[rust]ZST/DSTによるflexible array memberの実現

  • flexible array memberとは
  • ZSTを使う方法
  • DSTを使う方法

flexible array memberとは

flexible array memberとはずばり,C言語で以下のようなサイズ定義を持たない構造体メンバのことです(C99から標準).

struct X{
    size_t len;
    char value[];
}

この構造体は以下のようにして使うことができます.

size_t len = 10;
struct X *p = (struct X*)malloc(sizeof(struct X) + len);
p->len = len;
p->value[0] = 'a';
p->value[1] = 'b';
...

このように,flexible array memberを使用することで(ヘッダ+可変サイズなデータ)を持つ構造を簡潔に表現できます.普通はvalueは適当なポインタで持てばいいと思いますが,メモリ的にヘッダとデータが連続して欲しい場合,あるいは連続したバッファが与えられてそこにデータ構造を作成する場合などに有用です. ちなみにsizeof(struct X)の値はflexible array memberを除いたサイズ(上の例だと実質的にsizeof(size_t))になります.

ZSTを使う方法

rustにおいて,flexible array memberそのものとずばり対応するものはありません. 似たようなことをしたい場合は,Zero Sized Types (ZSTs) あるいは Dynamically Sized Types (DSTs)を使います.

最初に言っておくと,ZSTを使う場合はなんちゃってflexible array memberになります.

ZSTはその名の通りサイズが0の型で,例えば()がそれに相当します. これはもともと例えばSetを実現するためにSet<Key> = Map<Key, ()>として,Mapの実装を利用しつつ無駄な領域が割り当てられないようにするといった機能のために用意されたもののようです.

ZSTを利用する場合,最初のC言語の例と同じような構造は以下のようにして定義できます.

#[derive(Debug)]
#[repr(C)]
struct X {
    len: usize,
    val: [u8; 0],
}

ここではvalZSTになります.valのサイズは0なので,std::mem::size_of::<X>()は実質的にstd::mem::size_of::usize()と同じ値を返します.

以下のようにしてこのstructを利用することができます.

#![feature(allocator_api)]
use std::heap::{Alloc, Heap, Layout};

unsafe fn alloc(len: usize) -> *mut u8 {
    let layout = Layout::from_size_align(len, std::mem::align_of::<usize>()).unwrap();
    Heap.alloc(layout.clone()).unwrap() as *mut u8
}

unsafe fn free(p: *mut u8, len: usize) {
    let layout = Layout::from_size_align(len, std::mem::align_of::<usize>()).unwrap();
    Heap.dealloc(p, layout);
}

fn zst() {
    unsafe {
        // メモリの確保
        let value_len = 3;
        let header_len = std::mem::size_of::<X>();
        let total_len = header_len + value_len;
        let p = alloc(total_len);

        // 確保したバッファ先頭をstruct Xにキャスト
        let header: &mut X = std::mem::transmute(p);
        header.len = value_len;

        // valにアクセスするためのスライスの作成
        let val: &mut [u8] =
            std::slice::from_raw_parts_mut(p.offset(header_len as isize), header.len);
        val[0] = 1;
        val[1] = 2;
        val[2] = 3;

        // get_unchecked_mutを利用したアクセス
        *header.val.get_unchecked_mut(0) = 100;
        // これは index out of bounds error
        //header.val[0] = 100;

        println!("header_len = {}", header_len);
        println!("value_len  = {}", value_len);
        println!("header     = {:?}", header);
        println!("val        = {:?}", val);
        println!(
            "p          =  {:?}",
            std::slice::from_raw_parts(p, total_len)
        );

        free(p, total_len);
    }
}

実行結果は以下のようになります.

header_len: 8
value_len: 3
header: X { len: 3, val: [] }
val: [100, 2, 3]
p: [3, 0, 0, 0, 0, 0, 0, 0, 100, 2, 3]

やっていることはC言語の場合とほぼ同じで,まず適当な方法(ここではstd::heap::Alloc)でヘッダ(元々のstructの大きさ)+可変長サイズのデータの長さ分のメモリを確保します.メモリ確保方法の詳細は前回のエントリを参照して下さい.

        let value_len = 3;
        let header_len = std::mem::size_of::<X>();
        let total_len = header_len + value_len;
        let p = alloc(total_len);

その後確保したメモリをtransmuteXへのreferenceにしてヘッダにアクセスします.

        let header: &mut X = std::mem::transmute(p);
        header.len = value_len;

valに関してはfrom_raw_parts_mutでsliceを作成してアクセスします.

        let val: &mut [u8] =
            std::slice::from_raw_parts_mut(p.offset(header_len as isize), header.len);
        val[0] = 1;
        val[1] = 2;
        val[2] = 3;

valget_unchecked_mutを使えばヘッダからのアクセスも一応可能です.普通にアクセスしようとした場合は境界値エラーになります.

        // get_unchecked_mutを利用したアクセス
        *header.val.get_unchecked_mut(0) = 100;
        // これは index out of bounds error
        //header.val[0] = 100;

実のところこの方法は(valへのアクセスに基本的にはスライスを作ることになるので)flexible array memberと呼べるのか微妙なものですが,使うのは楽だと思います.

ちなみに,この方法ではスタック上に変数を確保することはできません.というか,

let x = X{len : 10, val:[]};

とやれば確保できますが,valは必ずemptyでなければならないので実質的に使えません.

DSTを使う方法

DSTを使うことで,本来のflexible array memberに近いものが実現できます. ここでの方法はHacker Newsのコメントから知りました.

DSTはコンパイル時にはサイズが定まらず,実行時にサイズが求まる型のことで,代表的なDSTの一つはスライス[T]です.structは一番最後の要素にDSTを持つことができます.この場合,そのstruct自体がDSTになります.

例えば,以下のようにしてDSTなstructを定義できます.

#[repr(C)]
struct Y{
    len: usize,
    value: [u8],
}

で,このようなstructが定義できるのは良いですが,実のところをこれを直接構成する方法はないです. これは扱いにくいので,代わりに以下のような?Sizedなgeneric data typeを利用します.

#[repr(C)]
struct Y<T: ?Sized> {
    len: usize,
    val: T,
}

コンパイル時に大きさが分かっている場合

そもそもflexible array memberを使いたい目的が動的にデータを割り当てたいという目的からなので,コンパイル時に大きさが分かっている状況というのは本末転倒な状況ですが,この場合は以下のようにして上記のstructをスタック上に構成することができます.

fn as_raw_bytes<T: ?Sized>(x: &T) -> &[u8] {
    unsafe { std::slice::from_raw_parts(x as *const T as *const u8, std::mem::size_of_val(x)) }
}

fn dst1() {
    let len = 10;
    let val: [u8; 10] = [1; 10];
    let y = Y { len, val };

    println!("y      = {:?}", y);
    println!("y      = {:?}", as_raw_bytes(&y));
    println!("&y     = {:p}", &y);
    println!("&y.len = {:p}", &y.len);
    println!("&y.val = {:p}", &y.val);
    println!("size_of_val(&y)     = {}", std::mem::size_of_val(&y));
    println!("size_of_val(&y.len) = {}", std::mem::size_of_val(&y.len));
    println!("size_of_val(&y.val) = {}", std::mem::size_of_val(&y.val));
    println!(
        "size_of(Y<[u8;10]>) = {}",
        std::mem::size_of::<Y<[u8; 10]>>()
    );
    // NG
    //println!("size_of(y) = {}", std::mem::size_of::<Y>());
    //println!("size_of(y) = {}", std::mem::size_of::<Y<[u8]>>());
}

実行結果

y      = Y { len: 10, val: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] }
y      = [10, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 134, 2, 1, 0, 0, 0]
&y     = 0x7fff5d398b68
&y.len = 0x7fff5d398b68
&y.val = 0x7fff5d398b70
size_of_val(&y)     = 24
size_of_val(&y.len) = 8
size_of_val(&y.val) = 10
size_of(Y<[u8;10]>) = 24

ここで,y.lenのサイズは8, y.valはサイズは10ですが,メモリは8バイト単位で確保されるようでy自体のサイズは24になっています.

この場合,DST coercionsを利用して,以下のような関数を呼び出すことが可能になります.

fn f<T: std::fmt::Debug>(y: &Y<[T]>) {
    println!("f: y = {:?}", y);
    // f: y = Y { len: 10, val: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] }
}

fn dst1(){
    let len = 10;
    let val: [u8; 10] = [1; 10];
    let y = Y { len, val };
    f(&y);
}

ちなみに,このコード例において,最初のDSTな構造体

#[repr(C)]
struct Y{
    len: usize,
    value: [u8],
}

を使った場合,以下のようなエラーが発生します.

error[E0308]: mismatched types
   --> src/main.rs:149:22
    |
149 |     let y = Y { len, val };
    |                      ^^^ expected slice, found array of 10 elements
    |
    = note: expected type `[u8]`
               found type `[u8; 10]`

ヒープに割り当てたい場合は,例えば以下のようにVecが利用できます.

    let len = 10;
    let mut vec = Vec::<Y<[u8; 10]>>::with_capacity(1);
    unsafe {
        vec.set_len(1);
    }
    let y = &mut vec[0];
    y.len = len;
    for i in 0..len {
        y.val[i] = i as u8;
    }

動的にサイズを割り当てる

さて,それではようやく本題で,実行時にDSTなstructを構成する方法について説明します.

以下で説明する方法は,servoの一部のコードを参考にその処理を簡略化したものです.元のコードに丁寧にコメントが書いてあるので,興味のある方は一読を勧めます.

fn dst2() {
    let len = 10;

    // サイズの計算
    let fake_slice_ptr: *const u8 = std::mem::align_of::<Y<[u8; 1]>>() as *const u8;
    let fake_slice: &[u8] = unsafe { std::slice::from_raw_parts(fake_slice_ptr, len) };
    let fake_ptr: *const Y<[u8]> = fake_slice as *const [u8] as *const Y<[u8]>;
    let fake_ref: &Y<[u8]> = unsafe { &*fake_ptr };
    let size = std::mem::size_of_val(fake_ref);

    println!("size_of(fake_ref)   = {}", size);
    println!(
        "size_of(fake_slice) = {}",
        std::mem::size_of_val(fake_slice)
    );
    println!("fake_slice.len      = {}", fake_slice.len());
    println!("fake_ref.val.len    = {}", fake_ref.val.len());

    unsafe {
        let p = alloc(size);

        let fake_slice: &mut [u8] = std::slice::from_raw_parts_mut(p as *mut u8, len);
        let ptr = fake_slice as *mut [u8] as *mut Y<[u8]>;
        let y: &mut Y<[u8]> = &mut *ptr;

        y.len = len;
        for i in 0..len {
            y.val[i] = i as u8;
        }

        println!("y.len     = {}", y.len);
        println!("y.val.len = {}", y.val.len());
        println!("size_of_val(y)    = {}", std::mem::size_of_val(y));
        println!("size_of_val(&ptr) = {}", std::mem::size_of_val(&ptr));
        println!("ptr = {:?}", as_raw_bytes(&ptr));

        free(p, size);
    }
}

実行結果

size_of(fake_ref)   = 24
size_of(fake_slice) = 10
fake_slice.len      = 10
fake_ref.val.len    = 10
y.len     = 10
y.val.len = 10
size_of_val(y)    = 24
size_of_val(&ptr) = 16
ptr = [32, 0, 66, 15, 1, 0, 0, 0, 10, 0, 0, 0, 0, 0, 0, 0]

まず,このコードの前半部分では確保するメモリの大きさを計算します.

    len = 10;
    let fake_slice_ptr: *const u8 = std::mem::align_of::<Y<[u8; 1]>>() as *const u8;
    let fake_slice: &[u8] = unsafe { std::slice::from_raw_parts(fake_slice_ptr, len) };
    let fake_ptr: *const Y<[u8]> = fake_slice as *const [u8] as *const Y<[u8]>;
    let fake_ref: &Y<[u8]> = unsafe { &*fake_ptr };
    let size = std::mem::size_of_val(fake_ref);

ここで何をやっているかというと,

  • 長さがlen[u8]のfat pinterを作成
  • それを長さがlenY<u8>のポインタにし,さらにそれをreferenceにする
  • size_of_valを使ってサイズを求める.

どうしてこれでサイズが求まるのかというと,DSTな要素は必ずfat pointer経由でアクセスしますが,このときfat pinterは(アドレス,長さ)を持ちます.このとき,この長さというのはDSTなstructの場合は,structの末尾の要素の長さを意味することになります.上のコードはまず最初にfrom_raw_part()を使って長さがlen[u8]のfat pointerを作成した後,それを*const Y<[u8]>のfat pointerにキャストし,そしてさらにそれをreferenceに変換してサイズを計算している訳です. 上の実行結果のptrを見ると,後半8byteで長さ(10)を保持していることが分かります.

    unsafe {
        let p = alloc(size);

        let fake_slice: &mut [u8] = std::slice::from_raw_parts_mut(p as *mut u8, len);
        let ptr = fake_slice as *mut [u8] as *mut Y<[u8]>;
        let y: &mut Y<[u8]> = &mut *ptr;

        y.len = len;
        for i in 0..len {
            y.val[i] = i as u8;
        }
    }

確保すべきメモリサイズが分かったら,メモリを確保して,サイズ計算時と同様にキャストしてY<[u8]>のreferenceを取得し,それを利用してデータ構造を操作します.

確保すべきサイズ計算のところは

let len = 10;
let size = std::mem::size_of::<Y<()>>() + len;

でもいいんじゃないかと思いましたが,この場合はsizeは18になります.前述の通りrustはメモリを(この環境では)8byte単位で確保しているようで,今回の場合はsize_of_val()で得られる値と齟齬が生じることになります.といっても確保したメモリ外にはアクセスしないので大丈夫じゃないかなと思いますが..

実際に利用する場合には,以下のようにしてgeneric data typeを利用すると汎用性が上がります.

fn dst3<T>(len: usize) -> (*mut Y<[T]>, usize) {
    let fake_slice_ptr: *const T = std::mem::align_of::<Y<[T; 1]>>() as *const T;
    let fake_slice: &[T] = unsafe { std::slice::from_raw_parts(fake_slice_ptr, len) };
    let fake_ptr: *const Y<[T]> = fake_slice as *const [T] as *const Y<[T]>;
    let fake_ref: &Y<[T]> = unsafe { &*fake_ptr };
    let size = std::mem::size_of_val(fake_ref);

    unsafe {
        let p = alloc(size);

        let fake_slice: &mut [T] = std::slice::from_raw_parts_mut(p as *mut T, len);
        let ptr = fake_slice as *mut [T] as *mut Y<[T]>;
        (ptr, size)
    }
}

fn f<T: std::fmt::Debug>(y: &Y<[T]>) {
    println!("f: y = {:?}", y);
}

fn main(){
    let len = 10;
    let (ptr, size) = dst3::<u8>(len);
    unsafe {
        let y: &mut Y<[u8]> = &mut *ptr;
        y.len = len;
        for i in 0..len {
            y.val[i] = i as u8;
        }
        f(&y);
        // f: y = Y { len: 10, val: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] }
        free(ptr as *mut u8, size);
    }
}

未初期化領域への代入の注意点

ここまで例ではprimitive typeしか扱ってないので未初期化の領域にそのまま代入していますが,Drop型を実装している型の場合はservoの例のようにstd::ptr::writeを使って書き込まないと未初期化のデータに対してDropが実行されてしまいます.詳細はnomiconに書いてあります.

まとめ

Cにおけるflexible array memberをrustで実現するための方法として,ZSTとDSTを利用する方法について書きました.実のところ両方ともあまりunsafeなコード量は変わらず,本来のflexible array memberに相当するのはDSTを使う方法ですが,状況によってはZSTの方が使いやすいかなと思いました.

rustで動的にバッファを確保する方法

  1. Boxを使う
  2. Vecを使う
  3. std::heap::Allocを使う
  4. placement-in を使う
  5. mallocを使う

1. Boxを使う

rustで動的にメモリを確保する方法といってまず思いつくのはBoxを使う方法だと思います.例えば,以下のようにすれば長さ1000のu8のバッファを確保できます.

let buffer : Box<[u8]> = Box::new([0;1000]);

ただし,この方法は以下のような特徴があります.

  1. 確保した領域は必ず初期化する必要がある.
  2. 一旦スタック上にデータを確保したあとに,ヒープにそのデータをコピーする

1.に関しては,これは変数は初期化しないと利用できないというrustの原則に則ったものですが,場合によってはこの初期化コストが大きい場合があります.また,2. の方が問題で,あまりにも大きい領域を確保しようとするとスタックオーバフローが発生する可能性があります.コピー分のオーバヘッドも生じます.

2. Vecを使う

Vecを使う場合はVec::new()で可変長配列を作成した後Vec::pushでデータを追加していくのが最も基本的な方法ですが,当然効率はあまり良くありません.

Vec::with_capacityを使うと,指定したサイズ分だけのメモリ領域を確保してくれます.さらに,Vec::set_lenを使うことで,初期化をスキップして確保した領域にアクセスすることができます.ただし,set_lenはunsafeです.

unsafe{
  let len = 1000;
  let mut vec = Vec::<u8>::with_capacity(len);
  vec.set_len(len);
}

また,確保した要素にアクセスする際rustはデフォルトで境界のチェックをしますが, そのオーバヘッドを減らしたい場合はget_uncheckedあるいはget_unchecked_mutが使えます((get_uncheckedget_uchecked_mutはsliceの関数で,boxやvecからderef coercionを通じて呼び出せます)).

ffiで利用するなどでraw pointerが欲しい場合は以下のような手法が使えます.

unsafe fn alloc(len: usize) -> *mut u8 {
    let mut vec = Vec::<u8>::with_capacity(len);
    vec.set_len(len);
    Box::into_raw(vec.into_boxed_slice()) as *mut u8
}

unsafe fn free(raw: *mut u8, len : usize) {
    let s = std::slice::from_raw_parts_mut(raw, len);
    let _ = Box::from_raw(s);
}

Vec::with_capacityで指定した長さ分の領域を確保したあと,それをvec::into_boxed_sliceでBoxに変換し,さらにそれをBox::into_rawすることで実際のポインタを得ます.

確保した領域を解放する場合はポインタを再びslice::from_raw_parts_mutでスライスに直し,それをさらにBox::from_rawでBoxに直してBoxのDropが呼ばれるようにします.

上記のコードはservoの一部のコードを参考にしたものです. 実際のservoのコードではアラインメントをワード境界に揃えるため,以下のようなことをしています (簡単化のため実際のコードとちょっと異なります.ここでは実際に確保したバイト数も戻り値として返しています).

unsafe fn alloc_buffer(len : usize) -> (*mut u8, usize){
    let word_size = std::mem::size_of::<usize>();
    let num = (len + word_size-1) / word_size;
    let mut vec = Vec::<usize>::with_capacity(num);
    vec.set_len(num);
    (Box::into_raw(vec.into_boxed_slice()) as *mut u8, num*word_size)
}

3. std::heap::Allocを使う

rustのメモリアロケータはAllocator traitを実装することになっており,またstd::heap::Heapでデフォルトのアロケータにアクセスできます.コードはliballocにあります. これを利用すると,以下のようにメモリを確保することができます.

#![feature(allocator_api)]
use std::heap::{Alloc, Heap, Layout};

// 第一引数: サイズ, 第二引数: アラインメントサイズ
let layout = Layout::from_size_align(10, std::mem::align_of::<u32>()).unwrap();
unsafe {
    let p = Heap.alloc(layout.clone()).unwrap() as *mut u8;
    // ...
    Heap.dealloc(p, layout);
}

この関数を利用すれば,アラインメントも指定可能です.raw pionterが欲しい場合,この手法が一番良さそうに見えますが,残念ながらこの機能はunstableなため,nightlyな環境でしかまだ使えません.また,今後インタフェースが変わる可能性も十分あると思います.(詳細: https://github.com/rust-lang/rust/issues/32838

4. placement-in を使う

こちらも現時点ではunstableな機能ですが,確保したメモリ上に直接データを配置するplacement-in構文があります(詳細: https://github.com/rust-lang/rust/issues/27779). これを利用すると,boxでも大きなメモリ領域を確保することが可能です.

#![feature(placement_in_syntax, box_heap)]


// OK
let x: Box<[u8]> = std::boxed::HEAP <- [0;10000000];

// これは自分の環境ではoverflow
// let b: Box<[u8]> = Box::new([0; 10000000]);

5. mallocを使う

場合によってはlibc crateを使ってmalloc & freeを直接呼んでしまうのも手かもしれません.こちらも当然unsafeです.

// https://github.com/rust-lang/libc
unsafe {
    let p : *mut c_void = libc::malloc(10);
    // ...
    libc::free(p)
}

rustをnostdで使う

nostdとは

rustは他の多くの言語と同様,標準ライブラリ(std)が同包されており,通常はそれを使ってプログラミングをおこないます.しかしながら,stdの多くの機能はOSの機能を利用しているため,OSそのものを開発したいとか,あるいはOSが存在しない環境でプログラミングをしたい場合などではstdは利用できません.

stdを利用しない場合は明示的にその旨を宣言する必要がり,そのために利用するのが#![no_std] attributeです.

nostdの概要は(昔の)bookのno stdlibの章に書いてあります. なお,nostdに関わることはunstableな機能が多いので,ここに書いてあることが今後変更される可能性は十分あります

core library

stdが使えなくなるとかなりの機能が制限されてしまいますが,rustはcoreと呼ばれるrust onlyで書かれたライブラリを提供しており,nostd環境でもcoreを利用することができます*1.ただし,core libraryを使用するにあったって,最低限以下の関数は自前で定義する必要があります.

  • memcpy, memcmp, memset
  • panic_fmt
  • eh_personality

memcpy, memcmp, memset

これらの関数はrlibc crateにrustでの実装があるので,必要ならばそれを利用できます.

panic_fmt, eh_personality

プログラムがパニックしたとき等に利用される関数です.とりあえず,何もしないなら

#![feature(lang_items)]

#[lang = "eh_personality"] extern fn eh_personality() {}
#[lang = "panic_fmt"] extern fn panic_fmt() -> ! { loop {} }

みたいな関数を作ればokです.

coreとstdの関係

coreのドキュメントを見ていると,stdで提供されている機能のサブセットが提供されていることに気づくと思います.それもそのはずで,これはstdの中でcoreのライブラリがreexportされているからです.coreのドキュメントを見ているとよくstdの方を参照せよと書いてあるのはこういう理由からです. stdはOSがある用,coreはベアメタル用のライブラリですが,stdでもcoreの機能は使いたいのでこうしてるようです.

target

#![no_std]を指定すればnostdなプログラムが開発できるかというと,実際にはそう単純ではありません.多くの場合,プログラムをビルドするためのtargetを正しく設定する必要があります.

rustはさまざまなプラットフォームでの動作を保証しており,それを設定するのがtarget*2です.rustがサポートしているターゲットは ructc --print target-list としてみるか,あるいは https://forge.rust-lang.org/platform-support.html から確認できます.ターゲット名は基本的に{arch}-{vendor}-{sys}[-{abi}]の形をしていて,例えばx86_64-unknown-linux-gnux86_64-apple-darwinのようなターゲットがデフォルトで存在します.

各targetごとに,struct Target及びstruct TargetOptionsで指定されるtargetごとの設定を記述します. rustがデフォルトでサポートしているtargetの設定はsrc/librustc_back/target/の中に書いてあります.例えば,linuxの場合はx86_64_unknown_linux_gnu.rsです.

このtargetはjsonで記述できるようになっており,例えば以下のように記述します.

{
  "llvm-target": "x86_64-unknown-none",
  "data-layout": "e-m:e-i64:64-f80:128-n8:16:32:64-S128",
  "linker-flavor": "gcc",
  "target-endian": "little",
  "target-pointer-width": "64",
  "arch": "x86_64",
  "os": "none",
  "disable-redzone": true,
  "features": "-mmx,-sse,+soft-float",
}

ベースはx86_64_unknown_linux_gnuですが,ここではosが存在しないので"none"を指定し,その他redzonemmx, sseを禁止しています.またfloatのsoftwareサポートを有効にしています.なお,data layoutはLLVMのものです.

こうして作成したtargetのjsonファイルは,例えばx86_64_xxx.jsonという名前で保存したならビルドする際に(Cargo.tomlと同じディレクトリに置いて) cargo build --target=x86_64_xxxのようにすることで指定できます.

sysrootとxargo

さて,targetを無事に指定すればあとはokかというと,残念ながらそうではありません.rustでは実際にプログラムをコンパイルする際に,sysrootと呼ばれるパスからライブラリを検索します.デフォルトのsysrootはrustc --print sysrootで確認できます.このsysrootにcoreのライブラリなどが置かれています.cargo build --target=x86_64_xxx のように指定した場合は x86_64_xxx用のsysrootを探す訳ですが,x86_64_xxxは自分で用意したターゲットなので,sysrootも自分で用意する必要があります.つまり,自分でそのtarget用のcoreをビルドする必要がある訳です.

幸いなことに,自分で用意したtarget用に自動でsysrootを用意してくれる,xargoと呼ばれるプログラムが存在します.cargo install xargoとすればインストールできます.xargoはcargoのラッパーになっており, xargo build --target=x86_64_xxx としたときにtarget x86_64_xxxのsysrootがなければまずそれ用にcoreなどをビルドし,その後プログラムのビルドをおこないます.nostd環境でプログラムを作る際は普通xargoを使うことになると思います.

nostdで使えるcrate

crates.ioのカテゴリ検索で,no-stdを検索すると,stdを利用していないcrateを見つけることができます.といってもCargo.tomlの中でcategoryをちゃんと設定しているcrateがどれだけあるのか分かりませんが..

*1:というか,nostd環境だとstdの代わりにcoreが自動でリンクされるようになる

*2:ちょっと古いのでソースを直接見た方がいいかもしれない

rustのGUIライブラリconrodの使い方

所用でrustのGUIライブラリについて調べる機会がありました.特にこれといったGUIライブラリはまだないような気がします.

普段自分がネイティブなGUIアプリケーションを作成する場合はQtを使います.rustからQtを呼び出すのはいくつか試みがあって,disassemblerのpanopticonGUIとしてQtを使ってますし,rustからQtを呼び出すためのcpp_to_rustというプロジェクトもあります.ただどうもまだいろいろと開発中のようです.

今回はconrodというGUIライブラリを使ってみたので,簡単にそれの使い方について書こうと思います.

conrod

conrodは純rust製のGUIライブラリです.conrodが担当するのはウィジェッ ト(ボタンやテキストボックスなど)の構成の管理やウィジェットに対するイベントの伝播などです.pistonというゲームエンジンを開発しているところが作っているようです.実際の描画やOSからのイベントの受け取りは何か別のライブラリが担当することになります.conrod自体に描画用としてglium (OpenGL), イベント管理用としてwinitのバックエンドが用意されているので,普通はそれを使うことになると思います*1.自分はmacでしか試していませんが,マルチプラットフォームで動作するはずです.ちなみに,まだバージョン1.0にもなってないので仕様が大きく変わる可能性は十分あると思います.

conrodのチュートリアルはまだ未完成のようですが,rustdocは比較的丁寧に書いてあると思います.また,実際にプログラムを作成する場合は,リポジトリにある

あたりを参考にするのが良いかと思います.

exampleはリポジトリをクローンして

$ cargo run --release --features "winit glium" --example hello_world

とかやれば動きます.

簡単な例

ものすごく簡単な例として,以下のようにフィボナッチ数を計算するアプリケーションを作成しようと思います.

f:id:mm_i:20170709234547p:plain

ソース全体はここにあります: https://github.com/mmisono/conrod-examples/tree/master/fibonacci

これから説明するコードの大部分(初期化部分とイベントループ部分)はexamplesのものとほぼ同じです.

Cargo.toml

前述した通り,描画にglium, イベント取得にwinitを使うので,Cargo.tomlには以下のように書いてあげます.また,フォントを登録する際にフォントが格納されているフォルダを探す必要があるので,find_folderもdependenciesに追加しています.

[dependencies.conrod]
version = "0.53.0"
features = ["glium", "winit"]

[dependencies]
find_folder = "*"

初期化

conrodを使うための初期化のコードは以下のようになります.WindowやUiといったコンポーネントを作成したり,必要なフォントや画像を読み込んだりします.

    const TITLE: &'static str = "Fibonacci";
    let width = 300;
    let height = 100;

    // Build the window.
    let display = glium::glutin::WindowBuilder::new()
        .with_vsync()
        .with_dimensions(width, height)
        .with_title(TITLE)
        .with_multisampling(4)
        .build_glium()
        .unwrap();

    // construct our `Ui`.
    let mut ui = conrod::UiBuilder::new([width as f64, height as f64]).build();

    // Add a `Font` to the `Ui`'s `font::Map` from file.
    let assets = find_folder::Search::KidsThenParents(3, 5)
        .for_folder("assets")
        .unwrap();
    let font_path = assets.join("fonts/NotoSans/NotoSans-Regular.ttf");
    ui.fonts.insert_from_file(font_path).unwrap();

    // Generate the widget identifiers.
    let ids = &mut Ids::new(ui.widget_id_generator());

    // A type used for converting `conrod::render::Primitives` into `Command`s that can be used
    // for drawing to the glium `Surface`.
    let mut renderer = conrod::backend::glium::Renderer::new(&display).unwrap();

    // The image map describing each of our widget->image mappings (in our case, none).
    let image_map = conrod::image::Map::<glium::texture::Texture2d>::new();

    let mut text = "0".to_string();
    let mut answer = "0".to_string();

ここで,widget identifierを生成していますが,これはウィジェット管理に必要なIDで,ウィジェットごとにIDが必要です.Idsは以下のようにマクロで生成します.

widget_ids!(
    struct Ids {
        canvas,
        title,
        text_box,
        button,
        result,
    });

イベントループ

conrodではイベントループベースで処理をおこないます.ループ処理は以下のようになります.

    let mut event_loop = EventLoop::new();
    'main: loop {
        // Handle all events.
        for event in event_loop.next(&display) {
            // Use the `winit` backend feature to convert the winit event to a conrod one.
            if let Some(event) = conrod::backend::winit::convert(event.clone(), &display) {
                ui.handle_event(event);
                event_loop.needs_update();
            }

            match event {
                // Break from the loop upon `Escape`.
                glium::glutin::Event::KeyboardInput(
                    _,
                    _,
                    Some(glium::glutin::VirtualKeyCode::Escape),
                ) |
                glium::glutin::Event::Closed => break 'main,
                _ => {}
            }
        }

        set_widgets(ui.set_widgets(), ids, &mut text, &mut answer);

        // Render the `Ui` and then display it on the screen.
        if let Some(primitives) = ui.draw_if_changed() {
            renderer.fill(&display, primitives, &image_map);
            let mut target = display.draw();
            target.clear_color(0.0, 0.0, 0.0, 1.0);
            renderer.draw(&display, &mut target, &image_map).unwrap();
            target.finish().unwrap();
        }
    }

match eventの部分でイベントを補足し,必要な処理をします.通常ここで処理するイベントは終了処理といった大域的なイベントのみで,それ以外の処理(ウィジェットに対するマウスイベントとか)は後でおこないます.その後,set_widgets()という関数でGUIを設定し,必要なら描画をおこないます.

イベントループの本体は以下のようになっています.基本的にはウィンドウが受け取ったイベントを取得してそれを返すだけですが,60fpsを超えないように適当にスリープします.

struct EventLoop {
    ui_needs_update: bool,
    last_update: std::time::Instant,
}

impl EventLoop {
    pub fn new() -> Self {
        EventLoop {
            last_update: std::time::Instant::now(),
            ui_needs_update: true,
        }
    }

    /// Produce an iterator yielding all available events.
    pub fn next(&mut self, display: &glium::Display) -> Vec<glium::glutin::Event> {
        // We don't want to loop any faster than 60 FPS, so wait until it has been at least 16ms
        // since the last yield.
        let last_update = self.last_update;
        let sixteen_ms = std::time::Duration::from_millis(16);
        let duration_since_last_update = std::time::Instant::now().duration_since(last_update);
        if duration_since_last_update < sixteen_ms {
            std::thread::sleep(sixteen_ms - duration_since_last_update);
        }

        // Collect all pending events.
        let mut events = Vec::new();
        events.extend(display.poll_events());

        // If there are no events and the `Ui` does not need updating, wait for the next event.
        if events.is_empty() && !self.ui_needs_update {
            events.extend(display.wait_events().next());
        }

        self.ui_needs_update = false;
        self.last_update = std::time::Instant::now();

        events
    }

    /// Notifies the event loop that the `Ui` requires another update whether or not there are any
    /// pending events.
    ///
    /// This is primarily used on the occasion that some part of the `Ui` is still animating and
    /// requires further updates to do so.
    pub fn needs_update(&mut self) {
        self.ui_needs_update = true;
    }
}

GUI設定の本体

set_widgets()関数が実際にGUIの設定をする箇所です.conrodでアプリケーションを作成する場合は基本的にこのset_widgets()の部分を適当にカスタマイズすることになると思います.

conrodではボタンやテキストボックスなどのウィジェットを内部で木構造として管理しています.wiget::Text::New("aaa").set(ids.title, ui) のようにすると,指定したIDでそのウィジェットが登録されます.set()をしたとき,指定したidのウィジェットに対してもし何かイベントが発生していたらそれが返ってきます.若干分かりにくいと思うのでコードを見てもらった方が早いと思います.

fn set_widgets(ref mut ui: conrod::UiCell, ids: &mut Ids, text: &mut String, answer: &mut String) {
    widget::Canvas::new()
        .pad(0.0)
        .color(conrod::color::rgb(0.2, 0.35, 0.45))
        .set(ids.canvas, ui);

    let canvas_wh = ui.wh_of(ids.canvas).unwrap();

    // title
    widget::Text::new("Fibonacci Calculuator")
        .mid_top_with_margin_on(ids.canvas, 5.0)
        .font_size(20)
        .color(color::WHITE)
        .set(ids.title, ui);

    // textbox
    for event in widget::TextBox::new(text)
        .font_size(15)
        .w_h((canvas_wh[0] - 90.) / 2., 30.0)
        .mid_left_with_margin_on(ids.canvas, 30.0)
        .border(2.0)
        .border_color(color::BLUE)
        .color(color::WHITE)
        .set(ids.text_box, ui)
    {
        match event {
            widget::text_box::Event::Enter => println!("TextBox {:?}", text),
            widget::text_box::Event::Update(string) => *text = string,
        }
    }

    // button
    if widget::Button::new()
        .w_h((canvas_wh[0] - 90.) / 2., 30.0)
        .right_from(ids.text_box, 30.0)
        .rgb(0.4, 0.75, 0.6)
        .border(2.0)
        .label("calc!")
        .set(ids.button, ui)
        .was_clicked()
    {
        if let Ok(num) = text.parse::<u64>() {
            *answer = fib(num).to_string();
        } else {
            println!("invalid number");
        }
    }

    // result
    widget::Text::new(answer)
        .mid_bottom_with_margin_on(ids.canvas, 10.0)
        .font_size(20)
        .color(color::WHITE)
        .set(ids.result, ui);
}

例えば,TextBoxにユーザが何か入力すると,それがset()したときに返ってくるので,入力した内容を変数textに保存しています.また,Buttonの場合はwas_clicked()を呼ぶとボタンが押されたかどうか分かるので,ボタンが押された場合は計算を実行します.

ウィジェットに位置の指定ですが,各ウィジェットPositonableというトレイトを実装していて,これを利用することになります.

例えばタイトルテキストは .mid_top_with_margin_on(ids.canvas, 5.0)としていますが,こうするとids.canvasで指定したウィジェットの中央上に指定したマージン幅を開けてウィジェットを配置することになります.また,.down()を使えば直前にset()したウィジェットの下に指定したマージンだけ空けてウィジェットが配置されます.他にもいろいろ配置用の関数が用意されています.個人的には一番上に適当なウィジェットを配置したあと,.down().align_middle_x()を使ってどんどん下にウィジェットを配置していくのが楽なんじゃないかなと思います.

三目並べ

conrodの特徴はidに紐づけてウィジェットを登録し,そのidを利用してウィジェットが配置できるというところだと思います.こうすることであまり深くレイアウトを考えなくても簡単にウィジェットが配置できます.一方で,各ウィジェットに対して一意なIDが必要なので,ウィジェットが多い場合はその管理が面倒になることもあります.

例として,以下のような三目並べのアプリケーションを考えます.

f:id:mm_i:20170709234602p:plain

(ソース: https://github.com/mmisono/conrod-examples/tree/master/tic_tac_toe)

conrodでは直線や円も一つのウィジェットです.したがって,それぞれに対してIDを振って管理する必要があります.IDは以下のようにして配列で持つことができます.

widget_ids!(
    struct Ids {
        canvas,
        grids[],
        circles[],
        xs[],
    });
let ids = &mut Ids::new(ui.widget_id_generator());
ids.grids
    .resize((rows - 1) * (cols - 1), &mut ui.widget_id_generator());
ids.circles
    .resize(rows * cols, &mut ui.widget_id_generator());
ids.xs
    .resize(rows * cols * 2, &mut ui.widget_id_generator());

丸とバツを描画するために,ここではあらかじめ各セルに対応するIDを生成させています(バツは直線二本なので2倍のIDを用意しています).

グリッドの描画

グリッドの描画は以下のようにしておこないます.注意点としては座標はデカルト座標(中央が(0,0))です.

fn set_widgets(
    ref mut ui: conrod::UiCell,
    ids: &mut Ids,
    board: &mut [&mut [BoardState]],
    turn: &mut Turn,
) {
    widget::Canvas::new()
        .pad(0.0)
        .color(color::WHITE)
        .set(ids.canvas, ui);

    let rows = board.len();
    let cols = board[0].len();
    let canvas_wh = ui.wh_of(ids.canvas).unwrap();
    let tl = [-canvas_wh[0] / 2., canvas_wh[1] / 2.];
    let sw = canvas_wh[0] / (cols as f64);
    let sh = canvas_wh[1] / (rows as f64);

    // draw grid line
    for x in 1..cols {
        widget::Line::abs(
            [tl[0] + (x as f64) * sw, tl[1]],
            [tl[0] + (x as f64) * sw, tl[1] - canvas_wh[1] + 1.],
        ).color(color::BLACK)
            .set(ids.grids[x - 1], ui);
    }
    for y in 1..rows {
        widget::Line::abs(
            [tl[0], tl[1] - (y as f64) * sh],
            [tl[0] + canvas_wh[1] - 1., tl[1] - (y as f64) * sh],
        ).color(color::BLACK)
            .set(ids.grids[y - 1 + cols - 1], ui);
    }

マウスイベントの検知

マウスイベントの検知は,ui.widget_input().mouse()を使います.

    if let Some(mouse) = ui.widget_input(ids.canvas).mouse() {
        if mouse.buttons.left().is_down() {
            let mouse_abs_xy = mouse.abs_xy();
            let x = ((mouse_abs_xy[0] + (canvas_wh[0] / 2.)) / (canvas_wh[0] / (cols as f64))) as
                usize;
            let y = ((canvas_wh[1] / 2. - mouse_abs_xy[1]) / (canvas_wh[1] / (rows as f64))) as
                usize;
            // println!("{:?}, {}, {}", mouse_abs_xy, x, y);

            // when resizing, x can be greater than cols (so do y)
            if x < cols && y < rows {
                if board[y][x] == BoardState::Empty {
                    match *turn {
                        Turn::White => {
                            board[y][x] = BoardState::Circle;
                            *turn = Turn::Black;
                        }
                        Turn::Black => {
                            board[y][x] = BoardState::X;
                            *turn = Turn::White;
                        }
                    }
                }
            }
        }
    }

盤面の描画

盤面の描画をする際は,IDが他のウィジェットと被らないように注意しながらウィジェットを作成します.うっかりIDが被ると上書きされてしまって古いウィジェットが消えてしまいます.

    // draw circle or x
    for y in 0..rows {
        for x in 0..cols {
            match board[y][x] {
                BoardState::Circle => {
                    widget::Circle::outline_styled(
                        sw / 3.,
                        widget::line::Style::new().thickness(2.),
                    ).x(tl[0] + sw * (x as f64) + sw / 2.)
                        .y(tl[1] - sh * (y as f64) - sh / 2.)
                        .color(color::RED)
                        .set(ids.circles[y * cols + x], ui);
                }
                BoardState::X => {
                    widget::Line::abs(
                        [
                            tl[0] + sw * (x as f64) + sw / 5.,
                            tl[1] - sh * (y as f64) - sh / 5.,
                        ],
                        [
                            tl[0] + sw * ((x + 1) as f64) - sw / 5.,
                            tl[1] - sh * ((y + 1) as f64) + sh / 5.,
                        ],
                    ).color(color::BLACK)
                        .thickness(2.)
                        .set(ids.xs[y * cols + x], ui);

                    widget::Line::abs(
                        [
                            tl[0] + sw * ((x + 1) as f64) - sw / 5.,
                            tl[1] - sh * (y as f64) - sh / 5.,
                        ],
                        [
                            tl[0] + sw * (x as f64) + sw / 5.,
                            tl[1] - sh * ((y + 1) as f64) + sh / 5.,
                        ],
                    ).color(color::BLACK)
                        .thickness(2.)
                        .set(ids.xs[y * cols + x + rows * cols], ui);
                }
                _ => {}
            }
        }
    }
}

少々面倒ですね.まぁ,そもそもこういうことをしたいのなら,使ったことないですがゲームエンジンであるpistonとかを使うべきなんだと思います.

まとめ

ということでconrodの基礎的な使い方について書きました.メニューやダイアログといった機能はまだないようですし,ウィジェット自体の数も決して多くはないので複雑なGUIアプリケーションを作成するのはちょっと大変かなという気がしますが,簡単で静的なものを作るにはいいかもしれません.

*1:gliumはoriginal authorがもう積極的にサポートしないことを表明していますが,コミュニティによってメンテナンスされていくようです(https://github.com/PistonDevelopers/conrod/issues/989).今original authorはvulkanのラッパーを作ってるので将来的にはそれに移行していくのかもしれません

DPDKのmbufの構造

DPDKではパケットをmbufというデータ構造で管理しています. mbufについては http://dpdk.org/doc/guides/prog_guide/mbuf_lib.html にまとまっていますが,ここでは具体的なmbufの構成や使われ方についてみていきたいと思います.

mempoolのおさらい

mbufはmempoolを使用しているので,mbufの説明の前に簡単にmempoolについて説明します.

前回説明したとおり,mempoolは以下の特徴を持っています.

  • 固定長サイズのメモリアロケータ
  • 内部で空きオブジェクトをリングバッファとして管理
  • コアごとのキャッシュを持つことができる

mempoolの概要を図にすると,以下のようになります.

f:id:mm_i:20170529110320p:plain

rte_create_mempool()でmempoolを作成しますが,ここで確保したオブジェクトはTAILQであるmp->elt_listに追加されます.またそれと同時にオブジェクトはmp->pool_dataで示されるrte_ringに追加されます(rte_ringのデータ部の実体はポインタの配列で,オブジェクトを指します). mempoolのキャッシュが有効な場合はmempool構造体の後ろにキャッシュ用のデータ構造用意されます.こちらも実体はポインタの配列で,キャッシュされたオブジェクトを指します.キャッシュされたオブジェクトがring bufferの方に入っていることはありません.

mempoolからのオブジェクトの取得は以下のような流れになります.

  • rte_mempool_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned n)
    • mempool mからn個オブジェクトを取得
    • obj_tableに確保したオブジェクトのポインタが格納される
  • このとき実際にはまずmempoolのキャッシュからオブジェクトを取得しようとする
  • もしキャッシュにオブジェクトがなければ,mempool本体のring bufferからオブジェクトを取得

また,確保したオブジェクトをmempoolに戻す場合は,以下のようにします.

  • rte_mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table, unsigned n)
    • obj_tableのn個のオブジェクトをmempool mpに戻す
  • このとき,まずmpのキャッシュに戻そうとする
  • もしキャッシュが一杯なら,mempool本体のバッファにオブジェクトを戻す

ringの初期化

ringの初期化はrte_mempool_populate_phys()内のrte_mempool_ops_alloc()でおこなわれます.

    /* create the internal ring if not already done */
    if ((mp->flags & MEMPOOL_F_POOL_CREATED) == 0) {
        ret = rte_mempool_ops_alloc(mp);
        if (ret != 0)
            return ret;
        mp->flags |= MEMPOOL_F_POOL_CREATED;
    }

rte_mempool_ops_alloc()は設定によって何が呼ばれるか変わりますが,例えばmulti consumer / multi producer の場合はdrivers/mempool/ring/rte_mempool_ring.ccommon_ring_alloc()が呼ばれます.

static int
common_ring_alloc(struct rte_mempool *mp)
{
    int rg_flags = 0, ret;
    char rg_name[RTE_RING_NAMESIZE];
    struct rte_ring *r;

    ret = snprintf(rg_name, sizeof(rg_name),
        RTE_MEMPOOL_MZ_FORMAT, mp->name);
    if (ret < 0 || ret >= (int)sizeof(rg_name)) {
        rte_errno = ENAMETOOLONG;
        return -rte_errno;
    }

    /* ring flags */
    if (mp->flags & MEMPOOL_F_SP_PUT)
        rg_flags |= RING_F_SP_ENQ;
    if (mp->flags & MEMPOOL_F_SC_GET)
        rg_flags |= RING_F_SC_DEQ;

    /*
     * Allocate the ring that will be used to store objects.
     * Ring functions will return appropriate errors if we are
     * running as a secondary process etc., so no checks made
     * in this function for that condition.
     */
    r = rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
        mp->socket_id, rg_flags);
    if (r == NULL)
        return -rte_errno;

    mp->pool_data = r;

    return 0;
}

mbufの構造

mbufの構造はざっくりと以下のようになります.

f:id:mm_i:20170529110606p:plain

mbufはmempoolを利用しており,一つのmbufが一つのmempoolのオブジェクトになります. mbufの先頭にrte_mbuf構造体が存在し,その後ろに実際のデータ領域が続いています. 後からデータを追加できるように,data_off分の領域が先頭から空いています. rte_pktmbuf_prepend(), rte_pktmbuf_append()を使ってdataの先頭/末尾にデータを追加できます. また,もし受信パケットが分割されている場合はmbuf->nextが次のmbufへを指します. mfbu->nb_segsがパケット全体がいくつのmbufから成るか示す値です.

rte_mbufは以下のようになっています.

struct rte_mbuf {
    MARKER cacheline0;

    void *buf_addr;           /**< Virtual address of segment buffer. */
    /**
     * Physical address of segment buffer.
     * Force alignment to 8-bytes, so as to ensure we have the exact
     * same mbuf cacheline0 layout for 32-bit and 64-bit. This makes
     * working on vector drivers easier.
     */
    phys_addr_t buf_physaddr __rte_aligned(sizeof(phys_addr_t));

    /* next 8 bytes are initialised on RX descriptor rearm */
    MARKER64 rearm_data;
    uint16_t data_off;

    /**
     * Reference counter. Its size should at least equal to the size
     * of port field (16 bits), to support zero-copy broadcast.
     * It should only be accessed using the following functions:
     * rte_mbuf_refcnt_update(), rte_mbuf_refcnt_read(), and
     * rte_mbuf_refcnt_set(). The functionality of these functions (atomic,
     * or non-atomic) is controlled by the CONFIG_RTE_MBUF_REFCNT_ATOMIC
     * config option.
     */
    RTE_STD_C11
    union {
        rte_atomic16_t refcnt_atomic; /**< Atomically accessed refcnt */
        uint16_t refcnt;              /**< Non-atomically accessed refcnt */
    };
    uint16_t nb_segs;         /**< Number of segments. */

    /** Input port (16 bits to support more than 256 virtual ports). */
    uint16_t port;

    uint64_t ol_flags;        /**< Offload features. */

    /* remaining bytes are set on RX when pulling packet from descriptor */
    MARKER rx_descriptor_fields1;

    /*
     * The packet type, which is the combination of outer/inner L2, L3, L4
     * and tunnel types. The packet_type is about data really present in the
     * mbuf. Example: if vlan stripping is enabled, a received vlan packet
     * would have RTE_PTYPE_L2_ETHER and not RTE_PTYPE_L2_VLAN because the
     * vlan is stripped from the data.
     */
    RTE_STD_C11
    union {
        uint32_t packet_type; /**< L2/L3/L4 and tunnel information. */
        struct {
            uint32_t l2_type:4; /**< (Outer) L2 type. */
            uint32_t l3_type:4; /**< (Outer) L3 type. */
            uint32_t l4_type:4; /**< (Outer) L4 type. */
            uint32_t tun_type:4; /**< Tunnel type. */
            uint32_t inner_l2_type:4; /**< Inner L2 type. */
            uint32_t inner_l3_type:4; /**< Inner L3 type. */
            uint32_t inner_l4_type:4; /**< Inner L4 type. */
        };
    };

    uint32_t pkt_len;         /**< Total pkt len: sum of all segments. */
    uint16_t data_len;        /**< Amount of data in segment buffer. */
    /** VLAN TCI (CPU order), valid if PKT_RX_VLAN_STRIPPED is set. */
    uint16_t vlan_tci;

    union {
        uint32_t rss;     /**< RSS hash result if RSS enabled */
        struct {
            RTE_STD_C11
            union {
                struct {
                    uint16_t hash;
                    uint16_t id;
                };
                uint32_t lo;
                /**< Second 4 flexible bytes */
            };
            uint32_t hi;
            /**< First 4 flexible bytes or FD ID, dependent on
                 PKT_RX_FDIR_* flag in ol_flags. */
        } fdir;           /**< Filter identifier if FDIR enabled */
        struct {
            uint32_t lo;
            uint32_t hi;
        } sched;          /**< Hierarchical scheduler */
        uint32_t usr;     /**< User defined tags. See rte_distributor_process() */
    } hash;                   /**< hash information */

    /** Outer VLAN TCI (CPU order), valid if PKT_RX_QINQ_STRIPPED is set. */
    uint16_t vlan_tci_outer;

    uint16_t buf_len;         /**< Length of segment buffer. */

    /** Valid if PKT_RX_TIMESTAMP is set. The unit and time reference
     * are not normalized but are always the same for a given port.
     */
    uint64_t timestamp;

    /* second cache line - fields only used in slow path or on TX */
    MARKER cacheline1 __rte_cache_min_aligned;

    RTE_STD_C11
    union {
        void *userdata;   /**< Can be used for external metadata */
        uint64_t udata64; /**< Allow 8-byte userdata on 32-bit */
    };

    struct rte_mempool *pool; /**< Pool from which mbuf was allocated. */
    struct rte_mbuf *next;    /**< Next segment of scattered packet. */

    /* fields to support TX offloads */
    RTE_STD_C11
    union {
        uint64_t tx_offload;       /**< combined for easy fetch */
        __extension__
        struct {
            uint64_t l2_len:7;
            /**< L2 (MAC) Header Length for non-tunneling pkt.
             * Outer_L4_len + ... + Inner_L2_len for tunneling pkt.
             */
            uint64_t l3_len:9; /**< L3 (IP) Header Length. */
            uint64_t l4_len:8; /**< L4 (TCP/UDP) Header Length. */
            uint64_t tso_segsz:16; /**< TCP TSO segment size */

            /* fields for TX offloading of tunnels */
            uint64_t outer_l3_len:9; /**< Outer L3 (IP) Hdr Length. */
            uint64_t outer_l2_len:7; /**< Outer L2 (MAC) Hdr Length. */

            /* uint64_t unused:8; */
        };
    };

    /** Size of the application private data. In case of an indirect
     * mbuf, it stores the direct mbuf private data size. */
    uint16_t priv_size;

    /** Timesync flags for use with IEEE1588. */
    uint16_t timesync;

    /** Sequence number. See also rte_reorder_insert(). */
    uint32_t seqn;

} __rte_cache_aligned;

refcntがあるのはmbufのデータが他のmbufから参照される場合があるからです(下図).

f:id:mm_i:20170529110553p:plain

rte_pktmbuf_attach()で他のmbufのデータを参照するようにできます.また,ヘルパー関数としてrte_pktmbuf_clone()で指定したmbufのデータを参照するmbufを作成できます. dpdkでは他のmbufのデータを指すmbufをindiret,自分自身のデータを指すmbufをdirectと呼んでいます. indirectなmbufは自身のデータ領域を使わないので,オブジェクトサイズが小さいmempoolからmbufを確保した方がメモリ効率が上がります.

mbufの作成

mbufの作成は,rte_pktmbuf_pool_create()でおこないます. rte_pktmbuf_pool_create()では,rte_mempool_create_empty()および rte_mempool_set_ops_byname(), rte_mempool_populate_default()を利用して mempoolを作成したのち,最後にrte_mempool_obj_iter()を使って 各mempoolのelemに対してrte_pktmbuf_init()を実行します.

struct rte_mempool *
rte_pktmbuf_pool_create(const char *name, unsigned n,
    unsigned cache_size, uint16_t priv_size, uint16_t data_room_size,
    int socket_id)
{
    struct rte_mempool *mp;
    struct rte_pktmbuf_pool_private mbp_priv;
    unsigned elt_size;
    int ret;

    if (RTE_ALIGN(priv_size, RTE_MBUF_PRIV_ALIGN) != priv_size) {
        RTE_LOG(ERR, MBUF, "mbuf priv_size=%u is not aligned\n",
            priv_size);
        rte_errno = EINVAL;
        return NULL;
    }
    elt_size = sizeof(struct rte_mbuf) + (unsigned)priv_size +
        (unsigned)data_room_size;
    mbp_priv.mbuf_data_room_size = data_room_size;
    mbp_priv.mbuf_priv_size = priv_size;


    mp = rte_mempool_create_empty(name, n, elt_size, cache_size,
         sizeof(struct rte_pktmbuf_pool_private), socket_id, 0);
    if (mp == NULL)
        return NULL;

    ret = rte_mempool_set_ops_byname(mp,
        RTE_MBUF_DEFAULT_MEMPOOL_OPS, NULL);
    if (ret != 0) {
        RTE_LOG(ERR, MBUF, "error setting mempool handler\n");
        rte_mempool_free(mp);
        rte_errno = -ret;
        return NULL;
    }
    rte_pktmbuf_pool_init(mp, &mbp_priv);

    ret = rte_mempool_populate_default(mp);
    if (ret < 0) {
        rte_mempool_free(mp);
        rte_errno = -ret;
        return NULL;
    }

    rte_mempool_obj_iter(mp, rte_pktmbuf_init, NULL);

    return mp;
}

rte_pktmbuf_init()ではmempoolの一つのオブジェクトの先頭にrte_mbufの構造体 領域を確保し,それを初期化しています.

void
rte_pktmbuf_init(struct rte_mempool *mp,
         __attribute__((unused)) void *opaque_arg,
         void *_m,
         __attribute__((unused)) unsigned i)
{
    struct rte_mbuf *m = _m;
    uint32_t mbuf_size, buf_len, priv_size;

    priv_size = rte_pktmbuf_priv_size(mp);
    mbuf_size = sizeof(struct rte_mbuf) + priv_size;
    buf_len = rte_pktmbuf_data_room_size(mp);

    RTE_ASSERT(RTE_ALIGN(priv_size, RTE_MBUF_PRIV_ALIGN) == priv_size);
    RTE_ASSERT(mp->elt_size >= mbuf_size);
    RTE_ASSERT(buf_len <= UINT16_MAX);

    memset(m, 0, mp->elt_size);

    /* start of buffer is after mbuf structure and priv data */
    m->priv_size = priv_size;
    m->buf_addr = (char *)m + mbuf_size;
    m->buf_physaddr = rte_mempool_virt2phy(mp, m) + mbuf_size;
    m->buf_len = (uint16_t)buf_len;

    /* keep some headroom between start of buffer and data */
    m->data_off = RTE_MIN(RTE_PKTMBUF_HEADROOM, (uint16_t)m->buf_len);

    /* init some constant fields */
    m->pool = mp;
    m->nb_segs = 1;
    m->port = 0xff;
    rte_mbuf_refcnt_set(m, 1);
    m->next = NULL;
}

mbufの利用: パケットの受信

実際にパケットの受信時にどのようにmbufが利用されるか見ていきたいと思います.

rx/txバッファの設定

dpdkのexampleにあるl2fwdのプログラムでは以下のように初期化をおこなっています.

// mbufの作成
pktmbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", NB_MBUF,
        MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE,
        rte_socket_id());
...

// ポートごとに初期化
for (portid = 0; portid < nb_ports; portid++) {
    ...
    ret = rte_eth_dev_configure(portid, 1, 1, &port_conf);
    ...
    ret = rte_eth_rx_queue_setup(portid, 0, nb_rxd,
                     rte_eth_dev_socket_id(portid),
                     NULL,
                     l2fwd_pktmbuf_pool);
    ...
    ret = rte_eth_tx_queue_setup(portid, 0, nb_txd,
            rte_eth_dev_socket_id(portid),
}

まずrte_pktmbuf_pool_create()でプログラム全体が利用するmbufを確保します. ちなみに,NB_MBUFは8192, RTE_MBUF_DEFAULT_BUF_SIZEは2KB + RTE_PKTMBUF_HEADROOM(自分の環境では128B)です.

その後各NICのポートの設定をおこないますが,特に重要なのがrte_eth_rx_queue_setup()rte_eth_tx_queue_setup()です. ixgbeを使用する場合,rte_eth_rx_queue_setup() から最終的には drivers/net/ixgbe/ixgbe_rxtx.cixgbe_dev_rx_queue_setup()が呼ばれます.

ixgbe_dev_rx_queue_setup()ではrx queue用のデータ構造(ixgbe_rx_queue)やring descriptor用のメモリを確保したりします.

int __attribute__((cold))
ixgbe_dev_rx_queue_setup(struct rte_eth_dev *dev,
             uint16_t queue_idx,
             uint16_t nb_desc,
             unsigned int socket_id,
             const struct rte_eth_rxconf *rx_conf,
             struct rte_mempool *mp)
{
    ...
    /* First allocate the rx queue data structure */
    rxq = rte_zmalloc_socket("ethdev RX queue", sizeof(struct ixgbe_rx_queue),
                 RTE_CACHE_LINE_SIZE, socket_id);
    if (rxq == NULL)
        return -ENOMEM;
    rxq->mb_pool = mp;
    rxq->nb_rx_desc = nb_desc;
    rxq->rx_free_thresh = rx_conf->rx_free_thresh;
    rxq->queue_id = queue_idx;
    rxq->reg_idx = (uint16_t)((RTE_ETH_DEV_SRIOV(dev).active == 0) ?
        queue_idx : RTE_ETH_DEV_SRIOV(dev).def_pool_q_idx + queue_idx);
    rxq->port_id = dev->data->port_id;
    rxq->crc_len = (uint8_t) ((dev->data->dev_conf.rxmode.hw_strip_crc) ?
                            0 : ETHER_CRC_LEN);
    rxq->drop_en = rx_conf->rx_drop_en;
    rxq->rx_deferred_start = rx_conf->rx_deferred_start;

    ...

    /*
     * Allocate RX ring hardware descriptors. A memzone large enough to
     * handle the maximum ring size is allocated in order to allow for
     * resizing in later calls to the queue setup function.
     */
    rz = rte_eth_dma_zone_reserve(dev, "rx_ring", queue_idx,
                      RX_RING_SZ, IXGBE_ALIGN, socket_id);
    ...

    rxq->rx_ring_phys_addr = rte_mem_phy2mch(rz->memseg_id, rz->phys_addr);
    rxq->rx_ring = (union ixgbe_adv_rx_desc *) rz->addr;

    ...

    /*
     * Allocate software ring. Allow for space at the end of the
     * S/W ring to make sure look-ahead logic in bulk alloc Rx burst
     * function does not access an invalid memory region.
     */
    len = nb_desc;
    if (adapter->rx_bulk_alloc_allowed)
        len += RTE_PMD_IXGBE_RX_MAX_BURST;

    rxq->sw_ring = rte_zmalloc_socket("rxq->sw_ring",
                      sizeof(struct ixgbe_rx_entry) * len,
                      RTE_CACHE_LINE_SIZE, socket_id);
    ...

    dev->data->rx_queues[queue_idx] = rxq;

    ixgbe_reset_rx_queue(adapter, rxq);

    return 0;
}

rx ring

ixgbeの場合のrx ringのデータ構造は以下のようになっています.

f:id:mm_i:20170529110825p:plain

struct ixgbe_rx_queue {
    struct rte_mempool  *mb_pool; /**< mbuf pool to populate RX ring. */
    volatile union ixgbe_adv_rx_desc *rx_ring; /**< RX ring virtual address. */
    uint64_t            rx_ring_phys_addr; /**< RX ring DMA address. */
    volatile uint32_t   *rdt_reg_addr; /**< RDT register address. */
    volatile uint32_t   *rdh_reg_addr; /**< RDH register address. */
    struct ixgbe_rx_entry *sw_ring; /**< address of RX software ring. */
    struct ixgbe_scattered_rx_entry *sw_sc_ring; /**< address of scattered Rx software ring. */
    struct rte_mbuf *pkt_first_seg; /**< First segment of current packet. */
    struct rte_mbuf *pkt_last_seg; /**< Last segment of current packet. */
    uint64_t            mbuf_initializer; /**< value to init mbufs */
    uint16_t            nb_rx_desc; /**< number of RX descriptors. */
    uint16_t            rx_tail;  /**< current value of RDT register. */
    uint16_t            nb_rx_hold; /**< number of held free RX desc. */
    uint16_t rx_nb_avail; /**< nr of staged pkts ready to ret to app */
    uint16_t rx_next_avail; /**< idx of next staged pkt to ret to app */
    uint16_t rx_free_trigger; /**< triggers rx buffer allocation */
    uint16_t            rx_using_sse;
    /**< indicates that vector RX is in use */
#ifdef RTE_IXGBE_INC_VECTOR
    uint16_t            rxrearm_nb;     /**< number of remaining to be re-armed */
    uint16_t            rxrearm_start;  /**< the idx we start the re-arming from */
#endif
    uint16_t            rx_free_thresh; /**< max free RX desc to hold. */
    uint16_t            queue_id; /**< RX queue index. */
    uint16_t            reg_idx;  /**< RX queue register index. */
    uint16_t            pkt_type_mask;  /**< Packet type mask for different NICs. */
    uint8_t             port_id;  /**< Device port identifier. */
    uint8_t             crc_len;  /**< 0 if CRC stripped, 4 otherwise. */
    uint8_t             drop_en;  /**< If not 0, set SRRCTL.Drop_En. */
    uint8_t             rx_deferred_start; /**< not in global dev start. */
    /** flags to set in mbuf when a vlan is detected. */
    uint64_t            vlan_flags;
    /** need to alloc dummy mbuf, for wraparound when scanning hw ring */
    struct rte_mbuf fake_mbuf;
    /** hold packets to return to application */
    struct rte_mbuf *rx_stage[RTE_PMD_IXGBE_RX_MAX_BURST*2];
};

rx_ringが実際のixgbeのrx ringを指すポインタです.rx ringのbuffer addressはmbufのデータ部分を指します.sw_ringがrx ringで利用されるmbufを保持しています. rx_ringpkt_addrが対応するsw_ringのmbufのデータ部分を指します. 受信時はrx_tailで示されるインデックスからdescriptorを確認していきます(NICのtailとは異なります).

x540のreceive descriptor

ここで簡単にX540のreceive descriptorについて説明しておきます. 詳しくはデータシートの7.1に書いてあります.

受信する際に利用するring descriptor queueは以下のような構造をしています(データシートより引用).

f:id:mm_i:20170529110750p:plain

キューの各エントリがdescriptorと呼ばれます. ソフトウェア側はdescriptorを設定したのち,tailを進めます.これは実際にはX540のRDTレジスタの値を更新することでおこないます. またNICは内部的に空きdescriptorの先頭位置をheadとして覚えており,パケットを受信したらheadのdescriptorに従ってパケットをメモリにDMAしたあと,headを進めます.

f:id:mm_i:20170529110801p:plainf:id:mm_i:20170529110808p:plain

descriptorのformatは以下のようになっています(descriptorのフォーマットにもいくつかありますが,ここではそのうちのadvance descritorを示しています).

descriptorのpacket buffer addressにパケットをDMAする先のアドレスを指定します. このアドレスはdpdkではmbufのデータのアドレスになります. X540では受信パケットのヘッダとデータを別々の領域へDMAする(header splitting)ことが可能であるためheader buffer addressを指定する場所もありますが,DPDKでは利用していません. ソフトウェアはDDビットが1になっているかどうかを調べることで,パケットが到着しているかどうかを判断することができます. またdescriptor write-back formatのフィールドにはNICが受信したパケットの情報を書き込みます.

パケットの受信

パケットの受信はrte_eth_rx_burst()でおこないます.

nb_rx = rte_eth_rx_burst((uint8_t) portid, 0,
             pkts_burst, MAX_PKT_BURST);

rte_eth_rx_burst()は最大MAX_PKT_BURST個のパケットを受信し,結果をpkts_burstに格納します. NICのオフロード機能を使用するかなどの設定によって,rte_eth_rx_burst()を呼んだときに呼ばれる関数は異なります.ixgbeではixgbe_dev_rx_init()の中で呼ばれるixgbe_set_rx_function()でrecv時の関数を設定しています. ここではrecv時にixgbe_recv_pkts_bulk_alloc()が設定されていると仮定して,その具体的な処理をみてみます.

ixgbe_recv_pkts_bulk_alloc()では,RTE_PMD_IXGBE_RX_MAX_BURST (実際は32)単位でのパケット読み出しをおこない,実際に受信したパケットを返します.

uint16_t
ixgbe_recv_pkts_bulk_alloc(void *rx_queue, struct rte_mbuf **rx_pkts,
               uint16_t nb_pkts)
{
    uint16_t nb_rx;

    if (unlikely(nb_pkts == 0))
        return 0;

    if (likely(nb_pkts <= RTE_PMD_IXGBE_RX_MAX_BURST))
        return rx_recv_pkts(rx_queue, rx_pkts, nb_pkts);

    /* request is relatively large, chunk it up */
    nb_rx = 0;
    while (nb_pkts) {
        uint16_t ret, n;

        n = (uint16_t)RTE_MIN(nb_pkts, RTE_PMD_IXGBE_RX_MAX_BURST);
        ret = rx_recv_pkts(rx_queue, &rx_pkts[nb_rx], n);
        nb_rx = (uint16_t)(nb_rx + ret);
        nb_pkts = (uint16_t)(nb_pkts - ret);
        if (ret < n)
            break;
    }

    return nb_rx;
}

実際の受信処理をおこなっているのはrx_recv_pkts()になります.

static inline uint16_t
rx_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
         uint16_t nb_pkts)
{
    struct ixgbe_rx_queue *rxq = (struct ixgbe_rx_queue *)rx_queue;
    uint16_t nb_rx = 0;

    /* Any previously recv'd pkts will be returned from the Rx stage */
    if (rxq->rx_nb_avail)
        return ixgbe_rx_fill_from_stage(rxq, rx_pkts, nb_pkts);

前回受信したパケットがまだ残っていたらそれを処理します (これはこの関数がnb_pkts単位で受信したパケットを返すからです).

    /* Scan the H/W ring for packets to receive */
    nb_rx = (uint16_t)ixgbe_rx_scan_hw_ring(rxq);

ixgbe_rx_scan_hw_ring()でパケットが到着しているかチェックします. nb_rxに受信したパケット数が入ります.

    /* update internal queue state */
    rxq->rx_next_avail = 0;
    rxq->rx_nb_avail = nb_rx;
    rxq->rx_tail = (uint16_t)(rxq->rx_tail + nb_rx);

rx_tailnb_rxだけ進めます.

    /* if required, allocate new buffers to replenish descriptors */
    if (rxq->rx_tail > rxq->rx_free_trigger) {
        uint16_t cur_free_trigger = rxq->rx_free_trigger;

        if (ixgbe_rx_alloc_bufs(rxq, true) != 0) {
            int i, j;

            PMD_RX_LOG(DEBUG, "RX mbuf alloc failed port_id=%u "
                   "queue_id=%u", (unsigned) rxq->port_id,
                   (unsigned) rxq->queue_id);

            rte_eth_devices[rxq->port_id].data->rx_mbuf_alloc_failed +=
                rxq->rx_free_thresh;

            /*
             * Need to rewind any previous receives if we cannot
             * allocate new buffers to replenish the old ones.
             */
            rxq->rx_nb_avail = 0;
            rxq->rx_tail = (uint16_t)(rxq->rx_tail - nb_rx);
            for (i = 0, j = rxq->rx_tail; i < nb_rx; ++i, ++j)
                rxq->sw_ring[j].mbuf = rxq->rx_stage[i];

            return 0;
        }

        /* update tail pointer */
        rte_wmb();
        IXGBE_PCI_REG_WRITE_RELAXED(rxq->rdt_reg_addr,
                        cur_free_trigger);
    }

rx_tailの値が設定した閾値以上になったら,ixgbe_rx_alloc_bufs()を使って新しいmbufを取得し,それをデスクリプタに設定します.またNICのtail pointerを更新します.

    if (rxq->rx_tail >= rxq->nb_rx_desc)
        rxq->rx_tail = 0;

この関数は一気にパケットを指定した数(8個)読もうとしますが,その際ring bufferの先頭に巻き戻って読むことはしません.もし末尾に到達したらここで先頭にインデックスを戻しています.(したがって,ring descriptorの最大数はこのバースト読み出し分を考慮して余分に確保しておく必要があります)

    /* received any packets this loop? */
    if (rxq->rx_nb_avail)
        return ixgbe_rx_fill_from_stage(rxq, rx_pkts, nb_pkts);

    return 0;
}

受信したパケットがあればixgbe_rx_fill_from_stage()でそのパケットの後処理をして返します.

デスクリプタをチェックしてパケットの受信を判断するixgbe_rx_scan_hw_ring()は以下のようになっています.

static inline int
ixgbe_rx_scan_hw_ring(struct ixgbe_rx_queue *rxq)
{
    volatile union ixgbe_adv_rx_desc *rxdp;
    struct ixgbe_rx_entry *rxep;
    struct rte_mbuf *mb;
    uint16_t pkt_len;
    uint64_t pkt_flags;
    int nb_dd;
    uint32_t s[LOOK_AHEAD];
    uint32_t pkt_info[LOOK_AHEAD];
    int i, j, nb_rx = 0;
    uint32_t status;
    uint64_t vlan_flags = rxq->vlan_flags;

    /* get references to current descriptor and S/W ring entry */
    rxdp = &rxq->rx_ring[rxq->rx_tail];
    rxep = &rxq->sw_ring[rxq->rx_tail];

    status = rxdp->wb.upper.status_error;
    /* check to make sure there is at least 1 packet to receive */
    if (!(status & rte_cpu_to_le_32(IXGBE_RXDADV_STAT_DD)))
        return 0;

    /*
     * Scan LOOK_AHEAD descriptors at a time to determine which descriptors
     * reference packets that are ready to be received.
     */
    for (i = 0; i < RTE_PMD_IXGBE_RX_MAX_BURST;
         i += LOOK_AHEAD, rxdp += LOOK_AHEAD, rxep += LOOK_AHEAD) {
        /* Read desc statuses backwards to avoid race condition */
        for (j = 0; j < LOOK_AHEAD; j++)
            s[j] = rte_le_to_cpu_32(rxdp[j].wb.upper.status_error);

        rte_smp_rmb();

        /* Compute how many status bits were set */
        for (nb_dd = 0; nb_dd < LOOK_AHEAD &&
                (s[nb_dd] & IXGBE_RXDADV_STAT_DD); nb_dd++)
            ;

ここでDDビットを見てパケットを受信したか見ています.

        for (j = 0; j < nb_dd; j++)
            pkt_info[j] = rte_le_to_cpu_32(rxdp[j].wb.lower.
                               lo_dword.data);

        nb_rx += nb_dd;

        /* Translate descriptor info to mbuf format */
        for (j = 0; j < nb_dd; ++j) {
            mb = rxep[j].mbuf;
            pkt_len = rte_le_to_cpu_16(rxdp[j].wb.upper.length) -
                  rxq->crc_len;
            mb->data_len = pkt_len;
            mb->pkt_len = pkt_len;
            mb->vlan_tci = rte_le_to_cpu_16(rxdp[j].wb.upper.vlan);

            /* convert descriptor fields to rte mbuf flags */
            pkt_flags = rx_desc_status_to_pkt_flags(s[j],
                vlan_flags);
            pkt_flags |= rx_desc_error_to_pkt_flags(s[j]);
            pkt_flags |= ixgbe_rxd_pkt_info_to_pkt_flags
                    ((uint16_t)pkt_info[j]);
            mb->ol_flags = pkt_flags;
            mb->packet_type =
                ixgbe_rxd_pkt_info_to_pkt_type
                    (pkt_info[j], rxq->pkt_type_mask);

            if (likely(pkt_flags & PKT_RX_RSS_HASH))
                mb->hash.rss = rte_le_to_cpu_32(
                    rxdp[j].wb.lower.hi_dword.rss);
            else if (pkt_flags & PKT_RX_FDIR) {
                mb->hash.fdir.hash = rte_le_to_cpu_16(
                    rxdp[j].wb.lower.hi_dword.csum_ip.csum) &
                    IXGBE_ATR_HASH_MASK;
                mb->hash.fdir.id = rte_le_to_cpu_16(
                    rxdp[j].wb.lower.hi_dword.csum_ip.ip_id);
            }
        }

この部分で受信したパケットに関して,mbufのメタデータ部分ににパケットの情報を格納しています.

        /* Move mbuf pointers from the S/W ring to the stage */
        for (j = 0; j < LOOK_AHEAD; ++j) {
            rxq->rx_stage[i + j] = rxep[j].mbuf;
        }

        /* stop if all requested packets could not be received */
        if (nb_dd != LOOK_AHEAD)
            break;
    }

    /* clear software ring entries so we can cleanup correctly */
    for (i = 0; i < nb_rx; ++i) {
        rxq->sw_ring[rxq->rx_tail + i].mbuf = NULL;
    }


    return nb_rx;
}

最後に,受信したパケットのmbufのアドレスをrx_stageに入れ,sw_ringの方はNULLにします.

ixgbe_rx_fill_from_stage()では最大nb_pkts分の受信データをrx_pktsに格納して返します.

static inline uint16_t
ixgbe_rx_fill_from_stage(struct ixgbe_rx_queue *rxq, struct rte_mbuf **rx_pkts,
             uint16_t nb_pkts)
{
    struct rte_mbuf **stage = &rxq->rx_stage[rxq->rx_next_avail];
    int i;

    /* how many packets are ready to return? */
    nb_pkts = (uint16_t)RTE_MIN(nb_pkts, rxq->rx_nb_avail);

    /* copy mbuf pointers to the application's packet list */
    for (i = 0; i < nb_pkts; ++i)
        rx_pkts[i] = stage[i];

    /* update internal queue state */
    rxq->rx_nb_avail = (uint16_t)(rxq->rx_nb_avail - nb_pkts);
    rxq->rx_next_avail = (uint16_t)(rxq->rx_next_avail + nb_pkts);

    return nb_pkts;
}

また,ixgbe_rx_alloc_bufs()では以下のようにしてmbufを取得しています.

static inline int
ixgbe_rx_alloc_bufs(struct ixgbe_rx_queue *rxq, bool reset_mbuf)
{
    volatile union ixgbe_adv_rx_desc *rxdp;
    struct ixgbe_rx_entry *rxep;
    struct rte_mbuf *mb;
    uint16_t alloc_idx;
    __le64 dma_addr;
    int diag, i;

    /* allocate buffers in bulk directly into the S/W ring */
    alloc_idx = rxq->rx_free_trigger - (rxq->rx_free_thresh - 1);
    rxep = &rxq->sw_ring[alloc_idx];
    diag = rte_mempool_get_bulk(rxq->mb_pool, (void *)rxep,
                    rxq->rx_free_thresh);

rte_mempool_get_bulk()を使ってmempoolから空いているmbufを取得します.

    if (unlikely(diag != 0))
        return -ENOMEM;

    rxdp = &rxq->rx_ring[alloc_idx];
    for (i = 0; i < rxq->rx_free_thresh; ++i) {
        /* populate the static rte mbuf fields */
        mb = rxep[i].mbuf;
        if (reset_mbuf) {
            mb->port = rxq->port_id;
        }

        rte_mbuf_refcnt_set(mb, 1);
        mb->data_off = RTE_PKTMBUF_HEADROOM;

        /* populate the descriptors */
        dma_addr = rte_cpu_to_le_64(rte_mbuf_data_dma_addr_default(mb));
        rxdp[i].read.hdr_addr = 0;
        rxdp[i].read.pkt_addr = dma_addr;
    }

各デスクリプタのpkt_addrにmbufのデータ領域のアドレスを書き込みます.

    /* update state of internal queue structure */
    rxq->rx_free_trigger = rxq->rx_free_trigger + rxq->rx_free_thresh;
    if (rxq->rx_free_trigger >= rxq->nb_rx_desc)
        rxq->rx_free_trigger = rxq->rx_free_thresh - 1;

    /* no errors */
    return 0;
}

最後に次にmbufを確保する際の閾値となるrx_free_triggerを更新しています. ちなみに,rx_free_threshはデフォルトで32のようです.

mbufの利用: マルチキャスト

受信以外のmbuf利用の例として,サンプルプログラムに含まれているマルチキャストのプログラムを見てみます.

http://dpdk.org/doc/guides/sample_app_ug/ipv4_multicast.html は受信したパケットを複数のポートからマルチキャストするサンプルプログラムです. マルチキャストの流れは以下のようになります.

f:id:mm_i:20170529110435p:plain

  1. 受信パケットからヘッダを取り除く (rte_pktmbuf_adj()を使ってdata_offを長くする)
  2. rte_pktmbuf_clone()を使って受信パケットのデータ部分を指すmbufを作成
  3. rte_pktmbuf_create()を使ってヘッダ部分のパケットを作成,mbuf->nextはcloneしたmbufを指すようにする 4. 作成したパケットを指定したポートから送信

cloneを使って複数のポートから送信する場合,各パケットは自身のメタデータを持つことになります.したがって最後のポートに関してはcloneをする必要がなく,受信パケットのメタデータを直接変更して使用することができます. もちろん,cloneをせず直接受信パケットをmbuf->nextで指す方法もありますが,この場合は受信パケットのメタデータを変更できないため,最後のポートに関しても他の場合と同様にして送信をおこないます. コードにはcloneする場合としない場合,2通りのコードが書いてあります.

DPDKにおけるメモリ管理

DPDKにおいてどのようにメモリが使用されるかをここにまとめたいと思います. なお,ここに書いてあるものはあくまで私がコードを読んで解釈した結果になります. DPDKのバージョンは17.05,OSはLinuxを対象としています.

全体像

DPDKでは独自のメモリアロケータを利用してメモリを管理しています. メモリ管理のために以下のようなデータ構造が利用されます.

  • memconfig : 作成したmemsegやmemzoneなどの情報を保持
  • memseg : 連続した物理メモリ領域情報を保持
  • heap : メモリの空きブロックの管理 / 割り当て.memsegのメモリ領域を使用する.
  • memzone : 連続した物理メモリ領域を名前をつけて割り当て.内部でheapを利用
  • mempool : 固定長メモリアロケータ.内部でmemzoneを利用

前提知識: queue(3)

本題に入る前に,DPDKでは内部で複数のリストを保持していますが,このリストには queue(3)を利用しています. queueについて知っておいた方がコードが読みやすいです.

queueにはSLIST,STAILQおよびLISTとTAILQの4種類があります. SLISTとSTAILQが単方向リスト,LISTとTAILQが連結リストで,TAILQの場合は末尾への挿入が可能です.queueは全てマクロとして実装されています.

TAILQの簡単な使用例は以下のようになります.

struct entry{
    TAILQ_ENTRY(entry) tq;
    int data;
} e1, e2, e3;

TAILQ_HEAD(entry_head, entry);

int main(){
    struct entry_head head;
    TAILQ_INIT(&head);

    e1.data = 1;
    e2.data = 2;
    e3.data = 3;

    TAILQ_INSERT_HEAD(&head, &e1, tq);
    TAILQ_INSERT_HEAD(&head, &e2, tq);
    TAILQ_INSERT_TAIL(&head, &e3, tq);

    struct entry* p = TAILQ_FIRST(&head);
    while(p != NULL){
        printf("%d\n", p->data);
        p = TAILQ_NEXT(p, tq);
    }
}

初期化部分がやや分かりにくいですが,TAILQ_ENTRY()TAILQ_HEAD()が以下のよ うに展開されることが理解できれば納得できると思います.

#define _TAILQ_HEAD(name, type, qual)                   \
struct name {                               \
    qual type *tqh_first;       /* first element */     \
    qual type *qual *tqh_last;  /* addr of last next element */ \
}
#define TAILQ_HEAD(name, type)  _TAILQ_HEAD(name, struct type,)

#define _TAILQ_ENTRY(type, qual)                    \
struct {                                \
    qual type *tqe_next;        /* next element */      \
    qual type *qual *tqe_prev;  /* address of previous next element */\
}
#define TAILQ_ENTRY(type)   _TAILQ_ENTRY(struct type,)

基本的にマクロにはポインタを渡します. queueの実装は全てヘッダファイルに記述してあるので,見てみると参考になると思います.

glibcの実装: https://sourceware.org/git/?p=glibc.git;a=blob_plain;f=misc/sys/queue.h;hb=HEAD

ちなみに,TAILQの実装を読む際は,

  • tqe_prev は 前の要素の tqe_next のアドレスを保持している
    • これを利用して要素を挿入した際に前の要素のtqe_nextを書き換える
  • ある要素の前の要素を取得する際は,前の要素のteq_prevを使う
    • 実質的に前の要素のtqe_nextを参照していることになる

ということを抑えておけば理解が容易だと思います.

memconfig

rte_mem_configは,EALの中でメモリ全体の情報を保持しているデータ構造です. 具体的には作成したmemsegやmemzone,heap等(後述)を保持しています. EALの中ではrte_eal_get_configuration()->mem_configからmemconfigを取得できます.

librte_eal/common/rte_eal_memconfig.h:

struct rte_mem_config {
    volatile uint32_t magic;   /**< Magic number - Sanity check. */

    /* memory topology */
    uint32_t nchannel;    /**< Number of channels (0 if unknown). */
    uint32_t nrank;       /**< Number of ranks (0 if unknown). */

    /**
     * current lock nest order
     *  - qlock->mlock (ring/hash/lpm)
     *  - mplock->qlock->mlock (mempool)
     * Notice:
     *  *ALWAYS* obtain qlock first if having to obtain both qlock and mlock
     */
    rte_rwlock_t mlock;   /**< only used by memzone LIB for thread-safe. */
    rte_rwlock_t qlock;   /**< used for tailq operation for thread safe. */
    rte_rwlock_t mplock;  /**< only used by mempool LIB for thread-safe. */

    uint32_t memzone_cnt; /**< Number of allocated memzones */

    /* memory segments and zones */
    struct rte_memseg memseg[RTE_MAX_MEMSEG];    /**< Physmem descriptors. */
    struct rte_memzone memzone[RTE_MAX_MEMZONE]; /**< Memzone descriptors. */

    struct rte_tailq_head tailq_head[RTE_MAX_TAILQ]; /**< Tailqs for objects */

    /* Heaps of Malloc per socket */
    struct malloc_heap malloc_heaps[RTE_MAX_NUMA_NODES];

    /* address of mem_config in primary process. used to map shared config into
     * exact same address the primary process maps it.
     */
    uint64_t mem_cfg_addr;
} __attribute__((__packed__));

初期化

DPDKのプログラムで最初にrte_eal_init()を呼ぶと様々な初期化処理がおこなわれますが, その中でも rte_eal_memory_init()でmemsegの初期化,rte_eal_memzone_init()で memzone及びheapの初期化がおこなわれます.

memseg

memsegは連続した物理アドレス空間の情報を保持するデータ構造です.

librte_eal/common/include/rte_memory.h:

struct rte_memseg {
    phys_addr_t phys_addr;      /**< Start physical address. */
    RTE_STD_C11
    union {
        void *addr;         /**< Start virtual address. */
        uint64_t addr_64;   /**< Makes sure addr is always 64 bits */
    };
    size_t len;               /**< Length of the segment. */
    uint64_t hugepage_sz;       /**< The pagesize of underlying memory */
    int32_t socket_id;          /**< NUMA socket ID. */
    uint32_t nchannel;          /**< Number of channels. */
    uint32_t nrank;             /**< Number of ranks. */
#ifdef RTE_LIBRTE_XEN_DOM0
     /**< store segment MFNs */
    uint64_t mfn[DOM0_NUM_MEMBLOCK];
#endif
}

phys_addrに先頭の物理アドレスが,lenに長さが格納されています. memsegの初期化は rte_eal_init() => rte_eal_memory_init() => rte_eal_hugepage_init() の中でおこなわれます.

具体的な物理アドレス確保の方法は前回のエントリに書いた通りです : DPDKにおけるhugepageの初期化部分. なるべく物理的に連続した領域を確保しようとします.

heap

連続したメモリの空きブロックの管理にはheapを利用します.

librte_eal/common/malloc_heap.h:

struct malloc_heap {
     rte_spinlock_t lock;
     LIST_HEAD(, malloc_elem) free_head[RTE_HEAP_NUM_FREELISTS];
     unsigned alloc_count;
     size_t total_size;
 } __rte_cache_aligned;

struct malloc_elem {
    struct malloc_heap *heap;
    struct malloc_elem *volatile prev;      /* points to prev elem in memseg */
    LIST_ENTRY(malloc_elem) free_list;      /* list of free elements in heap */
    const struct rte_memseg *ms;
    volatile enum elem_state state;
    uint32_t pad;
    size_t size;
#ifdef RTE_LIBRTE_MALLOC_DEBUG
    uint64_t header_cookie;         /* Cookie marking start of data */
                                    /* trailer cookie at start + size */
#endif
} __rte_cache_aligned;

malloc_elemが一つのメモリブロックで,それはqueue(3)のLISTの一要素となっています. LISTの先頭はmalloc_heap構造体のfree_headからアクセス可能です. フリーリストはメモリブロックのサイズの大きさごとに分けられています(後述のmalloc_elem_free_list_index()参照). またmalloc_heap自体はmemconfigのmalloc_heapsからアクセスできます.

heapの初期化はrte_eal_memory_init()のあとに呼ばれるrte_eal_memzone_init() の中のrte_malloc_heap_init()でおこなわれます.

rte_malloc_heap_init()ではmalloc_heap_add_memseg()を呼び,各memsegをheapに 追加します.ここでは一つのmemseg全体が一つのmalloc_elemとして追加されます.

librte_eal/common/malloc_heap.c:

static void
malloc_heap_add_memseg(struct malloc_heap *heap, struct rte_memseg *ms)
{
    /* allocate the memory block headers, one at end, one at start */
    struct malloc_elem *start_elem = (struct malloc_elem *)ms->addr;
    struct malloc_elem *end_elem = RTE_PTR_ADD(ms->addr,
            ms->len - MALLOC_ELEM_OVERHEAD);
    end_elem = RTE_PTR_ALIGN_FLOOR(end_elem, RTE_CACHE_LINE_SIZE);
    const size_t elem_size = (uintptr_t)end_elem - (uintptr_t)start_elem;

    malloc_elem_init(start_elem, heap, ms, elem_size);
    malloc_elem_mkend(end_elem, start_elem);
    malloc_elem_free_list_insert(start_elem);

    heap->total_size += elem_size;
}

librte_eal/common/malloc_elem.c:

void
malloc_elem_init(struct malloc_elem *elem,
        struct malloc_heap *heap, const struct rte_memseg *ms, size_t size)
{
    elem->heap = heap;
    elem->ms = ms;
    elem->prev = NULL;
    memset(&elem->free_list, 0, sizeof(elem->free_list));
    elem->state = ELEM_FREE;
    elem->size = size;
    elem->pad = 0;
    set_header(elem);
    set_trailer(elem);
}

ここでは各memsegの先頭と末尾にmalloc_elem用の領域を確保してそれを初期化しています. 末尾にもmalloc_elemを用意するのはオーバフロー防止のためのようです. malloc_elemのsizeはmalloc_elem構造体自身を含んだサイズのようです.

malloc_elem_free_list_insert()で作成したmalloc_elemmaloc_heapfree_headで先頭が示されるフリーリストに追加します. このフリーリストはメモリブロックの大きさごとに分かれており,どこのフリーリスト に入れるかはmalloc_elem_free_list_index()で決めています. コメントを読む限り,

heap->free_head[0] - (0   , 2^8]
heap->free_head[1] - (2^8 , 2^10]
heap->free_head[2] - (2^10 ,2^12]
heap->free_head[3] - (2^12, 2^14]
...

というような感じに分けられて格納されるようです.

memzone

memzoneは連続した物理アドレス空間に名前をつけて保持するためのデータ構造です. 取得したmemzoneはmemconfig.memzoneに保持されます. memzoneの確保の際に内部的にはheapからメモリを確保します.

librte_eal/common/include/rte_memzone.h:

struct rte_memzone {

#define RTE_MEMZONE_NAMESIZE 32       /**< Maximum length of memory zone name.*/
    char name[RTE_MEMZONE_NAMESIZE];  /**< Name of the memory zone. */

    phys_addr_t phys_addr;            /**< Start physical address. */
    RTE_STD_C11
    union {
        void *addr;                   /**< Start virtual address. */
        uint64_t addr_64;             /**< Makes sure addr is always 64-bits */
    };
    size_t len;                       /**< Length of the memzone. */

    uint64_t hugepage_sz;             /**< The page size of underlying memory */

    int32_t socket_id;                /**< NUMA socket ID. */

    uint32_t flags;                   /**< Characteristics of this memzone. */
    uint32_t memseg_id;               /**< Memseg it belongs. */
} __attribute__((__packed__));

rte_eal_memzone_init()の中ではmemzoneは空に初期化されるだけです.

heapからのメモリの割り当て

heapからの割り当てはmalloc_heap_alloc()からおこないます.

librte_eal/common/malloc_heap.c:

void *
malloc_heap_alloc(struct malloc_heap *heap,
        const char *type __attribute__((unused)), size_t size, unsigned flags,
        size_t align, size_t bound)
{
    struct malloc_elem *elem;

    size = RTE_CACHE_LINE_ROUNDUP(size);
    align = RTE_CACHE_LINE_ROUNDUP(align);

    rte_spinlock_lock(&heap->lock);

    elem = find_suitable_element(heap, size, flags, align, bound);
    if (elem != NULL) {
        elem = malloc_elem_alloc(elem, size, align, bound);
        /* increase heap's count of allocated elements */
        heap->alloc_count++;
    }
    rte_spinlock_unlock(&heap->lock);

    return elem == NULL ? NULL : (void *)(&elem[1]);
}

ここではまずfind_suitable_element()を利用して,ヒープのフリーリストからズの空きブロックを見つけます. やっていることはフリーリストを先頭から順に辿って行って,十分な大きさの空きブロックが見つかればそれを返します.ただしこの際指定したhugepage要件を満たすかチェックしています.

librte_eal/common/malloc_elem.c:

static struct malloc_elem *
find_suitable_element(struct malloc_heap *heap, size_t size,
        unsigned flags, size_t align, size_t bound)
{
    size_t idx;
    struct malloc_elem *elem, *alt_elem = NULL;

    for (idx = malloc_elem_free_list_index(size);
            idx < RTE_HEAP_NUM_FREELISTS; idx++) {
        for (elem = LIST_FIRST(&heap->free_head[idx]);
                !!elem; elem = LIST_NEXT(elem, free_list)) {
            if (malloc_elem_can_hold(elem, size, align, bound)) {
                if (check_hugepage_sz(flags, elem->ms->hugepage_sz))
                    return elem;
                if (alt_elem == NULL)
                    alt_elem = elem;
            }
        }
    }

    if ((alt_elem != NULL) && (flags & RTE_MEMZONE_SIZE_HINT_ONLY))
        return alt_elem;

    return NULL;
}

その後実際のmalloc_elem_alloc()で割り当てを行います.

struct malloc_elem *
malloc_elem_alloc(struct malloc_elem *elem, size_t size, unsigned align,
        size_t bound)
{
    struct malloc_elem *new_elem = elem_start_pt(elem, size, align, bound);
    const size_t old_elem_size = (uintptr_t)new_elem - (uintptr_t)elem;
    const size_t trailer_size = elem->size - old_elem_size - size -
        MALLOC_ELEM_OVERHEAD;

    elem_free_list_remove(elem);

割り当てはelemの末尾から指定したサイズ分だけ取得して割り当てます. まずelem_start_pt()で新しいオブジェクトの先頭のアドレスを取得します.

    if (trailer_size > MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) {
        /* split it, too much free space after elem */
        struct malloc_elem *new_free_elem =
                RTE_PTR_ADD(new_elem, size + MALLOC_ELEM_OVERHEAD);

        split_elem(elem, new_free_elem);
        malloc_elem_free_list_insert(new_free_elem);
    }

あんまり詳しく説明していませんが,割り当ての際に指定した境界に合うようにアラインメントしているので,その際にnew_elemの後にMIN_DATA_SIZE以上(自分の環境では64byte)の領域ができたらそれをフリーリストに追加しています*1

    if (old_elem_size < MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) {
        /* don't split it, pad the element instead */
        elem->state = ELEM_BUSY;
        elem->pad = old_elem_size;


        /* put a dummy header in padding, to point to real element header */
        if (elem->pad > 0){ /* pad will be at least 64-bytes, as everything
                             * is cache-line aligned */
            new_elem->pad = elem->pad;
            new_elem->state = ELEM_PAD;
            new_elem->size = elem->size - elem->pad;
            set_header(new_elem);
        }

        return new_elem;
    }

分割した際に,空きスペースがあまりにも小さい場合はそれをフリーリストに追加せ ず,ただ単にELEM_BUSYとするようです.

    /* we are going to split the element in two. The original element
     * remains free, and the new element is the one allocated.
     * Re-insert original element, in case its new size makes it
     * belong on a different list.
     */
    split_elem(elem, new_elem);
    new_elem->state = ELEM_BUSY;
    malloc_elem_free_list_insert(elem);

    return new_elem;
}

通常の場合はこのsplit_elem()で領域が分割され,利用しない前半部分はまたフリーリストに追加されます.

static void
split_elem(struct malloc_elem *elem, struct malloc_elem *split_pt)
{
    struct malloc_elem *next_elem = RTE_PTR_ADD(elem, elem->size);
    const size_t old_elem_size = (uintptr_t)split_pt - (uintptr_t)elem;
    const size_t new_elem_size = elem->size - old_elem_size;

    malloc_elem_init(split_pt, elem->heap, elem->ms, new_elem_size);
    split_pt->prev = elem;
    next_elem->prev = split_pt;
    elem->size = old_elem_size;
    set_trailer(elem);
}

図にすると以下のような感じだと思います.

f:id:mm_i:20170524154902p:plain

malloc_heap_allog()では最後(void *)(&elem[1])を返します.つまり,返されたアドレスはmalloc_elemのヘッダ部分を除いたアドレスになります.

memzoneの確保

メモリ領域を名前をつけて管理する場合はmemzoneを利用します. memzoneの確保はrte_memzone_reserve()からおこないます.

rte_memzone_reserve()を呼ぶと,最終的にmemzone_reserve_aligned_thread_unsafe()が呼ばれます. memzone_reserve_aligned_thread_unsafe()はそこそこ長いですが,主要部のみ抜き 出すと以下のようになります.

static const struct rte_memzone *
memzone_reserve_aligned_thread_unsafe(const char *name, size_t len,
        int socket_id, unsigned flags, unsigned align, unsigned bound)
{
    struct rte_memzone *mz;
    struct rte_mem_config *mcfg;
    size_t requested_len;
    int socket, i;

    ...

    /* get pointer to global configuration */
    mcfg = rte_eal_get_configuration()->mem_config;

    ...
    /* allocate memory on heap */
    void *mz_addr = malloc_heap_alloc(&mcfg->malloc_heaps[socket], NULL,
            requested_len, flags, align, bound);
    ...

    const struct malloc_elem *elem = malloc_elem_from_data(mz_addr);

    /* fill the zone in config */
    mz = get_next_free_memzone();
    ...

    mcfg->memzone_cnt++;
    snprintf(mz->name, sizeof(mz->name), "%s", name);
    mz->phys_addr = rte_malloc_virt2phy(mz_addr);
    mz->addr = mz_addr;
    mz->len = (requested_len == 0 ? elem->size : requested_len);
    mz->hugepage_sz = elem->ms->hugepage_sz;
    mz->socket_id = elem->ms->socket_id;
    mz->flags = 0;
    mz->memseg_id = elem->ms - rte_eal_get_configuration()->mem_config->memseg;

    return mz;
}

malloc_heap_alloc()からメモリを確保したあと,その情報をmemconfig.memzone[]に登録しています.

mempool

mempoolは固定長サイズのオブジェクトのアロケータです. フリーのオブジェクトはring構造で保持されます. mempoolでは各コアが占有して利用できるキャッシュを提供する機能などもあります. mempoolを作成する際,確保したいオブジェクトのサイズ及びその個数を指定しますが, 内部的にはそのオブジェクトが割り当てられるだけのmemzoneを取得します.

struct rte_mempool {
    /*
     * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
     * compatibility requirements, it could be changed to
     * RTE_MEMPOOL_NAMESIZE next time the ABI changes
     */
    char name[RTE_MEMZONE_NAMESIZE]; /**< Name of mempool. */
    RTE_STD_C11
    union {
        void *pool_data;         /**< Ring or pool to store objects. */
        uint64_t pool_id;        /**< External mempool identifier. */
    };
    void *pool_config;               /**< optional args for ops alloc. */
    const struct rte_memzone *mz;    /**< Memzone where pool is alloc'd. */
    int flags;                       /**< Flags of the mempool. */
    int socket_id;                   /**< Socket id passed at create. */
    uint32_t size;                   /**< Max size of the mempool. */
    uint32_t cache_size;
    /**< Size of per-lcore default local cache. */

    uint32_t elt_size;               /**< Size of an element. */
    uint32_t header_size;            /**< Size of header (before elt). */
    uint32_t trailer_size;           /**< Size of trailer (after elt). */

    unsigned private_data_size;      /**< Size of private data. */
    /**
     * Index into rte_mempool_ops_table array of mempool ops
     * structs, which contain callback function pointers.
     * We're using an index here rather than pointers to the callbacks
     * to facilitate any secondary processes that may want to use
     * this mempool.
     */

    int32_t ops_index;

    struct rte_mempool_cache *local_cache; /**< Per-lcore local cache */

    uint32_t populated_size;         /**< Number of populated objects. */
    struct rte_mempool_objhdr_list elt_list; /**< List of objects in pool */
    uint32_t nb_mem_chunks;          /**< Number of memory chunks */
    struct rte_mempool_memhdr_list mem_list; /**< List of memory chunks */

#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
    /** Per-lcore statistics. */
    struct rte_mempool_debug_stats stats[RTE_MAX_LCORE];
#endif
}  __rte_cache_aligned;

実際にmempoolで確保したオブジェクトはelt_listで先頭が示されるSTAILQで保持されます. また,確保した連続したメモリ領域はmem_listで先頭が示されるSTAILQで保持されます.

mempoolの初期化

mempoolの確保は以下のようにしておこなわれます.

  1. mempool構造体自体を格納する領域を一つのmemzoneとして取得
  2. 固定長のオブジェクトをn個格納するための領域を取得. この際領域は複数のmemzoneにまたがることがある.

mempoolの作成はrte_mempool_create()からおこないます. rte_mampool_create()は以下のようになっています.

librte_mempool/rte_mempool.c:

struct rte_mempool *
rte_mempool_create(const char *name, unsigned n, unsigned elt_size,
    unsigned cache_size, unsigned private_data_size,
    rte_mempool_ctor_t *mp_init, void *mp_init_arg,
    rte_mempool_obj_cb_t *obj_init, void *obj_init_arg,
    int socket_id, unsigned flags)
{
    int ret;
    struct rte_mempool *mp;

    mp = rte_mempool_create_empty(name, n, elt_size, cache_size,
        private_data_size, socket_id, flags);
    if (mp == NULL)
        return NULL;

    /*
     * Since we have 4 combinations of the SP/SC/MP/MC examine the flags to
     * set the correct index into the table of ops structs.
     */
    if ((flags & MEMPOOL_F_SP_PUT) && (flags & MEMPOOL_F_SC_GET))
        ret = rte_mempool_set_ops_byname(mp, "ring_sp_sc", NULL);
    else if (flags & MEMPOOL_F_SP_PUT)
        ret = rte_mempool_set_ops_byname(mp, "ring_sp_mc", NULL);
    else if (flags & MEMPOOL_F_SC_GET)
        ret = rte_mempool_set_ops_byname(mp, "ring_mp_sc", NULL);
    else
        ret = rte_mempool_set_ops_byname(mp, "ring_mp_mc", NULL);

    if (ret)
        goto fail;

    /* call the mempool priv initializer */
    if (mp_init)
        mp_init(mp, mp_init_arg);

    if (rte_mempool_populate_default(mp) < 0)
        goto fail;

    /* call the object initializers */
    if (obj_init)
        rte_mempool_obj_iter(mp, obj_init, obj_init_arg);

    return mp;

 fail:
    rte_mempool_free(mp);
    return NULL;
}

nameがmempoolの名前,nが割り当てたいオブジェクトの数,elt_sizeが各オブジェクトのサイズになります. 基本的にはrte_mempool_create_emtpy()で空のmempoolを作成し,その後rte_mempool_populate_default()で実際にオブジェクト分のメモリを確保します. また,rte_mempool_set_ops_byname()のところは,mempoolでオブジェクトを確保するスレッドが一つかどうかと解放するスレッドが一つかどうか(SC: Single Consumer, SP: Single Producer)のフラグによって,mempoolからオブジェクトを取得/解放する関数を切り替えて登録しています.

rte_mempool_create_empty()の主要部は以下のようになっています.

librte_mempool/rte_mempool.c:

/* create an empty mempool */
struct rte_mempool *
rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size,
    unsigned cache_size, unsigned private_data_size,
    int socket_id, unsigned flags)
{
    ...

    mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);

    ...

    mempool_size = MEMPOOL_HEADER_SIZE(mp, cache_size);
    mempool_size += private_data_size;
    mempool_size = RTE_ALIGN_CEIL(mempool_size, RTE_MEMPOOL_ALIGN);

    ret = snprintf(mz_name, sizeof(mz_name), RTE_MEMPOOL_MZ_FORMAT, name);
    if (ret < 0 || ret >= (int)sizeof(mz_name)) {
        rte_errno = ENAMETOOLONG;
        goto exit_unlock;
    }
    mz = rte_memzone_reserve(mz_name, mempool_size, socket_id, mz_flags);
    if (mz == NULL)
        goto exit_unlock;

    /* init the mempool structure */
    mp = mz->addr;

    memset(mp, 0, MEMPOOL_HEADER_SIZE(mp, cache_size));
    ret = snprintf(mp->name, sizeof(mp->name), "%s", name);
    if (ret < 0 || ret >= (int)sizeof(mp->name)) {
        rte_errno = ENAMETOOLONG;
        goto exit_unlock;
    }
    mp->mz = mz;
    mp->size = n;
    mp->flags = flags;
    mp->socket_id = socket_id;
    mp->elt_size = objsz.elt_size;
    mp->header_size = objsz.header_size;
    mp->trailer_size = objsz.trailer_size;
    /* Size of default caches, zero means disabled. */
    mp->cache_size = cache_size;
    mp->private_data_size = private_data_size;
    STAILQ_INIT(&mp->elt_list);
    STAILQ_INIT(&mp->mem_list);

    ...
    return mp;
}

mempool自体もTAILQで管理されています.mempool_listはそのTAILQのheadを指すデータ構造です. rte_mempool_create_empty()では,mempoool構造体自体を格納するためのメモリ領域をrte_memzone_reserve()で確保し,初期化します. この時点ではまだオブジェクトのメモリ領域は確保していません.

実際にオブジェクトのメモリを確保するのはrte_mempool_populate_default()になります.

librte_mempool/rte_mempool.c:

int
rte_mempool_populate_default(struct rte_mempool *mp)
{
    int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
    char mz_name[RTE_MEMZONE_NAMESIZE];
    const struct rte_memzone *mz;
    size_t size, total_elt_sz, align, pg_sz, pg_shift;
    phys_addr_t paddr;
    unsigned mz_id, n;
    int ret;

    /* mempool must not be populated */
    if (mp->nb_mem_chunks != 0){
        RTE_LOG(ERR, MBUF, "a\n");
        return -EEXIST;
    }

    if (rte_xen_dom0_supported()) {
        pg_sz = RTE_PGSIZE_2M;
        pg_shift = rte_bsf32(pg_sz);
        align = pg_sz;

    } else if (rte_eal_has_hugepages()) {
        pg_shift = 0; /* not needed, zone is physically contiguous */
        pg_sz = 0;
        align = RTE_CACHE_LINE_SIZE;
    } else {
        pg_sz = getpagesize();
        pg_shift = rte_bsf32(pg_sz);
        align = pg_sz;
    }

    total_elt_sz = mp->header_size + mp->elt_size + mp->trailer_size;
    for (mz_id = 0, n = mp->size; n > 0; mz_id++, n -= ret) {
        size = rte_mempool_xmem_size(n, total_elt_sz, pg_shift);

        ret = snprintf(mz_name, sizeof(mz_name),
            RTE_MEMPOOL_MZ_FORMAT "_%d", mp->name, mz_id);
        if (ret < 0 || ret >= (int)sizeof(mz_name)) {
            RTE_LOG(ERR, MBUF, "b\n");
            ret = -ENAMETOOLONG;
            goto fail;
        }

        mz = rte_memzone_reserve_aligned(mz_name, size,
            mp->socket_id, mz_flags, align);
        /* not enough memory, retry with the biggest zone we have */
        if (mz == NULL)
            mz = rte_memzone_reserve_aligned(mz_name, 0,
                mp->socket_id, mz_flags, align);
        if (mz == NULL) {
            RTE_LOG(ERR, MBUF, "c\n");
            ret = -rte_errno;
            goto fail;
        }

        if (mp->flags & MEMPOOL_F_NO_PHYS_CONTIG)
            paddr = RTE_BAD_PHYS_ADDR;
        else
            paddr = mz->phys_addr;

        if (rte_eal_has_hugepages() && !rte_xen_dom0_supported())
            ret = rte_mempool_populate_phys(mp, mz->addr,
                paddr, mz->len,
                rte_mempool_memchunk_mz_free,
                (void *)(uintptr_t)mz);
        else
            ret = rte_mempool_populate_virt(mp, mz->addr,
                mz->len, pg_sz,
                rte_mempool_memchunk_mz_free,
                (void *)(uintptr_t)mz);
        if (ret < 0) {
            RTE_LOG(ERR, MBUF, "d\n");
            rte_memzone_free(mz);
            goto fail;
        }
    }

    return mp->size;

 fail:
    rte_mempool_free_memchunks(mp);
    return ret;

重要なのはforループの内部の処理で,rte_memzone_reserve_aligned()を利用して確 保したいメモリ(オブジェクトサイズ ×個数分のメモリ)をmemzoneから確保しようとします. このときもし確保できなかった場合(それだけ連続するメモリがなかった場合)は, memzoneから最も大きな連続する領域を取得します. この後通常はrte_eal_has_hugepages() = 1, rte_xen_dom0_supported() = 0なのでrte_mempool_populate_phys()を呼び,確保したメモリ上でオブジェクトのブロックを作成します. rte_mempool_populate_phys()の戻り値が確保できたオブジェクトの数です. 目標のオブジェクト数だけ確保できるまでこの操作を繰り返します.

rte_mempool_popoulate_phys()の中では,確保したメモリ領域からオブジェクトを切り出し,それをmempool_add_elem()を使ってmempool.elt_listに入れ,また空きオブジェクトを管理するring bufferに入れています.また,確保したメモリ(memzone)をmempool.mem_listに登録します.

int
rte_mempool_populate_phys(struct rte_mempool *mp, char *vaddr,
    phys_addr_t paddr, size_t len, rte_mempool_memchunk_free_cb_t *free_cb,
    void *opaque)
{
    unsigned total_elt_sz;
    unsigned i = 0;
    size_t off;
    struct rte_mempool_memhdr *memhdr;
    int ret;

    /* create the internal ring if not already done */
    if ((mp->flags & MEMPOOL_F_POOL_CREATED) == 0) {
        ret = rte_mempool_ops_alloc(mp);
        if (ret != 0)
            return ret;
        mp->flags |= MEMPOOL_F_POOL_CREATED;
    }

    /* mempool is already populated */
    if (mp->populated_size >= mp->size)
        return -ENOSPC;

    total_elt_sz = mp->header_size + mp->elt_size + mp->trailer_size;

    memhdr = rte_zmalloc("MEMPOOL_MEMHDR", sizeof(*memhdr), 0);
    if (memhdr == NULL)
        return -ENOMEM;

    memhdr->mp = mp;
    memhdr->addr = vaddr;
    memhdr->phys_addr = paddr;
    memhdr->len = len;
    memhdr->free_cb = free_cb;
    memhdr->opaque = opaque;

    if (mp->flags & MEMPOOL_F_NO_CACHE_ALIGN)
        off = RTE_PTR_ALIGN_CEIL(vaddr, 8) - vaddr;
    else
        off = RTE_PTR_ALIGN_CEIL(vaddr, RTE_CACHE_LINE_SIZE) - vaddr;

    while (off + total_elt_sz <= len && mp->populated_size < mp->size) {
        off += mp->header_size;
        if (paddr == RTE_BAD_PHYS_ADDR)
            mempool_add_elem(mp, (char *)vaddr + off,
                RTE_BAD_PHYS_ADDR);
        else
            mempool_add_elem(mp, (char *)vaddr + off, paddr + off);
        off += mp->elt_size + mp->trailer_size;
        i++;
    }

    /* not enough room to store one object */
    if (i == 0)
        return -EINVAL;

    STAILQ_INSERT_TAIL(&mp->mem_list, memhdr, next);
    mp->nb_mem_chunks++;
    return i;
}
static void
mempool_add_elem(struct rte_mempool *mp, void *obj, phys_addr_t physaddr)
{
    struct rte_mempool_objhdr *hdr;
    struct rte_mempool_objtlr *tlr __rte_unused;

    /* set mempool ptr in header */
    hdr = RTE_PTR_SUB(obj, sizeof(*hdr));
    hdr->mp = mp;
    hdr->physaddr = physaddr;
    STAILQ_INSERT_TAIL(&mp->elt_list, hdr, next);
    mp->populated_size++;

#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
    hdr->cookie = RTE_MEMPOOL_HEADER_COOKIE2;
    tlr = __mempool_get_trailer(obj);
    tlr->cookie = RTE_MEMPOOL_TRAILER_COOKIE;
#endif

    /* enqueue in ring */
    rte_mempool_ops_enqueue_bulk(mp, &obj, 1);
}

rte_mempool_ops_enqueue_bulk()では,最終的には次で説明するenqueue()が呼ばれます.

mempoolの操作

rte_create_mempol_empty()のところで説明したとおり,初期化時のフラグによって mempoolからオブジェクトを取得/解放する際の関数を切り替えています. 関数の定義は drivers/mempool/ring/rte_mempool_ring.cに書いてあります.

static const struct rte_mempool_ops ops_mp_mc = {
    .name = "ring_mp_mc",
    .alloc = common_ring_alloc,
    .free = common_ring_free,
    .enqueue = common_ring_mp_enqueue,
    .dequeue = common_ring_mc_dequeue,
    .get_count = common_ring_get_count,
};

static const struct rte_mempool_ops ops_sp_sc = {
    .name = "ring_sp_sc",
    .alloc = common_ring_alloc,
    .free = common_ring_free,
    .enqueue = common_ring_sp_enqueue,
    .dequeue = common_ring_sc_dequeue,
    .get_count = common_ring_get_count,
};

static const struct rte_mempool_ops ops_mp_sc = {
    .name = "ring_mp_sc",
    .alloc = common_ring_alloc,
    .free = common_ring_free,
    .enqueue = common_ring_mp_enqueue,
    .dequeue = common_ring_sc_dequeue,
    .get_count = common_ring_get_count,
};


static const struct rte_mempool_ops ops_sp_mc = {
    .name = "ring_sp_mc",
    .alloc = common_ring_alloc,
    .free = common_ring_free,
    .enqueue = common_ring_sp_enqueue,
    .dequeue = common_ring_mc_dequeue,
    .get_count = common_ring_get_count,
};

MEMPOOL_REGISTER_OPS(ops_mp_mc);
MEMPOOL_REGISTER_OPS(ops_sp_sc);
MEMPOOL_REGISTER_OPS(ops_mp_sc);
MEMPOOL_REGISTER_OPS(ops_sp_mc);

singleかmultiかによって,rte_ringを操作する関数を切り替えています.

基本的にはrte_mempool_get()でmempoolから空きオブジェクトを取得し,rte_mempool_put()でオブジェクトを再びring bufferに追加します.

DPDKでパケットを格納するために利用するmbufは内部でmempolを利用しています. mbufに関してはまた後で書こうと思います.

*1:たぶん

DPDKにおけるhugepageの初期化部分

DPDKはhugepageをデフォルトで利用しています.ここではDPDKにおけるhugepageの初期化部分についてまとめようと思います. OSはLinux , DPDKのバージョンはv17.05-rc2です.

初期化のおおまかな流れ

librte_eal/linuxapp/eal/eal.c:rte_eal_init()がDPDKの基本的な初期化処理をおこないますが,この中でhugepageに関してはまず librte_eal/linuxap/eal/eal_hugepages_info.c:eal_hugepage_info_init() を呼びます. ここでは /sys/kernel/mm/hugepages/ の中のファイルからhugepageの基礎情報を読み取ります.その後 librte_eal/common/eal_common_memory.c:rte_eal_memory_init()を呼びますが,この中でlibrte_eal/linuxapp/eal/eal_memory.c:rte_eal_hugepage_init()が呼ばれます. このrte_eal_hugepage_init() でhugepage領域の確保をおこないます.

rte_eal_hugepage_init()

rte_eal_hugepage_init()の最初に何をするか書いてあります.

/*
 * Prepare physical memory mapping: fill configuration structure with
 * these infos, return 0 on success.
 *  1. map N huge pages in separate files in hugetlbfs
 *  2. find associated physical addr
 *  3. find associated NUMA socket ID
 *  4. sort all huge pages by physical address
 *  5. remap these N huge pages in the correct order
 *  6. unmap the first mapping
 *  7. fill memsegs in configuration with contiguous zones
 */

このコメントの通りですが,要するに

  • 最初にhugetlb上にhugepageと同じサイズのN個のファイルを作成し,それをメモリ上にマップする
  • マップしたhugetlbファイルの物理アドレスを求める
  • 物理アドレスが連続のものを仮想アドレスが連続になるように再マッピングする

ということをおこないます.

rte_eal_hugepage_init()はエラーハンドリングであったり複数アーキテクチャ対応のため若干長いですが,主要部分だけ抜粋すると以下のようになります.

http://dpdk.org/browse/dpdk/tree/lib/librte_eal/linuxapp/eal/eal_memory.c?h=v17.05-rc2#n964

    ....
    pages_new = map_all_hugepages(&tmp_hp[hp_offset], hpi, 1);
    ...
    find_physaddrs(&tmp_hp[hp_offset], hpi);
    ...
    qsort(&tmp_hp[hp_offset], hpi->num_pages[0],
          sizeof(struct hugepage_file), cmp_physaddr);
    ...
    map_all_hugepages(&tmp_hp[hp_offset], hpi, 0);
    ...

ここで,

をしています.

まず,map_all_hugepages()は以下のようになっています.

static unsigned
map_all_hugepages(struct hugepage_file *hugepg_tbl,
        struct hugepage_info *hpi, int orig)
{
    int fd;
    unsigned i;
    void *virtaddr;
    void *vma_addr = NULL;
    size_t vma_len = 0;

    for (i = 0; i < hpi->num_pages[0]; i++) {
        uint64_t hugepage_sz = hpi->hugepage_sz;
        ...

        hugepg_tbl[i].file_id = i;
        hugepg_tbl[i].size = hugepage_sz;
        eal_get_hugefile_path(hugepg_tbl[i].filepath,
                sizeof(hugepg_tbl[i].filepath), hpi->hugedir,
                hugepg_tbl[i].file_id);
        hugepg_tbl[i].filepath[sizeof(hugepg_tbl[i].filepath) - 1] = '\0';
        ...

        fd = open(hugepg_tbl[i].filepath, O_CREAT | O_RDWR, 0600);
        ...

        virtaddr = mmap(vma_addr, hugepage_sz, PROT_READ | PROT_WRITE,
                MAP_SHARED | MAP_POPULATE, fd, 0);
        hugepg_tbl[i].orig_va = virtaddr;
        ...

        *(int *)virtaddr = 0;
        ...
        flock(fd, LOCK_SH | LOCK_NB);
            RTE_LOG(DEBUG, EAL, "%s(): Locking file failed:%s \n",
        ...
        vma_addr = (char *)vma_addr + hugepage_sz;
        vma_len -= hugepage_sz;
    }

    return i;
}

最初は orig=1 なので orig=1 の処理のみ抜粋しています(エラーハンドリングは省略). hpi->num_pages[0]sys/kernel/mm/hugepages/hugepages-xxx/nr_hugepagesの値が入っており,その数だけhugetlb上にファイルを作成し,mmap()マッピング,仮想アドレスを hugepg_tbl[i].orig_va に格納します.

その後,rte_eal_hugepage_init() では find_physaddrs() で仮想アドレスから物理アドレスをもとめ,さらにそれを物理アドレスでソートします.仮想アドレスから物理アドレスを求める方法は,前のエントリに書いたとおりです.

物理アドレスを求めソートが終わったら,もう一度 map_allhugepaegs()を呼びます.ただし,今回はorig=0です. orig=0の場合の処理を抜粋すると以下のようになります.

http://dpdk.org/browse/dpdk/tree/lib/librte_eal/linuxapp/eal/eal_memory.c?h=v17.05-rc2#n393

static unsigned
map_all_hugepages(struct hugepage_file *hugepg_tbl,
        struct hugepage_info *hpi, int orig)
{
    int fd;
    unsigned i;
    void *virtaddr;
    void *vma_addr = NULL;
    size_t vma_len = 0;

    for (i = 0; i < hpi->num_pages[0]; i++) {
        uint64_t hugepage_sz = hpi->hugepage_sz;

        if (vma_len == 0) {
             unsigned j, num_pages;

             /* reserve a virtual area for next contiguous
              * physical block: count the number of
              * contiguous physical pages. */
             for (j = i+1; j < hpi->num_pages[0] ; j++) {
                if (hugepg_tbl[j].physaddr !=
                     hugepg_tbl[j-1].physaddr + hugepage_sz)
                     break;
             }

            num_pages = j - i;
            vma_len = num_pages * hugepage_sz;

            /* get the biggest virtual memory area up to
             * vma_len. If it fails, vma_addr is NULL, so
             * let the kernel provide the address. */
            vma_addr = get_virtual_area(&vma_len, hpi->hugepage_sz);
            if (vma_addr == NULL)
            vma_len = hugepage_sz;
        }
        ...
        fd = open(hugepg_tbl[i].filepath, O_CREAT | O_RDWR, 0600);
        ...
        virtaddr = mmap(vma_addr, hugepage_sz, PROT_READ | PROT_WRITE,
                MAP_SHARED | MAP_POPULATE, fd, 0);
        ...
        hugepg_tbl[i].final_va = virtaddr;
        ...
        flock(fd, LOCK_SH | LOCK_NB) == -1)
        ...
        close(fd);
        ...

        vma_addr = (char *)vma_addr + hugepage_sz;
        vma_len -= hugepage_sz;
    }

    return i;
}

ループの中で,まず最初は vma_len == 0 の条件が成立し,if文の中が実行されます.ここで連続した物理アドレスの長さがvma_lenに設定されます.その後get_virtual_area()を使いvma_lenの長さ以上の使用可能な仮想アドレス空間を求め,仮想アドレスが正しく求められたらそれを hugepg_tbl[i].final_va に設定しています.

get_virtual_area()は以下のようになっています.

http://dpdk.org/browse/dpdk/tree/lib/librte_eal/linuxapp/eal/eal_memory.c?h=v17.05-rc2#n312

static void *
get_virtual_area(size_t *size, size_t hugepage_sz)
{
    ...
    fd = open("/dev/zero", O_RDONLY);
    ...
    addr = mmap(addr, (*size) + hugepage_sz, PROT_READ, MAP_PRIVATE, fd, 0);
    ...
    aligned_addr = (long)addr;
    aligned_addr += (hugepage_sz - 1);
    aligned_addr &= (~(hugepage_sz - 1));
    addr = (void *)(aligned_addr);
    ...
    return addr;
}

やっていることは単純で, 求めたいサイズ(をhugepage境界にアラインしたもの)だけのアドレス空間mmap()で取得し,成功したならそのアドレスを返します.

rte_eal_hugepage_init()では最終的に rte_mem_config構造体の mcfgにメモリ情報を格納しています.memseg[j]が一つの連続した領域を表します.

    mcfg->memseg[j].phys_addr = hugepage[i].physaddr;
    mcfg->memseg[j].addr = hugepage[i].final_va;
    mcfg->memseg[j].len = hugepage[i].size;
    mcfg->memseg[j].socket_id = hugepage[i].socket_id;
    mcfg->memseg[j].hugepage_sz = hugepage[i].size;

またrte_eal_hugepage_init()ではこの他にNUMA環境の場合の初期化処理等もおこなっています.

全体的に,少々泥臭いことをしているというか,このあたりカーネル側でもう少しサポートがあれば綺麗にかけそうです(まぁ普通はユーザ側で物理アドレスが必要なことなんてないわけですが).

hugepageを使わない場合

DPDK(というかEAL)では,--no-hugeというオプションを渡すと,内部のinternal_config.no_hugetlbfsが1になります. rte_eal_hugepage_init()では一番最初にこのオプションをチェックしていてhugetableが無効の場合は以下の処理をおこないます.

if (internal_config.no_hugetlbfs) {
    addr = mmap(NULL, internal_config.memory, PROT_READ | PROT_WRITE,
                MAP_PRIVATE | MAP_ANONYMOUS, 0, 0);
    ...
    mcfg->memseg[0].phys_addr = (phys_addr_t)(uintptr_t)addr;
    mcfg->memseg[0].addr = addr;
    mcfg->memseg[0].hugepage_sz = RTE_PGSIZE_4K;
    mcfg->memseg[0].len = internal_config.memory;
    mcfg->memseg[0].socket_id = 0;
    return 0;
}

hugepageを使わない代わりにmmap()で一括でメモリを取得し,それをmemseg[0]に設定します.このときmemseg[0].phy_addr物理アドレスは格納していません (そもそも仮にphys_addrを求めて格納したとしても後続の仮想アドレス空間に対応する物理アドレスが連続している保証はどこにもありません). ということで,--no-hugeオプションを渡した場合はDPDKが正常に利用できないような気がします.少なくとも,自分の環境(X540+VFIO)では動きませんでした.

2017-05-19 追記 改めてソースを眺めていたらmempoolの初期化部分でhugepageでない場合はrte_mempool_popoiulate_virt()を使ってなるべく連続した領域を割り当てようとしていることに気づきました. http://dpdk.org/browse/dpdk/tree/lib/librte_mempool/rte_mempool.c?h=v17.05#n451 ということでhugepageなしでもうまくいけば環境によっては動作するのかもしれません.