DPDKにおけるメモリ管理

DPDKにおいてどのようにメモリが使用されるかをここにまとめたいと思います. なお,ここに書いてあるものはあくまで私がコードを読んで解釈した結果になります. DPDKのバージョンは17.05,OSはLinuxを対象としています.

全体像

DPDKでは独自のメモリアロケータを利用してメモリを管理しています. メモリ管理のために以下のようなデータ構造が利用されます.

  • memconfig : 作成したmemsegやmemzoneなどの情報を保持
  • memseg : 連続した物理メモリ領域情報を保持
  • heap : メモリの空きブロックの管理 / 割り当て.memsegのメモリ領域を使用する.
  • memzone : 連続した物理メモリ領域を名前をつけて割り当て.内部でheapを利用
  • mempool : 固定長メモリアロケータ.内部でmemzoneを利用

前提知識: queue(3)

本題に入る前に,DPDKでは内部で複数のリストを保持していますが,このリストには queue(3)を利用しています. queueについて知っておいた方がコードが読みやすいです.

queueにはSLIST,STAILQおよびLISTとTAILQの4種類があります. SLISTとSTAILQが単方向リスト,LISTとTAILQが連結リストで,TAILQの場合は末尾への挿入が可能です.queueは全てマクロとして実装されています.

TAILQの簡単な使用例は以下のようになります.

struct entry{
    TAILQ_ENTRY(entry) tq;
    int data;
} e1, e2, e3;

TAILQ_HEAD(entry_head, entry);

int main(){
    struct entry_head head;
    TAILQ_INIT(&head);

    e1.data = 1;
    e2.data = 2;
    e3.data = 3;

    TAILQ_INSERT_HEAD(&head, &e1, tq);
    TAILQ_INSERT_HEAD(&head, &e2, tq);
    TAILQ_INSERT_TAIL(&head, &e3, tq);

    struct entry* p = TAILQ_FIRST(&head);
    while(p != NULL){
        printf("%d\n", p->data);
        p = TAILQ_NEXT(p, tq);
    }
}

初期化部分がやや分かりにくいですが,TAILQ_ENTRY()TAILQ_HEAD()が以下のよ うに展開されることが理解できれば納得できると思います.

#define _TAILQ_HEAD(name, type, qual)                   \
struct name {                               \
    qual type *tqh_first;       /* first element */     \
    qual type *qual *tqh_last;  /* addr of last next element */ \
}
#define TAILQ_HEAD(name, type)  _TAILQ_HEAD(name, struct type,)

#define _TAILQ_ENTRY(type, qual)                    \
struct {                                \
    qual type *tqe_next;        /* next element */      \
    qual type *qual *tqe_prev;  /* address of previous next element */\
}
#define TAILQ_ENTRY(type)   _TAILQ_ENTRY(struct type,)

基本的にマクロにはポインタを渡します. queueの実装は全てヘッダファイルに記述してあるので,見てみると参考になると思います.

glibcの実装: https://sourceware.org/git/?p=glibc.git;a=blob_plain;f=misc/sys/queue.h;hb=HEAD

ちなみに,TAILQの実装を読む際は,

  • tqe_prev は 前の要素の tqe_next のアドレスを保持している
    • これを利用して要素を挿入した際に前の要素のtqe_nextを書き換える
  • ある要素の前の要素を取得する際は,前の要素のteq_prevを使う
    • 実質的に前の要素のtqe_nextを参照していることになる

ということを抑えておけば理解が容易だと思います.

memconfig

rte_mem_configは,EALの中でメモリ全体の情報を保持しているデータ構造です. 具体的には作成したmemsegやmemzone,heap等(後述)を保持しています. EALの中ではrte_eal_get_configuration()->mem_configからmemconfigを取得できます.

librte_eal/common/rte_eal_memconfig.h:

struct rte_mem_config {
    volatile uint32_t magic;   /**< Magic number - Sanity check. */

    /* memory topology */
    uint32_t nchannel;    /**< Number of channels (0 if unknown). */
    uint32_t nrank;       /**< Number of ranks (0 if unknown). */

    /**
     * current lock nest order
     *  - qlock->mlock (ring/hash/lpm)
     *  - mplock->qlock->mlock (mempool)
     * Notice:
     *  *ALWAYS* obtain qlock first if having to obtain both qlock and mlock
     */
    rte_rwlock_t mlock;   /**< only used by memzone LIB for thread-safe. */
    rte_rwlock_t qlock;   /**< used for tailq operation for thread safe. */
    rte_rwlock_t mplock;  /**< only used by mempool LIB for thread-safe. */

    uint32_t memzone_cnt; /**< Number of allocated memzones */

    /* memory segments and zones */
    struct rte_memseg memseg[RTE_MAX_MEMSEG];    /**< Physmem descriptors. */
    struct rte_memzone memzone[RTE_MAX_MEMZONE]; /**< Memzone descriptors. */

    struct rte_tailq_head tailq_head[RTE_MAX_TAILQ]; /**< Tailqs for objects */

    /* Heaps of Malloc per socket */
    struct malloc_heap malloc_heaps[RTE_MAX_NUMA_NODES];

    /* address of mem_config in primary process. used to map shared config into
     * exact same address the primary process maps it.
     */
    uint64_t mem_cfg_addr;
} __attribute__((__packed__));

初期化

DPDKのプログラムで最初にrte_eal_init()を呼ぶと様々な初期化処理がおこなわれますが, その中でも rte_eal_memory_init()でmemsegの初期化,rte_eal_memzone_init()で memzone及びheapの初期化がおこなわれます.

memseg

memsegは連続した物理アドレス空間の情報を保持するデータ構造です.

librte_eal/common/include/rte_memory.h:

struct rte_memseg {
    phys_addr_t phys_addr;      /**< Start physical address. */
    RTE_STD_C11
    union {
        void *addr;         /**< Start virtual address. */
        uint64_t addr_64;   /**< Makes sure addr is always 64 bits */
    };
    size_t len;               /**< Length of the segment. */
    uint64_t hugepage_sz;       /**< The pagesize of underlying memory */
    int32_t socket_id;          /**< NUMA socket ID. */
    uint32_t nchannel;          /**< Number of channels. */
    uint32_t nrank;             /**< Number of ranks. */
#ifdef RTE_LIBRTE_XEN_DOM0
     /**< store segment MFNs */
    uint64_t mfn[DOM0_NUM_MEMBLOCK];
#endif
}

phys_addrに先頭の物理アドレスが,lenに長さが格納されています. memsegの初期化は rte_eal_init() => rte_eal_memory_init() => rte_eal_hugepage_init() の中でおこなわれます.

具体的な物理アドレス確保の方法は前回のエントリに書いた通りです : DPDKにおけるhugepageの初期化部分. なるべく物理的に連続した領域を確保しようとします.

heap

連続したメモリの空きブロックの管理にはheapを利用します.

librte_eal/common/malloc_heap.h:

struct malloc_heap {
     rte_spinlock_t lock;
     LIST_HEAD(, malloc_elem) free_head[RTE_HEAP_NUM_FREELISTS];
     unsigned alloc_count;
     size_t total_size;
 } __rte_cache_aligned;

struct malloc_elem {
    struct malloc_heap *heap;
    struct malloc_elem *volatile prev;      /* points to prev elem in memseg */
    LIST_ENTRY(malloc_elem) free_list;      /* list of free elements in heap */
    const struct rte_memseg *ms;
    volatile enum elem_state state;
    uint32_t pad;
    size_t size;
#ifdef RTE_LIBRTE_MALLOC_DEBUG
    uint64_t header_cookie;         /* Cookie marking start of data */
                                    /* trailer cookie at start + size */
#endif
} __rte_cache_aligned;

malloc_elemが一つのメモリブロックで,それはqueue(3)のLISTの一要素となっています. LISTの先頭はmalloc_heap構造体のfree_headからアクセス可能です. フリーリストはメモリブロックのサイズの大きさごとに分けられています(後述のmalloc_elem_free_list_index()参照). またmalloc_heap自体はmemconfigのmalloc_heapsからアクセスできます.

heapの初期化はrte_eal_memory_init()のあとに呼ばれるrte_eal_memzone_init() の中のrte_malloc_heap_init()でおこなわれます.

rte_malloc_heap_init()ではmalloc_heap_add_memseg()を呼び,各memsegをheapに 追加します.ここでは一つのmemseg全体が一つのmalloc_elemとして追加されます.

librte_eal/common/malloc_heap.c:

static void
malloc_heap_add_memseg(struct malloc_heap *heap, struct rte_memseg *ms)
{
    /* allocate the memory block headers, one at end, one at start */
    struct malloc_elem *start_elem = (struct malloc_elem *)ms->addr;
    struct malloc_elem *end_elem = RTE_PTR_ADD(ms->addr,
            ms->len - MALLOC_ELEM_OVERHEAD);
    end_elem = RTE_PTR_ALIGN_FLOOR(end_elem, RTE_CACHE_LINE_SIZE);
    const size_t elem_size = (uintptr_t)end_elem - (uintptr_t)start_elem;

    malloc_elem_init(start_elem, heap, ms, elem_size);
    malloc_elem_mkend(end_elem, start_elem);
    malloc_elem_free_list_insert(start_elem);

    heap->total_size += elem_size;
}

librte_eal/common/malloc_elem.c:

void
malloc_elem_init(struct malloc_elem *elem,
        struct malloc_heap *heap, const struct rte_memseg *ms, size_t size)
{
    elem->heap = heap;
    elem->ms = ms;
    elem->prev = NULL;
    memset(&elem->free_list, 0, sizeof(elem->free_list));
    elem->state = ELEM_FREE;
    elem->size = size;
    elem->pad = 0;
    set_header(elem);
    set_trailer(elem);
}

ここでは各memsegの先頭と末尾にmalloc_elem用の領域を確保してそれを初期化しています. 末尾にもmalloc_elemを用意するのはオーバフロー防止のためのようです. malloc_elemのsizeはmalloc_elem構造体自身を含んだサイズのようです.

malloc_elem_free_list_insert()で作成したmalloc_elemmaloc_heapfree_headで先頭が示されるフリーリストに追加します. このフリーリストはメモリブロックの大きさごとに分かれており,どこのフリーリスト に入れるかはmalloc_elem_free_list_index()で決めています. コメントを読む限り,

heap->free_head[0] - (0   , 2^8]
heap->free_head[1] - (2^8 , 2^10]
heap->free_head[2] - (2^10 ,2^12]
heap->free_head[3] - (2^12, 2^14]
...

というような感じに分けられて格納されるようです.

memzone

memzoneは連続した物理アドレス空間に名前をつけて保持するためのデータ構造です. 取得したmemzoneはmemconfig.memzoneに保持されます. memzoneの確保の際に内部的にはheapからメモリを確保します.

librte_eal/common/include/rte_memzone.h:

struct rte_memzone {

#define RTE_MEMZONE_NAMESIZE 32       /**< Maximum length of memory zone name.*/
    char name[RTE_MEMZONE_NAMESIZE];  /**< Name of the memory zone. */

    phys_addr_t phys_addr;            /**< Start physical address. */
    RTE_STD_C11
    union {
        void *addr;                   /**< Start virtual address. */
        uint64_t addr_64;             /**< Makes sure addr is always 64-bits */
    };
    size_t len;                       /**< Length of the memzone. */

    uint64_t hugepage_sz;             /**< The page size of underlying memory */

    int32_t socket_id;                /**< NUMA socket ID. */

    uint32_t flags;                   /**< Characteristics of this memzone. */
    uint32_t memseg_id;               /**< Memseg it belongs. */
} __attribute__((__packed__));

rte_eal_memzone_init()の中ではmemzoneは空に初期化されるだけです.

heapからのメモリの割り当て

heapからの割り当てはmalloc_heap_alloc()からおこないます.

librte_eal/common/malloc_heap.c:

void *
malloc_heap_alloc(struct malloc_heap *heap,
        const char *type __attribute__((unused)), size_t size, unsigned flags,
        size_t align, size_t bound)
{
    struct malloc_elem *elem;

    size = RTE_CACHE_LINE_ROUNDUP(size);
    align = RTE_CACHE_LINE_ROUNDUP(align);

    rte_spinlock_lock(&heap->lock);

    elem = find_suitable_element(heap, size, flags, align, bound);
    if (elem != NULL) {
        elem = malloc_elem_alloc(elem, size, align, bound);
        /* increase heap's count of allocated elements */
        heap->alloc_count++;
    }
    rte_spinlock_unlock(&heap->lock);

    return elem == NULL ? NULL : (void *)(&elem[1]);
}

ここではまずfind_suitable_element()を利用して,ヒープのフリーリストからズの空きブロックを見つけます. やっていることはフリーリストを先頭から順に辿って行って,十分な大きさの空きブロックが見つかればそれを返します.ただしこの際指定したhugepage要件を満たすかチェックしています.

librte_eal/common/malloc_elem.c:

static struct malloc_elem *
find_suitable_element(struct malloc_heap *heap, size_t size,
        unsigned flags, size_t align, size_t bound)
{
    size_t idx;
    struct malloc_elem *elem, *alt_elem = NULL;

    for (idx = malloc_elem_free_list_index(size);
            idx < RTE_HEAP_NUM_FREELISTS; idx++) {
        for (elem = LIST_FIRST(&heap->free_head[idx]);
                !!elem; elem = LIST_NEXT(elem, free_list)) {
            if (malloc_elem_can_hold(elem, size, align, bound)) {
                if (check_hugepage_sz(flags, elem->ms->hugepage_sz))
                    return elem;
                if (alt_elem == NULL)
                    alt_elem = elem;
            }
        }
    }

    if ((alt_elem != NULL) && (flags & RTE_MEMZONE_SIZE_HINT_ONLY))
        return alt_elem;

    return NULL;
}

その後実際のmalloc_elem_alloc()で割り当てを行います.

struct malloc_elem *
malloc_elem_alloc(struct malloc_elem *elem, size_t size, unsigned align,
        size_t bound)
{
    struct malloc_elem *new_elem = elem_start_pt(elem, size, align, bound);
    const size_t old_elem_size = (uintptr_t)new_elem - (uintptr_t)elem;
    const size_t trailer_size = elem->size - old_elem_size - size -
        MALLOC_ELEM_OVERHEAD;

    elem_free_list_remove(elem);

割り当てはelemの末尾から指定したサイズ分だけ取得して割り当てます. まずelem_start_pt()で新しいオブジェクトの先頭のアドレスを取得します.

    if (trailer_size > MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) {
        /* split it, too much free space after elem */
        struct malloc_elem *new_free_elem =
                RTE_PTR_ADD(new_elem, size + MALLOC_ELEM_OVERHEAD);

        split_elem(elem, new_free_elem);
        malloc_elem_free_list_insert(new_free_elem);
    }

あんまり詳しく説明していませんが,割り当ての際に指定した境界に合うようにアラインメントしているので,その際にnew_elemの後にMIN_DATA_SIZE以上(自分の環境では64byte)の領域ができたらそれをフリーリストに追加しています*1

    if (old_elem_size < MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) {
        /* don't split it, pad the element instead */
        elem->state = ELEM_BUSY;
        elem->pad = old_elem_size;


        /* put a dummy header in padding, to point to real element header */
        if (elem->pad > 0){ /* pad will be at least 64-bytes, as everything
                             * is cache-line aligned */
            new_elem->pad = elem->pad;
            new_elem->state = ELEM_PAD;
            new_elem->size = elem->size - elem->pad;
            set_header(new_elem);
        }

        return new_elem;
    }

分割した際に,空きスペースがあまりにも小さい場合はそれをフリーリストに追加せ ず,ただ単にELEM_BUSYとするようです.

    /* we are going to split the element in two. The original element
     * remains free, and the new element is the one allocated.
     * Re-insert original element, in case its new size makes it
     * belong on a different list.
     */
    split_elem(elem, new_elem);
    new_elem->state = ELEM_BUSY;
    malloc_elem_free_list_insert(elem);

    return new_elem;
}

通常の場合はこのsplit_elem()で領域が分割され,利用しない前半部分はまたフリーリストに追加されます.

static void
split_elem(struct malloc_elem *elem, struct malloc_elem *split_pt)
{
    struct malloc_elem *next_elem = RTE_PTR_ADD(elem, elem->size);
    const size_t old_elem_size = (uintptr_t)split_pt - (uintptr_t)elem;
    const size_t new_elem_size = elem->size - old_elem_size;

    malloc_elem_init(split_pt, elem->heap, elem->ms, new_elem_size);
    split_pt->prev = elem;
    next_elem->prev = split_pt;
    elem->size = old_elem_size;
    set_trailer(elem);
}

図にすると以下のような感じだと思います.

f:id:mm_i:20170524154902p:plain

malloc_heap_allog()では最後(void *)(&elem[1])を返します.つまり,返されたアドレスはmalloc_elemのヘッダ部分を除いたアドレスになります.

memzoneの確保

メモリ領域を名前をつけて管理する場合はmemzoneを利用します. memzoneの確保はrte_memzone_reserve()からおこないます.

rte_memzone_reserve()を呼ぶと,最終的にmemzone_reserve_aligned_thread_unsafe()が呼ばれます. memzone_reserve_aligned_thread_unsafe()はそこそこ長いですが,主要部のみ抜き 出すと以下のようになります.

static const struct rte_memzone *
memzone_reserve_aligned_thread_unsafe(const char *name, size_t len,
        int socket_id, unsigned flags, unsigned align, unsigned bound)
{
    struct rte_memzone *mz;
    struct rte_mem_config *mcfg;
    size_t requested_len;
    int socket, i;

    ...

    /* get pointer to global configuration */
    mcfg = rte_eal_get_configuration()->mem_config;

    ...
    /* allocate memory on heap */
    void *mz_addr = malloc_heap_alloc(&mcfg->malloc_heaps[socket], NULL,
            requested_len, flags, align, bound);
    ...

    const struct malloc_elem *elem = malloc_elem_from_data(mz_addr);

    /* fill the zone in config */
    mz = get_next_free_memzone();
    ...

    mcfg->memzone_cnt++;
    snprintf(mz->name, sizeof(mz->name), "%s", name);
    mz->phys_addr = rte_malloc_virt2phy(mz_addr);
    mz->addr = mz_addr;
    mz->len = (requested_len == 0 ? elem->size : requested_len);
    mz->hugepage_sz = elem->ms->hugepage_sz;
    mz->socket_id = elem->ms->socket_id;
    mz->flags = 0;
    mz->memseg_id = elem->ms - rte_eal_get_configuration()->mem_config->memseg;

    return mz;
}

malloc_heap_alloc()からメモリを確保したあと,その情報をmemconfig.memzone[]に登録しています.

mempool

mempoolは固定長サイズのオブジェクトのアロケータです. フリーのオブジェクトはring構造で保持されます. mempoolでは各コアが占有して利用できるキャッシュを提供する機能などもあります. mempoolを作成する際,確保したいオブジェクトのサイズ及びその個数を指定しますが, 内部的にはそのオブジェクトが割り当てられるだけのmemzoneを取得します.

struct rte_mempool {
    /*
     * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
     * compatibility requirements, it could be changed to
     * RTE_MEMPOOL_NAMESIZE next time the ABI changes
     */
    char name[RTE_MEMZONE_NAMESIZE]; /**< Name of mempool. */
    RTE_STD_C11
    union {
        void *pool_data;         /**< Ring or pool to store objects. */
        uint64_t pool_id;        /**< External mempool identifier. */
    };
    void *pool_config;               /**< optional args for ops alloc. */
    const struct rte_memzone *mz;    /**< Memzone where pool is alloc'd. */
    int flags;                       /**< Flags of the mempool. */
    int socket_id;                   /**< Socket id passed at create. */
    uint32_t size;                   /**< Max size of the mempool. */
    uint32_t cache_size;
    /**< Size of per-lcore default local cache. */

    uint32_t elt_size;               /**< Size of an element. */
    uint32_t header_size;            /**< Size of header (before elt). */
    uint32_t trailer_size;           /**< Size of trailer (after elt). */

    unsigned private_data_size;      /**< Size of private data. */
    /**
     * Index into rte_mempool_ops_table array of mempool ops
     * structs, which contain callback function pointers.
     * We're using an index here rather than pointers to the callbacks
     * to facilitate any secondary processes that may want to use
     * this mempool.
     */

    int32_t ops_index;

    struct rte_mempool_cache *local_cache; /**< Per-lcore local cache */

    uint32_t populated_size;         /**< Number of populated objects. */
    struct rte_mempool_objhdr_list elt_list; /**< List of objects in pool */
    uint32_t nb_mem_chunks;          /**< Number of memory chunks */
    struct rte_mempool_memhdr_list mem_list; /**< List of memory chunks */

#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
    /** Per-lcore statistics. */
    struct rte_mempool_debug_stats stats[RTE_MAX_LCORE];
#endif
}  __rte_cache_aligned;

実際にmempoolで確保したオブジェクトはelt_listで先頭が示されるSTAILQで保持されます. また,確保した連続したメモリ領域はmem_listで先頭が示されるSTAILQで保持されます.

mempoolの初期化

mempoolの確保は以下のようにしておこなわれます.

  1. mempool構造体自体を格納する領域を一つのmemzoneとして取得
  2. 固定長のオブジェクトをn個格納するための領域を取得. この際領域は複数のmemzoneにまたがることがある.

mempoolの作成はrte_mempool_create()からおこないます. rte_mampool_create()は以下のようになっています.

librte_mempool/rte_mempool.c:

struct rte_mempool *
rte_mempool_create(const char *name, unsigned n, unsigned elt_size,
    unsigned cache_size, unsigned private_data_size,
    rte_mempool_ctor_t *mp_init, void *mp_init_arg,
    rte_mempool_obj_cb_t *obj_init, void *obj_init_arg,
    int socket_id, unsigned flags)
{
    int ret;
    struct rte_mempool *mp;

    mp = rte_mempool_create_empty(name, n, elt_size, cache_size,
        private_data_size, socket_id, flags);
    if (mp == NULL)
        return NULL;

    /*
     * Since we have 4 combinations of the SP/SC/MP/MC examine the flags to
     * set the correct index into the table of ops structs.
     */
    if ((flags & MEMPOOL_F_SP_PUT) && (flags & MEMPOOL_F_SC_GET))
        ret = rte_mempool_set_ops_byname(mp, "ring_sp_sc", NULL);
    else if (flags & MEMPOOL_F_SP_PUT)
        ret = rte_mempool_set_ops_byname(mp, "ring_sp_mc", NULL);
    else if (flags & MEMPOOL_F_SC_GET)
        ret = rte_mempool_set_ops_byname(mp, "ring_mp_sc", NULL);
    else
        ret = rte_mempool_set_ops_byname(mp, "ring_mp_mc", NULL);

    if (ret)
        goto fail;

    /* call the mempool priv initializer */
    if (mp_init)
        mp_init(mp, mp_init_arg);

    if (rte_mempool_populate_default(mp) < 0)
        goto fail;

    /* call the object initializers */
    if (obj_init)
        rte_mempool_obj_iter(mp, obj_init, obj_init_arg);

    return mp;

 fail:
    rte_mempool_free(mp);
    return NULL;
}

nameがmempoolの名前,nが割り当てたいオブジェクトの数,elt_sizeが各オブジェクトのサイズになります. 基本的にはrte_mempool_create_emtpy()で空のmempoolを作成し,その後rte_mempool_populate_default()で実際にオブジェクト分のメモリを確保します. また,rte_mempool_set_ops_byname()のところは,mempoolでオブジェクトを確保するスレッドが一つかどうかと解放するスレッドが一つかどうか(SC: Single Consumer, SP: Single Producer)のフラグによって,mempoolからオブジェクトを取得/解放する関数を切り替えて登録しています.

rte_mempool_create_empty()の主要部は以下のようになっています.

librte_mempool/rte_mempool.c:

/* create an empty mempool */
struct rte_mempool *
rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size,
    unsigned cache_size, unsigned private_data_size,
    int socket_id, unsigned flags)
{
    ...

    mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);

    ...

    mempool_size = MEMPOOL_HEADER_SIZE(mp, cache_size);
    mempool_size += private_data_size;
    mempool_size = RTE_ALIGN_CEIL(mempool_size, RTE_MEMPOOL_ALIGN);

    ret = snprintf(mz_name, sizeof(mz_name), RTE_MEMPOOL_MZ_FORMAT, name);
    if (ret < 0 || ret >= (int)sizeof(mz_name)) {
        rte_errno = ENAMETOOLONG;
        goto exit_unlock;
    }
    mz = rte_memzone_reserve(mz_name, mempool_size, socket_id, mz_flags);
    if (mz == NULL)
        goto exit_unlock;

    /* init the mempool structure */
    mp = mz->addr;

    memset(mp, 0, MEMPOOL_HEADER_SIZE(mp, cache_size));
    ret = snprintf(mp->name, sizeof(mp->name), "%s", name);
    if (ret < 0 || ret >= (int)sizeof(mp->name)) {
        rte_errno = ENAMETOOLONG;
        goto exit_unlock;
    }
    mp->mz = mz;
    mp->size = n;
    mp->flags = flags;
    mp->socket_id = socket_id;
    mp->elt_size = objsz.elt_size;
    mp->header_size = objsz.header_size;
    mp->trailer_size = objsz.trailer_size;
    /* Size of default caches, zero means disabled. */
    mp->cache_size = cache_size;
    mp->private_data_size = private_data_size;
    STAILQ_INIT(&mp->elt_list);
    STAILQ_INIT(&mp->mem_list);

    ...
    return mp;
}

mempool自体もTAILQで管理されています.mempool_listはそのTAILQのheadを指すデータ構造です. rte_mempool_create_empty()では,mempoool構造体自体を格納するためのメモリ領域をrte_memzone_reserve()で確保し,初期化します. この時点ではまだオブジェクトのメモリ領域は確保していません.

実際にオブジェクトのメモリを確保するのはrte_mempool_populate_default()になります.

librte_mempool/rte_mempool.c:

int
rte_mempool_populate_default(struct rte_mempool *mp)
{
    int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
    char mz_name[RTE_MEMZONE_NAMESIZE];
    const struct rte_memzone *mz;
    size_t size, total_elt_sz, align, pg_sz, pg_shift;
    phys_addr_t paddr;
    unsigned mz_id, n;
    int ret;

    /* mempool must not be populated */
    if (mp->nb_mem_chunks != 0){
        RTE_LOG(ERR, MBUF, "a\n");
        return -EEXIST;
    }

    if (rte_xen_dom0_supported()) {
        pg_sz = RTE_PGSIZE_2M;
        pg_shift = rte_bsf32(pg_sz);
        align = pg_sz;

    } else if (rte_eal_has_hugepages()) {
        pg_shift = 0; /* not needed, zone is physically contiguous */
        pg_sz = 0;
        align = RTE_CACHE_LINE_SIZE;
    } else {
        pg_sz = getpagesize();
        pg_shift = rte_bsf32(pg_sz);
        align = pg_sz;
    }

    total_elt_sz = mp->header_size + mp->elt_size + mp->trailer_size;
    for (mz_id = 0, n = mp->size; n > 0; mz_id++, n -= ret) {
        size = rte_mempool_xmem_size(n, total_elt_sz, pg_shift);

        ret = snprintf(mz_name, sizeof(mz_name),
            RTE_MEMPOOL_MZ_FORMAT "_%d", mp->name, mz_id);
        if (ret < 0 || ret >= (int)sizeof(mz_name)) {
            RTE_LOG(ERR, MBUF, "b\n");
            ret = -ENAMETOOLONG;
            goto fail;
        }

        mz = rte_memzone_reserve_aligned(mz_name, size,
            mp->socket_id, mz_flags, align);
        /* not enough memory, retry with the biggest zone we have */
        if (mz == NULL)
            mz = rte_memzone_reserve_aligned(mz_name, 0,
                mp->socket_id, mz_flags, align);
        if (mz == NULL) {
            RTE_LOG(ERR, MBUF, "c\n");
            ret = -rte_errno;
            goto fail;
        }

        if (mp->flags & MEMPOOL_F_NO_PHYS_CONTIG)
            paddr = RTE_BAD_PHYS_ADDR;
        else
            paddr = mz->phys_addr;

        if (rte_eal_has_hugepages() && !rte_xen_dom0_supported())
            ret = rte_mempool_populate_phys(mp, mz->addr,
                paddr, mz->len,
                rte_mempool_memchunk_mz_free,
                (void *)(uintptr_t)mz);
        else
            ret = rte_mempool_populate_virt(mp, mz->addr,
                mz->len, pg_sz,
                rte_mempool_memchunk_mz_free,
                (void *)(uintptr_t)mz);
        if (ret < 0) {
            RTE_LOG(ERR, MBUF, "d\n");
            rte_memzone_free(mz);
            goto fail;
        }
    }

    return mp->size;

 fail:
    rte_mempool_free_memchunks(mp);
    return ret;

重要なのはforループの内部の処理で,rte_memzone_reserve_aligned()を利用して確 保したいメモリ(オブジェクトサイズ ×個数分のメモリ)をmemzoneから確保しようとします. このときもし確保できなかった場合(それだけ連続するメモリがなかった場合)は, memzoneから最も大きな連続する領域を取得します. この後通常はrte_eal_has_hugepages() = 1, rte_xen_dom0_supported() = 0なのでrte_mempool_populate_phys()を呼び,確保したメモリ上でオブジェクトのブロックを作成します. rte_mempool_populate_phys()の戻り値が確保できたオブジェクトの数です. 目標のオブジェクト数だけ確保できるまでこの操作を繰り返します.

rte_mempool_popoulate_phys()の中では,確保したメモリ領域からオブジェクトを切り出し,それをmempool_add_elem()を使ってmempool.elt_listに入れ,また空きオブジェクトを管理するring bufferに入れています.また,確保したメモリ(memzone)をmempool.mem_listに登録します.

int
rte_mempool_populate_phys(struct rte_mempool *mp, char *vaddr,
    phys_addr_t paddr, size_t len, rte_mempool_memchunk_free_cb_t *free_cb,
    void *opaque)
{
    unsigned total_elt_sz;
    unsigned i = 0;
    size_t off;
    struct rte_mempool_memhdr *memhdr;
    int ret;

    /* create the internal ring if not already done */
    if ((mp->flags & MEMPOOL_F_POOL_CREATED) == 0) {
        ret = rte_mempool_ops_alloc(mp);
        if (ret != 0)
            return ret;
        mp->flags |= MEMPOOL_F_POOL_CREATED;
    }

    /* mempool is already populated */
    if (mp->populated_size >= mp->size)
        return -ENOSPC;

    total_elt_sz = mp->header_size + mp->elt_size + mp->trailer_size;

    memhdr = rte_zmalloc("MEMPOOL_MEMHDR", sizeof(*memhdr), 0);
    if (memhdr == NULL)
        return -ENOMEM;

    memhdr->mp = mp;
    memhdr->addr = vaddr;
    memhdr->phys_addr = paddr;
    memhdr->len = len;
    memhdr->free_cb = free_cb;
    memhdr->opaque = opaque;

    if (mp->flags & MEMPOOL_F_NO_CACHE_ALIGN)
        off = RTE_PTR_ALIGN_CEIL(vaddr, 8) - vaddr;
    else
        off = RTE_PTR_ALIGN_CEIL(vaddr, RTE_CACHE_LINE_SIZE) - vaddr;

    while (off + total_elt_sz <= len && mp->populated_size < mp->size) {
        off += mp->header_size;
        if (paddr == RTE_BAD_PHYS_ADDR)
            mempool_add_elem(mp, (char *)vaddr + off,
                RTE_BAD_PHYS_ADDR);
        else
            mempool_add_elem(mp, (char *)vaddr + off, paddr + off);
        off += mp->elt_size + mp->trailer_size;
        i++;
    }

    /* not enough room to store one object */
    if (i == 0)
        return -EINVAL;

    STAILQ_INSERT_TAIL(&mp->mem_list, memhdr, next);
    mp->nb_mem_chunks++;
    return i;
}
static void
mempool_add_elem(struct rte_mempool *mp, void *obj, phys_addr_t physaddr)
{
    struct rte_mempool_objhdr *hdr;
    struct rte_mempool_objtlr *tlr __rte_unused;

    /* set mempool ptr in header */
    hdr = RTE_PTR_SUB(obj, sizeof(*hdr));
    hdr->mp = mp;
    hdr->physaddr = physaddr;
    STAILQ_INSERT_TAIL(&mp->elt_list, hdr, next);
    mp->populated_size++;

#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
    hdr->cookie = RTE_MEMPOOL_HEADER_COOKIE2;
    tlr = __mempool_get_trailer(obj);
    tlr->cookie = RTE_MEMPOOL_TRAILER_COOKIE;
#endif

    /* enqueue in ring */
    rte_mempool_ops_enqueue_bulk(mp, &obj, 1);
}

rte_mempool_ops_enqueue_bulk()では,最終的には次で説明するenqueue()が呼ばれます.

mempoolの操作

rte_create_mempol_empty()のところで説明したとおり,初期化時のフラグによって mempoolからオブジェクトを取得/解放する際の関数を切り替えています. 関数の定義は drivers/mempool/ring/rte_mempool_ring.cに書いてあります.

static const struct rte_mempool_ops ops_mp_mc = {
    .name = "ring_mp_mc",
    .alloc = common_ring_alloc,
    .free = common_ring_free,
    .enqueue = common_ring_mp_enqueue,
    .dequeue = common_ring_mc_dequeue,
    .get_count = common_ring_get_count,
};

static const struct rte_mempool_ops ops_sp_sc = {
    .name = "ring_sp_sc",
    .alloc = common_ring_alloc,
    .free = common_ring_free,
    .enqueue = common_ring_sp_enqueue,
    .dequeue = common_ring_sc_dequeue,
    .get_count = common_ring_get_count,
};

static const struct rte_mempool_ops ops_mp_sc = {
    .name = "ring_mp_sc",
    .alloc = common_ring_alloc,
    .free = common_ring_free,
    .enqueue = common_ring_mp_enqueue,
    .dequeue = common_ring_sc_dequeue,
    .get_count = common_ring_get_count,
};


static const struct rte_mempool_ops ops_sp_mc = {
    .name = "ring_sp_mc",
    .alloc = common_ring_alloc,
    .free = common_ring_free,
    .enqueue = common_ring_sp_enqueue,
    .dequeue = common_ring_mc_dequeue,
    .get_count = common_ring_get_count,
};

MEMPOOL_REGISTER_OPS(ops_mp_mc);
MEMPOOL_REGISTER_OPS(ops_sp_sc);
MEMPOOL_REGISTER_OPS(ops_mp_sc);
MEMPOOL_REGISTER_OPS(ops_sp_mc);

singleかmultiかによって,rte_ringを操作する関数を切り替えています.

基本的にはrte_mempool_get()でmempoolから空きオブジェクトを取得し,rte_mempool_put()でオブジェクトを再びring bufferに追加します.

DPDKでパケットを格納するために利用するmbufは内部でmempolを利用しています. mbufに関してはまた後で書こうと思います.

*1:たぶん