DPDKのmbufの構造

DPDKではパケットをmbufというデータ構造で管理しています. mbufについては http://dpdk.org/doc/guides/prog_guide/mbuf_lib.html にまとまっていますが,ここでは具体的なmbufの構成や使われ方についてみていきたいと思います.

mempoolのおさらい

mbufはmempoolを使用しているので,mbufの説明の前に簡単にmempoolについて説明します.

前回説明したとおり,mempoolは以下の特徴を持っています.

  • 固定長サイズのメモリアロケータ
  • 内部で空きオブジェクトをリングバッファとして管理
  • コアごとのキャッシュを持つことができる

mempoolの概要を図にすると,以下のようになります.

f:id:mm_i:20170529110320p:plain

rte_create_mempool()でmempoolを作成しますが,ここで確保したオブジェクトはTAILQであるmp->elt_listに追加されます.またそれと同時にオブジェクトはmp->pool_dataで示されるrte_ringに追加されます(rte_ringのデータ部の実体はポインタの配列で,オブジェクトを指します). mempoolのキャッシュが有効な場合はmempool構造体の後ろにキャッシュ用のデータ構造用意されます.こちらも実体はポインタの配列で,キャッシュされたオブジェクトを指します.キャッシュされたオブジェクトがring bufferの方に入っていることはありません.

mempoolからのオブジェクトの取得は以下のような流れになります.

  • rte_mempool_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned n)
    • mempool mからn個オブジェクトを取得
    • obj_tableに確保したオブジェクトのポインタが格納される
  • このとき実際にはまずmempoolのキャッシュからオブジェクトを取得しようとする
  • もしキャッシュにオブジェクトがなければ,mempool本体のring bufferからオブジェクトを取得

また,確保したオブジェクトをmempoolに戻す場合は,以下のようにします.

  • rte_mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table, unsigned n)
    • obj_tableのn個のオブジェクトをmempool mpに戻す
  • このとき,まずmpのキャッシュに戻そうとする
  • もしキャッシュが一杯なら,mempool本体のバッファにオブジェクトを戻す

ringの初期化

ringの初期化はrte_mempool_populate_phys()内のrte_mempool_ops_alloc()でおこなわれます.

    /* create the internal ring if not already done */
    if ((mp->flags & MEMPOOL_F_POOL_CREATED) == 0) {
        ret = rte_mempool_ops_alloc(mp);
        if (ret != 0)
            return ret;
        mp->flags |= MEMPOOL_F_POOL_CREATED;
    }

rte_mempool_ops_alloc()は設定によって何が呼ばれるか変わりますが,例えばmulti consumer / multi producer の場合はdrivers/mempool/ring/rte_mempool_ring.ccommon_ring_alloc()が呼ばれます.

static int
common_ring_alloc(struct rte_mempool *mp)
{
    int rg_flags = 0, ret;
    char rg_name[RTE_RING_NAMESIZE];
    struct rte_ring *r;

    ret = snprintf(rg_name, sizeof(rg_name),
        RTE_MEMPOOL_MZ_FORMAT, mp->name);
    if (ret < 0 || ret >= (int)sizeof(rg_name)) {
        rte_errno = ENAMETOOLONG;
        return -rte_errno;
    }

    /* ring flags */
    if (mp->flags & MEMPOOL_F_SP_PUT)
        rg_flags |= RING_F_SP_ENQ;
    if (mp->flags & MEMPOOL_F_SC_GET)
        rg_flags |= RING_F_SC_DEQ;

    /*
     * Allocate the ring that will be used to store objects.
     * Ring functions will return appropriate errors if we are
     * running as a secondary process etc., so no checks made
     * in this function for that condition.
     */
    r = rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
        mp->socket_id, rg_flags);
    if (r == NULL)
        return -rte_errno;

    mp->pool_data = r;

    return 0;
}

mbufの構造

mbufの構造はざっくりと以下のようになります.

f:id:mm_i:20170529110606p:plain

mbufはmempoolを利用しており,一つのmbufが一つのmempoolのオブジェクトになります. mbufの先頭にrte_mbuf構造体が存在し,その後ろに実際のデータ領域が続いています. 後からデータを追加できるように,data_off分の領域が先頭から空いています. rte_pktmbuf_prepend(), rte_pktmbuf_append()を使ってdataの先頭/末尾にデータを追加できます. また,もし受信パケットが分割されている場合はmbuf->nextが次のmbufへを指します. mfbu->nb_segsがパケット全体がいくつのmbufから成るか示す値です.

rte_mbufは以下のようになっています.

struct rte_mbuf {
    MARKER cacheline0;

    void *buf_addr;           /**< Virtual address of segment buffer. */
    /**
     * Physical address of segment buffer.
     * Force alignment to 8-bytes, so as to ensure we have the exact
     * same mbuf cacheline0 layout for 32-bit and 64-bit. This makes
     * working on vector drivers easier.
     */
    phys_addr_t buf_physaddr __rte_aligned(sizeof(phys_addr_t));

    /* next 8 bytes are initialised on RX descriptor rearm */
    MARKER64 rearm_data;
    uint16_t data_off;

    /**
     * Reference counter. Its size should at least equal to the size
     * of port field (16 bits), to support zero-copy broadcast.
     * It should only be accessed using the following functions:
     * rte_mbuf_refcnt_update(), rte_mbuf_refcnt_read(), and
     * rte_mbuf_refcnt_set(). The functionality of these functions (atomic,
     * or non-atomic) is controlled by the CONFIG_RTE_MBUF_REFCNT_ATOMIC
     * config option.
     */
    RTE_STD_C11
    union {
        rte_atomic16_t refcnt_atomic; /**< Atomically accessed refcnt */
        uint16_t refcnt;              /**< Non-atomically accessed refcnt */
    };
    uint16_t nb_segs;         /**< Number of segments. */

    /** Input port (16 bits to support more than 256 virtual ports). */
    uint16_t port;

    uint64_t ol_flags;        /**< Offload features. */

    /* remaining bytes are set on RX when pulling packet from descriptor */
    MARKER rx_descriptor_fields1;

    /*
     * The packet type, which is the combination of outer/inner L2, L3, L4
     * and tunnel types. The packet_type is about data really present in the
     * mbuf. Example: if vlan stripping is enabled, a received vlan packet
     * would have RTE_PTYPE_L2_ETHER and not RTE_PTYPE_L2_VLAN because the
     * vlan is stripped from the data.
     */
    RTE_STD_C11
    union {
        uint32_t packet_type; /**< L2/L3/L4 and tunnel information. */
        struct {
            uint32_t l2_type:4; /**< (Outer) L2 type. */
            uint32_t l3_type:4; /**< (Outer) L3 type. */
            uint32_t l4_type:4; /**< (Outer) L4 type. */
            uint32_t tun_type:4; /**< Tunnel type. */
            uint32_t inner_l2_type:4; /**< Inner L2 type. */
            uint32_t inner_l3_type:4; /**< Inner L3 type. */
            uint32_t inner_l4_type:4; /**< Inner L4 type. */
        };
    };

    uint32_t pkt_len;         /**< Total pkt len: sum of all segments. */
    uint16_t data_len;        /**< Amount of data in segment buffer. */
    /** VLAN TCI (CPU order), valid if PKT_RX_VLAN_STRIPPED is set. */
    uint16_t vlan_tci;

    union {
        uint32_t rss;     /**< RSS hash result if RSS enabled */
        struct {
            RTE_STD_C11
            union {
                struct {
                    uint16_t hash;
                    uint16_t id;
                };
                uint32_t lo;
                /**< Second 4 flexible bytes */
            };
            uint32_t hi;
            /**< First 4 flexible bytes or FD ID, dependent on
                 PKT_RX_FDIR_* flag in ol_flags. */
        } fdir;           /**< Filter identifier if FDIR enabled */
        struct {
            uint32_t lo;
            uint32_t hi;
        } sched;          /**< Hierarchical scheduler */
        uint32_t usr;     /**< User defined tags. See rte_distributor_process() */
    } hash;                   /**< hash information */

    /** Outer VLAN TCI (CPU order), valid if PKT_RX_QINQ_STRIPPED is set. */
    uint16_t vlan_tci_outer;

    uint16_t buf_len;         /**< Length of segment buffer. */

    /** Valid if PKT_RX_TIMESTAMP is set. The unit and time reference
     * are not normalized but are always the same for a given port.
     */
    uint64_t timestamp;

    /* second cache line - fields only used in slow path or on TX */
    MARKER cacheline1 __rte_cache_min_aligned;

    RTE_STD_C11
    union {
        void *userdata;   /**< Can be used for external metadata */
        uint64_t udata64; /**< Allow 8-byte userdata on 32-bit */
    };

    struct rte_mempool *pool; /**< Pool from which mbuf was allocated. */
    struct rte_mbuf *next;    /**< Next segment of scattered packet. */

    /* fields to support TX offloads */
    RTE_STD_C11
    union {
        uint64_t tx_offload;       /**< combined for easy fetch */
        __extension__
        struct {
            uint64_t l2_len:7;
            /**< L2 (MAC) Header Length for non-tunneling pkt.
             * Outer_L4_len + ... + Inner_L2_len for tunneling pkt.
             */
            uint64_t l3_len:9; /**< L3 (IP) Header Length. */
            uint64_t l4_len:8; /**< L4 (TCP/UDP) Header Length. */
            uint64_t tso_segsz:16; /**< TCP TSO segment size */

            /* fields for TX offloading of tunnels */
            uint64_t outer_l3_len:9; /**< Outer L3 (IP) Hdr Length. */
            uint64_t outer_l2_len:7; /**< Outer L2 (MAC) Hdr Length. */

            /* uint64_t unused:8; */
        };
    };

    /** Size of the application private data. In case of an indirect
     * mbuf, it stores the direct mbuf private data size. */
    uint16_t priv_size;

    /** Timesync flags for use with IEEE1588. */
    uint16_t timesync;

    /** Sequence number. See also rte_reorder_insert(). */
    uint32_t seqn;

} __rte_cache_aligned;

refcntがあるのはmbufのデータが他のmbufから参照される場合があるからです(下図).

f:id:mm_i:20170529110553p:plain

rte_pktmbuf_attach()で他のmbufのデータを参照するようにできます.また,ヘルパー関数としてrte_pktmbuf_clone()で指定したmbufのデータを参照するmbufを作成できます. dpdkでは他のmbufのデータを指すmbufをindiret,自分自身のデータを指すmbufをdirectと呼んでいます. indirectなmbufは自身のデータ領域を使わないので,オブジェクトサイズが小さいmempoolからmbufを確保した方がメモリ効率が上がります.

mbufの作成

mbufの作成は,rte_pktmbuf_pool_create()でおこないます. rte_pktmbuf_pool_create()では,rte_mempool_create_empty()および rte_mempool_set_ops_byname(), rte_mempool_populate_default()を利用して mempoolを作成したのち,最後にrte_mempool_obj_iter()を使って 各mempoolのelemに対してrte_pktmbuf_init()を実行します.

struct rte_mempool *
rte_pktmbuf_pool_create(const char *name, unsigned n,
    unsigned cache_size, uint16_t priv_size, uint16_t data_room_size,
    int socket_id)
{
    struct rte_mempool *mp;
    struct rte_pktmbuf_pool_private mbp_priv;
    unsigned elt_size;
    int ret;

    if (RTE_ALIGN(priv_size, RTE_MBUF_PRIV_ALIGN) != priv_size) {
        RTE_LOG(ERR, MBUF, "mbuf priv_size=%u is not aligned\n",
            priv_size);
        rte_errno = EINVAL;
        return NULL;
    }
    elt_size = sizeof(struct rte_mbuf) + (unsigned)priv_size +
        (unsigned)data_room_size;
    mbp_priv.mbuf_data_room_size = data_room_size;
    mbp_priv.mbuf_priv_size = priv_size;


    mp = rte_mempool_create_empty(name, n, elt_size, cache_size,
         sizeof(struct rte_pktmbuf_pool_private), socket_id, 0);
    if (mp == NULL)
        return NULL;

    ret = rte_mempool_set_ops_byname(mp,
        RTE_MBUF_DEFAULT_MEMPOOL_OPS, NULL);
    if (ret != 0) {
        RTE_LOG(ERR, MBUF, "error setting mempool handler\n");
        rte_mempool_free(mp);
        rte_errno = -ret;
        return NULL;
    }
    rte_pktmbuf_pool_init(mp, &mbp_priv);

    ret = rte_mempool_populate_default(mp);
    if (ret < 0) {
        rte_mempool_free(mp);
        rte_errno = -ret;
        return NULL;
    }

    rte_mempool_obj_iter(mp, rte_pktmbuf_init, NULL);

    return mp;
}

rte_pktmbuf_init()ではmempoolの一つのオブジェクトの先頭にrte_mbufの構造体 領域を確保し,それを初期化しています.

void
rte_pktmbuf_init(struct rte_mempool *mp,
         __attribute__((unused)) void *opaque_arg,
         void *_m,
         __attribute__((unused)) unsigned i)
{
    struct rte_mbuf *m = _m;
    uint32_t mbuf_size, buf_len, priv_size;

    priv_size = rte_pktmbuf_priv_size(mp);
    mbuf_size = sizeof(struct rte_mbuf) + priv_size;
    buf_len = rte_pktmbuf_data_room_size(mp);

    RTE_ASSERT(RTE_ALIGN(priv_size, RTE_MBUF_PRIV_ALIGN) == priv_size);
    RTE_ASSERT(mp->elt_size >= mbuf_size);
    RTE_ASSERT(buf_len <= UINT16_MAX);

    memset(m, 0, mp->elt_size);

    /* start of buffer is after mbuf structure and priv data */
    m->priv_size = priv_size;
    m->buf_addr = (char *)m + mbuf_size;
    m->buf_physaddr = rte_mempool_virt2phy(mp, m) + mbuf_size;
    m->buf_len = (uint16_t)buf_len;

    /* keep some headroom between start of buffer and data */
    m->data_off = RTE_MIN(RTE_PKTMBUF_HEADROOM, (uint16_t)m->buf_len);

    /* init some constant fields */
    m->pool = mp;
    m->nb_segs = 1;
    m->port = 0xff;
    rte_mbuf_refcnt_set(m, 1);
    m->next = NULL;
}

mbufの利用: パケットの受信

実際にパケットの受信時にどのようにmbufが利用されるか見ていきたいと思います.

rx/txバッファの設定

dpdkのexampleにあるl2fwdのプログラムでは以下のように初期化をおこなっています.

// mbufの作成
pktmbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", NB_MBUF,
        MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE,
        rte_socket_id());
...

// ポートごとに初期化
for (portid = 0; portid < nb_ports; portid++) {
    ...
    ret = rte_eth_dev_configure(portid, 1, 1, &port_conf);
    ...
    ret = rte_eth_rx_queue_setup(portid, 0, nb_rxd,
                     rte_eth_dev_socket_id(portid),
                     NULL,
                     l2fwd_pktmbuf_pool);
    ...
    ret = rte_eth_tx_queue_setup(portid, 0, nb_txd,
            rte_eth_dev_socket_id(portid),
}

まずrte_pktmbuf_pool_create()でプログラム全体が利用するmbufを確保します. ちなみに,NB_MBUFは8192, RTE_MBUF_DEFAULT_BUF_SIZEは2KB + RTE_PKTMBUF_HEADROOM(自分の環境では128B)です.

その後各NICのポートの設定をおこないますが,特に重要なのがrte_eth_rx_queue_setup()rte_eth_tx_queue_setup()です. ixgbeを使用する場合,rte_eth_rx_queue_setup() から最終的には drivers/net/ixgbe/ixgbe_rxtx.cixgbe_dev_rx_queue_setup()が呼ばれます.

ixgbe_dev_rx_queue_setup()ではrx queue用のデータ構造(ixgbe_rx_queue)やring descriptor用のメモリを確保したりします.

int __attribute__((cold))
ixgbe_dev_rx_queue_setup(struct rte_eth_dev *dev,
             uint16_t queue_idx,
             uint16_t nb_desc,
             unsigned int socket_id,
             const struct rte_eth_rxconf *rx_conf,
             struct rte_mempool *mp)
{
    ...
    /* First allocate the rx queue data structure */
    rxq = rte_zmalloc_socket("ethdev RX queue", sizeof(struct ixgbe_rx_queue),
                 RTE_CACHE_LINE_SIZE, socket_id);
    if (rxq == NULL)
        return -ENOMEM;
    rxq->mb_pool = mp;
    rxq->nb_rx_desc = nb_desc;
    rxq->rx_free_thresh = rx_conf->rx_free_thresh;
    rxq->queue_id = queue_idx;
    rxq->reg_idx = (uint16_t)((RTE_ETH_DEV_SRIOV(dev).active == 0) ?
        queue_idx : RTE_ETH_DEV_SRIOV(dev).def_pool_q_idx + queue_idx);
    rxq->port_id = dev->data->port_id;
    rxq->crc_len = (uint8_t) ((dev->data->dev_conf.rxmode.hw_strip_crc) ?
                            0 : ETHER_CRC_LEN);
    rxq->drop_en = rx_conf->rx_drop_en;
    rxq->rx_deferred_start = rx_conf->rx_deferred_start;

    ...

    /*
     * Allocate RX ring hardware descriptors. A memzone large enough to
     * handle the maximum ring size is allocated in order to allow for
     * resizing in later calls to the queue setup function.
     */
    rz = rte_eth_dma_zone_reserve(dev, "rx_ring", queue_idx,
                      RX_RING_SZ, IXGBE_ALIGN, socket_id);
    ...

    rxq->rx_ring_phys_addr = rte_mem_phy2mch(rz->memseg_id, rz->phys_addr);
    rxq->rx_ring = (union ixgbe_adv_rx_desc *) rz->addr;

    ...

    /*
     * Allocate software ring. Allow for space at the end of the
     * S/W ring to make sure look-ahead logic in bulk alloc Rx burst
     * function does not access an invalid memory region.
     */
    len = nb_desc;
    if (adapter->rx_bulk_alloc_allowed)
        len += RTE_PMD_IXGBE_RX_MAX_BURST;

    rxq->sw_ring = rte_zmalloc_socket("rxq->sw_ring",
                      sizeof(struct ixgbe_rx_entry) * len,
                      RTE_CACHE_LINE_SIZE, socket_id);
    ...

    dev->data->rx_queues[queue_idx] = rxq;

    ixgbe_reset_rx_queue(adapter, rxq);

    return 0;
}

rx ring

ixgbeの場合のrx ringのデータ構造は以下のようになっています.

f:id:mm_i:20170529110825p:plain

struct ixgbe_rx_queue {
    struct rte_mempool  *mb_pool; /**< mbuf pool to populate RX ring. */
    volatile union ixgbe_adv_rx_desc *rx_ring; /**< RX ring virtual address. */
    uint64_t            rx_ring_phys_addr; /**< RX ring DMA address. */
    volatile uint32_t   *rdt_reg_addr; /**< RDT register address. */
    volatile uint32_t   *rdh_reg_addr; /**< RDH register address. */
    struct ixgbe_rx_entry *sw_ring; /**< address of RX software ring. */
    struct ixgbe_scattered_rx_entry *sw_sc_ring; /**< address of scattered Rx software ring. */
    struct rte_mbuf *pkt_first_seg; /**< First segment of current packet. */
    struct rte_mbuf *pkt_last_seg; /**< Last segment of current packet. */
    uint64_t            mbuf_initializer; /**< value to init mbufs */
    uint16_t            nb_rx_desc; /**< number of RX descriptors. */
    uint16_t            rx_tail;  /**< current value of RDT register. */
    uint16_t            nb_rx_hold; /**< number of held free RX desc. */
    uint16_t rx_nb_avail; /**< nr of staged pkts ready to ret to app */
    uint16_t rx_next_avail; /**< idx of next staged pkt to ret to app */
    uint16_t rx_free_trigger; /**< triggers rx buffer allocation */
    uint16_t            rx_using_sse;
    /**< indicates that vector RX is in use */
#ifdef RTE_IXGBE_INC_VECTOR
    uint16_t            rxrearm_nb;     /**< number of remaining to be re-armed */
    uint16_t            rxrearm_start;  /**< the idx we start the re-arming from */
#endif
    uint16_t            rx_free_thresh; /**< max free RX desc to hold. */
    uint16_t            queue_id; /**< RX queue index. */
    uint16_t            reg_idx;  /**< RX queue register index. */
    uint16_t            pkt_type_mask;  /**< Packet type mask for different NICs. */
    uint8_t             port_id;  /**< Device port identifier. */
    uint8_t             crc_len;  /**< 0 if CRC stripped, 4 otherwise. */
    uint8_t             drop_en;  /**< If not 0, set SRRCTL.Drop_En. */
    uint8_t             rx_deferred_start; /**< not in global dev start. */
    /** flags to set in mbuf when a vlan is detected. */
    uint64_t            vlan_flags;
    /** need to alloc dummy mbuf, for wraparound when scanning hw ring */
    struct rte_mbuf fake_mbuf;
    /** hold packets to return to application */
    struct rte_mbuf *rx_stage[RTE_PMD_IXGBE_RX_MAX_BURST*2];
};

rx_ringが実際のixgbeのrx ringを指すポインタです.rx ringのbuffer addressはmbufのデータ部分を指します.sw_ringがrx ringで利用されるmbufを保持しています. rx_ringpkt_addrが対応するsw_ringのmbufのデータ部分を指します. 受信時はrx_tailで示されるインデックスからdescriptorを確認していきます(NICのtailとは異なります).

x540のreceive descriptor

ここで簡単にX540のreceive descriptorについて説明しておきます. 詳しくはデータシートの7.1に書いてあります.

受信する際に利用するring descriptor queueは以下のような構造をしています(データシートより引用).

f:id:mm_i:20170529110750p:plain

キューの各エントリがdescriptorと呼ばれます. ソフトウェア側はdescriptorを設定したのち,tailを進めます.これは実際にはX540のRDTレジスタの値を更新することでおこないます. またNICは内部的に空きdescriptorの先頭位置をheadとして覚えており,パケットを受信したらheadのdescriptorに従ってパケットをメモリにDMAしたあと,headを進めます.

f:id:mm_i:20170529110801p:plainf:id:mm_i:20170529110808p:plain

descriptorのformatは以下のようになっています(descriptorのフォーマットにもいくつかありますが,ここではそのうちのadvance descritorを示しています).

descriptorのpacket buffer addressにパケットをDMAする先のアドレスを指定します. このアドレスはdpdkではmbufのデータのアドレスになります. X540では受信パケットのヘッダとデータを別々の領域へDMAする(header splitting)ことが可能であるためheader buffer addressを指定する場所もありますが,DPDKでは利用していません. ソフトウェアはDDビットが1になっているかどうかを調べることで,パケットが到着しているかどうかを判断することができます. またdescriptor write-back formatのフィールドにはNICが受信したパケットの情報を書き込みます.

パケットの受信

パケットの受信はrte_eth_rx_burst()でおこないます.

nb_rx = rte_eth_rx_burst((uint8_t) portid, 0,
             pkts_burst, MAX_PKT_BURST);

rte_eth_rx_burst()は最大MAX_PKT_BURST個のパケットを受信し,結果をpkts_burstに格納します. NICのオフロード機能を使用するかなどの設定によって,rte_eth_rx_burst()を呼んだときに呼ばれる関数は異なります.ixgbeではixgbe_dev_rx_init()の中で呼ばれるixgbe_set_rx_function()でrecv時の関数を設定しています. ここではrecv時にixgbe_recv_pkts_bulk_alloc()が設定されていると仮定して,その具体的な処理をみてみます.

ixgbe_recv_pkts_bulk_alloc()では,RTE_PMD_IXGBE_RX_MAX_BURST (実際は32)単位でのパケット読み出しをおこない,実際に受信したパケットを返します.

uint16_t
ixgbe_recv_pkts_bulk_alloc(void *rx_queue, struct rte_mbuf **rx_pkts,
               uint16_t nb_pkts)
{
    uint16_t nb_rx;

    if (unlikely(nb_pkts == 0))
        return 0;

    if (likely(nb_pkts <= RTE_PMD_IXGBE_RX_MAX_BURST))
        return rx_recv_pkts(rx_queue, rx_pkts, nb_pkts);

    /* request is relatively large, chunk it up */
    nb_rx = 0;
    while (nb_pkts) {
        uint16_t ret, n;

        n = (uint16_t)RTE_MIN(nb_pkts, RTE_PMD_IXGBE_RX_MAX_BURST);
        ret = rx_recv_pkts(rx_queue, &rx_pkts[nb_rx], n);
        nb_rx = (uint16_t)(nb_rx + ret);
        nb_pkts = (uint16_t)(nb_pkts - ret);
        if (ret < n)
            break;
    }

    return nb_rx;
}

実際の受信処理をおこなっているのはrx_recv_pkts()になります.

static inline uint16_t
rx_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
         uint16_t nb_pkts)
{
    struct ixgbe_rx_queue *rxq = (struct ixgbe_rx_queue *)rx_queue;
    uint16_t nb_rx = 0;

    /* Any previously recv'd pkts will be returned from the Rx stage */
    if (rxq->rx_nb_avail)
        return ixgbe_rx_fill_from_stage(rxq, rx_pkts, nb_pkts);

前回受信したパケットがまだ残っていたらそれを処理します (これはこの関数がnb_pkts単位で受信したパケットを返すからです).

    /* Scan the H/W ring for packets to receive */
    nb_rx = (uint16_t)ixgbe_rx_scan_hw_ring(rxq);

ixgbe_rx_scan_hw_ring()でパケットが到着しているかチェックします. nb_rxに受信したパケット数が入ります.

    /* update internal queue state */
    rxq->rx_next_avail = 0;
    rxq->rx_nb_avail = nb_rx;
    rxq->rx_tail = (uint16_t)(rxq->rx_tail + nb_rx);

rx_tailnb_rxだけ進めます.

    /* if required, allocate new buffers to replenish descriptors */
    if (rxq->rx_tail > rxq->rx_free_trigger) {
        uint16_t cur_free_trigger = rxq->rx_free_trigger;

        if (ixgbe_rx_alloc_bufs(rxq, true) != 0) {
            int i, j;

            PMD_RX_LOG(DEBUG, "RX mbuf alloc failed port_id=%u "
                   "queue_id=%u", (unsigned) rxq->port_id,
                   (unsigned) rxq->queue_id);

            rte_eth_devices[rxq->port_id].data->rx_mbuf_alloc_failed +=
                rxq->rx_free_thresh;

            /*
             * Need to rewind any previous receives if we cannot
             * allocate new buffers to replenish the old ones.
             */
            rxq->rx_nb_avail = 0;
            rxq->rx_tail = (uint16_t)(rxq->rx_tail - nb_rx);
            for (i = 0, j = rxq->rx_tail; i < nb_rx; ++i, ++j)
                rxq->sw_ring[j].mbuf = rxq->rx_stage[i];

            return 0;
        }

        /* update tail pointer */
        rte_wmb();
        IXGBE_PCI_REG_WRITE_RELAXED(rxq->rdt_reg_addr,
                        cur_free_trigger);
    }

rx_tailの値が設定した閾値以上になったら,ixgbe_rx_alloc_bufs()を使って新しいmbufを取得し,それをデスクリプタに設定します.またNICのtail pointerを更新します.

    if (rxq->rx_tail >= rxq->nb_rx_desc)
        rxq->rx_tail = 0;

この関数は一気にパケットを指定した数(8個)読もうとしますが,その際ring bufferの先頭に巻き戻って読むことはしません.もし末尾に到達したらここで先頭にインデックスを戻しています.(したがって,ring descriptorの最大数はこのバースト読み出し分を考慮して余分に確保しておく必要があります)

    /* received any packets this loop? */
    if (rxq->rx_nb_avail)
        return ixgbe_rx_fill_from_stage(rxq, rx_pkts, nb_pkts);

    return 0;
}

受信したパケットがあればixgbe_rx_fill_from_stage()でそのパケットの後処理をして返します.

デスクリプタをチェックしてパケットの受信を判断するixgbe_rx_scan_hw_ring()は以下のようになっています.

static inline int
ixgbe_rx_scan_hw_ring(struct ixgbe_rx_queue *rxq)
{
    volatile union ixgbe_adv_rx_desc *rxdp;
    struct ixgbe_rx_entry *rxep;
    struct rte_mbuf *mb;
    uint16_t pkt_len;
    uint64_t pkt_flags;
    int nb_dd;
    uint32_t s[LOOK_AHEAD];
    uint32_t pkt_info[LOOK_AHEAD];
    int i, j, nb_rx = 0;
    uint32_t status;
    uint64_t vlan_flags = rxq->vlan_flags;

    /* get references to current descriptor and S/W ring entry */
    rxdp = &rxq->rx_ring[rxq->rx_tail];
    rxep = &rxq->sw_ring[rxq->rx_tail];

    status = rxdp->wb.upper.status_error;
    /* check to make sure there is at least 1 packet to receive */
    if (!(status & rte_cpu_to_le_32(IXGBE_RXDADV_STAT_DD)))
        return 0;

    /*
     * Scan LOOK_AHEAD descriptors at a time to determine which descriptors
     * reference packets that are ready to be received.
     */
    for (i = 0; i < RTE_PMD_IXGBE_RX_MAX_BURST;
         i += LOOK_AHEAD, rxdp += LOOK_AHEAD, rxep += LOOK_AHEAD) {
        /* Read desc statuses backwards to avoid race condition */
        for (j = 0; j < LOOK_AHEAD; j++)
            s[j] = rte_le_to_cpu_32(rxdp[j].wb.upper.status_error);

        rte_smp_rmb();

        /* Compute how many status bits were set */
        for (nb_dd = 0; nb_dd < LOOK_AHEAD &&
                (s[nb_dd] & IXGBE_RXDADV_STAT_DD); nb_dd++)
            ;

ここでDDビットを見てパケットを受信したか見ています.

        for (j = 0; j < nb_dd; j++)
            pkt_info[j] = rte_le_to_cpu_32(rxdp[j].wb.lower.
                               lo_dword.data);

        nb_rx += nb_dd;

        /* Translate descriptor info to mbuf format */
        for (j = 0; j < nb_dd; ++j) {
            mb = rxep[j].mbuf;
            pkt_len = rte_le_to_cpu_16(rxdp[j].wb.upper.length) -
                  rxq->crc_len;
            mb->data_len = pkt_len;
            mb->pkt_len = pkt_len;
            mb->vlan_tci = rte_le_to_cpu_16(rxdp[j].wb.upper.vlan);

            /* convert descriptor fields to rte mbuf flags */
            pkt_flags = rx_desc_status_to_pkt_flags(s[j],
                vlan_flags);
            pkt_flags |= rx_desc_error_to_pkt_flags(s[j]);
            pkt_flags |= ixgbe_rxd_pkt_info_to_pkt_flags
                    ((uint16_t)pkt_info[j]);
            mb->ol_flags = pkt_flags;
            mb->packet_type =
                ixgbe_rxd_pkt_info_to_pkt_type
                    (pkt_info[j], rxq->pkt_type_mask);

            if (likely(pkt_flags & PKT_RX_RSS_HASH))
                mb->hash.rss = rte_le_to_cpu_32(
                    rxdp[j].wb.lower.hi_dword.rss);
            else if (pkt_flags & PKT_RX_FDIR) {
                mb->hash.fdir.hash = rte_le_to_cpu_16(
                    rxdp[j].wb.lower.hi_dword.csum_ip.csum) &
                    IXGBE_ATR_HASH_MASK;
                mb->hash.fdir.id = rte_le_to_cpu_16(
                    rxdp[j].wb.lower.hi_dword.csum_ip.ip_id);
            }
        }

この部分で受信したパケットに関して,mbufのメタデータ部分ににパケットの情報を格納しています.

        /* Move mbuf pointers from the S/W ring to the stage */
        for (j = 0; j < LOOK_AHEAD; ++j) {
            rxq->rx_stage[i + j] = rxep[j].mbuf;
        }

        /* stop if all requested packets could not be received */
        if (nb_dd != LOOK_AHEAD)
            break;
    }

    /* clear software ring entries so we can cleanup correctly */
    for (i = 0; i < nb_rx; ++i) {
        rxq->sw_ring[rxq->rx_tail + i].mbuf = NULL;
    }


    return nb_rx;
}

最後に,受信したパケットのmbufのアドレスをrx_stageに入れ,sw_ringの方はNULLにします.

ixgbe_rx_fill_from_stage()では最大nb_pkts分の受信データをrx_pktsに格納して返します.

static inline uint16_t
ixgbe_rx_fill_from_stage(struct ixgbe_rx_queue *rxq, struct rte_mbuf **rx_pkts,
             uint16_t nb_pkts)
{
    struct rte_mbuf **stage = &rxq->rx_stage[rxq->rx_next_avail];
    int i;

    /* how many packets are ready to return? */
    nb_pkts = (uint16_t)RTE_MIN(nb_pkts, rxq->rx_nb_avail);

    /* copy mbuf pointers to the application's packet list */
    for (i = 0; i < nb_pkts; ++i)
        rx_pkts[i] = stage[i];

    /* update internal queue state */
    rxq->rx_nb_avail = (uint16_t)(rxq->rx_nb_avail - nb_pkts);
    rxq->rx_next_avail = (uint16_t)(rxq->rx_next_avail + nb_pkts);

    return nb_pkts;
}

また,ixgbe_rx_alloc_bufs()では以下のようにしてmbufを取得しています.

static inline int
ixgbe_rx_alloc_bufs(struct ixgbe_rx_queue *rxq, bool reset_mbuf)
{
    volatile union ixgbe_adv_rx_desc *rxdp;
    struct ixgbe_rx_entry *rxep;
    struct rte_mbuf *mb;
    uint16_t alloc_idx;
    __le64 dma_addr;
    int diag, i;

    /* allocate buffers in bulk directly into the S/W ring */
    alloc_idx = rxq->rx_free_trigger - (rxq->rx_free_thresh - 1);
    rxep = &rxq->sw_ring[alloc_idx];
    diag = rte_mempool_get_bulk(rxq->mb_pool, (void *)rxep,
                    rxq->rx_free_thresh);

rte_mempool_get_bulk()を使ってmempoolから空いているmbufを取得します.

    if (unlikely(diag != 0))
        return -ENOMEM;

    rxdp = &rxq->rx_ring[alloc_idx];
    for (i = 0; i < rxq->rx_free_thresh; ++i) {
        /* populate the static rte mbuf fields */
        mb = rxep[i].mbuf;
        if (reset_mbuf) {
            mb->port = rxq->port_id;
        }

        rte_mbuf_refcnt_set(mb, 1);
        mb->data_off = RTE_PKTMBUF_HEADROOM;

        /* populate the descriptors */
        dma_addr = rte_cpu_to_le_64(rte_mbuf_data_dma_addr_default(mb));
        rxdp[i].read.hdr_addr = 0;
        rxdp[i].read.pkt_addr = dma_addr;
    }

各デスクリプタのpkt_addrにmbufのデータ領域のアドレスを書き込みます.

    /* update state of internal queue structure */
    rxq->rx_free_trigger = rxq->rx_free_trigger + rxq->rx_free_thresh;
    if (rxq->rx_free_trigger >= rxq->nb_rx_desc)
        rxq->rx_free_trigger = rxq->rx_free_thresh - 1;

    /* no errors */
    return 0;
}

最後に次にmbufを確保する際の閾値となるrx_free_triggerを更新しています. ちなみに,rx_free_threshはデフォルトで32のようです.

mbufの利用: マルチキャスト

受信以外のmbuf利用の例として,サンプルプログラムに含まれているマルチキャストのプログラムを見てみます.

http://dpdk.org/doc/guides/sample_app_ug/ipv4_multicast.html は受信したパケットを複数のポートからマルチキャストするサンプルプログラムです. マルチキャストの流れは以下のようになります.

f:id:mm_i:20170529110435p:plain

  1. 受信パケットからヘッダを取り除く (rte_pktmbuf_adj()を使ってdata_offを長くする)
  2. rte_pktmbuf_clone()を使って受信パケットのデータ部分を指すmbufを作成
  3. rte_pktmbuf_create()を使ってヘッダ部分のパケットを作成,mbuf->nextはcloneしたmbufを指すようにする 4. 作成したパケットを指定したポートから送信

cloneを使って複数のポートから送信する場合,各パケットは自身のメタデータを持つことになります.したがって最後のポートに関してはcloneをする必要がなく,受信パケットのメタデータを直接変更して使用することができます. もちろん,cloneをせず直接受信パケットをmbuf->nextで指す方法もありますが,この場合は受信パケットのメタデータを変更できないため,最後のポートに関しても他の場合と同様にして送信をおこないます. コードにはcloneする場合としない場合,2通りのコードが書いてあります.