rustのGUIライブラリconrodの使い方

所用でrustのGUIライブラリについて調べる機会がありました.特にこれといったGUIライブラリはまだないような気がします.

普段自分がネイティブなGUIアプリケーションを作成する場合はQtを使います.rustからQtを呼び出すのはいくつか試みがあって,disassemblerのpanopticonGUIとしてQtを使ってますし,rustからQtを呼び出すためのcpp_to_rustというプロジェクトもあります.ただどうもまだいろいろと開発中のようです.

今回はconrodというGUIライブラリを使ってみたので,簡単にそれの使い方について書こうと思います.

conrod

conrodは純rust製のGUIライブラリです.conrodが担当するのはウィジェッ ト(ボタンやテキストボックスなど)の構成の管理やウィジェットに対するイベントの伝播などです.pistonというゲームエンジンを開発しているところが作っているようです.実際の描画やOSからのイベントの受け取りは何か別のライブラリが担当することになります.conrod自体に描画用としてglium (OpenGL), イベント管理用としてwinitのバックエンドが用意されているので,普通はそれを使うことになると思います*1.自分はmacでしか試していませんが,マルチプラットフォームで動作するはずです.ちなみに,まだバージョン1.0にもなってないので仕様が大きく変わる可能性は十分あると思います.

conrodのチュートリアルはまだ未完成のようですが,rustdocは比較的丁寧に書いてあると思います.また,実際にプログラムを作成する場合は,リポジトリにある

あたりを参考にするのが良いかと思います.

exampleはリポジトリをクローンして

$ cargo run --release --features "winit glium" --example hello_world

とかやれば動きます.

簡単な例

ものすごく簡単な例として,以下のようにフィボナッチ数を計算するアプリケーションを作成しようと思います.

f:id:mm_i:20170709234547p:plain

ソース全体はここにあります: https://github.com/mmisono/conrod-examples/tree/master/fibonacci

これから説明するコードの大部分(初期化部分とイベントループ部分)はexamplesのものとほぼ同じです.

Cargo.toml

前述した通り,描画にglium, イベント取得にwinitを使うので,Cargo.tomlには以下のように書いてあげます.また,フォントを登録する際にフォントが格納されているフォルダを探す必要があるので,find_folderもdependenciesに追加しています.

[dependencies.conrod]
version = "0.53.0"
features = ["glium", "winit"]

[dependencies]
find_folder = "*"

初期化

conrodを使うための初期化のコードは以下のようになります.WindowやUiといったコンポーネントを作成したり,必要なフォントや画像を読み込んだりします.

    const TITLE: &'static str = "Fibonacci";
    let width = 300;
    let height = 100;

    // Build the window.
    let display = glium::glutin::WindowBuilder::new()
        .with_vsync()
        .with_dimensions(width, height)
        .with_title(TITLE)
        .with_multisampling(4)
        .build_glium()
        .unwrap();

    // construct our `Ui`.
    let mut ui = conrod::UiBuilder::new([width as f64, height as f64]).build();

    // Add a `Font` to the `Ui`'s `font::Map` from file.
    let assets = find_folder::Search::KidsThenParents(3, 5)
        .for_folder("assets")
        .unwrap();
    let font_path = assets.join("fonts/NotoSans/NotoSans-Regular.ttf");
    ui.fonts.insert_from_file(font_path).unwrap();

    // Generate the widget identifiers.
    let ids = &mut Ids::new(ui.widget_id_generator());

    // A type used for converting `conrod::render::Primitives` into `Command`s that can be used
    // for drawing to the glium `Surface`.
    let mut renderer = conrod::backend::glium::Renderer::new(&display).unwrap();

    // The image map describing each of our widget->image mappings (in our case, none).
    let image_map = conrod::image::Map::<glium::texture::Texture2d>::new();

    let mut text = "0".to_string();
    let mut answer = "0".to_string();

ここで,widget identifierを生成していますが,これはウィジェット管理に必要なIDで,ウィジェットごとにIDが必要です.Idsは以下のようにマクロで生成します.

widget_ids!(
    struct Ids {
        canvas,
        title,
        text_box,
        button,
        result,
    });

イベントループ

conrodではイベントループベースで処理をおこないます.ループ処理は以下のようになります.

    let mut event_loop = EventLoop::new();
    'main: loop {
        // Handle all events.
        for event in event_loop.next(&display) {
            // Use the `winit` backend feature to convert the winit event to a conrod one.
            if let Some(event) = conrod::backend::winit::convert(event.clone(), &display) {
                ui.handle_event(event);
                event_loop.needs_update();
            }

            match event {
                // Break from the loop upon `Escape`.
                glium::glutin::Event::KeyboardInput(
                    _,
                    _,
                    Some(glium::glutin::VirtualKeyCode::Escape),
                ) |
                glium::glutin::Event::Closed => break 'main,
                _ => {}
            }
        }

        set_widgets(ui.set_widgets(), ids, &mut text, &mut answer);

        // Render the `Ui` and then display it on the screen.
        if let Some(primitives) = ui.draw_if_changed() {
            renderer.fill(&display, primitives, &image_map);
            let mut target = display.draw();
            target.clear_color(0.0, 0.0, 0.0, 1.0);
            renderer.draw(&display, &mut target, &image_map).unwrap();
            target.finish().unwrap();
        }
    }

match eventの部分でイベントを補足し,必要な処理をします.通常ここで処理するイベントは終了処理といった大域的なイベントのみで,それ以外の処理(ウィジェットに対するマウスイベントとか)は後でおこないます.その後,set_widgets()という関数でGUIを設定し,必要なら描画をおこないます.

イベントループの本体は以下のようになっています.基本的にはウィンドウが受け取ったイベントを取得してそれを返すだけですが,60fpsを超えないように適当にスリープします.

struct EventLoop {
    ui_needs_update: bool,
    last_update: std::time::Instant,
}

impl EventLoop {
    pub fn new() -> Self {
        EventLoop {
            last_update: std::time::Instant::now(),
            ui_needs_update: true,
        }
    }

    /// Produce an iterator yielding all available events.
    pub fn next(&mut self, display: &glium::Display) -> Vec<glium::glutin::Event> {
        // We don't want to loop any faster than 60 FPS, so wait until it has been at least 16ms
        // since the last yield.
        let last_update = self.last_update;
        let sixteen_ms = std::time::Duration::from_millis(16);
        let duration_since_last_update = std::time::Instant::now().duration_since(last_update);
        if duration_since_last_update < sixteen_ms {
            std::thread::sleep(sixteen_ms - duration_since_last_update);
        }

        // Collect all pending events.
        let mut events = Vec::new();
        events.extend(display.poll_events());

        // If there are no events and the `Ui` does not need updating, wait for the next event.
        if events.is_empty() && !self.ui_needs_update {
            events.extend(display.wait_events().next());
        }

        self.ui_needs_update = false;
        self.last_update = std::time::Instant::now();

        events
    }

    /// Notifies the event loop that the `Ui` requires another update whether or not there are any
    /// pending events.
    ///
    /// This is primarily used on the occasion that some part of the `Ui` is still animating and
    /// requires further updates to do so.
    pub fn needs_update(&mut self) {
        self.ui_needs_update = true;
    }
}

GUI設定の本体

set_widgets()関数が実際にGUIの設定をする箇所です.conrodでアプリケーションを作成する場合は基本的にこのset_widgets()の部分を適当にカスタマイズすることになると思います.

conrodではボタンやテキストボックスなどのウィジェットを内部で木構造として管理しています.wiget::Text::New("aaa").set(ids.title, ui) のようにすると,指定したIDでそのウィジェットが登録されます.set()をしたとき,指定したidのウィジェットに対してもし何かイベントが発生していたらそれが返ってきます.若干分かりにくいと思うのでコードを見てもらった方が早いと思います.

fn set_widgets(ref mut ui: conrod::UiCell, ids: &mut Ids, text: &mut String, answer: &mut String) {
    widget::Canvas::new()
        .pad(0.0)
        .color(conrod::color::rgb(0.2, 0.35, 0.45))
        .set(ids.canvas, ui);

    let canvas_wh = ui.wh_of(ids.canvas).unwrap();

    // title
    widget::Text::new("Fibonacci Calculuator")
        .mid_top_with_margin_on(ids.canvas, 5.0)
        .font_size(20)
        .color(color::WHITE)
        .set(ids.title, ui);

    // textbox
    for event in widget::TextBox::new(text)
        .font_size(15)
        .w_h((canvas_wh[0] - 90.) / 2., 30.0)
        .mid_left_with_margin_on(ids.canvas, 30.0)
        .border(2.0)
        .border_color(color::BLUE)
        .color(color::WHITE)
        .set(ids.text_box, ui)
    {
        match event {
            widget::text_box::Event::Enter => println!("TextBox {:?}", text),
            widget::text_box::Event::Update(string) => *text = string,
        }
    }

    // button
    if widget::Button::new()
        .w_h((canvas_wh[0] - 90.) / 2., 30.0)
        .right_from(ids.text_box, 30.0)
        .rgb(0.4, 0.75, 0.6)
        .border(2.0)
        .label("calc!")
        .set(ids.button, ui)
        .was_clicked()
    {
        if let Ok(num) = text.parse::<u64>() {
            *answer = fib(num).to_string();
        } else {
            println!("invalid number");
        }
    }

    // result
    widget::Text::new(answer)
        .mid_bottom_with_margin_on(ids.canvas, 10.0)
        .font_size(20)
        .color(color::WHITE)
        .set(ids.result, ui);
}

例えば,TextBoxにユーザが何か入力すると,それがset()したときに返ってくるので,入力した内容を変数textに保存しています.また,Buttonの場合はwas_clicked()を呼ぶとボタンが押されたかどうか分かるので,ボタンが押された場合は計算を実行します.

ウィジェットに位置の指定ですが,各ウィジェットPositonableというトレイトを実装していて,これを利用することになります.

例えばタイトルテキストは .mid_top_with_margin_on(ids.canvas, 5.0)としていますが,こうするとids.canvasで指定したウィジェットの中央上に指定したマージン幅を開けてウィジェットを配置することになります.また,.down()を使えば直前にset()したウィジェットの下に指定したマージンだけ空けてウィジェットが配置されます.他にもいろいろ配置用の関数が用意されています.個人的には一番上に適当なウィジェットを配置したあと,.down().align_middle_x()を使ってどんどん下にウィジェットを配置していくのが楽なんじゃないかなと思います.

三目並べ

conrodの特徴はidに紐づけてウィジェットを登録し,そのidを利用してウィジェットが配置できるというところだと思います.こうすることであまり深くレイアウトを考えなくても簡単にウィジェットが配置できます.一方で,各ウィジェットに対して一意なIDが必要なので,ウィジェットが多い場合はその管理が面倒になることもあります.

例として,以下のような三目並べのアプリケーションを考えます.

f:id:mm_i:20170709234602p:plain

(ソース: https://github.com/mmisono/conrod-examples/tree/master/tic_tac_toe)

conrodでは直線や円も一つのウィジェットです.したがって,それぞれに対してIDを振って管理する必要があります.IDは以下のようにして配列で持つことができます.

widget_ids!(
    struct Ids {
        canvas,
        grids[],
        circles[],
        xs[],
    });
let ids = &mut Ids::new(ui.widget_id_generator());
ids.grids
    .resize((rows - 1) * (cols - 1), &mut ui.widget_id_generator());
ids.circles
    .resize(rows * cols, &mut ui.widget_id_generator());
ids.xs
    .resize(rows * cols * 2, &mut ui.widget_id_generator());

丸とバツを描画するために,ここではあらかじめ各セルに対応するIDを生成させています(バツは直線二本なので2倍のIDを用意しています).

グリッドの描画

グリッドの描画は以下のようにしておこないます.注意点としては座標はデカルト座標(中央が(0,0))です.

fn set_widgets(
    ref mut ui: conrod::UiCell,
    ids: &mut Ids,
    board: &mut [&mut [BoardState]],
    turn: &mut Turn,
) {
    widget::Canvas::new()
        .pad(0.0)
        .color(color::WHITE)
        .set(ids.canvas, ui);

    let rows = board.len();
    let cols = board[0].len();
    let canvas_wh = ui.wh_of(ids.canvas).unwrap();
    let tl = [-canvas_wh[0] / 2., canvas_wh[1] / 2.];
    let sw = canvas_wh[0] / (cols as f64);
    let sh = canvas_wh[1] / (rows as f64);

    // draw grid line
    for x in 1..cols {
        widget::Line::abs(
            [tl[0] + (x as f64) * sw, tl[1]],
            [tl[0] + (x as f64) * sw, tl[1] - canvas_wh[1] + 1.],
        ).color(color::BLACK)
            .set(ids.grids[x - 1], ui);
    }
    for y in 1..rows {
        widget::Line::abs(
            [tl[0], tl[1] - (y as f64) * sh],
            [tl[0] + canvas_wh[1] - 1., tl[1] - (y as f64) * sh],
        ).color(color::BLACK)
            .set(ids.grids[y - 1 + cols - 1], ui);
    }

マウスイベントの検知

マウスイベントの検知は,ui.widget_input().mouse()を使います.

    if let Some(mouse) = ui.widget_input(ids.canvas).mouse() {
        if mouse.buttons.left().is_down() {
            let mouse_abs_xy = mouse.abs_xy();
            let x = ((mouse_abs_xy[0] + (canvas_wh[0] / 2.)) / (canvas_wh[0] / (cols as f64))) as
                usize;
            let y = ((canvas_wh[1] / 2. - mouse_abs_xy[1]) / (canvas_wh[1] / (rows as f64))) as
                usize;
            // println!("{:?}, {}, {}", mouse_abs_xy, x, y);

            // when resizing, x can be greater than cols (so do y)
            if x < cols && y < rows {
                if board[y][x] == BoardState::Empty {
                    match *turn {
                        Turn::White => {
                            board[y][x] = BoardState::Circle;
                            *turn = Turn::Black;
                        }
                        Turn::Black => {
                            board[y][x] = BoardState::X;
                            *turn = Turn::White;
                        }
                    }
                }
            }
        }
    }

盤面の描画

盤面の描画をする際は,IDが他のウィジェットと被らないように注意しながらウィジェットを作成します.うっかりIDが被ると上書きされてしまって古いウィジェットが消えてしまいます.

    // draw circle or x
    for y in 0..rows {
        for x in 0..cols {
            match board[y][x] {
                BoardState::Circle => {
                    widget::Circle::outline_styled(
                        sw / 3.,
                        widget::line::Style::new().thickness(2.),
                    ).x(tl[0] + sw * (x as f64) + sw / 2.)
                        .y(tl[1] - sh * (y as f64) - sh / 2.)
                        .color(color::RED)
                        .set(ids.circles[y * cols + x], ui);
                }
                BoardState::X => {
                    widget::Line::abs(
                        [
                            tl[0] + sw * (x as f64) + sw / 5.,
                            tl[1] - sh * (y as f64) - sh / 5.,
                        ],
                        [
                            tl[0] + sw * ((x + 1) as f64) - sw / 5.,
                            tl[1] - sh * ((y + 1) as f64) + sh / 5.,
                        ],
                    ).color(color::BLACK)
                        .thickness(2.)
                        .set(ids.xs[y * cols + x], ui);

                    widget::Line::abs(
                        [
                            tl[0] + sw * ((x + 1) as f64) - sw / 5.,
                            tl[1] - sh * (y as f64) - sh / 5.,
                        ],
                        [
                            tl[0] + sw * (x as f64) + sw / 5.,
                            tl[1] - sh * ((y + 1) as f64) + sh / 5.,
                        ],
                    ).color(color::BLACK)
                        .thickness(2.)
                        .set(ids.xs[y * cols + x + rows * cols], ui);
                }
                _ => {}
            }
        }
    }
}

少々面倒ですね.まぁ,そもそもこういうことをしたいのなら,使ったことないですがゲームエンジンであるpistonとかを使うべきなんだと思います.

まとめ

ということでconrodの基礎的な使い方について書きました.メニューやダイアログといった機能はまだないようですし,ウィジェット自体の数も決して多くはないので複雑なGUIアプリケーションを作成するのはちょっと大変かなという気がしますが,簡単で静的なものを作るにはいいかもしれません.

*1:gliumはoriginal authorがもう積極的にサポートしないことを表明していますが,コミュニティによってメンテナンスされていくようです(https://github.com/PistonDevelopers/conrod/issues/989).今original authorはvulkanのラッパーを作ってるので将来的にはそれに移行していくのかもしれません

DPDKのmbufの構造

DPDKではパケットをmbufというデータ構造で管理しています. mbufについては http://dpdk.org/doc/guides/prog_guide/mbuf_lib.html にまとまっていますが,ここでは具体的なmbufの構成や使われ方についてみていきたいと思います.

mempoolのおさらい

mbufはmempoolを使用しているので,mbufの説明の前に簡単にmempoolについて説明します.

前回説明したとおり,mempoolは以下の特徴を持っています.

  • 固定長サイズのメモリアロケータ
  • 内部で空きオブジェクトをリングバッファとして管理
  • コアごとのキャッシュを持つことができる

mempoolの概要を図にすると,以下のようになります.

f:id:mm_i:20170529110320p:plain

rte_create_mempool()でmempoolを作成しますが,ここで確保したオブジェクトはTAILQであるmp->elt_listに追加されます.またそれと同時にオブジェクトはmp->pool_dataで示されるrte_ringに追加されます(rte_ringのデータ部の実体はポインタの配列で,オブジェクトを指します). mempoolのキャッシュが有効な場合はmempool構造体の後ろにキャッシュ用のデータ構造用意されます.こちらも実体はポインタの配列で,キャッシュされたオブジェクトを指します.キャッシュされたオブジェクトがring bufferの方に入っていることはありません.

mempoolからのオブジェクトの取得は以下のような流れになります.

  • rte_mempool_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned n)
    • mempool mからn個オブジェクトを取得
    • obj_tableに確保したオブジェクトのポインタが格納される
  • このとき実際にはまずmempoolのキャッシュからオブジェクトを取得しようとする
  • もしキャッシュにオブジェクトがなければ,mempool本体のring bufferからオブジェクトを取得

また,確保したオブジェクトをmempoolに戻す場合は,以下のようにします.

  • rte_mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table, unsigned n)
    • obj_tableのn個のオブジェクトをmempool mpに戻す
  • このとき,まずmpのキャッシュに戻そうとする
  • もしキャッシュが一杯なら,mempool本体のバッファにオブジェクトを戻す

ringの初期化

ringの初期化はrte_mempool_populate_phys()内のrte_mempool_ops_alloc()でおこなわれます.

    /* create the internal ring if not already done */
    if ((mp->flags & MEMPOOL_F_POOL_CREATED) == 0) {
        ret = rte_mempool_ops_alloc(mp);
        if (ret != 0)
            return ret;
        mp->flags |= MEMPOOL_F_POOL_CREATED;
    }

rte_mempool_ops_alloc()は設定によって何が呼ばれるか変わりますが,例えばmulti consumer / multi producer の場合はdrivers/mempool/ring/rte_mempool_ring.ccommon_ring_alloc()が呼ばれます.

static int
common_ring_alloc(struct rte_mempool *mp)
{
    int rg_flags = 0, ret;
    char rg_name[RTE_RING_NAMESIZE];
    struct rte_ring *r;

    ret = snprintf(rg_name, sizeof(rg_name),
        RTE_MEMPOOL_MZ_FORMAT, mp->name);
    if (ret < 0 || ret >= (int)sizeof(rg_name)) {
        rte_errno = ENAMETOOLONG;
        return -rte_errno;
    }

    /* ring flags */
    if (mp->flags & MEMPOOL_F_SP_PUT)
        rg_flags |= RING_F_SP_ENQ;
    if (mp->flags & MEMPOOL_F_SC_GET)
        rg_flags |= RING_F_SC_DEQ;

    /*
     * Allocate the ring that will be used to store objects.
     * Ring functions will return appropriate errors if we are
     * running as a secondary process etc., so no checks made
     * in this function for that condition.
     */
    r = rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
        mp->socket_id, rg_flags);
    if (r == NULL)
        return -rte_errno;

    mp->pool_data = r;

    return 0;
}

mbufの構造

mbufの構造はざっくりと以下のようになります.

f:id:mm_i:20170529110606p:plain

mbufはmempoolを利用しており,一つのmbufが一つのmempoolのオブジェクトになります. mbufの先頭にrte_mbuf構造体が存在し,その後ろに実際のデータ領域が続いています. 後からデータを追加できるように,data_off分の領域が先頭から空いています. rte_pktmbuf_prepend(), rte_pktmbuf_append()を使ってdataの先頭/末尾にデータを追加できます. また,もし受信パケットが分割されている場合はmbuf->nextが次のmbufへを指します. mfbu->nb_segsがパケット全体がいくつのmbufから成るか示す値です.

rte_mbufは以下のようになっています.

struct rte_mbuf {
    MARKER cacheline0;

    void *buf_addr;           /**< Virtual address of segment buffer. */
    /**
     * Physical address of segment buffer.
     * Force alignment to 8-bytes, so as to ensure we have the exact
     * same mbuf cacheline0 layout for 32-bit and 64-bit. This makes
     * working on vector drivers easier.
     */
    phys_addr_t buf_physaddr __rte_aligned(sizeof(phys_addr_t));

    /* next 8 bytes are initialised on RX descriptor rearm */
    MARKER64 rearm_data;
    uint16_t data_off;

    /**
     * Reference counter. Its size should at least equal to the size
     * of port field (16 bits), to support zero-copy broadcast.
     * It should only be accessed using the following functions:
     * rte_mbuf_refcnt_update(), rte_mbuf_refcnt_read(), and
     * rte_mbuf_refcnt_set(). The functionality of these functions (atomic,
     * or non-atomic) is controlled by the CONFIG_RTE_MBUF_REFCNT_ATOMIC
     * config option.
     */
    RTE_STD_C11
    union {
        rte_atomic16_t refcnt_atomic; /**< Atomically accessed refcnt */
        uint16_t refcnt;              /**< Non-atomically accessed refcnt */
    };
    uint16_t nb_segs;         /**< Number of segments. */

    /** Input port (16 bits to support more than 256 virtual ports). */
    uint16_t port;

    uint64_t ol_flags;        /**< Offload features. */

    /* remaining bytes are set on RX when pulling packet from descriptor */
    MARKER rx_descriptor_fields1;

    /*
     * The packet type, which is the combination of outer/inner L2, L3, L4
     * and tunnel types. The packet_type is about data really present in the
     * mbuf. Example: if vlan stripping is enabled, a received vlan packet
     * would have RTE_PTYPE_L2_ETHER and not RTE_PTYPE_L2_VLAN because the
     * vlan is stripped from the data.
     */
    RTE_STD_C11
    union {
        uint32_t packet_type; /**< L2/L3/L4 and tunnel information. */
        struct {
            uint32_t l2_type:4; /**< (Outer) L2 type. */
            uint32_t l3_type:4; /**< (Outer) L3 type. */
            uint32_t l4_type:4; /**< (Outer) L4 type. */
            uint32_t tun_type:4; /**< Tunnel type. */
            uint32_t inner_l2_type:4; /**< Inner L2 type. */
            uint32_t inner_l3_type:4; /**< Inner L3 type. */
            uint32_t inner_l4_type:4; /**< Inner L4 type. */
        };
    };

    uint32_t pkt_len;         /**< Total pkt len: sum of all segments. */
    uint16_t data_len;        /**< Amount of data in segment buffer. */
    /** VLAN TCI (CPU order), valid if PKT_RX_VLAN_STRIPPED is set. */
    uint16_t vlan_tci;

    union {
        uint32_t rss;     /**< RSS hash result if RSS enabled */
        struct {
            RTE_STD_C11
            union {
                struct {
                    uint16_t hash;
                    uint16_t id;
                };
                uint32_t lo;
                /**< Second 4 flexible bytes */
            };
            uint32_t hi;
            /**< First 4 flexible bytes or FD ID, dependent on
                 PKT_RX_FDIR_* flag in ol_flags. */
        } fdir;           /**< Filter identifier if FDIR enabled */
        struct {
            uint32_t lo;
            uint32_t hi;
        } sched;          /**< Hierarchical scheduler */
        uint32_t usr;     /**< User defined tags. See rte_distributor_process() */
    } hash;                   /**< hash information */

    /** Outer VLAN TCI (CPU order), valid if PKT_RX_QINQ_STRIPPED is set. */
    uint16_t vlan_tci_outer;

    uint16_t buf_len;         /**< Length of segment buffer. */

    /** Valid if PKT_RX_TIMESTAMP is set. The unit and time reference
     * are not normalized but are always the same for a given port.
     */
    uint64_t timestamp;

    /* second cache line - fields only used in slow path or on TX */
    MARKER cacheline1 __rte_cache_min_aligned;

    RTE_STD_C11
    union {
        void *userdata;   /**< Can be used for external metadata */
        uint64_t udata64; /**< Allow 8-byte userdata on 32-bit */
    };

    struct rte_mempool *pool; /**< Pool from which mbuf was allocated. */
    struct rte_mbuf *next;    /**< Next segment of scattered packet. */

    /* fields to support TX offloads */
    RTE_STD_C11
    union {
        uint64_t tx_offload;       /**< combined for easy fetch */
        __extension__
        struct {
            uint64_t l2_len:7;
            /**< L2 (MAC) Header Length for non-tunneling pkt.
             * Outer_L4_len + ... + Inner_L2_len for tunneling pkt.
             */
            uint64_t l3_len:9; /**< L3 (IP) Header Length. */
            uint64_t l4_len:8; /**< L4 (TCP/UDP) Header Length. */
            uint64_t tso_segsz:16; /**< TCP TSO segment size */

            /* fields for TX offloading of tunnels */
            uint64_t outer_l3_len:9; /**< Outer L3 (IP) Hdr Length. */
            uint64_t outer_l2_len:7; /**< Outer L2 (MAC) Hdr Length. */

            /* uint64_t unused:8; */
        };
    };

    /** Size of the application private data. In case of an indirect
     * mbuf, it stores the direct mbuf private data size. */
    uint16_t priv_size;

    /** Timesync flags for use with IEEE1588. */
    uint16_t timesync;

    /** Sequence number. See also rte_reorder_insert(). */
    uint32_t seqn;

} __rte_cache_aligned;

refcntがあるのはmbufのデータが他のmbufから参照される場合があるからです(下図).

f:id:mm_i:20170529110553p:plain

rte_pktmbuf_attach()で他のmbufのデータを参照するようにできます.また,ヘルパー関数としてrte_pktmbuf_clone()で指定したmbufのデータを参照するmbufを作成できます. dpdkでは他のmbufのデータを指すmbufをindiret,自分自身のデータを指すmbufをdirectと呼んでいます. indirectなmbufは自身のデータ領域を使わないので,オブジェクトサイズが小さいmempoolからmbufを確保した方がメモリ効率が上がります.

mbufの作成

mbufの作成は,rte_pktmbuf_pool_create()でおこないます. rte_pktmbuf_pool_create()では,rte_mempool_create_empty()および rte_mempool_set_ops_byname(), rte_mempool_populate_default()を利用して mempoolを作成したのち,最後にrte_mempool_obj_iter()を使って 各mempoolのelemに対してrte_pktmbuf_init()を実行します.

struct rte_mempool *
rte_pktmbuf_pool_create(const char *name, unsigned n,
    unsigned cache_size, uint16_t priv_size, uint16_t data_room_size,
    int socket_id)
{
    struct rte_mempool *mp;
    struct rte_pktmbuf_pool_private mbp_priv;
    unsigned elt_size;
    int ret;

    if (RTE_ALIGN(priv_size, RTE_MBUF_PRIV_ALIGN) != priv_size) {
        RTE_LOG(ERR, MBUF, "mbuf priv_size=%u is not aligned\n",
            priv_size);
        rte_errno = EINVAL;
        return NULL;
    }
    elt_size = sizeof(struct rte_mbuf) + (unsigned)priv_size +
        (unsigned)data_room_size;
    mbp_priv.mbuf_data_room_size = data_room_size;
    mbp_priv.mbuf_priv_size = priv_size;


    mp = rte_mempool_create_empty(name, n, elt_size, cache_size,
         sizeof(struct rte_pktmbuf_pool_private), socket_id, 0);
    if (mp == NULL)
        return NULL;

    ret = rte_mempool_set_ops_byname(mp,
        RTE_MBUF_DEFAULT_MEMPOOL_OPS, NULL);
    if (ret != 0) {
        RTE_LOG(ERR, MBUF, "error setting mempool handler\n");
        rte_mempool_free(mp);
        rte_errno = -ret;
        return NULL;
    }
    rte_pktmbuf_pool_init(mp, &mbp_priv);

    ret = rte_mempool_populate_default(mp);
    if (ret < 0) {
        rte_mempool_free(mp);
        rte_errno = -ret;
        return NULL;
    }

    rte_mempool_obj_iter(mp, rte_pktmbuf_init, NULL);

    return mp;
}

rte_pktmbuf_init()ではmempoolの一つのオブジェクトの先頭にrte_mbufの構造体 領域を確保し,それを初期化しています.

void
rte_pktmbuf_init(struct rte_mempool *mp,
         __attribute__((unused)) void *opaque_arg,
         void *_m,
         __attribute__((unused)) unsigned i)
{
    struct rte_mbuf *m = _m;
    uint32_t mbuf_size, buf_len, priv_size;

    priv_size = rte_pktmbuf_priv_size(mp);
    mbuf_size = sizeof(struct rte_mbuf) + priv_size;
    buf_len = rte_pktmbuf_data_room_size(mp);

    RTE_ASSERT(RTE_ALIGN(priv_size, RTE_MBUF_PRIV_ALIGN) == priv_size);
    RTE_ASSERT(mp->elt_size >= mbuf_size);
    RTE_ASSERT(buf_len <= UINT16_MAX);

    memset(m, 0, mp->elt_size);

    /* start of buffer is after mbuf structure and priv data */
    m->priv_size = priv_size;
    m->buf_addr = (char *)m + mbuf_size;
    m->buf_physaddr = rte_mempool_virt2phy(mp, m) + mbuf_size;
    m->buf_len = (uint16_t)buf_len;

    /* keep some headroom between start of buffer and data */
    m->data_off = RTE_MIN(RTE_PKTMBUF_HEADROOM, (uint16_t)m->buf_len);

    /* init some constant fields */
    m->pool = mp;
    m->nb_segs = 1;
    m->port = 0xff;
    rte_mbuf_refcnt_set(m, 1);
    m->next = NULL;
}

mbufの利用: パケットの受信

実際にパケットの受信時にどのようにmbufが利用されるか見ていきたいと思います.

rx/txバッファの設定

dpdkのexampleにあるl2fwdのプログラムでは以下のように初期化をおこなっています.

// mbufの作成
pktmbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", NB_MBUF,
        MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE,
        rte_socket_id());
...

// ポートごとに初期化
for (portid = 0; portid < nb_ports; portid++) {
    ...
    ret = rte_eth_dev_configure(portid, 1, 1, &port_conf);
    ...
    ret = rte_eth_rx_queue_setup(portid, 0, nb_rxd,
                     rte_eth_dev_socket_id(portid),
                     NULL,
                     l2fwd_pktmbuf_pool);
    ...
    ret = rte_eth_tx_queue_setup(portid, 0, nb_txd,
            rte_eth_dev_socket_id(portid),
}

まずrte_pktmbuf_pool_create()でプログラム全体が利用するmbufを確保します. ちなみに,NB_MBUFは8192, RTE_MBUF_DEFAULT_BUF_SIZEは2KB + RTE_PKTMBUF_HEADROOM(自分の環境では128B)です.

その後各NICのポートの設定をおこないますが,特に重要なのがrte_eth_rx_queue_setup()rte_eth_tx_queue_setup()です. ixgbeを使用する場合,rte_eth_rx_queue_setup() から最終的には drivers/net/ixgbe/ixgbe_rxtx.cixgbe_dev_rx_queue_setup()が呼ばれます.

ixgbe_dev_rx_queue_setup()ではrx queue用のデータ構造(ixgbe_rx_queue)やring descriptor用のメモリを確保したりします.

int __attribute__((cold))
ixgbe_dev_rx_queue_setup(struct rte_eth_dev *dev,
             uint16_t queue_idx,
             uint16_t nb_desc,
             unsigned int socket_id,
             const struct rte_eth_rxconf *rx_conf,
             struct rte_mempool *mp)
{
    ...
    /* First allocate the rx queue data structure */
    rxq = rte_zmalloc_socket("ethdev RX queue", sizeof(struct ixgbe_rx_queue),
                 RTE_CACHE_LINE_SIZE, socket_id);
    if (rxq == NULL)
        return -ENOMEM;
    rxq->mb_pool = mp;
    rxq->nb_rx_desc = nb_desc;
    rxq->rx_free_thresh = rx_conf->rx_free_thresh;
    rxq->queue_id = queue_idx;
    rxq->reg_idx = (uint16_t)((RTE_ETH_DEV_SRIOV(dev).active == 0) ?
        queue_idx : RTE_ETH_DEV_SRIOV(dev).def_pool_q_idx + queue_idx);
    rxq->port_id = dev->data->port_id;
    rxq->crc_len = (uint8_t) ((dev->data->dev_conf.rxmode.hw_strip_crc) ?
                            0 : ETHER_CRC_LEN);
    rxq->drop_en = rx_conf->rx_drop_en;
    rxq->rx_deferred_start = rx_conf->rx_deferred_start;

    ...

    /*
     * Allocate RX ring hardware descriptors. A memzone large enough to
     * handle the maximum ring size is allocated in order to allow for
     * resizing in later calls to the queue setup function.
     */
    rz = rte_eth_dma_zone_reserve(dev, "rx_ring", queue_idx,
                      RX_RING_SZ, IXGBE_ALIGN, socket_id);
    ...

    rxq->rx_ring_phys_addr = rte_mem_phy2mch(rz->memseg_id, rz->phys_addr);
    rxq->rx_ring = (union ixgbe_adv_rx_desc *) rz->addr;

    ...

    /*
     * Allocate software ring. Allow for space at the end of the
     * S/W ring to make sure look-ahead logic in bulk alloc Rx burst
     * function does not access an invalid memory region.
     */
    len = nb_desc;
    if (adapter->rx_bulk_alloc_allowed)
        len += RTE_PMD_IXGBE_RX_MAX_BURST;

    rxq->sw_ring = rte_zmalloc_socket("rxq->sw_ring",
                      sizeof(struct ixgbe_rx_entry) * len,
                      RTE_CACHE_LINE_SIZE, socket_id);
    ...

    dev->data->rx_queues[queue_idx] = rxq;

    ixgbe_reset_rx_queue(adapter, rxq);

    return 0;
}

rx ring

ixgbeの場合のrx ringのデータ構造は以下のようになっています.

f:id:mm_i:20170529110825p:plain

struct ixgbe_rx_queue {
    struct rte_mempool  *mb_pool; /**< mbuf pool to populate RX ring. */
    volatile union ixgbe_adv_rx_desc *rx_ring; /**< RX ring virtual address. */
    uint64_t            rx_ring_phys_addr; /**< RX ring DMA address. */
    volatile uint32_t   *rdt_reg_addr; /**< RDT register address. */
    volatile uint32_t   *rdh_reg_addr; /**< RDH register address. */
    struct ixgbe_rx_entry *sw_ring; /**< address of RX software ring. */
    struct ixgbe_scattered_rx_entry *sw_sc_ring; /**< address of scattered Rx software ring. */
    struct rte_mbuf *pkt_first_seg; /**< First segment of current packet. */
    struct rte_mbuf *pkt_last_seg; /**< Last segment of current packet. */
    uint64_t            mbuf_initializer; /**< value to init mbufs */
    uint16_t            nb_rx_desc; /**< number of RX descriptors. */
    uint16_t            rx_tail;  /**< current value of RDT register. */
    uint16_t            nb_rx_hold; /**< number of held free RX desc. */
    uint16_t rx_nb_avail; /**< nr of staged pkts ready to ret to app */
    uint16_t rx_next_avail; /**< idx of next staged pkt to ret to app */
    uint16_t rx_free_trigger; /**< triggers rx buffer allocation */
    uint16_t            rx_using_sse;
    /**< indicates that vector RX is in use */
#ifdef RTE_IXGBE_INC_VECTOR
    uint16_t            rxrearm_nb;     /**< number of remaining to be re-armed */
    uint16_t            rxrearm_start;  /**< the idx we start the re-arming from */
#endif
    uint16_t            rx_free_thresh; /**< max free RX desc to hold. */
    uint16_t            queue_id; /**< RX queue index. */
    uint16_t            reg_idx;  /**< RX queue register index. */
    uint16_t            pkt_type_mask;  /**< Packet type mask for different NICs. */
    uint8_t             port_id;  /**< Device port identifier. */
    uint8_t             crc_len;  /**< 0 if CRC stripped, 4 otherwise. */
    uint8_t             drop_en;  /**< If not 0, set SRRCTL.Drop_En. */
    uint8_t             rx_deferred_start; /**< not in global dev start. */
    /** flags to set in mbuf when a vlan is detected. */
    uint64_t            vlan_flags;
    /** need to alloc dummy mbuf, for wraparound when scanning hw ring */
    struct rte_mbuf fake_mbuf;
    /** hold packets to return to application */
    struct rte_mbuf *rx_stage[RTE_PMD_IXGBE_RX_MAX_BURST*2];
};

rx_ringが実際のixgbeのrx ringを指すポインタです.rx ringのbuffer addressはmbufのデータ部分を指します.sw_ringがrx ringで利用されるmbufを保持しています. rx_ringpkt_addrが対応するsw_ringのmbufのデータ部分を指します. 受信時はrx_tailで示されるインデックスからdescriptorを確認していきます(NICのtailとは異なります).

x540のreceive descriptor

ここで簡単にX540のreceive descriptorについて説明しておきます. 詳しくはデータシートの7.1に書いてあります.

受信する際に利用するring descriptor queueは以下のような構造をしています(データシートより引用).

f:id:mm_i:20170529110750p:plain

キューの各エントリがdescriptorと呼ばれます. ソフトウェア側はdescriptorを設定したのち,tailを進めます.これは実際にはX540のRDTレジスタの値を更新することでおこないます. またNICは内部的に空きdescriptorの先頭位置をheadとして覚えており,パケットを受信したらheadのdescriptorに従ってパケットをメモリにDMAしたあと,headを進めます.

f:id:mm_i:20170529110801p:plainf:id:mm_i:20170529110808p:plain

descriptorのformatは以下のようになっています(descriptorのフォーマットにもいくつかありますが,ここではそのうちのadvance descritorを示しています).

descriptorのpacket buffer addressにパケットをDMAする先のアドレスを指定します. このアドレスはdpdkではmbufのデータのアドレスになります. X540では受信パケットのヘッダとデータを別々の領域へDMAする(header splitting)ことが可能であるためheader buffer addressを指定する場所もありますが,DPDKでは利用していません. ソフトウェアはDDビットが1になっているかどうかを調べることで,パケットが到着しているかどうかを判断することができます. またdescriptor write-back formatのフィールドにはNICが受信したパケットの情報を書き込みます.

パケットの受信

パケットの受信はrte_eth_rx_burst()でおこないます.

nb_rx = rte_eth_rx_burst((uint8_t) portid, 0,
             pkts_burst, MAX_PKT_BURST);

rte_eth_rx_burst()は最大MAX_PKT_BURST個のパケットを受信し,結果をpkts_burstに格納します. NICのオフロード機能を使用するかなどの設定によって,rte_eth_rx_burst()を呼んだときに呼ばれる関数は異なります.ixgbeではixgbe_dev_rx_init()の中で呼ばれるixgbe_set_rx_function()でrecv時の関数を設定しています. ここではrecv時にixgbe_recv_pkts_bulk_alloc()が設定されていると仮定して,その具体的な処理をみてみます.

ixgbe_recv_pkts_bulk_alloc()では,RTE_PMD_IXGBE_RX_MAX_BURST (実際は32)単位でのパケット読み出しをおこない,実際に受信したパケットを返します.

uint16_t
ixgbe_recv_pkts_bulk_alloc(void *rx_queue, struct rte_mbuf **rx_pkts,
               uint16_t nb_pkts)
{
    uint16_t nb_rx;

    if (unlikely(nb_pkts == 0))
        return 0;

    if (likely(nb_pkts <= RTE_PMD_IXGBE_RX_MAX_BURST))
        return rx_recv_pkts(rx_queue, rx_pkts, nb_pkts);

    /* request is relatively large, chunk it up */
    nb_rx = 0;
    while (nb_pkts) {
        uint16_t ret, n;

        n = (uint16_t)RTE_MIN(nb_pkts, RTE_PMD_IXGBE_RX_MAX_BURST);
        ret = rx_recv_pkts(rx_queue, &rx_pkts[nb_rx], n);
        nb_rx = (uint16_t)(nb_rx + ret);
        nb_pkts = (uint16_t)(nb_pkts - ret);
        if (ret < n)
            break;
    }

    return nb_rx;
}

実際の受信処理をおこなっているのはrx_recv_pkts()になります.

static inline uint16_t
rx_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
         uint16_t nb_pkts)
{
    struct ixgbe_rx_queue *rxq = (struct ixgbe_rx_queue *)rx_queue;
    uint16_t nb_rx = 0;

    /* Any previously recv'd pkts will be returned from the Rx stage */
    if (rxq->rx_nb_avail)
        return ixgbe_rx_fill_from_stage(rxq, rx_pkts, nb_pkts);

前回受信したパケットがまだ残っていたらそれを処理します (これはこの関数がnb_pkts単位で受信したパケットを返すからです).

    /* Scan the H/W ring for packets to receive */
    nb_rx = (uint16_t)ixgbe_rx_scan_hw_ring(rxq);

ixgbe_rx_scan_hw_ring()でパケットが到着しているかチェックします. nb_rxに受信したパケット数が入ります.

    /* update internal queue state */
    rxq->rx_next_avail = 0;
    rxq->rx_nb_avail = nb_rx;
    rxq->rx_tail = (uint16_t)(rxq->rx_tail + nb_rx);

rx_tailnb_rxだけ進めます.

    /* if required, allocate new buffers to replenish descriptors */
    if (rxq->rx_tail > rxq->rx_free_trigger) {
        uint16_t cur_free_trigger = rxq->rx_free_trigger;

        if (ixgbe_rx_alloc_bufs(rxq, true) != 0) {
            int i, j;

            PMD_RX_LOG(DEBUG, "RX mbuf alloc failed port_id=%u "
                   "queue_id=%u", (unsigned) rxq->port_id,
                   (unsigned) rxq->queue_id);

            rte_eth_devices[rxq->port_id].data->rx_mbuf_alloc_failed +=
                rxq->rx_free_thresh;

            /*
             * Need to rewind any previous receives if we cannot
             * allocate new buffers to replenish the old ones.
             */
            rxq->rx_nb_avail = 0;
            rxq->rx_tail = (uint16_t)(rxq->rx_tail - nb_rx);
            for (i = 0, j = rxq->rx_tail; i < nb_rx; ++i, ++j)
                rxq->sw_ring[j].mbuf = rxq->rx_stage[i];

            return 0;
        }

        /* update tail pointer */
        rte_wmb();
        IXGBE_PCI_REG_WRITE_RELAXED(rxq->rdt_reg_addr,
                        cur_free_trigger);
    }

rx_tailの値が設定した閾値以上になったら,ixgbe_rx_alloc_bufs()を使って新しいmbufを取得し,それをデスクリプタに設定します.またNICのtail pointerを更新します.

    if (rxq->rx_tail >= rxq->nb_rx_desc)
        rxq->rx_tail = 0;

この関数は一気にパケットを指定した数(8個)読もうとしますが,その際ring bufferの先頭に巻き戻って読むことはしません.もし末尾に到達したらここで先頭にインデックスを戻しています.(したがって,ring descriptorの最大数はこのバースト読み出し分を考慮して余分に確保しておく必要があります)

    /* received any packets this loop? */
    if (rxq->rx_nb_avail)
        return ixgbe_rx_fill_from_stage(rxq, rx_pkts, nb_pkts);

    return 0;
}

受信したパケットがあればixgbe_rx_fill_from_stage()でそのパケットの後処理をして返します.

デスクリプタをチェックしてパケットの受信を判断するixgbe_rx_scan_hw_ring()は以下のようになっています.

static inline int
ixgbe_rx_scan_hw_ring(struct ixgbe_rx_queue *rxq)
{
    volatile union ixgbe_adv_rx_desc *rxdp;
    struct ixgbe_rx_entry *rxep;
    struct rte_mbuf *mb;
    uint16_t pkt_len;
    uint64_t pkt_flags;
    int nb_dd;
    uint32_t s[LOOK_AHEAD];
    uint32_t pkt_info[LOOK_AHEAD];
    int i, j, nb_rx = 0;
    uint32_t status;
    uint64_t vlan_flags = rxq->vlan_flags;

    /* get references to current descriptor and S/W ring entry */
    rxdp = &rxq->rx_ring[rxq->rx_tail];
    rxep = &rxq->sw_ring[rxq->rx_tail];

    status = rxdp->wb.upper.status_error;
    /* check to make sure there is at least 1 packet to receive */
    if (!(status & rte_cpu_to_le_32(IXGBE_RXDADV_STAT_DD)))
        return 0;

    /*
     * Scan LOOK_AHEAD descriptors at a time to determine which descriptors
     * reference packets that are ready to be received.
     */
    for (i = 0; i < RTE_PMD_IXGBE_RX_MAX_BURST;
         i += LOOK_AHEAD, rxdp += LOOK_AHEAD, rxep += LOOK_AHEAD) {
        /* Read desc statuses backwards to avoid race condition */
        for (j = 0; j < LOOK_AHEAD; j++)
            s[j] = rte_le_to_cpu_32(rxdp[j].wb.upper.status_error);

        rte_smp_rmb();

        /* Compute how many status bits were set */
        for (nb_dd = 0; nb_dd < LOOK_AHEAD &&
                (s[nb_dd] & IXGBE_RXDADV_STAT_DD); nb_dd++)
            ;

ここでDDビットを見てパケットを受信したか見ています.

        for (j = 0; j < nb_dd; j++)
            pkt_info[j] = rte_le_to_cpu_32(rxdp[j].wb.lower.
                               lo_dword.data);

        nb_rx += nb_dd;

        /* Translate descriptor info to mbuf format */
        for (j = 0; j < nb_dd; ++j) {
            mb = rxep[j].mbuf;
            pkt_len = rte_le_to_cpu_16(rxdp[j].wb.upper.length) -
                  rxq->crc_len;
            mb->data_len = pkt_len;
            mb->pkt_len = pkt_len;
            mb->vlan_tci = rte_le_to_cpu_16(rxdp[j].wb.upper.vlan);

            /* convert descriptor fields to rte mbuf flags */
            pkt_flags = rx_desc_status_to_pkt_flags(s[j],
                vlan_flags);
            pkt_flags |= rx_desc_error_to_pkt_flags(s[j]);
            pkt_flags |= ixgbe_rxd_pkt_info_to_pkt_flags
                    ((uint16_t)pkt_info[j]);
            mb->ol_flags = pkt_flags;
            mb->packet_type =
                ixgbe_rxd_pkt_info_to_pkt_type
                    (pkt_info[j], rxq->pkt_type_mask);

            if (likely(pkt_flags & PKT_RX_RSS_HASH))
                mb->hash.rss = rte_le_to_cpu_32(
                    rxdp[j].wb.lower.hi_dword.rss);
            else if (pkt_flags & PKT_RX_FDIR) {
                mb->hash.fdir.hash = rte_le_to_cpu_16(
                    rxdp[j].wb.lower.hi_dword.csum_ip.csum) &
                    IXGBE_ATR_HASH_MASK;
                mb->hash.fdir.id = rte_le_to_cpu_16(
                    rxdp[j].wb.lower.hi_dword.csum_ip.ip_id);
            }
        }

この部分で受信したパケットに関して,mbufのメタデータ部分ににパケットの情報を格納しています.

        /* Move mbuf pointers from the S/W ring to the stage */
        for (j = 0; j < LOOK_AHEAD; ++j) {
            rxq->rx_stage[i + j] = rxep[j].mbuf;
        }

        /* stop if all requested packets could not be received */
        if (nb_dd != LOOK_AHEAD)
            break;
    }

    /* clear software ring entries so we can cleanup correctly */
    for (i = 0; i < nb_rx; ++i) {
        rxq->sw_ring[rxq->rx_tail + i].mbuf = NULL;
    }


    return nb_rx;
}

最後に,受信したパケットのmbufのアドレスをrx_stageに入れ,sw_ringの方はNULLにします.

ixgbe_rx_fill_from_stage()では最大nb_pkts分の受信データをrx_pktsに格納して返します.

static inline uint16_t
ixgbe_rx_fill_from_stage(struct ixgbe_rx_queue *rxq, struct rte_mbuf **rx_pkts,
             uint16_t nb_pkts)
{
    struct rte_mbuf **stage = &rxq->rx_stage[rxq->rx_next_avail];
    int i;

    /* how many packets are ready to return? */
    nb_pkts = (uint16_t)RTE_MIN(nb_pkts, rxq->rx_nb_avail);

    /* copy mbuf pointers to the application's packet list */
    for (i = 0; i < nb_pkts; ++i)
        rx_pkts[i] = stage[i];

    /* update internal queue state */
    rxq->rx_nb_avail = (uint16_t)(rxq->rx_nb_avail - nb_pkts);
    rxq->rx_next_avail = (uint16_t)(rxq->rx_next_avail + nb_pkts);

    return nb_pkts;
}

また,ixgbe_rx_alloc_bufs()では以下のようにしてmbufを取得しています.

static inline int
ixgbe_rx_alloc_bufs(struct ixgbe_rx_queue *rxq, bool reset_mbuf)
{
    volatile union ixgbe_adv_rx_desc *rxdp;
    struct ixgbe_rx_entry *rxep;
    struct rte_mbuf *mb;
    uint16_t alloc_idx;
    __le64 dma_addr;
    int diag, i;

    /* allocate buffers in bulk directly into the S/W ring */
    alloc_idx = rxq->rx_free_trigger - (rxq->rx_free_thresh - 1);
    rxep = &rxq->sw_ring[alloc_idx];
    diag = rte_mempool_get_bulk(rxq->mb_pool, (void *)rxep,
                    rxq->rx_free_thresh);

rte_mempool_get_bulk()を使ってmempoolから空いているmbufを取得します.

    if (unlikely(diag != 0))
        return -ENOMEM;

    rxdp = &rxq->rx_ring[alloc_idx];
    for (i = 0; i < rxq->rx_free_thresh; ++i) {
        /* populate the static rte mbuf fields */
        mb = rxep[i].mbuf;
        if (reset_mbuf) {
            mb->port = rxq->port_id;
        }

        rte_mbuf_refcnt_set(mb, 1);
        mb->data_off = RTE_PKTMBUF_HEADROOM;

        /* populate the descriptors */
        dma_addr = rte_cpu_to_le_64(rte_mbuf_data_dma_addr_default(mb));
        rxdp[i].read.hdr_addr = 0;
        rxdp[i].read.pkt_addr = dma_addr;
    }

各デスクリプタのpkt_addrにmbufのデータ領域のアドレスを書き込みます.

    /* update state of internal queue structure */
    rxq->rx_free_trigger = rxq->rx_free_trigger + rxq->rx_free_thresh;
    if (rxq->rx_free_trigger >= rxq->nb_rx_desc)
        rxq->rx_free_trigger = rxq->rx_free_thresh - 1;

    /* no errors */
    return 0;
}

最後に次にmbufを確保する際の閾値となるrx_free_triggerを更新しています. ちなみに,rx_free_threshはデフォルトで32のようです.

mbufの利用: マルチキャスト

受信以外のmbuf利用の例として,サンプルプログラムに含まれているマルチキャストのプログラムを見てみます.

http://dpdk.org/doc/guides/sample_app_ug/ipv4_multicast.html は受信したパケットを複数のポートからマルチキャストするサンプルプログラムです. マルチキャストの流れは以下のようになります.

f:id:mm_i:20170529110435p:plain

  1. 受信パケットからヘッダを取り除く (rte_pktmbuf_adj()を使ってdata_offを長くする)
  2. rte_pktmbuf_clone()を使って受信パケットのデータ部分を指すmbufを作成
  3. rte_pktmbuf_create()を使ってヘッダ部分のパケットを作成,mbuf->nextはcloneしたmbufを指すようにする 4. 作成したパケットを指定したポートから送信

cloneを使って複数のポートから送信する場合,各パケットは自身のメタデータを持つことになります.したがって最後のポートに関してはcloneをする必要がなく,受信パケットのメタデータを直接変更して使用することができます. もちろん,cloneをせず直接受信パケットをmbuf->nextで指す方法もありますが,この場合は受信パケットのメタデータを変更できないため,最後のポートに関しても他の場合と同様にして送信をおこないます. コードにはcloneする場合としない場合,2通りのコードが書いてあります.

DPDKにおけるメモリ管理

DPDKにおいてどのようにメモリが使用されるかをここにまとめたいと思います. なお,ここに書いてあるものはあくまで私がコードを読んで解釈した結果になります. DPDKのバージョンは17.05,OSはLinuxを対象としています.

全体像

DPDKでは独自のメモリアロケータを利用してメモリを管理しています. メモリ管理のために以下のようなデータ構造が利用されます.

  • memconfig : 作成したmemsegやmemzoneなどの情報を保持
  • memseg : 連続した物理メモリ領域情報を保持
  • heap : メモリの空きブロックの管理 / 割り当て.memsegのメモリ領域を使用する.
  • memzone : 連続した物理メモリ領域を名前をつけて割り当て.内部でheapを利用
  • mempool : 固定長メモリアロケータ.内部でmemzoneを利用

前提知識: queue(3)

本題に入る前に,DPDKでは内部で複数のリストを保持していますが,このリストには queue(3)を利用しています. queueについて知っておいた方がコードが読みやすいです.

queueにはSLIST,STAILQおよびLISTとTAILQの4種類があります. SLISTとSTAILQが単方向リスト,LISTとTAILQが連結リストで,TAILQの場合は末尾への挿入が可能です.queueは全てマクロとして実装されています.

TAILQの簡単な使用例は以下のようになります.

struct entry{
    TAILQ_ENTRY(entry) tq;
    int data;
} e1, e2, e3;

TAILQ_HEAD(entry_head, entry);

int main(){
    struct entry_head head;
    TAILQ_INIT(&head);

    e1.data = 1;
    e2.data = 2;
    e3.data = 3;

    TAILQ_INSERT_HEAD(&head, &e1, tq);
    TAILQ_INSERT_HEAD(&head, &e2, tq);
    TAILQ_INSERT_TAIL(&head, &e3, tq);

    struct entry* p = TAILQ_FIRST(&head);
    while(p != NULL){
        printf("%d\n", p->data);
        p = TAILQ_NEXT(p, tq);
    }
}

初期化部分がやや分かりにくいですが,TAILQ_ENTRY()TAILQ_HEAD()が以下のよ うに展開されることが理解できれば納得できると思います.

#define _TAILQ_HEAD(name, type, qual)                   \
struct name {                               \
    qual type *tqh_first;       /* first element */     \
    qual type *qual *tqh_last;  /* addr of last next element */ \
}
#define TAILQ_HEAD(name, type)  _TAILQ_HEAD(name, struct type,)

#define _TAILQ_ENTRY(type, qual)                    \
struct {                                \
    qual type *tqe_next;        /* next element */      \
    qual type *qual *tqe_prev;  /* address of previous next element */\
}
#define TAILQ_ENTRY(type)   _TAILQ_ENTRY(struct type,)

基本的にマクロにはポインタを渡します. queueの実装は全てヘッダファイルに記述してあるので,見てみると参考になると思います.

glibcの実装: https://sourceware.org/git/?p=glibc.git;a=blob_plain;f=misc/sys/queue.h;hb=HEAD

ちなみに,TAILQの実装を読む際は,

  • tqe_prev は 前の要素の tqe_next のアドレスを保持している
    • これを利用して要素を挿入した際に前の要素のtqe_nextを書き換える
  • ある要素の前の要素を取得する際は,前の要素のteq_prevを使う
    • 実質的に前の要素のtqe_nextを参照していることになる

ということを抑えておけば理解が容易だと思います.

memconfig

rte_mem_configは,EALの中でメモリ全体の情報を保持しているデータ構造です. 具体的には作成したmemsegやmemzone,heap等(後述)を保持しています. EALの中ではrte_eal_get_configuration()->mem_configからmemconfigを取得できます.

librte_eal/common/rte_eal_memconfig.h:

struct rte_mem_config {
    volatile uint32_t magic;   /**< Magic number - Sanity check. */

    /* memory topology */
    uint32_t nchannel;    /**< Number of channels (0 if unknown). */
    uint32_t nrank;       /**< Number of ranks (0 if unknown). */

    /**
     * current lock nest order
     *  - qlock->mlock (ring/hash/lpm)
     *  - mplock->qlock->mlock (mempool)
     * Notice:
     *  *ALWAYS* obtain qlock first if having to obtain both qlock and mlock
     */
    rte_rwlock_t mlock;   /**< only used by memzone LIB for thread-safe. */
    rte_rwlock_t qlock;   /**< used for tailq operation for thread safe. */
    rte_rwlock_t mplock;  /**< only used by mempool LIB for thread-safe. */

    uint32_t memzone_cnt; /**< Number of allocated memzones */

    /* memory segments and zones */
    struct rte_memseg memseg[RTE_MAX_MEMSEG];    /**< Physmem descriptors. */
    struct rte_memzone memzone[RTE_MAX_MEMZONE]; /**< Memzone descriptors. */

    struct rte_tailq_head tailq_head[RTE_MAX_TAILQ]; /**< Tailqs for objects */

    /* Heaps of Malloc per socket */
    struct malloc_heap malloc_heaps[RTE_MAX_NUMA_NODES];

    /* address of mem_config in primary process. used to map shared config into
     * exact same address the primary process maps it.
     */
    uint64_t mem_cfg_addr;
} __attribute__((__packed__));

初期化

DPDKのプログラムで最初にrte_eal_init()を呼ぶと様々な初期化処理がおこなわれますが, その中でも rte_eal_memory_init()でmemsegの初期化,rte_eal_memzone_init()で memzone及びheapの初期化がおこなわれます.

memseg

memsegは連続した物理アドレス空間の情報を保持するデータ構造です.

librte_eal/common/include/rte_memory.h:

struct rte_memseg {
    phys_addr_t phys_addr;      /**< Start physical address. */
    RTE_STD_C11
    union {
        void *addr;         /**< Start virtual address. */
        uint64_t addr_64;   /**< Makes sure addr is always 64 bits */
    };
    size_t len;               /**< Length of the segment. */
    uint64_t hugepage_sz;       /**< The pagesize of underlying memory */
    int32_t socket_id;          /**< NUMA socket ID. */
    uint32_t nchannel;          /**< Number of channels. */
    uint32_t nrank;             /**< Number of ranks. */
#ifdef RTE_LIBRTE_XEN_DOM0
     /**< store segment MFNs */
    uint64_t mfn[DOM0_NUM_MEMBLOCK];
#endif
}

phys_addrに先頭の物理アドレスが,lenに長さが格納されています. memsegの初期化は rte_eal_init() => rte_eal_memory_init() => rte_eal_hugepage_init() の中でおこなわれます.

具体的な物理アドレス確保の方法は前回のエントリに書いた通りです : DPDKにおけるhugepageの初期化部分. なるべく物理的に連続した領域を確保しようとします.

heap

連続したメモリの空きブロックの管理にはheapを利用します.

librte_eal/common/malloc_heap.h:

struct malloc_heap {
     rte_spinlock_t lock;
     LIST_HEAD(, malloc_elem) free_head[RTE_HEAP_NUM_FREELISTS];
     unsigned alloc_count;
     size_t total_size;
 } __rte_cache_aligned;

struct malloc_elem {
    struct malloc_heap *heap;
    struct malloc_elem *volatile prev;      /* points to prev elem in memseg */
    LIST_ENTRY(malloc_elem) free_list;      /* list of free elements in heap */
    const struct rte_memseg *ms;
    volatile enum elem_state state;
    uint32_t pad;
    size_t size;
#ifdef RTE_LIBRTE_MALLOC_DEBUG
    uint64_t header_cookie;         /* Cookie marking start of data */
                                    /* trailer cookie at start + size */
#endif
} __rte_cache_aligned;

malloc_elemが一つのメモリブロックで,それはqueue(3)のLISTの一要素となっています. LISTの先頭はmalloc_heap構造体のfree_headからアクセス可能です. フリーリストはメモリブロックのサイズの大きさごとに分けられています(後述のmalloc_elem_free_list_index()参照). またmalloc_heap自体はmemconfigのmalloc_heapsからアクセスできます.

heapの初期化はrte_eal_memory_init()のあとに呼ばれるrte_eal_memzone_init() の中のrte_malloc_heap_init()でおこなわれます.

rte_malloc_heap_init()ではmalloc_heap_add_memseg()を呼び,各memsegをheapに 追加します.ここでは一つのmemseg全体が一つのmalloc_elemとして追加されます.

librte_eal/common/malloc_heap.c:

static void
malloc_heap_add_memseg(struct malloc_heap *heap, struct rte_memseg *ms)
{
    /* allocate the memory block headers, one at end, one at start */
    struct malloc_elem *start_elem = (struct malloc_elem *)ms->addr;
    struct malloc_elem *end_elem = RTE_PTR_ADD(ms->addr,
            ms->len - MALLOC_ELEM_OVERHEAD);
    end_elem = RTE_PTR_ALIGN_FLOOR(end_elem, RTE_CACHE_LINE_SIZE);
    const size_t elem_size = (uintptr_t)end_elem - (uintptr_t)start_elem;

    malloc_elem_init(start_elem, heap, ms, elem_size);
    malloc_elem_mkend(end_elem, start_elem);
    malloc_elem_free_list_insert(start_elem);

    heap->total_size += elem_size;
}

librte_eal/common/malloc_elem.c:

void
malloc_elem_init(struct malloc_elem *elem,
        struct malloc_heap *heap, const struct rte_memseg *ms, size_t size)
{
    elem->heap = heap;
    elem->ms = ms;
    elem->prev = NULL;
    memset(&elem->free_list, 0, sizeof(elem->free_list));
    elem->state = ELEM_FREE;
    elem->size = size;
    elem->pad = 0;
    set_header(elem);
    set_trailer(elem);
}

ここでは各memsegの先頭と末尾にmalloc_elem用の領域を確保してそれを初期化しています. 末尾にもmalloc_elemを用意するのはオーバフロー防止のためのようです. malloc_elemのsizeはmalloc_elem構造体自身を含んだサイズのようです.

malloc_elem_free_list_insert()で作成したmalloc_elemmaloc_heapfree_headで先頭が示されるフリーリストに追加します. このフリーリストはメモリブロックの大きさごとに分かれており,どこのフリーリスト に入れるかはmalloc_elem_free_list_index()で決めています. コメントを読む限り,

heap->free_head[0] - (0   , 2^8]
heap->free_head[1] - (2^8 , 2^10]
heap->free_head[2] - (2^10 ,2^12]
heap->free_head[3] - (2^12, 2^14]
...

というような感じに分けられて格納されるようです.

memzone

memzoneは連続した物理アドレス空間に名前をつけて保持するためのデータ構造です. 取得したmemzoneはmemconfig.memzoneに保持されます. memzoneの確保の際に内部的にはheapからメモリを確保します.

librte_eal/common/include/rte_memzone.h:

struct rte_memzone {

#define RTE_MEMZONE_NAMESIZE 32       /**< Maximum length of memory zone name.*/
    char name[RTE_MEMZONE_NAMESIZE];  /**< Name of the memory zone. */

    phys_addr_t phys_addr;            /**< Start physical address. */
    RTE_STD_C11
    union {
        void *addr;                   /**< Start virtual address. */
        uint64_t addr_64;             /**< Makes sure addr is always 64-bits */
    };
    size_t len;                       /**< Length of the memzone. */

    uint64_t hugepage_sz;             /**< The page size of underlying memory */

    int32_t socket_id;                /**< NUMA socket ID. */

    uint32_t flags;                   /**< Characteristics of this memzone. */
    uint32_t memseg_id;               /**< Memseg it belongs. */
} __attribute__((__packed__));

rte_eal_memzone_init()の中ではmemzoneは空に初期化されるだけです.

heapからのメモリの割り当て

heapからの割り当てはmalloc_heap_alloc()からおこないます.

librte_eal/common/malloc_heap.c:

void *
malloc_heap_alloc(struct malloc_heap *heap,
        const char *type __attribute__((unused)), size_t size, unsigned flags,
        size_t align, size_t bound)
{
    struct malloc_elem *elem;

    size = RTE_CACHE_LINE_ROUNDUP(size);
    align = RTE_CACHE_LINE_ROUNDUP(align);

    rte_spinlock_lock(&heap->lock);

    elem = find_suitable_element(heap, size, flags, align, bound);
    if (elem != NULL) {
        elem = malloc_elem_alloc(elem, size, align, bound);
        /* increase heap's count of allocated elements */
        heap->alloc_count++;
    }
    rte_spinlock_unlock(&heap->lock);

    return elem == NULL ? NULL : (void *)(&elem[1]);
}

ここではまずfind_suitable_element()を利用して,ヒープのフリーリストからズの空きブロックを見つけます. やっていることはフリーリストを先頭から順に辿って行って,十分な大きさの空きブロックが見つかればそれを返します.ただしこの際指定したhugepage要件を満たすかチェックしています.

librte_eal/common/malloc_elem.c:

static struct malloc_elem *
find_suitable_element(struct malloc_heap *heap, size_t size,
        unsigned flags, size_t align, size_t bound)
{
    size_t idx;
    struct malloc_elem *elem, *alt_elem = NULL;

    for (idx = malloc_elem_free_list_index(size);
            idx < RTE_HEAP_NUM_FREELISTS; idx++) {
        for (elem = LIST_FIRST(&heap->free_head[idx]);
                !!elem; elem = LIST_NEXT(elem, free_list)) {
            if (malloc_elem_can_hold(elem, size, align, bound)) {
                if (check_hugepage_sz(flags, elem->ms->hugepage_sz))
                    return elem;
                if (alt_elem == NULL)
                    alt_elem = elem;
            }
        }
    }

    if ((alt_elem != NULL) && (flags & RTE_MEMZONE_SIZE_HINT_ONLY))
        return alt_elem;

    return NULL;
}

その後実際のmalloc_elem_alloc()で割り当てを行います.

struct malloc_elem *
malloc_elem_alloc(struct malloc_elem *elem, size_t size, unsigned align,
        size_t bound)
{
    struct malloc_elem *new_elem = elem_start_pt(elem, size, align, bound);
    const size_t old_elem_size = (uintptr_t)new_elem - (uintptr_t)elem;
    const size_t trailer_size = elem->size - old_elem_size - size -
        MALLOC_ELEM_OVERHEAD;

    elem_free_list_remove(elem);

割り当てはelemの末尾から指定したサイズ分だけ取得して割り当てます. まずelem_start_pt()で新しいオブジェクトの先頭のアドレスを取得します.

    if (trailer_size > MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) {
        /* split it, too much free space after elem */
        struct malloc_elem *new_free_elem =
                RTE_PTR_ADD(new_elem, size + MALLOC_ELEM_OVERHEAD);

        split_elem(elem, new_free_elem);
        malloc_elem_free_list_insert(new_free_elem);
    }

あんまり詳しく説明していませんが,割り当ての際に指定した境界に合うようにアラインメントしているので,その際にnew_elemの後にMIN_DATA_SIZE以上(自分の環境では64byte)の領域ができたらそれをフリーリストに追加しています*1

    if (old_elem_size < MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) {
        /* don't split it, pad the element instead */
        elem->state = ELEM_BUSY;
        elem->pad = old_elem_size;


        /* put a dummy header in padding, to point to real element header */
        if (elem->pad > 0){ /* pad will be at least 64-bytes, as everything
                             * is cache-line aligned */
            new_elem->pad = elem->pad;
            new_elem->state = ELEM_PAD;
            new_elem->size = elem->size - elem->pad;
            set_header(new_elem);
        }

        return new_elem;
    }

分割した際に,空きスペースがあまりにも小さい場合はそれをフリーリストに追加せ ず,ただ単にELEM_BUSYとするようです.

    /* we are going to split the element in two. The original element
     * remains free, and the new element is the one allocated.
     * Re-insert original element, in case its new size makes it
     * belong on a different list.
     */
    split_elem(elem, new_elem);
    new_elem->state = ELEM_BUSY;
    malloc_elem_free_list_insert(elem);

    return new_elem;
}

通常の場合はこのsplit_elem()で領域が分割され,利用しない前半部分はまたフリーリストに追加されます.

static void
split_elem(struct malloc_elem *elem, struct malloc_elem *split_pt)
{
    struct malloc_elem *next_elem = RTE_PTR_ADD(elem, elem->size);
    const size_t old_elem_size = (uintptr_t)split_pt - (uintptr_t)elem;
    const size_t new_elem_size = elem->size - old_elem_size;

    malloc_elem_init(split_pt, elem->heap, elem->ms, new_elem_size);
    split_pt->prev = elem;
    next_elem->prev = split_pt;
    elem->size = old_elem_size;
    set_trailer(elem);
}

図にすると以下のような感じだと思います.

f:id:mm_i:20170524154902p:plain

malloc_heap_allog()では最後(void *)(&elem[1])を返します.つまり,返されたアドレスはmalloc_elemのヘッダ部分を除いたアドレスになります.

memzoneの確保

メモリ領域を名前をつけて管理する場合はmemzoneを利用します. memzoneの確保はrte_memzone_reserve()からおこないます.

rte_memzone_reserve()を呼ぶと,最終的にmemzone_reserve_aligned_thread_unsafe()が呼ばれます. memzone_reserve_aligned_thread_unsafe()はそこそこ長いですが,主要部のみ抜き 出すと以下のようになります.

static const struct rte_memzone *
memzone_reserve_aligned_thread_unsafe(const char *name, size_t len,
        int socket_id, unsigned flags, unsigned align, unsigned bound)
{
    struct rte_memzone *mz;
    struct rte_mem_config *mcfg;
    size_t requested_len;
    int socket, i;

    ...

    /* get pointer to global configuration */
    mcfg = rte_eal_get_configuration()->mem_config;

    ...
    /* allocate memory on heap */
    void *mz_addr = malloc_heap_alloc(&mcfg->malloc_heaps[socket], NULL,
            requested_len, flags, align, bound);
    ...

    const struct malloc_elem *elem = malloc_elem_from_data(mz_addr);

    /* fill the zone in config */
    mz = get_next_free_memzone();
    ...

    mcfg->memzone_cnt++;
    snprintf(mz->name, sizeof(mz->name), "%s", name);
    mz->phys_addr = rte_malloc_virt2phy(mz_addr);
    mz->addr = mz_addr;
    mz->len = (requested_len == 0 ? elem->size : requested_len);
    mz->hugepage_sz = elem->ms->hugepage_sz;
    mz->socket_id = elem->ms->socket_id;
    mz->flags = 0;
    mz->memseg_id = elem->ms - rte_eal_get_configuration()->mem_config->memseg;

    return mz;
}

malloc_heap_alloc()からメモリを確保したあと,その情報をmemconfig.memzone[]に登録しています.

mempool

mempoolは固定長サイズのオブジェクトのアロケータです. フリーのオブジェクトはring構造で保持されます. mempoolでは各コアが占有して利用できるキャッシュを提供する機能などもあります. mempoolを作成する際,確保したいオブジェクトのサイズ及びその個数を指定しますが, 内部的にはそのオブジェクトが割り当てられるだけのmemzoneを取得します.

struct rte_mempool {
    /*
     * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
     * compatibility requirements, it could be changed to
     * RTE_MEMPOOL_NAMESIZE next time the ABI changes
     */
    char name[RTE_MEMZONE_NAMESIZE]; /**< Name of mempool. */
    RTE_STD_C11
    union {
        void *pool_data;         /**< Ring or pool to store objects. */
        uint64_t pool_id;        /**< External mempool identifier. */
    };
    void *pool_config;               /**< optional args for ops alloc. */
    const struct rte_memzone *mz;    /**< Memzone where pool is alloc'd. */
    int flags;                       /**< Flags of the mempool. */
    int socket_id;                   /**< Socket id passed at create. */
    uint32_t size;                   /**< Max size of the mempool. */
    uint32_t cache_size;
    /**< Size of per-lcore default local cache. */

    uint32_t elt_size;               /**< Size of an element. */
    uint32_t header_size;            /**< Size of header (before elt). */
    uint32_t trailer_size;           /**< Size of trailer (after elt). */

    unsigned private_data_size;      /**< Size of private data. */
    /**
     * Index into rte_mempool_ops_table array of mempool ops
     * structs, which contain callback function pointers.
     * We're using an index here rather than pointers to the callbacks
     * to facilitate any secondary processes that may want to use
     * this mempool.
     */

    int32_t ops_index;

    struct rte_mempool_cache *local_cache; /**< Per-lcore local cache */

    uint32_t populated_size;         /**< Number of populated objects. */
    struct rte_mempool_objhdr_list elt_list; /**< List of objects in pool */
    uint32_t nb_mem_chunks;          /**< Number of memory chunks */
    struct rte_mempool_memhdr_list mem_list; /**< List of memory chunks */

#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
    /** Per-lcore statistics. */
    struct rte_mempool_debug_stats stats[RTE_MAX_LCORE];
#endif
}  __rte_cache_aligned;

実際にmempoolで確保したオブジェクトはelt_listで先頭が示されるSTAILQで保持されます. また,確保した連続したメモリ領域はmem_listで先頭が示されるSTAILQで保持されます.

mempoolの初期化

mempoolの確保は以下のようにしておこなわれます.

  1. mempool構造体自体を格納する領域を一つのmemzoneとして取得
  2. 固定長のオブジェクトをn個格納するための領域を取得. この際領域は複数のmemzoneにまたがることがある.

mempoolの作成はrte_mempool_create()からおこないます. rte_mampool_create()は以下のようになっています.

librte_mempool/rte_mempool.c:

struct rte_mempool *
rte_mempool_create(const char *name, unsigned n, unsigned elt_size,
    unsigned cache_size, unsigned private_data_size,
    rte_mempool_ctor_t *mp_init, void *mp_init_arg,
    rte_mempool_obj_cb_t *obj_init, void *obj_init_arg,
    int socket_id, unsigned flags)
{
    int ret;
    struct rte_mempool *mp;

    mp = rte_mempool_create_empty(name, n, elt_size, cache_size,
        private_data_size, socket_id, flags);
    if (mp == NULL)
        return NULL;

    /*
     * Since we have 4 combinations of the SP/SC/MP/MC examine the flags to
     * set the correct index into the table of ops structs.
     */
    if ((flags & MEMPOOL_F_SP_PUT) && (flags & MEMPOOL_F_SC_GET))
        ret = rte_mempool_set_ops_byname(mp, "ring_sp_sc", NULL);
    else if (flags & MEMPOOL_F_SP_PUT)
        ret = rte_mempool_set_ops_byname(mp, "ring_sp_mc", NULL);
    else if (flags & MEMPOOL_F_SC_GET)
        ret = rte_mempool_set_ops_byname(mp, "ring_mp_sc", NULL);
    else
        ret = rte_mempool_set_ops_byname(mp, "ring_mp_mc", NULL);

    if (ret)
        goto fail;

    /* call the mempool priv initializer */
    if (mp_init)
        mp_init(mp, mp_init_arg);

    if (rte_mempool_populate_default(mp) < 0)
        goto fail;

    /* call the object initializers */
    if (obj_init)
        rte_mempool_obj_iter(mp, obj_init, obj_init_arg);

    return mp;

 fail:
    rte_mempool_free(mp);
    return NULL;
}

nameがmempoolの名前,nが割り当てたいオブジェクトの数,elt_sizeが各オブジェクトのサイズになります. 基本的にはrte_mempool_create_emtpy()で空のmempoolを作成し,その後rte_mempool_populate_default()で実際にオブジェクト分のメモリを確保します. また,rte_mempool_set_ops_byname()のところは,mempoolでオブジェクトを確保するスレッドが一つかどうかと解放するスレッドが一つかどうか(SC: Single Consumer, SP: Single Producer)のフラグによって,mempoolからオブジェクトを取得/解放する関数を切り替えて登録しています.

rte_mempool_create_empty()の主要部は以下のようになっています.

librte_mempool/rte_mempool.c:

/* create an empty mempool */
struct rte_mempool *
rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size,
    unsigned cache_size, unsigned private_data_size,
    int socket_id, unsigned flags)
{
    ...

    mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);

    ...

    mempool_size = MEMPOOL_HEADER_SIZE(mp, cache_size);
    mempool_size += private_data_size;
    mempool_size = RTE_ALIGN_CEIL(mempool_size, RTE_MEMPOOL_ALIGN);

    ret = snprintf(mz_name, sizeof(mz_name), RTE_MEMPOOL_MZ_FORMAT, name);
    if (ret < 0 || ret >= (int)sizeof(mz_name)) {
        rte_errno = ENAMETOOLONG;
        goto exit_unlock;
    }
    mz = rte_memzone_reserve(mz_name, mempool_size, socket_id, mz_flags);
    if (mz == NULL)
        goto exit_unlock;

    /* init the mempool structure */
    mp = mz->addr;

    memset(mp, 0, MEMPOOL_HEADER_SIZE(mp, cache_size));
    ret = snprintf(mp->name, sizeof(mp->name), "%s", name);
    if (ret < 0 || ret >= (int)sizeof(mp->name)) {
        rte_errno = ENAMETOOLONG;
        goto exit_unlock;
    }
    mp->mz = mz;
    mp->size = n;
    mp->flags = flags;
    mp->socket_id = socket_id;
    mp->elt_size = objsz.elt_size;
    mp->header_size = objsz.header_size;
    mp->trailer_size = objsz.trailer_size;
    /* Size of default caches, zero means disabled. */
    mp->cache_size = cache_size;
    mp->private_data_size = private_data_size;
    STAILQ_INIT(&mp->elt_list);
    STAILQ_INIT(&mp->mem_list);

    ...
    return mp;
}

mempool自体もTAILQで管理されています.mempool_listはそのTAILQのheadを指すデータ構造です. rte_mempool_create_empty()では,mempoool構造体自体を格納するためのメモリ領域をrte_memzone_reserve()で確保し,初期化します. この時点ではまだオブジェクトのメモリ領域は確保していません.

実際にオブジェクトのメモリを確保するのはrte_mempool_populate_default()になります.

librte_mempool/rte_mempool.c:

int
rte_mempool_populate_default(struct rte_mempool *mp)
{
    int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
    char mz_name[RTE_MEMZONE_NAMESIZE];
    const struct rte_memzone *mz;
    size_t size, total_elt_sz, align, pg_sz, pg_shift;
    phys_addr_t paddr;
    unsigned mz_id, n;
    int ret;

    /* mempool must not be populated */
    if (mp->nb_mem_chunks != 0){
        RTE_LOG(ERR, MBUF, "a\n");
        return -EEXIST;
    }

    if (rte_xen_dom0_supported()) {
        pg_sz = RTE_PGSIZE_2M;
        pg_shift = rte_bsf32(pg_sz);
        align = pg_sz;

    } else if (rte_eal_has_hugepages()) {
        pg_shift = 0; /* not needed, zone is physically contiguous */
        pg_sz = 0;
        align = RTE_CACHE_LINE_SIZE;
    } else {
        pg_sz = getpagesize();
        pg_shift = rte_bsf32(pg_sz);
        align = pg_sz;
    }

    total_elt_sz = mp->header_size + mp->elt_size + mp->trailer_size;
    for (mz_id = 0, n = mp->size; n > 0; mz_id++, n -= ret) {
        size = rte_mempool_xmem_size(n, total_elt_sz, pg_shift);

        ret = snprintf(mz_name, sizeof(mz_name),
            RTE_MEMPOOL_MZ_FORMAT "_%d", mp->name, mz_id);
        if (ret < 0 || ret >= (int)sizeof(mz_name)) {
            RTE_LOG(ERR, MBUF, "b\n");
            ret = -ENAMETOOLONG;
            goto fail;
        }

        mz = rte_memzone_reserve_aligned(mz_name, size,
            mp->socket_id, mz_flags, align);
        /* not enough memory, retry with the biggest zone we have */
        if (mz == NULL)
            mz = rte_memzone_reserve_aligned(mz_name, 0,
                mp->socket_id, mz_flags, align);
        if (mz == NULL) {
            RTE_LOG(ERR, MBUF, "c\n");
            ret = -rte_errno;
            goto fail;
        }

        if (mp->flags & MEMPOOL_F_NO_PHYS_CONTIG)
            paddr = RTE_BAD_PHYS_ADDR;
        else
            paddr = mz->phys_addr;

        if (rte_eal_has_hugepages() && !rte_xen_dom0_supported())
            ret = rte_mempool_populate_phys(mp, mz->addr,
                paddr, mz->len,
                rte_mempool_memchunk_mz_free,
                (void *)(uintptr_t)mz);
        else
            ret = rte_mempool_populate_virt(mp, mz->addr,
                mz->len, pg_sz,
                rte_mempool_memchunk_mz_free,
                (void *)(uintptr_t)mz);
        if (ret < 0) {
            RTE_LOG(ERR, MBUF, "d\n");
            rte_memzone_free(mz);
            goto fail;
        }
    }

    return mp->size;

 fail:
    rte_mempool_free_memchunks(mp);
    return ret;

重要なのはforループの内部の処理で,rte_memzone_reserve_aligned()を利用して確 保したいメモリ(オブジェクトサイズ ×個数分のメモリ)をmemzoneから確保しようとします. このときもし確保できなかった場合(それだけ連続するメモリがなかった場合)は, memzoneから最も大きな連続する領域を取得します. この後通常はrte_eal_has_hugepages() = 1, rte_xen_dom0_supported() = 0なのでrte_mempool_populate_phys()を呼び,確保したメモリ上でオブジェクトのブロックを作成します. rte_mempool_populate_phys()の戻り値が確保できたオブジェクトの数です. 目標のオブジェクト数だけ確保できるまでこの操作を繰り返します.

rte_mempool_popoulate_phys()の中では,確保したメモリ領域からオブジェクトを切り出し,それをmempool_add_elem()を使ってmempool.elt_listに入れ,また空きオブジェクトを管理するring bufferに入れています.また,確保したメモリ(memzone)をmempool.mem_listに登録します.

int
rte_mempool_populate_phys(struct rte_mempool *mp, char *vaddr,
    phys_addr_t paddr, size_t len, rte_mempool_memchunk_free_cb_t *free_cb,
    void *opaque)
{
    unsigned total_elt_sz;
    unsigned i = 0;
    size_t off;
    struct rte_mempool_memhdr *memhdr;
    int ret;

    /* create the internal ring if not already done */
    if ((mp->flags & MEMPOOL_F_POOL_CREATED) == 0) {
        ret = rte_mempool_ops_alloc(mp);
        if (ret != 0)
            return ret;
        mp->flags |= MEMPOOL_F_POOL_CREATED;
    }

    /* mempool is already populated */
    if (mp->populated_size >= mp->size)
        return -ENOSPC;

    total_elt_sz = mp->header_size + mp->elt_size + mp->trailer_size;

    memhdr = rte_zmalloc("MEMPOOL_MEMHDR", sizeof(*memhdr), 0);
    if (memhdr == NULL)
        return -ENOMEM;

    memhdr->mp = mp;
    memhdr->addr = vaddr;
    memhdr->phys_addr = paddr;
    memhdr->len = len;
    memhdr->free_cb = free_cb;
    memhdr->opaque = opaque;

    if (mp->flags & MEMPOOL_F_NO_CACHE_ALIGN)
        off = RTE_PTR_ALIGN_CEIL(vaddr, 8) - vaddr;
    else
        off = RTE_PTR_ALIGN_CEIL(vaddr, RTE_CACHE_LINE_SIZE) - vaddr;

    while (off + total_elt_sz <= len && mp->populated_size < mp->size) {
        off += mp->header_size;
        if (paddr == RTE_BAD_PHYS_ADDR)
            mempool_add_elem(mp, (char *)vaddr + off,
                RTE_BAD_PHYS_ADDR);
        else
            mempool_add_elem(mp, (char *)vaddr + off, paddr + off);
        off += mp->elt_size + mp->trailer_size;
        i++;
    }

    /* not enough room to store one object */
    if (i == 0)
        return -EINVAL;

    STAILQ_INSERT_TAIL(&mp->mem_list, memhdr, next);
    mp->nb_mem_chunks++;
    return i;
}
static void
mempool_add_elem(struct rte_mempool *mp, void *obj, phys_addr_t physaddr)
{
    struct rte_mempool_objhdr *hdr;
    struct rte_mempool_objtlr *tlr __rte_unused;

    /* set mempool ptr in header */
    hdr = RTE_PTR_SUB(obj, sizeof(*hdr));
    hdr->mp = mp;
    hdr->physaddr = physaddr;
    STAILQ_INSERT_TAIL(&mp->elt_list, hdr, next);
    mp->populated_size++;

#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
    hdr->cookie = RTE_MEMPOOL_HEADER_COOKIE2;
    tlr = __mempool_get_trailer(obj);
    tlr->cookie = RTE_MEMPOOL_TRAILER_COOKIE;
#endif

    /* enqueue in ring */
    rte_mempool_ops_enqueue_bulk(mp, &obj, 1);
}

rte_mempool_ops_enqueue_bulk()では,最終的には次で説明するenqueue()が呼ばれます.

mempoolの操作

rte_create_mempol_empty()のところで説明したとおり,初期化時のフラグによって mempoolからオブジェクトを取得/解放する際の関数を切り替えています. 関数の定義は drivers/mempool/ring/rte_mempool_ring.cに書いてあります.

static const struct rte_mempool_ops ops_mp_mc = {
    .name = "ring_mp_mc",
    .alloc = common_ring_alloc,
    .free = common_ring_free,
    .enqueue = common_ring_mp_enqueue,
    .dequeue = common_ring_mc_dequeue,
    .get_count = common_ring_get_count,
};

static const struct rte_mempool_ops ops_sp_sc = {
    .name = "ring_sp_sc",
    .alloc = common_ring_alloc,
    .free = common_ring_free,
    .enqueue = common_ring_sp_enqueue,
    .dequeue = common_ring_sc_dequeue,
    .get_count = common_ring_get_count,
};

static const struct rte_mempool_ops ops_mp_sc = {
    .name = "ring_mp_sc",
    .alloc = common_ring_alloc,
    .free = common_ring_free,
    .enqueue = common_ring_mp_enqueue,
    .dequeue = common_ring_sc_dequeue,
    .get_count = common_ring_get_count,
};


static const struct rte_mempool_ops ops_sp_mc = {
    .name = "ring_sp_mc",
    .alloc = common_ring_alloc,
    .free = common_ring_free,
    .enqueue = common_ring_sp_enqueue,
    .dequeue = common_ring_mc_dequeue,
    .get_count = common_ring_get_count,
};

MEMPOOL_REGISTER_OPS(ops_mp_mc);
MEMPOOL_REGISTER_OPS(ops_sp_sc);
MEMPOOL_REGISTER_OPS(ops_mp_sc);
MEMPOOL_REGISTER_OPS(ops_sp_mc);

singleかmultiかによって,rte_ringを操作する関数を切り替えています.

基本的にはrte_mempool_get()でmempoolから空きオブジェクトを取得し,rte_mempool_put()でオブジェクトを再びring bufferに追加します.

DPDKでパケットを格納するために利用するmbufは内部でmempolを利用しています. mbufに関してはまた後で書こうと思います.

*1:たぶん

DPDKにおけるhugepageの初期化部分

DPDKはhugepageをデフォルトで利用しています.ここではDPDKにおけるhugepageの初期化部分についてまとめようと思います. OSはLinux , DPDKのバージョンはv17.05-rc2です.

初期化のおおまかな流れ

librte_eal/linuxapp/eal/eal.c:rte_eal_init()がDPDKの基本的な初期化処理をおこないますが,この中でhugepageに関してはまず librte_eal/linuxap/eal/eal_hugepages_info.c:eal_hugepage_info_init() を呼びます. ここでは /sys/kernel/mm/hugepages/ の中のファイルからhugepageの基礎情報を読み取ります.その後 librte_eal/common/eal_common_memory.c:rte_eal_memory_init()を呼びますが,この中でlibrte_eal/linuxapp/eal/eal_memory.c:rte_eal_hugepage_init()が呼ばれます. このrte_eal_hugepage_init() でhugepage領域の確保をおこないます.

rte_eal_hugepage_init()

rte_eal_hugepage_init()の最初に何をするか書いてあります.

/*
 * Prepare physical memory mapping: fill configuration structure with
 * these infos, return 0 on success.
 *  1. map N huge pages in separate files in hugetlbfs
 *  2. find associated physical addr
 *  3. find associated NUMA socket ID
 *  4. sort all huge pages by physical address
 *  5. remap these N huge pages in the correct order
 *  6. unmap the first mapping
 *  7. fill memsegs in configuration with contiguous zones
 */

このコメントの通りですが,要するに

  • 最初にhugetlb上にhugepageと同じサイズのN個のファイルを作成し,それをメモリ上にマップする
  • マップしたhugetlbファイルの物理アドレスを求める
  • 物理アドレスが連続のものを仮想アドレスが連続になるように再マッピングする

ということをおこないます.

rte_eal_hugepage_init()はエラーハンドリングであったり複数アーキテクチャ対応のため若干長いですが,主要部分だけ抜粋すると以下のようになります.

http://dpdk.org/browse/dpdk/tree/lib/librte_eal/linuxapp/eal/eal_memory.c?h=v17.05-rc2#n964

    ....
    pages_new = map_all_hugepages(&tmp_hp[hp_offset], hpi, 1);
    ...
    find_physaddrs(&tmp_hp[hp_offset], hpi);
    ...
    qsort(&tmp_hp[hp_offset], hpi->num_pages[0],
          sizeof(struct hugepage_file), cmp_physaddr);
    ...
    map_all_hugepages(&tmp_hp[hp_offset], hpi, 0);
    ...

ここで,

をしています.

まず,map_all_hugepages()は以下のようになっています.

static unsigned
map_all_hugepages(struct hugepage_file *hugepg_tbl,
        struct hugepage_info *hpi, int orig)
{
    int fd;
    unsigned i;
    void *virtaddr;
    void *vma_addr = NULL;
    size_t vma_len = 0;

    for (i = 0; i < hpi->num_pages[0]; i++) {
        uint64_t hugepage_sz = hpi->hugepage_sz;
        ...

        hugepg_tbl[i].file_id = i;
        hugepg_tbl[i].size = hugepage_sz;
        eal_get_hugefile_path(hugepg_tbl[i].filepath,
                sizeof(hugepg_tbl[i].filepath), hpi->hugedir,
                hugepg_tbl[i].file_id);
        hugepg_tbl[i].filepath[sizeof(hugepg_tbl[i].filepath) - 1] = '\0';
        ...

        fd = open(hugepg_tbl[i].filepath, O_CREAT | O_RDWR, 0600);
        ...

        virtaddr = mmap(vma_addr, hugepage_sz, PROT_READ | PROT_WRITE,
                MAP_SHARED | MAP_POPULATE, fd, 0);
        hugepg_tbl[i].orig_va = virtaddr;
        ...

        *(int *)virtaddr = 0;
        ...
        flock(fd, LOCK_SH | LOCK_NB);
            RTE_LOG(DEBUG, EAL, "%s(): Locking file failed:%s \n",
        ...
        vma_addr = (char *)vma_addr + hugepage_sz;
        vma_len -= hugepage_sz;
    }

    return i;
}

最初は orig=1 なので orig=1 の処理のみ抜粋しています(エラーハンドリングは省略). hpi->num_pages[0]sys/kernel/mm/hugepages/hugepages-xxx/nr_hugepagesの値が入っており,その数だけhugetlb上にファイルを作成し,mmap()マッピング,仮想アドレスを hugepg_tbl[i].orig_va に格納します.

その後,rte_eal_hugepage_init() では find_physaddrs() で仮想アドレスから物理アドレスをもとめ,さらにそれを物理アドレスでソートします.仮想アドレスから物理アドレスを求める方法は,前のエントリに書いたとおりです.

物理アドレスを求めソートが終わったら,もう一度 map_allhugepaegs()を呼びます.ただし,今回はorig=0です. orig=0の場合の処理を抜粋すると以下のようになります.

http://dpdk.org/browse/dpdk/tree/lib/librte_eal/linuxapp/eal/eal_memory.c?h=v17.05-rc2#n393

static unsigned
map_all_hugepages(struct hugepage_file *hugepg_tbl,
        struct hugepage_info *hpi, int orig)
{
    int fd;
    unsigned i;
    void *virtaddr;
    void *vma_addr = NULL;
    size_t vma_len = 0;

    for (i = 0; i < hpi->num_pages[0]; i++) {
        uint64_t hugepage_sz = hpi->hugepage_sz;

        if (vma_len == 0) {
             unsigned j, num_pages;

             /* reserve a virtual area for next contiguous
              * physical block: count the number of
              * contiguous physical pages. */
             for (j = i+1; j < hpi->num_pages[0] ; j++) {
                if (hugepg_tbl[j].physaddr !=
                     hugepg_tbl[j-1].physaddr + hugepage_sz)
                     break;
             }

            num_pages = j - i;
            vma_len = num_pages * hugepage_sz;

            /* get the biggest virtual memory area up to
             * vma_len. If it fails, vma_addr is NULL, so
             * let the kernel provide the address. */
            vma_addr = get_virtual_area(&vma_len, hpi->hugepage_sz);
            if (vma_addr == NULL)
            vma_len = hugepage_sz;
        }
        ...
        fd = open(hugepg_tbl[i].filepath, O_CREAT | O_RDWR, 0600);
        ...
        virtaddr = mmap(vma_addr, hugepage_sz, PROT_READ | PROT_WRITE,
                MAP_SHARED | MAP_POPULATE, fd, 0);
        ...
        hugepg_tbl[i].final_va = virtaddr;
        ...
        flock(fd, LOCK_SH | LOCK_NB) == -1)
        ...
        close(fd);
        ...

        vma_addr = (char *)vma_addr + hugepage_sz;
        vma_len -= hugepage_sz;
    }

    return i;
}

ループの中で,まず最初は vma_len == 0 の条件が成立し,if文の中が実行されます.ここで連続した物理アドレスの長さがvma_lenに設定されます.その後get_virtual_area()を使いvma_lenの長さ以上の使用可能な仮想アドレス空間を求め,仮想アドレスが正しく求められたらそれを hugepg_tbl[i].final_va に設定しています.

get_virtual_area()は以下のようになっています.

http://dpdk.org/browse/dpdk/tree/lib/librte_eal/linuxapp/eal/eal_memory.c?h=v17.05-rc2#n312

static void *
get_virtual_area(size_t *size, size_t hugepage_sz)
{
    ...
    fd = open("/dev/zero", O_RDONLY);
    ...
    addr = mmap(addr, (*size) + hugepage_sz, PROT_READ, MAP_PRIVATE, fd, 0);
    ...
    aligned_addr = (long)addr;
    aligned_addr += (hugepage_sz - 1);
    aligned_addr &= (~(hugepage_sz - 1));
    addr = (void *)(aligned_addr);
    ...
    return addr;
}

やっていることは単純で, 求めたいサイズ(をhugepage境界にアラインしたもの)だけのアドレス空間mmap()で取得し,成功したならそのアドレスを返します.

rte_eal_hugepage_init()では最終的に rte_mem_config構造体の mcfgにメモリ情報を格納しています.memseg[j]が一つの連続した領域を表します.

    mcfg->memseg[j].phys_addr = hugepage[i].physaddr;
    mcfg->memseg[j].addr = hugepage[i].final_va;
    mcfg->memseg[j].len = hugepage[i].size;
    mcfg->memseg[j].socket_id = hugepage[i].socket_id;
    mcfg->memseg[j].hugepage_sz = hugepage[i].size;

またrte_eal_hugepage_init()ではこの他にNUMA環境の場合の初期化処理等もおこなっています.

全体的に,少々泥臭いことをしているというか,このあたりカーネル側でもう少しサポートがあれば綺麗にかけそうです(まぁ普通はユーザ側で物理アドレスが必要なことなんてないわけですが).

hugepageを使わない場合

DPDK(というかEAL)では,--no-hugeというオプションを渡すと,内部のinternal_config.no_hugetlbfsが1になります. rte_eal_hugepage_init()では一番最初にこのオプションをチェックしていてhugetableが無効の場合は以下の処理をおこないます.

if (internal_config.no_hugetlbfs) {
    addr = mmap(NULL, internal_config.memory, PROT_READ | PROT_WRITE,
                MAP_PRIVATE | MAP_ANONYMOUS, 0, 0);
    ...
    mcfg->memseg[0].phys_addr = (phys_addr_t)(uintptr_t)addr;
    mcfg->memseg[0].addr = addr;
    mcfg->memseg[0].hugepage_sz = RTE_PGSIZE_4K;
    mcfg->memseg[0].len = internal_config.memory;
    mcfg->memseg[0].socket_id = 0;
    return 0;
}

hugepageを使わない代わりにmmap()で一括でメモリを取得し,それをmemseg[0]に設定します.このときmemseg[0].phy_addr物理アドレスは格納していません (そもそも仮にphys_addrを求めて格納したとしても後続の仮想アドレス空間に対応する物理アドレスが連続している保証はどこにもありません). ということで,--no-hugeオプションを渡した場合はDPDKが正常に利用できないような気がします.少なくとも,自分の環境(X540+VFIO)では動きませんでした.

2017-05-19 追記 改めてソースを眺めていたらmempoolの初期化部分でhugepageでない場合はrte_mempool_popoiulate_virt()を使ってなるべく連続した領域を割り当てようとしていることに気づきました. http://dpdk.org/browse/dpdk/tree/lib/librte_mempool/rte_mempool.c?h=v17.05#n451 ということでhugepageなしでもうまくいけば環境によっては動作するのかもしれません.

/proc/pid/pagemapの話

前回 /proc/pid/pagemap について簡単に触れたので,実際にpagemapを読み取るときどういうことが起きるか少し追ってみようと思います.

/proc/pid/pagemapに関する操作はカーネルfs/proc/task_mmu.c内でproc_pagemap_operationsに定義してあります.

https://github.com/torvalds/linux/blob/v4.11/fs/proc/task_mmu.c#L1463

const struct file_operations proc_pagemap_operations = {
    .llseek     = mem_lseek, /* borrow this */
    .read       = pagemap_read,
    .open       = pagemap_open,
    .release    = pagemap_release,
};

要するに,/proc/pid/pagemapをreadする場合はpagemap_read()が呼ばれるということです. pagemap_read()の処理を抜粋すると以下のようになります.

https://github.com/torvalds/linux/blob/v4.11/fs/proc/task_mmu.c#L1351

static ssize_t pagemap_read(struct file *file, char __user *buf,
               size_t count, loff_t *ppos)
{
   struct mm_struct *mm = file->private_data;
   struct pagemapread pm;
   struct mm_walk pagemap_walk = {};
   unsigned long src;
   unsigned long svpfn;
   unsigned long start_vaddr;
   unsigned long end_vaddr;
...
   pagemap_walk.pmd_entry = pagemap_pmd_range;
   pagemap_walk.pte_hole = pagemap_pte_hole;
#ifdef CONFIG_HUGETLB_PAGE
   pagemap_walk.hugetlb_entry = pagemap_hugetlb_range;
#endif
   pagemap_walk.mm = mm;
   pagemap_walk.private = &pm;

   src = *ppos;
   svpfn = src / PM_ENTRY_BYTES;
   start_vaddr = svpfn << PAGE_SHIFT;
   end_vaddr = mm->task_size;
...
   while (count && (start_vaddr < end_vaddr)) {
        ...
       down_read(&mm->mmap_sem);
       ret = walk_page_range(start_vaddr, end, &pagemap_walk);
       up_read(&mm->mmap_sem);
       start_vaddr = end;

       len = min(count, PM_ENTRY_BYTES * pm.pos);
       if (copy_to_user(buf, pm.buffer, len)) {
           ret = -EFAULT;
           goto out_free;
       }
       copied += len;
       buf += len;
       count -= len;
   }

この関数ではread()のオフセットから求めたい仮想アドレスを計算し(start_vaddr)その後 walk_page_range() 関数を呼びます.このwalk_page_range()関数は https://github.com/torvalds/linux/blob/master/mm/pagewalk.c#L283 に定義されており,指定したページ範囲を探索し,必要に応じて設定したコールバック関数を呼ぶ関数です.

今回の場合,基本的には walk_page_range() => __walk_page_range() => walk_pgd_range() => walk_pud_range() => walk_pmd_range() と探索していったのち,walk->pmd_entry() が呼ばれます.このwalk->pmd_entry()pagemap_read()の前半部分で設定したコールバック関数で,実体はpagemap_pmd_range()です.

pagemap_pmd_range()の主要部を抜き出すと以下のようになっています. https://github.com/torvalds/linux/blob/v4.11/fs/proc/task_mmu.c#L1205

static int pagemap_pmd_range(pmd_t *pmdp, unsigned long addr, unsigned long end,
                 struct mm_walk *walk)
{
    ...
    orig_pte = pte = pte_offset_map_lock(walk->mm, pmdp, addr, &ptl);
    for (; addr < end; pte++, addr += PAGE_SIZE) {
        pagemap_entry_t pme;

        pme = pte_to_pagemap_entry(pm, vma, addr, *pte);
        err = add_to_pagemap(addr, &pme, pm);
        if (err)
            break;
    }
    ....
}

また,add_to_pagemap()は以下のようになっています. https://github.com/torvalds/linux/blob/v4.11/fs/proc/task_mmu.c#L1121

static int add_to_pagemap(unsigned long addr, pagemap_entry_t *pme,
              struct pagemapread *pm)
{
    pm->buffer[pm->pos++] = *pme;
    if (pm->pos >= pm->len)
        return PM_END_OF_BUFFER;
    return 0;
}

やっていることはpmdからpteを求め,そのptepte_to_pagemap_entryでpagemapの64bitのエントリに変換し,add_to_pagemapでそれをバッファに保存します.こうして得られたpagemapのエントリを最終的にpagemap_read()のい中でcopy_to_user()でコピーしています.

ということで,pagemapをreadするとページテーブルを探索してそれの結果を返すという,そりゃそうだろうという話ですがこうなっているという話でした.

仮想アドレスから物理アドレスを求める

××な理由で仮想アドレスではなく実際の物理アドレスを求めたいことがあると思います.

linuxの場合,カーネルからならvirt2phys()等使えますが,ユーザランドからはそのような関数はありません. その代わり, /proc/self/pagemap を参照して解決することができます.

pagemap ファイルは各仮想ページがどの物理ページに対応しているかを保持している特殊ファイルです.一つのエントリの長さが64bitで,一つのエントリが一つの仮想ページの情報を保持しています.物理アドレスを求めたい場合はまず仮想アドレスページ番号をインデックスとしてpagemapのエントリを読み取り,そこから物理ページのアドレスを求めます.物理ページのアドレスが分かればそこにオフセットを足せば実際の物理アドレスが求まります.エントリの63bit目がページがメモリ上にあるかどうかを示すフラグになります.

DPDKにあるrte_mem_virt2phy() という関数を参考に*1以下のような関数で解決できました. http://dpdk.org/browse/dpdk/tree/lib/librte_eal/linuxapp/eal/eal_memory.c?h=v17.05-rc2#n159

#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/time.h>
#include <inttypes.h>

#define PFN_MASK_SIZE 8

unsigned long
virt2phy(const void *virtaddr)
{
    unsigned long page, physaddr, virt_pfn;
    off_t offset;
    int page_size = sysconf(_SC_PAGESIZE);
    int fd, retval;

    fd = open("/proc/self/pagemap", O_RDONLY);
    if (fd < 0) {
        printf("error: open");
        return 0;
    }

    virt_pfn = (unsigned long)virtaddr / page_size;
    offset = sizeof(uint64_t) * virt_pfn;
    if (lseek(fd, offset, SEEK_SET) == (off_t) -1) {
        printf("error: lseek\n");
        close(fd);
        return 0;
    }

    retval = read(fd, &page, PFN_MASK_SIZE);
    close(fd);
    if (retval < 0) {
        printf("error: read\n");
        return 0;
    } else if (retval != PFN_MASK_SIZE) {
        printf("error: read2\n");
        return 0;
    }

    if ((page & 0x7fffffffffffffULL) == 0){
        printf("pfn == 0\n");
        printf("page = %016lX\n", page);
        return 0;
    }

    physaddr = ((page & 0x7fffffffffffffULL) * page_size)
        + ((unsigned long)virtaddr % page_size);

    return physaddr;
}

実のところ,最初は以下のstack overflowの回答をみつけ,この関数を試してみたのですが,malloc()で求めたアドレスには正しく動くものの,mmap()やローカル変数のアドレスにはなぜか動きませんでした.fread()が失敗しているようですが何が原因でしょうか..

c - How to find the physical address of a variable from user-space in Linux? - Stack Overflow

*1:というかほぼそのまま

I/O APICについて

前回LAPICについて書きました.LAPICは今ではCPU内に埋め込まれており,IntelのSDMにその使用方法が書いてあります.一方でI/O APICは今ではチップセットに埋め込まれているため,I/O APICに関して知りたい場合はチップセットのデータシートを見ることになります.

もともとはLAPICもI/O APICもCPUとは別の独立したチップとして実装されていました.特に,Intel 82093AAが最初のI/O APICチップです.例によって基本的に製品の互換性が保たれているため,I/O APICについて知りたい場合は82093AAのデータシートも参考になります.巨大なチップセットのデータシートと比べコンパクトにまとまっています.

文献

I/O APICの基本

f:id:mm_i:20170327201320p:plain (Intel SDMより)

図に示したように,I/O APICは外部からの割り込みを受け取り,それを各LAPICへ送ります.外部からの割り込みをどのLAPICへ送るかはRemaping Tableで設定します.I/O APICを使用するデバイスは,タイマやキーボードコントローラ(i8042)などです./proc/interruptsから何がI/O APICを利用しているか確認できます.

% cat /proc/interrupts
           CPU0       CPU1       CPU2       CPU3       CPU4       CPU5       CPU6       CPU7
  0:         16          0          0          0          0          0          0          0   IO-APIC   2-edge      timer
  1:          1          0          0          0          1          0          0          0   IO-APIC   1-edge      i8042
  8:          0          0          0          0          0          1          0          0   IO-APIC   8-edge      rtc0
  9:          0          0          0          0          4          0          0          0   IO-APIC   9-fasteoi   acpi
 12:          3          0          0          0          1          0          0          0   IO-APIC  12-edge      i8042
 16:        188          0          0          0        187         48          0          0   IO-APIC  16-fasteoi   ehci_hcd:usb1
 18:          0          0          0          0          0          2          0          0   IO-APIC  18-fasteoi   firewire_ohci
 23:         28          0          0          0          2          3          0          0   IO-APIC  23-fasteoi   ehci_hcd:usb4
...

なお,PCIeデバイスMSIを利用する場合は,I/O APICを介さずに直接割り込みがLAPICへ送られます.

I/O APICレジスタ

LAPICではアドレスfee00000h以下にLAPIC レジスタがmemory-mappedされていました(x2APICの場合はMSRを使います).I/O APICの場合,memory-mappedされているレジスタを利用して,I/O APICレジスタへアクセスします.以下にmemory-mappedされているレジスタを示します.

f:id:mm_i:20170409131928p:plain (9-series datasheetより)

具体的には,Indexレジスタ(IOREGSEL レジスタ),Dataレジスタ(IOWINレジスタ)の2つを使います*1.この2つのレジスタはmemory-mappedされており,IndexでI/O APICレジスタのインデックス(下図参照)を指定し,Dataレジスタでデータの読み書きをおこないます.間接的にアクセスできるレジスタは以下の通りです.

f:id:mm_i:20170409132110p:plain (9-series datasheetより)

IndexレジスタやDataレジスタがどこにmemory-mappedされているかはACPIから取得します.linuxの場合,iaslを利用して以下のようにACPIの情報が取得できます.

% sudo apt-get install iasl
% sudo cp /sys/firmware/acpi/tables/APIC .
% iasl -d APIC

これを実行すると,APIC.dslというファイルが作成されます.このファイルを見ると,以下のようにI/O APICについて書かれた部分が見つかります.

...
[06Ch 0108   1]                Subtable Type : 01 [I/O APIC]
[06Dh 0109   1]                       Length : 0C
[06Eh 0110   1]                  I/O Apic ID : 02
[06Fh 0111   1]                     Reserved : 00
[070h 0112   4]                      Address : FEC00000
[074h 0116   4]                    Interrupt : 00000000
...

これより,Indexレジスタが fec00000hにマッピングされていることがわかります.また,Dataレジスタはfec00010hにマッピングされています.

I/O APICレジスタの読み出し

カーネルモジュールからI/O APICレジスタを読み出してみます.読み出し方法は,まずIndexレジスタにoffsetを書き込み,その後Data Registerを読み出すことでおこないます(書き込みたい場合は,Data Registerに直接書き込みます).これらのレジスタのアクセスは全て32bit単位でおこないます.

オフセット0にあるI/O APIC IDと,オフセット1にあるI/O APIC Versionは以下のようにして読み出すことができます.

static int hello_init(){
    void *addr = ioremap(0xFEC00000, 0x10);

    writel(0, addr);
    pr_info("I/O APIC ID : %08x\n", readl(addr+0x10));

    writel(1, addr);
    pr_info("I/O APIC VER : %08x\n", readl(addr+0x10));

    iounmap(addr);

    return 0;
}
% sudo insmode hello.ko && rmmod helo && dmesg | tail -n2
[663870.752166] I/O APIC ID : 02000000
[663870.752185] I/O APIC VER : 00170020

IDは以下に示すように27-24bit目に格納されており,その値は02です.これはACPIで得られたI/O APIC IDと一致します.

f:id:mm_i:20170409132150p:plain (9-series datasheetより)

また,I/O APIC VERは7-0bit目がバージョン,23-16bit目がRedirection Tableのエントリインデックスの最大値です.したがって,このマシンではI/O APICのバージョンは0x20,Redirection Tableのエントリ数の最大(サポートする割り込みの数)は0x17+1 (=24) になります.どのI/O APICでもRedirection Tableの最大エントリ数は24のようです. → 200 seriesのデータシート見たらエントリ数の最大は120でした.

f:id:mm_i:20170409132217p:plain (9-series datasheetより)

Redirection Table

I/O APICレジスタの中でも重要なのが,Redirection Table Registerです.Redirectoin Table Registerは64bitのレジスタで,24個のRedirection Table RegisterそれぞれにはIndexレジスタで10-11h, 12-13h, …, 3e-3fhを指定することでアクセスします..

Redirection Table Registerの構造は以下の通りです.

f:id:mm_i:20170409132231p:plain f:id:mm_i:20170409132238p:plain (9-series datasheetより)

63-56bitでDestinationを決めます.LAPICと同様にDestinationにはLogical modeとPhysical modeがあって,11bit目でどちらのモードかを指定します.下位8bitが割り込みベクタ番号,10-8bit目がDelivery modeになっています.

各Redirection Table Registerと実際の割り込みの対応付けは以下のようになっています.

f:id:mm_i:20170409132320p:plain f:id:mm_i:20170409132328p:plain (9-series datasheetより)

Redirection Tableの読み出し

実際にRedirection Tableを読み出して見ます. 以下のようなカーネルモジュールで読み出すことができます.

long read_redirection_table(int idx){
    void *addr = ioremap_nocache(0xFEC00000, 0x14);
    int t1,t2;
    writel(0x10+idx*2, addr);
    t1 = readl(addr+0x10);

    writel(0x11+idx*2, addr);
    t2 = readl(addr+0x10);
    iounmap(addr);

    return ((long)(t2) << 32) | (long)(t1);
}

void read_rt(){
    int i = 0;
    for(i = 0; i < 24; i++){
        long reg = read_redirection_table(i);
        pr_info("RT Entry%2d : %016lx\n", i, reg);
    }
 }

実行結果:

[869496.874591] RT Entry 0 : 0000000000010000
[869496.874623] RT Entry 1 : ff00000000000931
[869496.874655] RT Entry 2 : ff00000000000930
[869496.874686] RT Entry 3 : ff00000000000933
[869496.874717] RT Entry 4 : ff00000000000934
[869496.874750] RT Entry 5 : ff00000000000935
[869496.874781] RT Entry 6 : ff00000000000936
[869496.874812] RT Entry 7 : ff00000000000937
[869496.874844] RT Entry 8 : ff00000000000938
[869496.874874] RT Entry 9 : ff00000000008939
[869496.874906] RT Entry10 : ff0000000000093a
[869496.874937] RT Entry11 : ff0000000000093b
[869496.874968] RT Entry12 : ff0000000000093c
[869496.875946] RT Entry13 : ff0000000000093d
[869496.876926] RT Entry14 : ff0000000000093e
[869496.877858] RT Entry15 : ff0000000000093f
[869496.878775] RT Entry16 : ff0000000000a971
[869496.879704] RT Entry17 : 0000000000010000
[869496.880615] RT Entry18 : ff0000000000a9a1
[869496.881520] RT Entry19 : 0000000000010000
[869496.882421] RT Entry20 : 0000000000010000
[869496.883306] RT Entry21 : 0000000000010000
[869496.884220] RT Entry22 : 0000000000010000
[869496.885118] RT Entry23 : ff0000000000a973

これだとちょっと分かりにくいので,以下の様な関数で表示してみます.

void print_rt(long reg){
    int dest      = (reg >> 56) & 0xff;
    int edid      = (reg >> 48) & 0xff;
    int mask      = (reg >> 16) & 0x1;
    int trigger   = (reg >> 15) & 0x1;
    int irr       = (reg >> 14) & 0x1;
    int polarity  = (reg >> 13) & 0x1;
    int status    = (reg >> 12) & 0x1;
    int dest_mode = (reg >> 11) & 0x1;
    int delv_mode = (reg >>  8) & 0x3;
    int vector = reg & 0xff;

    pr_info("  Dest           : %02X\n",  dest);
    pr_info("  EDID           : %02X\n",  edid);
    pr_info("  Mask           : %s\n",  mask ? "masked" : "not masked");
    pr_info("  Trigger mode   : %s\n",  trigger ? "level" : "edge");
    pr_info("  Remote IRR mode: %d\n",  irr);
    pr_info("  Polarity       : %s\n",  polarity ? "active low" : "active high");
    pr_info("  Delivery status: %s\n",  status ? "pending" : "idle");
    pr_info("  Dest mode      : %s\n",  dest_mode ? "logical" : "physical");
    pr_info("  Delivery mode  : %03x\n",  delv_mode);
    pr_info("  vector         : %02x\n",  vector);
}

たとえばエントリ16に関しては以下の様になります.

[865418.043710] RT Entry16 : ff0000000000a971
[865418.044335]   Dest           : FF
[865418.044967]   EDID           : 00
[865418.045572]   Mask           : not masked
[865418.046177]   Trigger mode   : level
[865418.046779]   Remote IRR mode: 0
[865418.047381]   Polarity       : active low
[865418.047989]   Delivery status: idle
[865418.048612]   Dest mode      : logical
[865418.049224]   Delivery mode  : 001
[865418.049806]   vector         : 71

ちなみに,ここでdestination modeがlogical, delivery modeは001 (lowest priority), destinationはffですが,LAPICの時と同様自分の環境では割り込みは散りませんでした.. 何故でしょうか..

x2APICの場合

x2APICを利用する場合は,MSIの場合と同様にInterrupt Remappingを使うことになります.下図のように,Redirection Tableの63-49bit目でInterrupt Rmeppaing Tableのインデックスを指定します.

f:id:mm_i:20170409132351p:plain (Intel® Virtualization Technology for Directed I/Oより)

*1:82093AAではIOREGSELとIOWINという名前ですが,9 seriesのデータシートではIndexとDataという名前になっています