LinuxのBPF : (3) eBPFの基礎

はじめに(注意)

ちょうど一年ぐらい前にLinuxのBPFについて記事を書いていました. 最初の記事は古典的なBPFについて,二番目の記事はseccompについてです. eBPFに関する文章も書いていて,てっきり公開していたと思っていたのですが今下書きのまま保存され一年以上放置されていたことに気づきました.. BPFの開発は非常に活発で,ここに書いてる情報が古い可能性もあるのですが,せっかくなので公開しておきます.

この記事はLinux 4.7時点での情報に基づきます.プログラムはLinux 4.7(Ubuntu Xeniel)で動作確認しています.

extended BPF (eBPF)

前々回と前回のエントリでLinuxにおいてBPFのプログラムを利用してパケットフィルタリングやseccompを利用する例を見ました.BPFを利用する利点は,大きく以下の2つです.

  • ユーザがフィルタプログラムを自由に設定できる
  • コードはカーネル内安全に実行される (実行前に静的に安全性をチェック)

そんなBPFですが,これを拡張し,カーネル内の様々なイベントに対してユーザが定義した処理を実行できるようにすれば,様々なトレースに使えるのではないかという議論が2011年ごろから*1起こりました. そして,その機能実現のためにBPFのレジスタマシンを拡張したextended BPF (eBPF)が考案されました. 昔からあるBPFをclassic BPF (cBPF),extended BPFをeBPFあるいはinternal BPFとして区別します*2

eBPFは以下のような特徴/機能があります.

  • 10個のレジスタ (cBPFは2つ)
    • R0 : 戻り値格納用
    • R1-R5 : 引数
    • R6-R9 : BPFプログラムが利用
    • R10: スタックへアクセスするためのフレームポインタ (read only)
  • レジスタ幅は64bit (cBPFは32bit)
  • ジャンプが jt/jf ではなく jt/fall through に
  • 負の方向のジャンプを許可
  • 他のbpfプログラムへのジャンプを許可 (ただし無限ループできないように遷移回数は制限されている)
  • bpf_call命令によるカーネル内の関数の呼び出し
  • eBPF mapによるデータのやりとり

eBPF map

eBPFの中でも特にeBPF mapはダイナミックトレースを実現するのに欠かせない機能です. eBPF mapはbpfで利用可能なデータ構造で,eBPFプログラム間やユーザスペース/カーネルスペースのプログラム間でのデータのやりとりに利用します. 例えばeBPFマップを利用することで任意の型のkey/value連想配列を作成することができます. eBPF mapはまず最初にユーザプロセスが作成します.BPFプログラムはそのmapにプログラム中からアクセスすることができます.

eBPFプログラムの例

eBPF mapを作成したり,あるいはeBPFのプログラムをイベントにアタッチするにはLinux 3.18から追加されたbpf(2)システムコールを利用します. bpf(2)のマニュアルに情報がありますが,とりあえず現在(2016/8/1)においては情報がやや古いです.. カーネルのソースのsamples/bpf以下にeBPFのプログラムの例があるので,それと 合わせてドキュメントを見た方がいいと思います.

ここでは,サンプルの中でお手頃なsock_exasmple.cを見てみましょう. このプログラムはeBPFを利用して,受信したTCPパケット,IPパケット,ARPパケットの種類をカウントするものです.

/* eBPF example program:
 * - creates arraymap in kernel with key 4 bytes and value 8 bytes
 *
 * - loads eBPF program:
 *   r0 = skb->data[ETH_HLEN + offsetof(struct iphdr, protocol)];
 *   *(u32*)(fp - 4) = r0;
 *   // assuming packet is IPv4, lookup ip->proto in a map
 *   value = bpf_map_lookup_elem(map_fd, fp - 4);
 *   if (value)
 *        (*(u64*)value) += 1;
 *
 * - attaches this program to eth0 raw socket
 *
 * - every second user space reads map[tcp], map[udp], map[icmp] to see
 *   how many packets of given protocol were seen on eth0
 */
#include <stdio.h>
#include <unistd.h>
#include <assert.h>
#include <linux/bpf.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <linux/if_ether.h>
#include <linux/ip.h>
#include <stddef.h>
#include "libbpf.h"

static int test_sock(void)
{
    int sock = -1, map_fd, prog_fd, i, key;
    long long value = 0, tcp_cnt, udp_cnt, icmp_cnt;

    map_fd = bpf_create_map(BPF_MAP_TYPE_ARRAY, sizeof(key), sizeof(value),
                256, 0);
    if (map_fd < 0) {
        printf("failed to create map '%s'\n", strerror(errno));
        goto cleanup;
    }

    struct bpf_insn prog[] = {
        BPF_MOV64_REG(BPF_REG_6, BPF_REG_1),
        BPF_LD_ABS(BPF_B, ETH_HLEN + offsetof(struct iphdr, protocol) /* R0 = ip->proto */),
        BPF_STX_MEM(BPF_W, BPF_REG_10, BPF_REG_0, -4), /* *(u32 *)(fp - 4) = r0 */
        BPF_MOV64_REG(BPF_REG_2, BPF_REG_10),
        BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, -4), /* r2 = fp - 4 */
        BPF_LD_MAP_FD(BPF_REG_1, map_fd),
        BPF_RAW_INSN(BPF_JMP | BPF_CALL, 0, 0, 0, BPF_FUNC_map_lookup_elem),
        BPF_JMP_IMM(BPF_JEQ, BPF_REG_0, 0, 2),
        BPF_MOV64_IMM(BPF_REG_1, 1), /* r1 = 1 */
        BPF_RAW_INSN(BPF_STX | BPF_XADD | BPF_DW, BPF_REG_0, BPF_REG_1, 0, 0), /* xadd r0 += r1 */
        BPF_MOV64_IMM(BPF_REG_0, 0), /* r0 = 0 */
        BPF_EXIT_INSN(),
    };

    prog_fd = bpf_prog_load(BPF_PROG_TYPE_SOCKET_FILTER, prog, sizeof(prog),
                "GPL", 0);
    if (prog_fd < 0) {
        printf("failed to load prog '%s'\n", strerror(errno));
        goto cleanup;
    }

    sock = open_raw_sock("lo");

    if (setsockopt(sock, SOL_SOCKET, SO_ATTACH_BPF, &prog_fd,
               sizeof(prog_fd)) < 0) {
        printf("setsockopt %s\n", strerror(errno));
        goto cleanup;
    }

    for (i = 0; i < 10; i++) {
        key = IPPROTO_TCP;
        assert(bpf_lookup_elem(map_fd, &key, &tcp_cnt) == 0);

        key = IPPROTO_UDP;
        assert(bpf_lookup_elem(map_fd, &key, &udp_cnt) == 0);

        key = IPPROTO_ICMP;
        assert(bpf_lookup_elem(map_fd, &key, &icmp_cnt) == 0);

        printf("TCP %lld UDP %lld ICMP %lld packets\n",
               tcp_cnt, udp_cnt, icmp_cnt);
        sleep(1);
    }

cleanup:
    /* maps, programs, raw sockets will auto cleanup on process exit */
    return 0;
}

int main(void)
{
    FILE *f;

    f = popen("ping -c5 localhost", "r");
    (void)f;

    return test_sock();
}

プログラムの主要点について見ていきいます.

eBPFのmapの作成

    map_fd = bpf_create_map(BPF_MAP_TYPE_ARRAY, sizeof(key), sizeof(value),
                256, 0);

bpf_create_map()によって大きさが256のeBPF mapを作成します.map_fdがこのeBPF mapのdescriptorです.

eBPFプログラムの作成

/*
 * - loads eBPF program:
 *   r0 = skb->data[ETH_HLEN + offsetof(struct iphdr, protocol)];
 *   *(u32*)(fp - 4) = r0;
 *   // assuming packet is IPv4, lookup ip->proto in a map
 *   value = bpf_map_lookup_elem(map_fd, fp - 4);
 *   if (value)
 *        (*(u64*)value) += 1;
 */

    struct bpf_insn prog[] = {
        BPF_MOV64_REG(BPF_REG_6, BPF_REG_1),
        BPF_LD_ABS(BPF_B, ETH_HLEN + offsetof(struct iphdr, protocol) /* R0 = ip->proto */),
        BPF_STX_MEM(BPF_W, BPF_REG_10, BPF_REG_0, -4), /* *(u32 *)(fp - 4) = r0 */
        BPF_MOV64_REG(BPF_REG_2, BPF_REG_10),
        BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, -4), /* r2 = fp - 4 */
        BPF_LD_MAP_FD(BPF_REG_1, map_fd),
        BPF_RAW_INSN(BPF_JMP | BPF_CALL, 0, 0, 0, BPF_FUNC_map_lookup_elem),
        BPF_JMP_IMM(BPF_JEQ, BPF_REG_0, 0, 2),
        BPF_MOV64_IMM(BPF_REG_1, 1), /* r1 = 1 */
        BPF_RAW_INSN(BPF_STX | BPF_XADD | BPF_DW, BPF_REG_0, BPF_REG_1, 0, 0), /* xadd r0 += r1 */
        BPF_MOV64_IMM(BPF_REG_0, 0), /* r0 = 0 */
        BPF_EXIT_INSN(),
    };

cBPFと同様に,define済みのマクロを利用してeBPFのプログラムを作成することができます. <linux/bpf.h>にeBPFの命令や,eBPFプログラムから呼び出すことのできる関数が宣言されています. eBPFについても詳しくはカーネルのドキュメントに書いてあります.

このプログラムが何をしているかというのは,コメントに書いてある通りですが,ポイントとしては

  1. R1-R5のレジスタが引数として使われる
  2. R10はフレームポインタ
  3. 最初R1にはパケットデータを保持するsk_buffのポインタが格納されている
  4. BPF_LD_ABS (正確にはBPF_ABS | <size> | BPF_LD)は特別な命令で,sk_buffからデータを読み出す.このときR6レジスタsk_buffのポインタを格納する.(注: パケットデータはsk_buff構造体の中のポインタからアクセスするため,単純なeBPFの命令ではアクセスできない.そのため特別扱いしている)
  5. BPF_CALLBPF_FUNC_map_lookup_elem(この関数はカーネル内にあらかじめ存在)を呼び出している
  6. BPF_FUNC_map_lookup_elemの第一引数はeBPF mapのfd,第二引数はキーのアドレス.ここではスタック上にキーを格納し,それを渡している.戻り値はキーに対応するマップが見つかったらそのアドレス

eBPFフィルタプログラムの利用

BPFでパケットフィルタリングする場合は,setsockopt(2)を使ってソケットのfdに対してcBPFのプログラムをアタッチすることができました. このとき実はcBPFのプログラムは内部でeBPFのプログラムに変換され実行されます.

それでは,eBPFプログラムそのものをソケットにアタッチすることはできるのでしょうか? 答えはもちろんイエスです. アタッチする手順は以下の通りです.

  1. bpfシステムコールLinux 3.18から追加)を使って,まずカーネル空間にeBPFのプログラムを追加
  2. setsockoptの第二引数にSO_ATTACH_BPFを指定してeBPFのプログラムをアタッチする

上のプログラムではbpfシステムコールの代わりに,そのラッパー関数であるbpf_prog_loadを利用しています.

    prog_fd = bpf_prog_load(BPF_PROG_TYPE_SOCKET_FILTER, prog, sizeof(prog),
                "GPL", 0);
    if (prog_fd < 0) {
        printf("failed to load prog '%s'\n", strerror(errno));
        goto cleanup;
    }

    sock = open_raw_sock("lo");

    if (setsockopt(sock, SOL_SOCKET, SO_ATTACH_BPF, &prog_fd,
               sizeof(prog_fd)) < 0) {
        printf("setsockopt %s\n", strerror(errno));
        goto cleanup;
    }

eBPFプログラムはカーネル内で実行されるため,カーネルモジュールと同様にライセンスを明示する必要があります. bpf_prog_loadの第四引数でライセンスとして"GPL"を指定しています.GPL compatibleなライセンスにしておかないとカーネル内の他のGPLの関数が呼べなくなります.

setsockoptの挙動

BPFに関するsetsockoptの挙動を整理しておくと,

  • cBPFプログラムのロードは,SO_ATTACH_FILTER
    • このときsk_attach_filterが呼ばれる
      • さらに,sk_attach_filterから,__get_filter() > bpf_prepare_filter() > bpf_migrate_filter() > bpf_convert_filter() と関数が呼ばれcBPFはeBPFに変換される
  • eBPFプログラムのロードは,SO_ATTAH_BPF

JIT

Linux 3.16からeBPFに対してもJITが使えるようになりました. ちなみに,BPFをJIT化しようとする議論は2011年ごろからありました(A JIT for packet filters).

/proc/sys/net/core/bpf_jit_enable を1にするとjitが有効になります.また,2にするとdebugモードとなり,jitの結果がdemsgで確認できます. デフォルトは無効化(0)されています.

ソース

eBPFの主なソースはkernel/bpfにあります.また,tools/net以下にBPFのアセンブラ/ディスアセンブラがあります. eBPFのインタプリタに興味がある方は,eBPFのユーザランド実装であるubpfを見た方が分かりやすいかもしれません.

まとめ

今回はeBPFの基礎について簡単に説明しました. eBPFによるトレースの方法や,実際にeBPFプログラムを作成する方法(もちろん,#defineを使わないで作成する方法があります)などは機会があれば別に書こうと思います.

*1:もっと前からあったかもしれない

*2:最近ではLinuxでBPFといった場合eBPFの方を指すようになってきているようです

nomによるnumpyデータのパース

nom

nomはrust製のパーサコンビネーターライブラリです. 個人的にはno_stdでの動作をサポートしてる点が嬉しいです.

今回勉強のためにnomでnumpyのファイル(のヘッダ)をパースしてみます.

numpyのフォーマット

今回は以下のようにして作成したデータをパースしてみます.

$ python -c "import numpy as np; a = np.arange(9).reshape(3,3); np.save('a.npy', a)";
$ hexdump -C a.npy
00000000  93 4e 55 4d 50 59 01 00  46 00 7b 27 64 65 73 63  |.NUMPY..F.{'desc|
00000010  72 27 3a 20 27 3c 69 38  27 2c 20 27 66 6f 72 74  |r': '<i8', 'fort|
00000020  72 61 6e 5f 6f 72 64 65  72 27 3a 20 46 61 6c 73  |ran_order': Fals|
00000030  65 2c 20 27 73 68 61 70  65 27 3a 20 28 33 2c 20  |e, 'shape': (3, |
00000040  33 29 2c 20 7d 20 20 20  20 20 20 20 20 20 20 0a  |3), }          .|
00000050  00 00 00 00 00 00 00 00  01 00 00 00 00 00 00 00  |................|
00000060  02 00 00 00 00 00 00 00  03 00 00 00 00 00 00 00  |................|
00000070  04 00 00 00 00 00 00 00  05 00 00 00 00 00 00 00  |................|
00000080  06 00 00 00 00 00 00 00  07 00 00 00 00 00 00 00  |................|
00000090  08 00 00 00 00 00 00 00                           |........|
00000098

フォーマットの公式ドキュメント: https://docs.scipy.org/doc/numpy-dev/neps/npy-format.html

以下のようなフォーマットになっているようです.

  • 先頭は \x93NUMPY
  • 次の1byteがメジャーバージョン
  • 次の1byteがマイナーバージョン
  • メジャーバージョン1の場合,次の2byteがヘッダ長(この後に続くpythonのdictを表す文字列の長さ(パディング含む))
  • メジャーバージョン2の場合,次の4byteがヘッダ長
  • 次に文字列でpythonのdictの形式でdescr, fortran_order, shape が格納されている
    • descr : numpyのdtype.通常は'<u8'とか'<f8'などの文字列だが,structured arrayの場合は型情報のリストになる.
    • fortran_order : TrueFalse
    • shape: タプルでarrayの形を表す
  • その後,アラインメントを揃えるために適当なスペースがあり,最後に改行(\x0a)
  • その後が実際のデータ.データの格納形式はdescrで指定されたdtypeに基づく

完成品1

とりあえず,1. dictの部分はdescr, fortran_order, shapeの順に格納されていると仮定 *1 2. dtypeの部分はstrucured arrayは考えないで,endianとワードサイズのみだけ取り出す として作ってみました.

#[derive(Debug, PartialEq)]
pub struct NpyHeader {
    major_version: u8,
    minor_version: u8,
    header_len: u32,
    little_endian: bool,
    fortran_order: bool,
    word_size: u32,
    shape: Vec<u8>,
}
named!(pub parse_header<NpyHeader>,
    do_parse!(
        tag!(b"\x93NUMPY") >>
        major_version: le_u8 >>
        minor_version: le_u8 >>
        header_len: alt!(
            cond_reduce!(major_version == 1, map!(le_u16, |x| x as u32)) |
            cond_reduce!(major_version == 2, le_u32)) >>
        tag!("{'descr': '") >>
        little_endian: map!(alt!(tag!("<") | tag!(">")), |x| x == b"<") >>
        le_u8 >> // skip byte
        word_size: map_res!(map_res!(digit, std::str::from_utf8), std::str::FromStr::from_str) >>
        tag!("', 'fortran_order': ") >>
        fortran_order: map!(alt!(tag!("True") | tag!("False")), |s| s == b"True") >>
        tag!(", 'shape': ") >>
        shape: delimited!(char!('('),
                          separated_list!(ws!(char!(',')),
                                            map_res!(
                                                map_res!(digit, std::str::from_utf8),
                                                    std::str::FromStr::from_str)),
                          char!(')')) >>
        tag!(", }") >>
        take_while!(call!(|c| c == b' ')) >>
        tag!("\n") >>
        (
            NpyHeader{
                major_version,
                minor_version,
                header_len,
                little_endian,
                fortran_order,
                word_size,
                shape,
            }
        )
    )
);

実行結果

#[cfg(test)]
mod test {
    use super::*;
    use std::fs::File;
    use std::io::Read;

    #[test]
    fn parse_nom() {
        let mut buf = vec![];
        File::open("./a.npy")
            .expect("failed to open file")
            .read_to_end(&mut buf)
            .unwrap();
        let r = parse_header(&buf);
        println!("{:?}", r);
        assert_eq!(
            r.to_result().ok().unwrap(),
            NpyHeader {
                major_version: 1,
                minor_version: 0,
                header_len: 70,
                little_endian: true,
                fortran_order: false,
                word_size: 8,
                shape: vec![3, 3],
            }
        );
    }
}
$ cargo test -- --nocapture
...
running 1 test
Done([0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0], NpyHeader { major_version: 1, minor_version: 0, header_len: 70, little_endian: true, fortran_order: false, word_size: 8, shape: [3, 3] })
test test::parse_nom ... ok

ポイント

  • named!() マクロで関数を定義する.今回の場合 fn parse_header(input: &[u8]) -> nom::IResult<&[u8], NpyHeader> みたいな関数が定義されることになる.
    • nom::IResult<I,O>の型パラメータは,Iがまだパースしていない残りの部分の型,Oがパースした結果の型
    • 正しくパースされると,Done(I,O)が返る.エラーだったらError(Err)Incomplete(Needed)
  • do_parse!()マクロを使うと,IResultを返す関数を>>で繋げることができる
    • 具体的には,do_parse!(I->IResult<I,A> >> I->IResult<I,B> >> ... I->IResult<I,X> , ( O ) ) => I -> IResult<I, O> の形になる
    • 最後に()で囲んだものが最終的な戻り値
    • 基本的にnomがいろいろな関数とマクロを提供しているので,それをdo_parse!()の中で組み合わせていくことになる
    • do_parse!()の中でxxx: yyy とするとパース処理した結果が後からxxxで参照できる
  • tag!()で指定したバイト列の読み取り
  • le_u8はlittle endianで1byte読み取る関数*2.同様の関数にle_u16とかle_u32など
  • alt!() は複数のパーサを受け取り,先頭から試していって最初に成功した結果を返す
    • alt!(tag!("True") | tag!("False")の部分は最初がtag!("True")(短い方の文字列)でなければならない.これは"False"が先の場合,"False"にマッチするか確認するために5文字読み込むので,次に"True"にマッチするか確認する際は文字長が足らないため必ず失敗する.詳細はドキュメント参照
  • パース処理で得られた部分の型を変換したい場合は,map!()あるいはmap_res!()を使う
    • スライスを文字列へ変換: map_res!(xxxx, std::str::from_utf8)
    • 文字列を数値へ変換: map_res!(xxxx, std::str::FromStr::from_str)
  • if処理をしたいときはcond!(), if-else処理をしたい時はalt!()cond_reduce!()を組み合わせる
  • delimited!(opening, X, closing)Xを取り出す
  • ws!()は各トークン間のスペースを自動で取り除く
  • separated_list!()でセパレータで区切られた値をVecで取得できる.ちなみに,もし末尾にセパレータがある場合そのセパレータは残る (例えば1,2,3,みたいな場合)

完成品2

もう少し真面目にdictをパースしてみたのがこちら.といっても手抜きでdictのvalueとしてboolかstringか数値のタプルかの3つしか考えてないですが.上の例はあえて全部一つのdo_parse!()にまとめましたが実際には分割した方が,可読性もテストしやすさも向上します.本当ならdictをパースしたあと,さらに個別の各要素をチェックしていく必要がありますが,飽きてきたのでこの辺で.. ちなみにもしdict部分に変なデータがあってもそのままパースされることになります.

named!(to_u8<u8>,
    map_res!(
        map_res!(digit, std::str::from_utf8),
        std::str::FromStr::from_str)
);

#[derive(Debug, PartialEq)]
pub enum DictValue<'a> {
    Str(&'a str),
    Bool(bool),
    Tuple(Vec<u8>),
}

named!(tuple_value<DictValue>,
    map!(
        delimited!(
            char!('('),
            separated_list!(ws!(char!(',')), to_u8),
            pair!(opt!(ws!(char!(','))), char!(')'))),
        DictValue::Tuple)
);

named!(bool_value<DictValue>,
    map!(alt!(tag!("True") | tag!("False")),
        |s| DictValue::Bool(s == b"True"))
);

named!(quoted_string<&str>,
    map_res!(
        delimited!(
            char!('\''),
            is_not!("'"),
            char!('\'')),
        std::str::from_utf8)
);

named!(string_value<DictValue<'a>>,
    map!(
        map_res!(
            delimited!(
                char!('\''),
                is_not!("'"),
                char!('\'')),
            std::str::from_utf8),
        DictValue::Str)
);

named!(key_value<(&str, DictValue)>,
    do_parse!(
        key: quoted_string >>
        ws!(char!(':')) >>
        value: alt!(string_value | bool_value | tuple_value) >>
        (key, value)
    )
);

named!(pub parse_dict<HashMap<&str,DictValue>>,
    delimited!(
        pair!(char!('{'), opt!(multispace)),
        map!(many0!(terminated!(key_value, opt!(ws!(char!(','))))),
                |vec: Vec<_>| vec.into_iter().collect()),
        pair!(opt!(multispace), char!('}')))
);

#[derive(Debug, PartialEq)]
pub struct NpyHeader2<'a> {
    major_version: u8,
    minor_version: u8,
    header_len: u32,
    dict: HashMap<&'a str, DictValue<'a>>,
}

named!(pub parse_header2<NpyHeader2>,
    do_parse!(
        tag!(b"\x93NUMPY") >>
        major_version: le_u8 >>
        minor_version: le_u8 >>
        header_len: alt!(
            cond_reduce!(major_version == 1, map!(le_u16, |x| x as u32)) |
            cond_reduce!(major_version == 2, le_u32)) >>
        dict: parse_dict >>
        take_while!(call!(|c| c == b' ')) >>
        tag!("\n") >>
        (
            NpyHeader2{
                major_version,
                minor_version,
                header_len,
                dict,
            }
        )
    )
);

実行結果

    #[test]
    fn parse_nom2() {
        let mut buf = vec![];
        File::open("./a.npy")
            .expect("failed to open file")
            .read_to_end(&mut buf)
            .unwrap();
        let r = parse_header2(&buf);
        let mut dict = HashMap::new();
        dict.insert("descr", DictValue::Str("<i8"));
        dict.insert("fortran_order", DictValue::Bool(false));
        dict.insert("shape", DictValue::Tuple(vec![3,3]));
        println!("{:?}", r);
        assert_eq!(
            r.to_result().ok().unwrap(),
            NpyHeader2 {
                major_version: 1,
                minor_version: 0,
                header_len: 70,
                dict: dict,
            }
        );
    }
$ cargo test -- --nocapture
...
Done([0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0], NpyHeader2 { major_version: 1, minor_version: 0, header_len: 70, dict: {"descr": Str("<i8"), "fortran_order": Bool(false), "shape": Tuple([3, 3])} })

ポイント

  • alt!()を使う場合,各パーサの戻り値は同じでなければならない.今回はenumを使って対処している

余談

後から気づきましたが,nomを使ってnumpyのstructured arraysをパースしてるnpy-rsというのがあるのでnumpyデータをちゃんとパースしたい人は参考になるかもしれません.dictのパースはValueのenumを作ってパースしてけば綺麗に書ける訳ですね.確かに.

感想

マクロはあまり好きではないので最初とっつきにくい印象がありましたが,書いていくうちにまぁこんなもんかもしれないと思いました.同じ処理をするにもいろいろな書き方ができるので,ライブラリが提供しているマクロ/関数をうまく活用することが簡潔なコードを書く鍵になります.幸いnomで書かれたパーサはいろいろあるのでそれが参考になります.書いていて少し気になったのが,synstasticでの文法チェックが少々遅いこと.体感として明らかにnomでマクロを定義する前と後で遅くなりました.今回の簡単な例でも1s近く遅くなった気がします.これはnom(というかマクロ)の問題なのか,それ以外の問題なのかはわかりませんが..

*1:多分必ずそうなる保証はない

*2:1byteにendianも何もないですが

[rust]ZST/DSTによるflexible array memberの実現

  • flexible array memberとは
  • ZSTを使う方法
  • DSTを使う方法

flexible array memberとは

flexible array memberとはずばり,C言語で以下のようなサイズ定義を持たない構造体メンバのことです(C99から標準).

struct X{
    size_t len;
    char value[];
}

この構造体は以下のようにして使うことができます.

size_t len = 10;
struct X *p = (struct X*)malloc(sizeof(struct X) + len);
p->len = len;
p->value[0] = 'a';
p->value[1] = 'b';
...

このように,flexible array memberを使用することで(ヘッダ+可変サイズなデータ)を持つ構造を簡潔に表現できます.普通はvalueは適当なポインタで持てばいいと思いますが,メモリ的にヘッダとデータが連続して欲しい場合,あるいは連続したバッファが与えられてそこにデータ構造を作成する場合などに有用です. ちなみにsizeof(struct X)の値はflexible array memberを除いたサイズ(上の例だと実質的にsizeof(size_t))になります.

ZSTを使う方法

rustにおいて,flexible array memberそのものとずばり対応するものはありません. 似たようなことをしたい場合は,Zero Sized Types (ZSTs) あるいは Dynamically Sized Types (DSTs)を使います.

最初に言っておくと,ZSTを使う場合はなんちゃってflexible array memberになります.

ZSTはその名の通りサイズが0の型で,例えば()がそれに相当します. これはもともと例えばSetを実現するためにSet<Key> = Map<Key, ()>として,Mapの実装を利用しつつ無駄な領域が割り当てられないようにするといった機能のために用意されたもののようです.

ZSTを利用する場合,最初のC言語の例と同じような構造は以下のようにして定義できます.

#[derive(Debug)]
#[repr(C)]
struct X {
    len: usize,
    val: [u8; 0],
}

ここではvalZSTになります.valのサイズは0なので,std::mem::size_of::<X>()は実質的にstd::mem::size_of::usize()と同じ値を返します.

以下のようにしてこのstructを利用することができます.

#![feature(allocator_api)]
use std::heap::{Alloc, Heap, Layout};

unsafe fn alloc(len: usize) -> *mut u8 {
    let layout = Layout::from_size_align(len, std::mem::align_of::<usize>()).unwrap();
    Heap.alloc(layout.clone()).unwrap() as *mut u8
}

unsafe fn free(p: *mut u8, len: usize) {
    let layout = Layout::from_size_align(len, std::mem::align_of::<usize>()).unwrap();
    Heap.dealloc(p, layout);
}

fn zst() {
    unsafe {
        // メモリの確保
        let value_len = 3;
        let header_len = std::mem::size_of::<X>();
        let total_len = header_len + value_len;
        let p = alloc(total_len);

        // 確保したバッファ先頭をstruct Xにキャスト
        let header: &mut X = std::mem::transmute(p);
        header.len = value_len;

        // valにアクセスするためのスライスの作成
        let val: &mut [u8] =
            std::slice::from_raw_parts_mut(p.offset(header_len as isize), header.len);
        val[0] = 1;
        val[1] = 2;
        val[2] = 3;

        // get_unchecked_mutを利用したアクセス
        *header.val.get_unchecked_mut(0) = 100;
        // これは index out of bounds error
        //header.val[0] = 100;

        println!("header_len = {}", header_len);
        println!("value_len  = {}", value_len);
        println!("header     = {:?}", header);
        println!("val        = {:?}", val);
        println!(
            "p          =  {:?}",
            std::slice::from_raw_parts(p, total_len)
        );

        free(p, total_len);
    }
}

実行結果は以下のようになります.

header_len: 8
value_len: 3
header: X { len: 3, val: [] }
val: [100, 2, 3]
p: [3, 0, 0, 0, 0, 0, 0, 0, 100, 2, 3]

やっていることはC言語の場合とほぼ同じで,まず適当な方法(ここではstd::heap::Alloc)でヘッダ(元々のstructの大きさ)+可変長サイズのデータの長さ分のメモリを確保します.メモリ確保方法の詳細は前回のエントリを参照して下さい.

        let value_len = 3;
        let header_len = std::mem::size_of::<X>();
        let total_len = header_len + value_len;
        let p = alloc(total_len);

その後確保したメモリをtransmuteXへのreferenceにしてヘッダにアクセスします.

        let header: &mut X = std::mem::transmute(p);
        header.len = value_len;

valに関してはfrom_raw_parts_mutでsliceを作成してアクセスします.

        let val: &mut [u8] =
            std::slice::from_raw_parts_mut(p.offset(header_len as isize), header.len);
        val[0] = 1;
        val[1] = 2;
        val[2] = 3;

valget_unchecked_mutを使えばヘッダからのアクセスも一応可能です.普通にアクセスしようとした場合は境界値エラーになります.

        // get_unchecked_mutを利用したアクセス
        *header.val.get_unchecked_mut(0) = 100;
        // これは index out of bounds error
        //header.val[0] = 100;

実のところこの方法は(valへのアクセスに基本的にはスライスを作ることになるので)flexible array memberと呼べるのか微妙なものですが,使うのは楽だと思います.

ちなみに,この方法ではスタック上に変数を確保することはできません.というか,

let x = X{len : 10, val:[]};

とやれば確保できますが,valは必ずemptyでなければならないので実質的に使えません.

DSTを使う方法

DSTを使うことで,本来のflexible array memberに近いものが実現できます. ここでの方法はHacker Newsのコメントから知りました.

DSTはコンパイル時にはサイズが定まらず,実行時にサイズが求まる型のことで,代表的なDSTの一つはスライス[T]です.structは一番最後の要素にDSTを持つことができます.この場合,そのstruct自体がDSTになります.

例えば,以下のようにしてDSTなstructを定義できます.

#[repr(C)]
struct Y{
    len: usize,
    value: [u8],
}

で,このようなstructが定義できるのは良いですが,実のところをこれを直接構成する方法はないです. これは扱いにくいので,代わりに以下のような?Sizedなgeneric data typeを利用します.

#[repr(C)]
struct Y<T: ?Sized> {
    len: usize,
    val: T,
}

コンパイル時に大きさが分かっている場合

そもそもflexible array memberを使いたい目的が動的にデータを割り当てたいという目的からなので,コンパイル時に大きさが分かっている状況というのは本末転倒な状況ですが,この場合は以下のようにして上記のstructをスタック上に構成することができます.

fn as_raw_bytes<T: ?Sized>(x: &T) -> &[u8] {
    unsafe { std::slice::from_raw_parts(x as *const T as *const u8, std::mem::size_of_val(x)) }
}

fn dst1() {
    let len = 10;
    let val: [u8; 10] = [1; 10];
    let y = Y { len, val };

    println!("y      = {:?}", y);
    println!("y      = {:?}", as_raw_bytes(&y));
    println!("&y     = {:p}", &y);
    println!("&y.len = {:p}", &y.len);
    println!("&y.val = {:p}", &y.val);
    println!("size_of_val(&y)     = {}", std::mem::size_of_val(&y));
    println!("size_of_val(&y.len) = {}", std::mem::size_of_val(&y.len));
    println!("size_of_val(&y.val) = {}", std::mem::size_of_val(&y.val));
    println!(
        "size_of(Y<[u8;10]>) = {}",
        std::mem::size_of::<Y<[u8; 10]>>()
    );
    // NG
    //println!("size_of(y) = {}", std::mem::size_of::<Y>());
    //println!("size_of(y) = {}", std::mem::size_of::<Y<[u8]>>());
}

実行結果

y      = Y { len: 10, val: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] }
y      = [10, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 134, 2, 1, 0, 0, 0]
&y     = 0x7fff5d398b68
&y.len = 0x7fff5d398b68
&y.val = 0x7fff5d398b70
size_of_val(&y)     = 24
size_of_val(&y.len) = 8
size_of_val(&y.val) = 10
size_of(Y<[u8;10]>) = 24

ここで,y.lenのサイズは8, y.valはサイズは10ですが,メモリは8バイト単位で確保されるようでy自体のサイズは24になっています.

この場合,DST coercionsを利用して,以下のような関数を呼び出すことが可能になります.

fn f<T: std::fmt::Debug>(y: &Y<[T]>) {
    println!("f: y = {:?}", y);
    // f: y = Y { len: 10, val: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] }
}

fn dst1(){
    let len = 10;
    let val: [u8; 10] = [1; 10];
    let y = Y { len, val };
    f(&y);
}

ちなみに,このコード例において,最初のDSTな構造体

#[repr(C)]
struct Y{
    len: usize,
    value: [u8],
}

を使った場合,以下のようなエラーが発生します.

error[E0308]: mismatched types
   --> src/main.rs:149:22
    |
149 |     let y = Y { len, val };
    |                      ^^^ expected slice, found array of 10 elements
    |
    = note: expected type `[u8]`
               found type `[u8; 10]`

ヒープに割り当てたい場合は,例えば以下のようにVecが利用できます.

    let len = 10;
    let mut vec = Vec::<Y<[u8; 10]>>::with_capacity(1);
    unsafe {
        vec.set_len(1);
    }
    let y = &mut vec[0];
    y.len = len;
    for i in 0..len {
        y.val[i] = i as u8;
    }

動的にサイズを割り当てる

さて,それではようやく本題で,実行時にDSTなstructを構成する方法について説明します.

以下で説明する方法は,servoの一部のコードを参考にその処理を簡略化したものです.元のコードに丁寧にコメントが書いてあるので,興味のある方は一読を勧めます.

fn dst2() {
    let len = 10;

    // サイズの計算
    let fake_slice_ptr: *const u8 = std::mem::align_of::<Y<[u8; 1]>>() as *const u8;
    let fake_slice: &[u8] = unsafe { std::slice::from_raw_parts(fake_slice_ptr, len) };
    let fake_ptr: *const Y<[u8]> = fake_slice as *const [u8] as *const Y<[u8]>;
    let fake_ref: &Y<[u8]> = unsafe { &*fake_ptr };
    let size = std::mem::size_of_val(fake_ref);

    println!("size_of(fake_ref)   = {}", size);
    println!(
        "size_of(fake_slice) = {}",
        std::mem::size_of_val(fake_slice)
    );
    println!("fake_slice.len      = {}", fake_slice.len());
    println!("fake_ref.val.len    = {}", fake_ref.val.len());

    unsafe {
        let p = alloc(size);

        let fake_slice: &mut [u8] = std::slice::from_raw_parts_mut(p as *mut u8, len);
        let ptr = fake_slice as *mut [u8] as *mut Y<[u8]>;
        let y: &mut Y<[u8]> = &mut *ptr;

        y.len = len;
        for i in 0..len {
            y.val[i] = i as u8;
        }

        println!("y.len     = {}", y.len);
        println!("y.val.len = {}", y.val.len());
        println!("size_of_val(y)    = {}", std::mem::size_of_val(y));
        println!("size_of_val(&ptr) = {}", std::mem::size_of_val(&ptr));
        println!("ptr = {:?}", as_raw_bytes(&ptr));

        free(p, size);
    }
}

実行結果

size_of(fake_ref)   = 24
size_of(fake_slice) = 10
fake_slice.len      = 10
fake_ref.val.len    = 10
y.len     = 10
y.val.len = 10
size_of_val(y)    = 24
size_of_val(&ptr) = 16
ptr = [32, 0, 66, 15, 1, 0, 0, 0, 10, 0, 0, 0, 0, 0, 0, 0]

まず,このコードの前半部分では確保するメモリの大きさを計算します.

    len = 10;
    let fake_slice_ptr: *const u8 = std::mem::align_of::<Y<[u8; 1]>>() as *const u8;
    let fake_slice: &[u8] = unsafe { std::slice::from_raw_parts(fake_slice_ptr, len) };
    let fake_ptr: *const Y<[u8]> = fake_slice as *const [u8] as *const Y<[u8]>;
    let fake_ref: &Y<[u8]> = unsafe { &*fake_ptr };
    let size = std::mem::size_of_val(fake_ref);

ここで何をやっているかというと,

  • 長さがlen[u8]のfat pinterを作成
  • それを長さがlenY<u8>のポインタにし,さらにそれをreferenceにする
  • size_of_valを使ってサイズを求める.

どうしてこれでサイズが求まるのかというと,DSTな要素は必ずfat pointer経由でアクセスしますが,このときfat pinterは(アドレス,長さ)を持ちます.このとき,この長さというのはDSTなstructの場合は,structの末尾の要素の長さを意味することになります.上のコードはまず最初にfrom_raw_part()を使って長さがlen[u8]のfat pointerを作成した後,それを*const Y<[u8]>のfat pointerにキャストし,そしてさらにそれをreferenceに変換してサイズを計算している訳です. 上の実行結果のptrを見ると,後半8byteで長さ(10)を保持していることが分かります.

    unsafe {
        let p = alloc(size);

        let fake_slice: &mut [u8] = std::slice::from_raw_parts_mut(p as *mut u8, len);
        let ptr = fake_slice as *mut [u8] as *mut Y<[u8]>;
        let y: &mut Y<[u8]> = &mut *ptr;

        y.len = len;
        for i in 0..len {
            y.val[i] = i as u8;
        }
    }

確保すべきメモリサイズが分かったら,メモリを確保して,サイズ計算時と同様にキャストしてY<[u8]>のreferenceを取得し,それを利用してデータ構造を操作します.

確保すべきサイズ計算のところは

let len = 10;
let size = std::mem::size_of::<Y<()>>() + len;

でもいいんじゃないかと思いましたが,この場合はsizeは18になります.前述の通りrustはメモリを(この環境では)8byte単位で確保しているようで,今回の場合はsize_of_val()で得られる値と齟齬が生じることになります.といっても確保したメモリ外にはアクセスしないので大丈夫じゃないかなと思いますが..

実際に利用する場合には,以下のようにしてgeneric data typeを利用すると汎用性が上がります.

fn dst3<T>(len: usize) -> (*mut Y<[T]>, usize) {
    let fake_slice_ptr: *const T = std::mem::align_of::<Y<[T; 1]>>() as *const T;
    let fake_slice: &[T] = unsafe { std::slice::from_raw_parts(fake_slice_ptr, len) };
    let fake_ptr: *const Y<[T]> = fake_slice as *const [T] as *const Y<[T]>;
    let fake_ref: &Y<[T]> = unsafe { &*fake_ptr };
    let size = std::mem::size_of_val(fake_ref);

    unsafe {
        let p = alloc(size);

        let fake_slice: &mut [T] = std::slice::from_raw_parts_mut(p as *mut T, len);
        let ptr = fake_slice as *mut [T] as *mut Y<[T]>;
        (ptr, size)
    }
}

fn f<T: std::fmt::Debug>(y: &Y<[T]>) {
    println!("f: y = {:?}", y);
}

fn main(){
    let len = 10;
    let (ptr, size) = dst3::<u8>(len);
    unsafe {
        let y: &mut Y<[u8]> = &mut *ptr;
        y.len = len;
        for i in 0..len {
            y.val[i] = i as u8;
        }
        f(&y);
        // f: y = Y { len: 10, val: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] }
        free(ptr as *mut u8, size);
    }
}

未初期化領域への代入の注意点

ここまで例ではprimitive typeしか扱ってないので未初期化の領域にそのまま代入していますが,Drop型を実装している型の場合はservoの例のようにstd::ptr::writeを使って書き込まないと未初期化のデータに対してDropが実行されてしまいます.詳細はnomiconに書いてあります.

まとめ

Cにおけるflexible array memberをrustで実現するための方法として,ZSTとDSTを利用する方法について書きました.実のところ両方ともあまりunsafeなコード量は変わらず,本来のflexible array memberに相当するのはDSTを使う方法ですが,状況によってはZSTの方が使いやすいかなと思いました.

rustで動的にバッファを確保する方法

  1. Boxを使う
  2. Vecを使う
  3. std::heap::Allocを使う
  4. placement-in を使う
  5. mallocを使う

1. Boxを使う

rustで動的にメモリを確保する方法といってまず思いつくのはBoxを使う方法だと思います.例えば,以下のようにすれば長さ1000のu8のバッファを確保できます.

let buffer : Box<[u8]> = Box::new([0;1000]);

ただし,この方法は以下のような特徴があります.

  1. 確保した領域は必ず初期化する必要がある.
  2. 一旦スタック上にデータを確保したあとに,ヒープにそのデータをコピーする

1.に関しては,これは変数は初期化しないと利用できないというrustの原則に則ったものですが,場合によってはこの初期化コストが大きい場合があります.また,2. の方が問題で,あまりにも大きい領域を確保しようとするとスタックオーバフローが発生する可能性があります.コピー分のオーバヘッドも生じます.

2. Vecを使う

Vecを使う場合はVec::new()で可変長配列を作成した後Vec::pushでデータを追加していくのが最も基本的な方法ですが,当然効率はあまり良くありません.

Vec::with_capacityを使うと,指定したサイズ分だけのメモリ領域を確保してくれます.さらに,Vec::set_lenを使うことで,初期化をスキップして確保した領域にアクセスすることができます.ただし,set_lenはunsafeです.

unsafe{
  let len = 1000;
  let mut vec = Vec::<u8>::with_capacity(len);
  vec.set_len(len);
}

また,確保した要素にアクセスする際rustはデフォルトで境界のチェックをしますが, そのオーバヘッドを減らしたい場合はget_uncheckedあるいはget_unchecked_mutが使えます((get_uncheckedget_uchecked_mutはsliceの関数で,boxやvecからderef coercionを通じて呼び出せます)).

ffiで利用するなどでraw pointerが欲しい場合は以下のような手法が使えます.

unsafe fn alloc(len: usize) -> *mut u8 {
    let mut vec = Vec::<u8>::with_capacity(len);
    vec.set_len(len);
    Box::into_raw(vec.into_boxed_slice()) as *mut u8
}

unsafe fn free(raw: *mut u8, len : usize) {
    let s = std::slice::from_raw_parts_mut(raw, len);
    let _ = Box::from_raw(s);
}

Vec::with_capacityで指定した長さ分の領域を確保したあと,それをvec::into_boxed_sliceでBoxに変換し,さらにそれをBox::into_rawすることで実際のポインタを得ます.

確保した領域を解放する場合はポインタを再びslice::from_raw_parts_mutでスライスに直し,それをさらにBox::from_rawでBoxに直してBoxのDropが呼ばれるようにします.

上記のコードはservoの一部のコードを参考にしたものです. 実際のservoのコードではアラインメントをワード境界に揃えるため,以下のようなことをしています (簡単化のため実際のコードとちょっと異なります.ここでは実際に確保したバイト数も戻り値として返しています).

unsafe fn alloc_buffer(len : usize) -> (*mut u8, usize){
    let word_size = std::mem::size_of::<usize>();
    let num = (len + word_size-1) / word_size;
    let mut vec = Vec::<usize>::with_capacity(num);
    vec.set_len(num);
    (Box::into_raw(vec.into_boxed_slice()) as *mut u8, num*word_size)
}

3. std::heap::Allocを使う

rustのメモリアロケータはAllocator traitを実装することになっており,またstd::heap::Heapでデフォルトのアロケータにアクセスできます.コードはliballocにあります. これを利用すると,以下のようにメモリを確保することができます.

#![feature(allocator_api)]
use std::heap::{Alloc, Heap, Layout};

// 第一引数: サイズ, 第二引数: アラインメントサイズ
let layout = Layout::from_size_align(10, std::mem::align_of::<u32>()).unwrap();
unsafe {
    let p = Heap.alloc(layout.clone()).unwrap() as *mut u8;
    // ...
    Heap.dealloc(p, layout);
}

この関数を利用すれば,アラインメントも指定可能です.raw pionterが欲しい場合,この手法が一番良さそうに見えますが,残念ながらこの機能はunstableなため,nightlyな環境でしかまだ使えません.また,今後インタフェースが変わる可能性も十分あると思います.(詳細: https://github.com/rust-lang/rust/issues/32838

4. placement-in を使う

こちらも現時点ではunstableな機能ですが,確保したメモリ上に直接データを配置するplacement-in構文があります(詳細: https://github.com/rust-lang/rust/issues/27779). これを利用すると,boxでも大きなメモリ領域を確保することが可能です.

#![feature(placement_in_syntax, box_heap)]


// OK
let x: Box<[u8]> = std::boxed::HEAP <- [0;10000000];

// これは自分の環境ではoverflow
// let b: Box<[u8]> = Box::new([0; 10000000]);

5. mallocを使う

場合によってはlibc crateを使ってmalloc & freeを直接呼んでしまうのも手かもしれません.こちらも当然unsafeです.

// https://github.com/rust-lang/libc
unsafe {
    let p : *mut c_void = libc::malloc(10);
    // ...
    libc::free(p)
}

rustをnostdで使う

nostdとは

rustは他の多くの言語と同様,標準ライブラリ(std)が同包されており,通常はそれを使ってプログラミングをおこないます.しかしながら,stdの多くの機能はOSの機能を利用しているため,OSそのものを開発したいとか,あるいはOSが存在しない環境でプログラミングをしたい場合などではstdは利用できません.

stdを利用しない場合は明示的にその旨を宣言する必要がり,そのために利用するのが#![no_std] attributeです.

nostdの概要は(昔の)bookのno stdlibの章に書いてあります. なお,nostdに関わることはunstableな機能が多いので,ここに書いてあることが今後変更される可能性は十分あります

core library

stdが使えなくなるとかなりの機能が制限されてしまいますが,rustはcoreと呼ばれるrust onlyで書かれたライブラリを提供しており,nostd環境でもcoreを利用することができます*1.ただし,core libraryを使用するにあったって,最低限以下の関数は自前で定義する必要があります.

  • memcpy, memcmp, memset
  • panic_fmt
  • eh_personality

memcpy, memcmp, memset

これらの関数はrlibc crateにrustでの実装があるので,必要ならばそれを利用できます.

panic_fmt, eh_personality

プログラムがパニックしたとき等に利用される関数です.とりあえず,何もしないなら

#![feature(lang_items)]

#[lang = "eh_personality"] extern fn eh_personality() {}
#[lang = "panic_fmt"] extern fn panic_fmt() -> ! { loop {} }

みたいな関数を作ればokです.

coreとstdの関係

coreのドキュメントを見ていると,stdで提供されている機能のサブセットが提供されていることに気づくと思います.それもそのはずで,これはstdの中でcoreのライブラリがreexportされているからです.coreのドキュメントを見ているとよくstdの方を参照せよと書いてあるのはこういう理由からです. stdはOSがある用,coreはベアメタル用のライブラリですが,stdでもcoreの機能は使いたいのでこうしてるようです.

target

#![no_std]を指定すればnostdなプログラムが開発できるかというと,実際にはそう単純ではありません.多くの場合,プログラムをビルドするためのtargetを正しく設定する必要があります.

rustはさまざまなプラットフォームでの動作を保証しており,それを設定するのがtarget*2です.rustがサポートしているターゲットは ructc --print target-list としてみるか,あるいは https://forge.rust-lang.org/platform-support.html から確認できます.ターゲット名は基本的に{arch}-{vendor}-{sys}[-{abi}]の形をしていて,例えばx86_64-unknown-linux-gnux86_64-apple-darwinのようなターゲットがデフォルトで存在します.

各targetごとに,struct Target及びstruct TargetOptionsで指定されるtargetごとの設定を記述します. rustがデフォルトでサポートしているtargetの設定はsrc/librustc_back/target/の中に書いてあります.例えば,linuxの場合はx86_64_unknown_linux_gnu.rsです.

このtargetはjsonで記述できるようになっており,例えば以下のように記述します.

{
  "llvm-target": "x86_64-unknown-none",
  "data-layout": "e-m:e-i64:64-f80:128-n8:16:32:64-S128",
  "linker-flavor": "gcc",
  "target-endian": "little",
  "target-pointer-width": "64",
  "arch": "x86_64",
  "os": "none",
  "disable-redzone": true,
  "features": "-mmx,-sse,+soft-float",
}

ベースはx86_64_unknown_linux_gnuですが,ここではosが存在しないので"none"を指定し,その他redzonemmx, sseを禁止しています.またfloatのsoftwareサポートを有効にしています.なお,data layoutはLLVMのものです.

こうして作成したtargetのjsonファイルは,例えばx86_64_xxx.jsonという名前で保存したならビルドする際に(Cargo.tomlと同じディレクトリに置いて) cargo build --target=x86_64_xxxのようにすることで指定できます.

sysrootとxargo

さて,targetを無事に指定すればあとはokかというと,残念ながらそうではありません.rustでは実際にプログラムをコンパイルする際に,sysrootと呼ばれるパスからライブラリを検索します.デフォルトのsysrootはrustc --print sysrootで確認できます.このsysrootにcoreのライブラリなどが置かれています.cargo build --target=x86_64_xxx のように指定した場合は x86_64_xxx用のsysrootを探す訳ですが,x86_64_xxxは自分で用意したターゲットなので,sysrootも自分で用意する必要があります.つまり,自分でそのtarget用のcoreをビルドする必要がある訳です.

幸いなことに,自分で用意したtarget用に自動でsysrootを用意してくれる,xargoと呼ばれるプログラムが存在します.cargo install xargoとすればインストールできます.xargoはcargoのラッパーになっており, xargo build --target=x86_64_xxx としたときにtarget x86_64_xxxのsysrootがなければまずそれ用にcoreなどをビルドし,その後プログラムのビルドをおこないます.nostd環境でプログラムを作る際は普通xargoを使うことになると思います.

nostdで使えるcrate

crates.ioのカテゴリ検索で,no-stdを検索すると,stdを利用していないcrateを見つけることができます.といってもCargo.tomlの中でcategoryをちゃんと設定しているcrateがどれだけあるのか分かりませんが..

*1:というか,nostd環境だとstdの代わりにcoreが自動でリンクされるようになる

*2:ちょっと古いのでソースを直接見た方がいいかもしれない

rustのGUIライブラリconrodの使い方

所用でrustのGUIライブラリについて調べる機会がありました.特にこれといったGUIライブラリはまだないような気がします.

普段自分がネイティブなGUIアプリケーションを作成する場合はQtを使います.rustからQtを呼び出すのはいくつか試みがあって,disassemblerのpanopticonGUIとしてQtを使ってますし,rustからQtを呼び出すためのcpp_to_rustというプロジェクトもあります.ただどうもまだいろいろと開発中のようです.

今回はconrodというGUIライブラリを使ってみたので,簡単にそれの使い方について書こうと思います.

conrod

conrodは純rust製のGUIライブラリです.conrodが担当するのはウィジェッ ト(ボタンやテキストボックスなど)の構成の管理やウィジェットに対するイベントの伝播などです.pistonというゲームエンジンを開発しているところが作っているようです.実際の描画やOSからのイベントの受け取りは何か別のライブラリが担当することになります.conrod自体に描画用としてglium (OpenGL), イベント管理用としてwinitのバックエンドが用意されているので,普通はそれを使うことになると思います*1.自分はmacでしか試していませんが,マルチプラットフォームで動作するはずです.ちなみに,まだバージョン1.0にもなってないので仕様が大きく変わる可能性は十分あると思います.

conrodのチュートリアルはまだ未完成のようですが,rustdocは比較的丁寧に書いてあると思います.また,実際にプログラムを作成する場合は,リポジトリにある

あたりを参考にするのが良いかと思います.

exampleはリポジトリをクローンして

$ cargo run --release --features "winit glium" --example hello_world

とかやれば動きます.

簡単な例

ものすごく簡単な例として,以下のようにフィボナッチ数を計算するアプリケーションを作成しようと思います.

f:id:mm_i:20170709234547p:plain

ソース全体はここにあります: https://github.com/mmisono/conrod-examples/tree/master/fibonacci

これから説明するコードの大部分(初期化部分とイベントループ部分)はexamplesのものとほぼ同じです.

Cargo.toml

前述した通り,描画にglium, イベント取得にwinitを使うので,Cargo.tomlには以下のように書いてあげます.また,フォントを登録する際にフォントが格納されているフォルダを探す必要があるので,find_folderもdependenciesに追加しています.

[dependencies.conrod]
version = "0.53.0"
features = ["glium", "winit"]

[dependencies]
find_folder = "*"

初期化

conrodを使うための初期化のコードは以下のようになります.WindowやUiといったコンポーネントを作成したり,必要なフォントや画像を読み込んだりします.

    const TITLE: &'static str = "Fibonacci";
    let width = 300;
    let height = 100;

    // Build the window.
    let display = glium::glutin::WindowBuilder::new()
        .with_vsync()
        .with_dimensions(width, height)
        .with_title(TITLE)
        .with_multisampling(4)
        .build_glium()
        .unwrap();

    // construct our `Ui`.
    let mut ui = conrod::UiBuilder::new([width as f64, height as f64]).build();

    // Add a `Font` to the `Ui`'s `font::Map` from file.
    let assets = find_folder::Search::KidsThenParents(3, 5)
        .for_folder("assets")
        .unwrap();
    let font_path = assets.join("fonts/NotoSans/NotoSans-Regular.ttf");
    ui.fonts.insert_from_file(font_path).unwrap();

    // Generate the widget identifiers.
    let ids = &mut Ids::new(ui.widget_id_generator());

    // A type used for converting `conrod::render::Primitives` into `Command`s that can be used
    // for drawing to the glium `Surface`.
    let mut renderer = conrod::backend::glium::Renderer::new(&display).unwrap();

    // The image map describing each of our widget->image mappings (in our case, none).
    let image_map = conrod::image::Map::<glium::texture::Texture2d>::new();

    let mut text = "0".to_string();
    let mut answer = "0".to_string();

ここで,widget identifierを生成していますが,これはウィジェット管理に必要なIDで,ウィジェットごとにIDが必要です.Idsは以下のようにマクロで生成します.

widget_ids!(
    struct Ids {
        canvas,
        title,
        text_box,
        button,
        result,
    });

イベントループ

conrodではイベントループベースで処理をおこないます.ループ処理は以下のようになります.

    let mut event_loop = EventLoop::new();
    'main: loop {
        // Handle all events.
        for event in event_loop.next(&display) {
            // Use the `winit` backend feature to convert the winit event to a conrod one.
            if let Some(event) = conrod::backend::winit::convert(event.clone(), &display) {
                ui.handle_event(event);
                event_loop.needs_update();
            }

            match event {
                // Break from the loop upon `Escape`.
                glium::glutin::Event::KeyboardInput(
                    _,
                    _,
                    Some(glium::glutin::VirtualKeyCode::Escape),
                ) |
                glium::glutin::Event::Closed => break 'main,
                _ => {}
            }
        }

        set_widgets(ui.set_widgets(), ids, &mut text, &mut answer);

        // Render the `Ui` and then display it on the screen.
        if let Some(primitives) = ui.draw_if_changed() {
            renderer.fill(&display, primitives, &image_map);
            let mut target = display.draw();
            target.clear_color(0.0, 0.0, 0.0, 1.0);
            renderer.draw(&display, &mut target, &image_map).unwrap();
            target.finish().unwrap();
        }
    }

match eventの部分でイベントを補足し,必要な処理をします.通常ここで処理するイベントは終了処理といった大域的なイベントのみで,それ以外の処理(ウィジェットに対するマウスイベントとか)は後でおこないます.その後,set_widgets()という関数でGUIを設定し,必要なら描画をおこないます.

イベントループの本体は以下のようになっています.基本的にはウィンドウが受け取ったイベントを取得してそれを返すだけですが,60fpsを超えないように適当にスリープします.

struct EventLoop {
    ui_needs_update: bool,
    last_update: std::time::Instant,
}

impl EventLoop {
    pub fn new() -> Self {
        EventLoop {
            last_update: std::time::Instant::now(),
            ui_needs_update: true,
        }
    }

    /// Produce an iterator yielding all available events.
    pub fn next(&mut self, display: &glium::Display) -> Vec<glium::glutin::Event> {
        // We don't want to loop any faster than 60 FPS, so wait until it has been at least 16ms
        // since the last yield.
        let last_update = self.last_update;
        let sixteen_ms = std::time::Duration::from_millis(16);
        let duration_since_last_update = std::time::Instant::now().duration_since(last_update);
        if duration_since_last_update < sixteen_ms {
            std::thread::sleep(sixteen_ms - duration_since_last_update);
        }

        // Collect all pending events.
        let mut events = Vec::new();
        events.extend(display.poll_events());

        // If there are no events and the `Ui` does not need updating, wait for the next event.
        if events.is_empty() && !self.ui_needs_update {
            events.extend(display.wait_events().next());
        }

        self.ui_needs_update = false;
        self.last_update = std::time::Instant::now();

        events
    }

    /// Notifies the event loop that the `Ui` requires another update whether or not there are any
    /// pending events.
    ///
    /// This is primarily used on the occasion that some part of the `Ui` is still animating and
    /// requires further updates to do so.
    pub fn needs_update(&mut self) {
        self.ui_needs_update = true;
    }
}

GUI設定の本体

set_widgets()関数が実際にGUIの設定をする箇所です.conrodでアプリケーションを作成する場合は基本的にこのset_widgets()の部分を適当にカスタマイズすることになると思います.

conrodではボタンやテキストボックスなどのウィジェットを内部で木構造として管理しています.wiget::Text::New("aaa").set(ids.title, ui) のようにすると,指定したIDでそのウィジェットが登録されます.set()をしたとき,指定したidのウィジェットに対してもし何かイベントが発生していたらそれが返ってきます.若干分かりにくいと思うのでコードを見てもらった方が早いと思います.

fn set_widgets(ref mut ui: conrod::UiCell, ids: &mut Ids, text: &mut String, answer: &mut String) {
    widget::Canvas::new()
        .pad(0.0)
        .color(conrod::color::rgb(0.2, 0.35, 0.45))
        .set(ids.canvas, ui);

    let canvas_wh = ui.wh_of(ids.canvas).unwrap();

    // title
    widget::Text::new("Fibonacci Calculuator")
        .mid_top_with_margin_on(ids.canvas, 5.0)
        .font_size(20)
        .color(color::WHITE)
        .set(ids.title, ui);

    // textbox
    for event in widget::TextBox::new(text)
        .font_size(15)
        .w_h((canvas_wh[0] - 90.) / 2., 30.0)
        .mid_left_with_margin_on(ids.canvas, 30.0)
        .border(2.0)
        .border_color(color::BLUE)
        .color(color::WHITE)
        .set(ids.text_box, ui)
    {
        match event {
            widget::text_box::Event::Enter => println!("TextBox {:?}", text),
            widget::text_box::Event::Update(string) => *text = string,
        }
    }

    // button
    if widget::Button::new()
        .w_h((canvas_wh[0] - 90.) / 2., 30.0)
        .right_from(ids.text_box, 30.0)
        .rgb(0.4, 0.75, 0.6)
        .border(2.0)
        .label("calc!")
        .set(ids.button, ui)
        .was_clicked()
    {
        if let Ok(num) = text.parse::<u64>() {
            *answer = fib(num).to_string();
        } else {
            println!("invalid number");
        }
    }

    // result
    widget::Text::new(answer)
        .mid_bottom_with_margin_on(ids.canvas, 10.0)
        .font_size(20)
        .color(color::WHITE)
        .set(ids.result, ui);
}

例えば,TextBoxにユーザが何か入力すると,それがset()したときに返ってくるので,入力した内容を変数textに保存しています.また,Buttonの場合はwas_clicked()を呼ぶとボタンが押されたかどうか分かるので,ボタンが押された場合は計算を実行します.

ウィジェットに位置の指定ですが,各ウィジェットPositonableというトレイトを実装していて,これを利用することになります.

例えばタイトルテキストは .mid_top_with_margin_on(ids.canvas, 5.0)としていますが,こうするとids.canvasで指定したウィジェットの中央上に指定したマージン幅を開けてウィジェットを配置することになります.また,.down()を使えば直前にset()したウィジェットの下に指定したマージンだけ空けてウィジェットが配置されます.他にもいろいろ配置用の関数が用意されています.個人的には一番上に適当なウィジェットを配置したあと,.down().align_middle_x()を使ってどんどん下にウィジェットを配置していくのが楽なんじゃないかなと思います.

三目並べ

conrodの特徴はidに紐づけてウィジェットを登録し,そのidを利用してウィジェットが配置できるというところだと思います.こうすることであまり深くレイアウトを考えなくても簡単にウィジェットが配置できます.一方で,各ウィジェットに対して一意なIDが必要なので,ウィジェットが多い場合はその管理が面倒になることもあります.

例として,以下のような三目並べのアプリケーションを考えます.

f:id:mm_i:20170709234602p:plain

(ソース: https://github.com/mmisono/conrod-examples/tree/master/tic_tac_toe)

conrodでは直線や円も一つのウィジェットです.したがって,それぞれに対してIDを振って管理する必要があります.IDは以下のようにして配列で持つことができます.

widget_ids!(
    struct Ids {
        canvas,
        grids[],
        circles[],
        xs[],
    });
let ids = &mut Ids::new(ui.widget_id_generator());
ids.grids
    .resize((rows - 1) * (cols - 1), &mut ui.widget_id_generator());
ids.circles
    .resize(rows * cols, &mut ui.widget_id_generator());
ids.xs
    .resize(rows * cols * 2, &mut ui.widget_id_generator());

丸とバツを描画するために,ここではあらかじめ各セルに対応するIDを生成させています(バツは直線二本なので2倍のIDを用意しています).

グリッドの描画

グリッドの描画は以下のようにしておこないます.注意点としては座標はデカルト座標(中央が(0,0))です.

fn set_widgets(
    ref mut ui: conrod::UiCell,
    ids: &mut Ids,
    board: &mut [&mut [BoardState]],
    turn: &mut Turn,
) {
    widget::Canvas::new()
        .pad(0.0)
        .color(color::WHITE)
        .set(ids.canvas, ui);

    let rows = board.len();
    let cols = board[0].len();
    let canvas_wh = ui.wh_of(ids.canvas).unwrap();
    let tl = [-canvas_wh[0] / 2., canvas_wh[1] / 2.];
    let sw = canvas_wh[0] / (cols as f64);
    let sh = canvas_wh[1] / (rows as f64);

    // draw grid line
    for x in 1..cols {
        widget::Line::abs(
            [tl[0] + (x as f64) * sw, tl[1]],
            [tl[0] + (x as f64) * sw, tl[1] - canvas_wh[1] + 1.],
        ).color(color::BLACK)
            .set(ids.grids[x - 1], ui);
    }
    for y in 1..rows {
        widget::Line::abs(
            [tl[0], tl[1] - (y as f64) * sh],
            [tl[0] + canvas_wh[1] - 1., tl[1] - (y as f64) * sh],
        ).color(color::BLACK)
            .set(ids.grids[y - 1 + cols - 1], ui);
    }

マウスイベントの検知

マウスイベントの検知は,ui.widget_input().mouse()を使います.

    if let Some(mouse) = ui.widget_input(ids.canvas).mouse() {
        if mouse.buttons.left().is_down() {
            let mouse_abs_xy = mouse.abs_xy();
            let x = ((mouse_abs_xy[0] + (canvas_wh[0] / 2.)) / (canvas_wh[0] / (cols as f64))) as
                usize;
            let y = ((canvas_wh[1] / 2. - mouse_abs_xy[1]) / (canvas_wh[1] / (rows as f64))) as
                usize;
            // println!("{:?}, {}, {}", mouse_abs_xy, x, y);

            // when resizing, x can be greater than cols (so do y)
            if x < cols && y < rows {
                if board[y][x] == BoardState::Empty {
                    match *turn {
                        Turn::White => {
                            board[y][x] = BoardState::Circle;
                            *turn = Turn::Black;
                        }
                        Turn::Black => {
                            board[y][x] = BoardState::X;
                            *turn = Turn::White;
                        }
                    }
                }
            }
        }
    }

盤面の描画

盤面の描画をする際は,IDが他のウィジェットと被らないように注意しながらウィジェットを作成します.うっかりIDが被ると上書きされてしまって古いウィジェットが消えてしまいます.

    // draw circle or x
    for y in 0..rows {
        for x in 0..cols {
            match board[y][x] {
                BoardState::Circle => {
                    widget::Circle::outline_styled(
                        sw / 3.,
                        widget::line::Style::new().thickness(2.),
                    ).x(tl[0] + sw * (x as f64) + sw / 2.)
                        .y(tl[1] - sh * (y as f64) - sh / 2.)
                        .color(color::RED)
                        .set(ids.circles[y * cols + x], ui);
                }
                BoardState::X => {
                    widget::Line::abs(
                        [
                            tl[0] + sw * (x as f64) + sw / 5.,
                            tl[1] - sh * (y as f64) - sh / 5.,
                        ],
                        [
                            tl[0] + sw * ((x + 1) as f64) - sw / 5.,
                            tl[1] - sh * ((y + 1) as f64) + sh / 5.,
                        ],
                    ).color(color::BLACK)
                        .thickness(2.)
                        .set(ids.xs[y * cols + x], ui);

                    widget::Line::abs(
                        [
                            tl[0] + sw * ((x + 1) as f64) - sw / 5.,
                            tl[1] - sh * (y as f64) - sh / 5.,
                        ],
                        [
                            tl[0] + sw * (x as f64) + sw / 5.,
                            tl[1] - sh * ((y + 1) as f64) + sh / 5.,
                        ],
                    ).color(color::BLACK)
                        .thickness(2.)
                        .set(ids.xs[y * cols + x + rows * cols], ui);
                }
                _ => {}
            }
        }
    }
}

少々面倒ですね.まぁ,そもそもこういうことをしたいのなら,使ったことないですがゲームエンジンであるpistonとかを使うべきなんだと思います.

まとめ

ということでconrodの基礎的な使い方について書きました.メニューやダイアログといった機能はまだないようですし,ウィジェット自体の数も決して多くはないので複雑なGUIアプリケーションを作成するのはちょっと大変かなという気がしますが,簡単で静的なものを作るにはいいかもしれません.

*1:gliumはoriginal authorがもう積極的にサポートしないことを表明していますが,コミュニティによってメンテナンスされていくようです(https://github.com/PistonDevelopers/conrod/issues/989).今original authorはvulkanのラッパーを作ってるので将来的にはそれに移行していくのかもしれません

DPDKのmbufの構造

DPDKではパケットをmbufというデータ構造で管理しています. mbufについては http://dpdk.org/doc/guides/prog_guide/mbuf_lib.html にまとまっていますが,ここでは具体的なmbufの構成や使われ方についてみていきたいと思います.

mempoolのおさらい

mbufはmempoolを使用しているので,mbufの説明の前に簡単にmempoolについて説明します.

前回説明したとおり,mempoolは以下の特徴を持っています.

  • 固定長サイズのメモリアロケータ
  • 内部で空きオブジェクトをリングバッファとして管理
  • コアごとのキャッシュを持つことができる

mempoolの概要を図にすると,以下のようになります.

f:id:mm_i:20170529110320p:plain

rte_create_mempool()でmempoolを作成しますが,ここで確保したオブジェクトはTAILQであるmp->elt_listに追加されます.またそれと同時にオブジェクトはmp->pool_dataで示されるrte_ringに追加されます(rte_ringのデータ部の実体はポインタの配列で,オブジェクトを指します). mempoolのキャッシュが有効な場合はmempool構造体の後ろにキャッシュ用のデータ構造用意されます.こちらも実体はポインタの配列で,キャッシュされたオブジェクトを指します.キャッシュされたオブジェクトがring bufferの方に入っていることはありません.

mempoolからのオブジェクトの取得は以下のような流れになります.

  • rte_mempool_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned n)
    • mempool mからn個オブジェクトを取得
    • obj_tableに確保したオブジェクトのポインタが格納される
  • このとき実際にはまずmempoolのキャッシュからオブジェクトを取得しようとする
  • もしキャッシュにオブジェクトがなければ,mempool本体のring bufferからオブジェクトを取得

また,確保したオブジェクトをmempoolに戻す場合は,以下のようにします.

  • rte_mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table, unsigned n)
    • obj_tableのn個のオブジェクトをmempool mpに戻す
  • このとき,まずmpのキャッシュに戻そうとする
  • もしキャッシュが一杯なら,mempool本体のバッファにオブジェクトを戻す

ringの初期化

ringの初期化はrte_mempool_populate_phys()内のrte_mempool_ops_alloc()でおこなわれます.

    /* create the internal ring if not already done */
    if ((mp->flags & MEMPOOL_F_POOL_CREATED) == 0) {
        ret = rte_mempool_ops_alloc(mp);
        if (ret != 0)
            return ret;
        mp->flags |= MEMPOOL_F_POOL_CREATED;
    }

rte_mempool_ops_alloc()は設定によって何が呼ばれるか変わりますが,例えばmulti consumer / multi producer の場合はdrivers/mempool/ring/rte_mempool_ring.ccommon_ring_alloc()が呼ばれます.

static int
common_ring_alloc(struct rte_mempool *mp)
{
    int rg_flags = 0, ret;
    char rg_name[RTE_RING_NAMESIZE];
    struct rte_ring *r;

    ret = snprintf(rg_name, sizeof(rg_name),
        RTE_MEMPOOL_MZ_FORMAT, mp->name);
    if (ret < 0 || ret >= (int)sizeof(rg_name)) {
        rte_errno = ENAMETOOLONG;
        return -rte_errno;
    }

    /* ring flags */
    if (mp->flags & MEMPOOL_F_SP_PUT)
        rg_flags |= RING_F_SP_ENQ;
    if (mp->flags & MEMPOOL_F_SC_GET)
        rg_flags |= RING_F_SC_DEQ;

    /*
     * Allocate the ring that will be used to store objects.
     * Ring functions will return appropriate errors if we are
     * running as a secondary process etc., so no checks made
     * in this function for that condition.
     */
    r = rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
        mp->socket_id, rg_flags);
    if (r == NULL)
        return -rte_errno;

    mp->pool_data = r;

    return 0;
}

mbufの構造

mbufの構造はざっくりと以下のようになります.

f:id:mm_i:20170529110606p:plain

mbufはmempoolを利用しており,一つのmbufが一つのmempoolのオブジェクトになります. mbufの先頭にrte_mbuf構造体が存在し,その後ろに実際のデータ領域が続いています. 後からデータを追加できるように,data_off分の領域が先頭から空いています. rte_pktmbuf_prepend(), rte_pktmbuf_append()を使ってdataの先頭/末尾にデータを追加できます. また,もし受信パケットが分割されている場合はmbuf->nextが次のmbufへを指します. mfbu->nb_segsがパケット全体がいくつのmbufから成るか示す値です.

rte_mbufは以下のようになっています.

struct rte_mbuf {
    MARKER cacheline0;

    void *buf_addr;           /**< Virtual address of segment buffer. */
    /**
     * Physical address of segment buffer.
     * Force alignment to 8-bytes, so as to ensure we have the exact
     * same mbuf cacheline0 layout for 32-bit and 64-bit. This makes
     * working on vector drivers easier.
     */
    phys_addr_t buf_physaddr __rte_aligned(sizeof(phys_addr_t));

    /* next 8 bytes are initialised on RX descriptor rearm */
    MARKER64 rearm_data;
    uint16_t data_off;

    /**
     * Reference counter. Its size should at least equal to the size
     * of port field (16 bits), to support zero-copy broadcast.
     * It should only be accessed using the following functions:
     * rte_mbuf_refcnt_update(), rte_mbuf_refcnt_read(), and
     * rte_mbuf_refcnt_set(). The functionality of these functions (atomic,
     * or non-atomic) is controlled by the CONFIG_RTE_MBUF_REFCNT_ATOMIC
     * config option.
     */
    RTE_STD_C11
    union {
        rte_atomic16_t refcnt_atomic; /**< Atomically accessed refcnt */
        uint16_t refcnt;              /**< Non-atomically accessed refcnt */
    };
    uint16_t nb_segs;         /**< Number of segments. */

    /** Input port (16 bits to support more than 256 virtual ports). */
    uint16_t port;

    uint64_t ol_flags;        /**< Offload features. */

    /* remaining bytes are set on RX when pulling packet from descriptor */
    MARKER rx_descriptor_fields1;

    /*
     * The packet type, which is the combination of outer/inner L2, L3, L4
     * and tunnel types. The packet_type is about data really present in the
     * mbuf. Example: if vlan stripping is enabled, a received vlan packet
     * would have RTE_PTYPE_L2_ETHER and not RTE_PTYPE_L2_VLAN because the
     * vlan is stripped from the data.
     */
    RTE_STD_C11
    union {
        uint32_t packet_type; /**< L2/L3/L4 and tunnel information. */
        struct {
            uint32_t l2_type:4; /**< (Outer) L2 type. */
            uint32_t l3_type:4; /**< (Outer) L3 type. */
            uint32_t l4_type:4; /**< (Outer) L4 type. */
            uint32_t tun_type:4; /**< Tunnel type. */
            uint32_t inner_l2_type:4; /**< Inner L2 type. */
            uint32_t inner_l3_type:4; /**< Inner L3 type. */
            uint32_t inner_l4_type:4; /**< Inner L4 type. */
        };
    };

    uint32_t pkt_len;         /**< Total pkt len: sum of all segments. */
    uint16_t data_len;        /**< Amount of data in segment buffer. */
    /** VLAN TCI (CPU order), valid if PKT_RX_VLAN_STRIPPED is set. */
    uint16_t vlan_tci;

    union {
        uint32_t rss;     /**< RSS hash result if RSS enabled */
        struct {
            RTE_STD_C11
            union {
                struct {
                    uint16_t hash;
                    uint16_t id;
                };
                uint32_t lo;
                /**< Second 4 flexible bytes */
            };
            uint32_t hi;
            /**< First 4 flexible bytes or FD ID, dependent on
                 PKT_RX_FDIR_* flag in ol_flags. */
        } fdir;           /**< Filter identifier if FDIR enabled */
        struct {
            uint32_t lo;
            uint32_t hi;
        } sched;          /**< Hierarchical scheduler */
        uint32_t usr;     /**< User defined tags. See rte_distributor_process() */
    } hash;                   /**< hash information */

    /** Outer VLAN TCI (CPU order), valid if PKT_RX_QINQ_STRIPPED is set. */
    uint16_t vlan_tci_outer;

    uint16_t buf_len;         /**< Length of segment buffer. */

    /** Valid if PKT_RX_TIMESTAMP is set. The unit and time reference
     * are not normalized but are always the same for a given port.
     */
    uint64_t timestamp;

    /* second cache line - fields only used in slow path or on TX */
    MARKER cacheline1 __rte_cache_min_aligned;

    RTE_STD_C11
    union {
        void *userdata;   /**< Can be used for external metadata */
        uint64_t udata64; /**< Allow 8-byte userdata on 32-bit */
    };

    struct rte_mempool *pool; /**< Pool from which mbuf was allocated. */
    struct rte_mbuf *next;    /**< Next segment of scattered packet. */

    /* fields to support TX offloads */
    RTE_STD_C11
    union {
        uint64_t tx_offload;       /**< combined for easy fetch */
        __extension__
        struct {
            uint64_t l2_len:7;
            /**< L2 (MAC) Header Length for non-tunneling pkt.
             * Outer_L4_len + ... + Inner_L2_len for tunneling pkt.
             */
            uint64_t l3_len:9; /**< L3 (IP) Header Length. */
            uint64_t l4_len:8; /**< L4 (TCP/UDP) Header Length. */
            uint64_t tso_segsz:16; /**< TCP TSO segment size */

            /* fields for TX offloading of tunnels */
            uint64_t outer_l3_len:9; /**< Outer L3 (IP) Hdr Length. */
            uint64_t outer_l2_len:7; /**< Outer L2 (MAC) Hdr Length. */

            /* uint64_t unused:8; */
        };
    };

    /** Size of the application private data. In case of an indirect
     * mbuf, it stores the direct mbuf private data size. */
    uint16_t priv_size;

    /** Timesync flags for use with IEEE1588. */
    uint16_t timesync;

    /** Sequence number. See also rte_reorder_insert(). */
    uint32_t seqn;

} __rte_cache_aligned;

refcntがあるのはmbufのデータが他のmbufから参照される場合があるからです(下図).

f:id:mm_i:20170529110553p:plain

rte_pktmbuf_attach()で他のmbufのデータを参照するようにできます.また,ヘルパー関数としてrte_pktmbuf_clone()で指定したmbufのデータを参照するmbufを作成できます. dpdkでは他のmbufのデータを指すmbufをindiret,自分自身のデータを指すmbufをdirectと呼んでいます. indirectなmbufは自身のデータ領域を使わないので,オブジェクトサイズが小さいmempoolからmbufを確保した方がメモリ効率が上がります.

mbufの作成

mbufの作成は,rte_pktmbuf_pool_create()でおこないます. rte_pktmbuf_pool_create()では,rte_mempool_create_empty()および rte_mempool_set_ops_byname(), rte_mempool_populate_default()を利用して mempoolを作成したのち,最後にrte_mempool_obj_iter()を使って 各mempoolのelemに対してrte_pktmbuf_init()を実行します.

struct rte_mempool *
rte_pktmbuf_pool_create(const char *name, unsigned n,
    unsigned cache_size, uint16_t priv_size, uint16_t data_room_size,
    int socket_id)
{
    struct rte_mempool *mp;
    struct rte_pktmbuf_pool_private mbp_priv;
    unsigned elt_size;
    int ret;

    if (RTE_ALIGN(priv_size, RTE_MBUF_PRIV_ALIGN) != priv_size) {
        RTE_LOG(ERR, MBUF, "mbuf priv_size=%u is not aligned\n",
            priv_size);
        rte_errno = EINVAL;
        return NULL;
    }
    elt_size = sizeof(struct rte_mbuf) + (unsigned)priv_size +
        (unsigned)data_room_size;
    mbp_priv.mbuf_data_room_size = data_room_size;
    mbp_priv.mbuf_priv_size = priv_size;


    mp = rte_mempool_create_empty(name, n, elt_size, cache_size,
         sizeof(struct rte_pktmbuf_pool_private), socket_id, 0);
    if (mp == NULL)
        return NULL;

    ret = rte_mempool_set_ops_byname(mp,
        RTE_MBUF_DEFAULT_MEMPOOL_OPS, NULL);
    if (ret != 0) {
        RTE_LOG(ERR, MBUF, "error setting mempool handler\n");
        rte_mempool_free(mp);
        rte_errno = -ret;
        return NULL;
    }
    rte_pktmbuf_pool_init(mp, &mbp_priv);

    ret = rte_mempool_populate_default(mp);
    if (ret < 0) {
        rte_mempool_free(mp);
        rte_errno = -ret;
        return NULL;
    }

    rte_mempool_obj_iter(mp, rte_pktmbuf_init, NULL);

    return mp;
}

rte_pktmbuf_init()ではmempoolの一つのオブジェクトの先頭にrte_mbufの構造体 領域を確保し,それを初期化しています.

void
rte_pktmbuf_init(struct rte_mempool *mp,
         __attribute__((unused)) void *opaque_arg,
         void *_m,
         __attribute__((unused)) unsigned i)
{
    struct rte_mbuf *m = _m;
    uint32_t mbuf_size, buf_len, priv_size;

    priv_size = rte_pktmbuf_priv_size(mp);
    mbuf_size = sizeof(struct rte_mbuf) + priv_size;
    buf_len = rte_pktmbuf_data_room_size(mp);

    RTE_ASSERT(RTE_ALIGN(priv_size, RTE_MBUF_PRIV_ALIGN) == priv_size);
    RTE_ASSERT(mp->elt_size >= mbuf_size);
    RTE_ASSERT(buf_len <= UINT16_MAX);

    memset(m, 0, mp->elt_size);

    /* start of buffer is after mbuf structure and priv data */
    m->priv_size = priv_size;
    m->buf_addr = (char *)m + mbuf_size;
    m->buf_physaddr = rte_mempool_virt2phy(mp, m) + mbuf_size;
    m->buf_len = (uint16_t)buf_len;

    /* keep some headroom between start of buffer and data */
    m->data_off = RTE_MIN(RTE_PKTMBUF_HEADROOM, (uint16_t)m->buf_len);

    /* init some constant fields */
    m->pool = mp;
    m->nb_segs = 1;
    m->port = 0xff;
    rte_mbuf_refcnt_set(m, 1);
    m->next = NULL;
}

mbufの利用: パケットの受信

実際にパケットの受信時にどのようにmbufが利用されるか見ていきたいと思います.

rx/txバッファの設定

dpdkのexampleにあるl2fwdのプログラムでは以下のように初期化をおこなっています.

// mbufの作成
pktmbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", NB_MBUF,
        MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE,
        rte_socket_id());
...

// ポートごとに初期化
for (portid = 0; portid < nb_ports; portid++) {
    ...
    ret = rte_eth_dev_configure(portid, 1, 1, &port_conf);
    ...
    ret = rte_eth_rx_queue_setup(portid, 0, nb_rxd,
                     rte_eth_dev_socket_id(portid),
                     NULL,
                     l2fwd_pktmbuf_pool);
    ...
    ret = rte_eth_tx_queue_setup(portid, 0, nb_txd,
            rte_eth_dev_socket_id(portid),
}

まずrte_pktmbuf_pool_create()でプログラム全体が利用するmbufを確保します. ちなみに,NB_MBUFは8192, RTE_MBUF_DEFAULT_BUF_SIZEは2KB + RTE_PKTMBUF_HEADROOM(自分の環境では128B)です.

その後各NICのポートの設定をおこないますが,特に重要なのがrte_eth_rx_queue_setup()rte_eth_tx_queue_setup()です. ixgbeを使用する場合,rte_eth_rx_queue_setup() から最終的には drivers/net/ixgbe/ixgbe_rxtx.cixgbe_dev_rx_queue_setup()が呼ばれます.

ixgbe_dev_rx_queue_setup()ではrx queue用のデータ構造(ixgbe_rx_queue)やring descriptor用のメモリを確保したりします.

int __attribute__((cold))
ixgbe_dev_rx_queue_setup(struct rte_eth_dev *dev,
             uint16_t queue_idx,
             uint16_t nb_desc,
             unsigned int socket_id,
             const struct rte_eth_rxconf *rx_conf,
             struct rte_mempool *mp)
{
    ...
    /* First allocate the rx queue data structure */
    rxq = rte_zmalloc_socket("ethdev RX queue", sizeof(struct ixgbe_rx_queue),
                 RTE_CACHE_LINE_SIZE, socket_id);
    if (rxq == NULL)
        return -ENOMEM;
    rxq->mb_pool = mp;
    rxq->nb_rx_desc = nb_desc;
    rxq->rx_free_thresh = rx_conf->rx_free_thresh;
    rxq->queue_id = queue_idx;
    rxq->reg_idx = (uint16_t)((RTE_ETH_DEV_SRIOV(dev).active == 0) ?
        queue_idx : RTE_ETH_DEV_SRIOV(dev).def_pool_q_idx + queue_idx);
    rxq->port_id = dev->data->port_id;
    rxq->crc_len = (uint8_t) ((dev->data->dev_conf.rxmode.hw_strip_crc) ?
                            0 : ETHER_CRC_LEN);
    rxq->drop_en = rx_conf->rx_drop_en;
    rxq->rx_deferred_start = rx_conf->rx_deferred_start;

    ...

    /*
     * Allocate RX ring hardware descriptors. A memzone large enough to
     * handle the maximum ring size is allocated in order to allow for
     * resizing in later calls to the queue setup function.
     */
    rz = rte_eth_dma_zone_reserve(dev, "rx_ring", queue_idx,
                      RX_RING_SZ, IXGBE_ALIGN, socket_id);
    ...

    rxq->rx_ring_phys_addr = rte_mem_phy2mch(rz->memseg_id, rz->phys_addr);
    rxq->rx_ring = (union ixgbe_adv_rx_desc *) rz->addr;

    ...

    /*
     * Allocate software ring. Allow for space at the end of the
     * S/W ring to make sure look-ahead logic in bulk alloc Rx burst
     * function does not access an invalid memory region.
     */
    len = nb_desc;
    if (adapter->rx_bulk_alloc_allowed)
        len += RTE_PMD_IXGBE_RX_MAX_BURST;

    rxq->sw_ring = rte_zmalloc_socket("rxq->sw_ring",
                      sizeof(struct ixgbe_rx_entry) * len,
                      RTE_CACHE_LINE_SIZE, socket_id);
    ...

    dev->data->rx_queues[queue_idx] = rxq;

    ixgbe_reset_rx_queue(adapter, rxq);

    return 0;
}

rx ring

ixgbeの場合のrx ringのデータ構造は以下のようになっています.

f:id:mm_i:20170529110825p:plain

struct ixgbe_rx_queue {
    struct rte_mempool  *mb_pool; /**< mbuf pool to populate RX ring. */
    volatile union ixgbe_adv_rx_desc *rx_ring; /**< RX ring virtual address. */
    uint64_t            rx_ring_phys_addr; /**< RX ring DMA address. */
    volatile uint32_t   *rdt_reg_addr; /**< RDT register address. */
    volatile uint32_t   *rdh_reg_addr; /**< RDH register address. */
    struct ixgbe_rx_entry *sw_ring; /**< address of RX software ring. */
    struct ixgbe_scattered_rx_entry *sw_sc_ring; /**< address of scattered Rx software ring. */
    struct rte_mbuf *pkt_first_seg; /**< First segment of current packet. */
    struct rte_mbuf *pkt_last_seg; /**< Last segment of current packet. */
    uint64_t            mbuf_initializer; /**< value to init mbufs */
    uint16_t            nb_rx_desc; /**< number of RX descriptors. */
    uint16_t            rx_tail;  /**< current value of RDT register. */
    uint16_t            nb_rx_hold; /**< number of held free RX desc. */
    uint16_t rx_nb_avail; /**< nr of staged pkts ready to ret to app */
    uint16_t rx_next_avail; /**< idx of next staged pkt to ret to app */
    uint16_t rx_free_trigger; /**< triggers rx buffer allocation */
    uint16_t            rx_using_sse;
    /**< indicates that vector RX is in use */
#ifdef RTE_IXGBE_INC_VECTOR
    uint16_t            rxrearm_nb;     /**< number of remaining to be re-armed */
    uint16_t            rxrearm_start;  /**< the idx we start the re-arming from */
#endif
    uint16_t            rx_free_thresh; /**< max free RX desc to hold. */
    uint16_t            queue_id; /**< RX queue index. */
    uint16_t            reg_idx;  /**< RX queue register index. */
    uint16_t            pkt_type_mask;  /**< Packet type mask for different NICs. */
    uint8_t             port_id;  /**< Device port identifier. */
    uint8_t             crc_len;  /**< 0 if CRC stripped, 4 otherwise. */
    uint8_t             drop_en;  /**< If not 0, set SRRCTL.Drop_En. */
    uint8_t             rx_deferred_start; /**< not in global dev start. */
    /** flags to set in mbuf when a vlan is detected. */
    uint64_t            vlan_flags;
    /** need to alloc dummy mbuf, for wraparound when scanning hw ring */
    struct rte_mbuf fake_mbuf;
    /** hold packets to return to application */
    struct rte_mbuf *rx_stage[RTE_PMD_IXGBE_RX_MAX_BURST*2];
};

rx_ringが実際のixgbeのrx ringを指すポインタです.rx ringのbuffer addressはmbufのデータ部分を指します.sw_ringがrx ringで利用されるmbufを保持しています. rx_ringpkt_addrが対応するsw_ringのmbufのデータ部分を指します. 受信時はrx_tailで示されるインデックスからdescriptorを確認していきます(NICのtailとは異なります).

x540のreceive descriptor

ここで簡単にX540のreceive descriptorについて説明しておきます. 詳しくはデータシートの7.1に書いてあります.

受信する際に利用するring descriptor queueは以下のような構造をしています(データシートより引用).

f:id:mm_i:20170529110750p:plain

キューの各エントリがdescriptorと呼ばれます. ソフトウェア側はdescriptorを設定したのち,tailを進めます.これは実際にはX540のRDTレジスタの値を更新することでおこないます. またNICは内部的に空きdescriptorの先頭位置をheadとして覚えており,パケットを受信したらheadのdescriptorに従ってパケットをメモリにDMAしたあと,headを進めます.

f:id:mm_i:20170529110801p:plainf:id:mm_i:20170529110808p:plain

descriptorのformatは以下のようになっています(descriptorのフォーマットにもいくつかありますが,ここではそのうちのadvance descritorを示しています).

descriptorのpacket buffer addressにパケットをDMAする先のアドレスを指定します. このアドレスはdpdkではmbufのデータのアドレスになります. X540では受信パケットのヘッダとデータを別々の領域へDMAする(header splitting)ことが可能であるためheader buffer addressを指定する場所もありますが,DPDKでは利用していません. ソフトウェアはDDビットが1になっているかどうかを調べることで,パケットが到着しているかどうかを判断することができます. またdescriptor write-back formatのフィールドにはNICが受信したパケットの情報を書き込みます.

パケットの受信

パケットの受信はrte_eth_rx_burst()でおこないます.

nb_rx = rte_eth_rx_burst((uint8_t) portid, 0,
             pkts_burst, MAX_PKT_BURST);

rte_eth_rx_burst()は最大MAX_PKT_BURST個のパケットを受信し,結果をpkts_burstに格納します. NICのオフロード機能を使用するかなどの設定によって,rte_eth_rx_burst()を呼んだときに呼ばれる関数は異なります.ixgbeではixgbe_dev_rx_init()の中で呼ばれるixgbe_set_rx_function()でrecv時の関数を設定しています. ここではrecv時にixgbe_recv_pkts_bulk_alloc()が設定されていると仮定して,その具体的な処理をみてみます.

ixgbe_recv_pkts_bulk_alloc()では,RTE_PMD_IXGBE_RX_MAX_BURST (実際は32)単位でのパケット読み出しをおこない,実際に受信したパケットを返します.

uint16_t
ixgbe_recv_pkts_bulk_alloc(void *rx_queue, struct rte_mbuf **rx_pkts,
               uint16_t nb_pkts)
{
    uint16_t nb_rx;

    if (unlikely(nb_pkts == 0))
        return 0;

    if (likely(nb_pkts <= RTE_PMD_IXGBE_RX_MAX_BURST))
        return rx_recv_pkts(rx_queue, rx_pkts, nb_pkts);

    /* request is relatively large, chunk it up */
    nb_rx = 0;
    while (nb_pkts) {
        uint16_t ret, n;

        n = (uint16_t)RTE_MIN(nb_pkts, RTE_PMD_IXGBE_RX_MAX_BURST);
        ret = rx_recv_pkts(rx_queue, &rx_pkts[nb_rx], n);
        nb_rx = (uint16_t)(nb_rx + ret);
        nb_pkts = (uint16_t)(nb_pkts - ret);
        if (ret < n)
            break;
    }

    return nb_rx;
}

実際の受信処理をおこなっているのはrx_recv_pkts()になります.

static inline uint16_t
rx_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
         uint16_t nb_pkts)
{
    struct ixgbe_rx_queue *rxq = (struct ixgbe_rx_queue *)rx_queue;
    uint16_t nb_rx = 0;

    /* Any previously recv'd pkts will be returned from the Rx stage */
    if (rxq->rx_nb_avail)
        return ixgbe_rx_fill_from_stage(rxq, rx_pkts, nb_pkts);

前回受信したパケットがまだ残っていたらそれを処理します (これはこの関数がnb_pkts単位で受信したパケットを返すからです).

    /* Scan the H/W ring for packets to receive */
    nb_rx = (uint16_t)ixgbe_rx_scan_hw_ring(rxq);

ixgbe_rx_scan_hw_ring()でパケットが到着しているかチェックします. nb_rxに受信したパケット数が入ります.

    /* update internal queue state */
    rxq->rx_next_avail = 0;
    rxq->rx_nb_avail = nb_rx;
    rxq->rx_tail = (uint16_t)(rxq->rx_tail + nb_rx);

rx_tailnb_rxだけ進めます.

    /* if required, allocate new buffers to replenish descriptors */
    if (rxq->rx_tail > rxq->rx_free_trigger) {
        uint16_t cur_free_trigger = rxq->rx_free_trigger;

        if (ixgbe_rx_alloc_bufs(rxq, true) != 0) {
            int i, j;

            PMD_RX_LOG(DEBUG, "RX mbuf alloc failed port_id=%u "
                   "queue_id=%u", (unsigned) rxq->port_id,
                   (unsigned) rxq->queue_id);

            rte_eth_devices[rxq->port_id].data->rx_mbuf_alloc_failed +=
                rxq->rx_free_thresh;

            /*
             * Need to rewind any previous receives if we cannot
             * allocate new buffers to replenish the old ones.
             */
            rxq->rx_nb_avail = 0;
            rxq->rx_tail = (uint16_t)(rxq->rx_tail - nb_rx);
            for (i = 0, j = rxq->rx_tail; i < nb_rx; ++i, ++j)
                rxq->sw_ring[j].mbuf = rxq->rx_stage[i];

            return 0;
        }

        /* update tail pointer */
        rte_wmb();
        IXGBE_PCI_REG_WRITE_RELAXED(rxq->rdt_reg_addr,
                        cur_free_trigger);
    }

rx_tailの値が設定した閾値以上になったら,ixgbe_rx_alloc_bufs()を使って新しいmbufを取得し,それをデスクリプタに設定します.またNICのtail pointerを更新します.

    if (rxq->rx_tail >= rxq->nb_rx_desc)
        rxq->rx_tail = 0;

この関数は一気にパケットを指定した数(8個)読もうとしますが,その際ring bufferの先頭に巻き戻って読むことはしません.もし末尾に到達したらここで先頭にインデックスを戻しています.(したがって,ring descriptorの最大数はこのバースト読み出し分を考慮して余分に確保しておく必要があります)

    /* received any packets this loop? */
    if (rxq->rx_nb_avail)
        return ixgbe_rx_fill_from_stage(rxq, rx_pkts, nb_pkts);

    return 0;
}

受信したパケットがあればixgbe_rx_fill_from_stage()でそのパケットの後処理をして返します.

デスクリプタをチェックしてパケットの受信を判断するixgbe_rx_scan_hw_ring()は以下のようになっています.

static inline int
ixgbe_rx_scan_hw_ring(struct ixgbe_rx_queue *rxq)
{
    volatile union ixgbe_adv_rx_desc *rxdp;
    struct ixgbe_rx_entry *rxep;
    struct rte_mbuf *mb;
    uint16_t pkt_len;
    uint64_t pkt_flags;
    int nb_dd;
    uint32_t s[LOOK_AHEAD];
    uint32_t pkt_info[LOOK_AHEAD];
    int i, j, nb_rx = 0;
    uint32_t status;
    uint64_t vlan_flags = rxq->vlan_flags;

    /* get references to current descriptor and S/W ring entry */
    rxdp = &rxq->rx_ring[rxq->rx_tail];
    rxep = &rxq->sw_ring[rxq->rx_tail];

    status = rxdp->wb.upper.status_error;
    /* check to make sure there is at least 1 packet to receive */
    if (!(status & rte_cpu_to_le_32(IXGBE_RXDADV_STAT_DD)))
        return 0;

    /*
     * Scan LOOK_AHEAD descriptors at a time to determine which descriptors
     * reference packets that are ready to be received.
     */
    for (i = 0; i < RTE_PMD_IXGBE_RX_MAX_BURST;
         i += LOOK_AHEAD, rxdp += LOOK_AHEAD, rxep += LOOK_AHEAD) {
        /* Read desc statuses backwards to avoid race condition */
        for (j = 0; j < LOOK_AHEAD; j++)
            s[j] = rte_le_to_cpu_32(rxdp[j].wb.upper.status_error);

        rte_smp_rmb();

        /* Compute how many status bits were set */
        for (nb_dd = 0; nb_dd < LOOK_AHEAD &&
                (s[nb_dd] & IXGBE_RXDADV_STAT_DD); nb_dd++)
            ;

ここでDDビットを見てパケットを受信したか見ています.

        for (j = 0; j < nb_dd; j++)
            pkt_info[j] = rte_le_to_cpu_32(rxdp[j].wb.lower.
                               lo_dword.data);

        nb_rx += nb_dd;

        /* Translate descriptor info to mbuf format */
        for (j = 0; j < nb_dd; ++j) {
            mb = rxep[j].mbuf;
            pkt_len = rte_le_to_cpu_16(rxdp[j].wb.upper.length) -
                  rxq->crc_len;
            mb->data_len = pkt_len;
            mb->pkt_len = pkt_len;
            mb->vlan_tci = rte_le_to_cpu_16(rxdp[j].wb.upper.vlan);

            /* convert descriptor fields to rte mbuf flags */
            pkt_flags = rx_desc_status_to_pkt_flags(s[j],
                vlan_flags);
            pkt_flags |= rx_desc_error_to_pkt_flags(s[j]);
            pkt_flags |= ixgbe_rxd_pkt_info_to_pkt_flags
                    ((uint16_t)pkt_info[j]);
            mb->ol_flags = pkt_flags;
            mb->packet_type =
                ixgbe_rxd_pkt_info_to_pkt_type
                    (pkt_info[j], rxq->pkt_type_mask);

            if (likely(pkt_flags & PKT_RX_RSS_HASH))
                mb->hash.rss = rte_le_to_cpu_32(
                    rxdp[j].wb.lower.hi_dword.rss);
            else if (pkt_flags & PKT_RX_FDIR) {
                mb->hash.fdir.hash = rte_le_to_cpu_16(
                    rxdp[j].wb.lower.hi_dword.csum_ip.csum) &
                    IXGBE_ATR_HASH_MASK;
                mb->hash.fdir.id = rte_le_to_cpu_16(
                    rxdp[j].wb.lower.hi_dword.csum_ip.ip_id);
            }
        }

この部分で受信したパケットに関して,mbufのメタデータ部分ににパケットの情報を格納しています.

        /* Move mbuf pointers from the S/W ring to the stage */
        for (j = 0; j < LOOK_AHEAD; ++j) {
            rxq->rx_stage[i + j] = rxep[j].mbuf;
        }

        /* stop if all requested packets could not be received */
        if (nb_dd != LOOK_AHEAD)
            break;
    }

    /* clear software ring entries so we can cleanup correctly */
    for (i = 0; i < nb_rx; ++i) {
        rxq->sw_ring[rxq->rx_tail + i].mbuf = NULL;
    }


    return nb_rx;
}

最後に,受信したパケットのmbufのアドレスをrx_stageに入れ,sw_ringの方はNULLにします.

ixgbe_rx_fill_from_stage()では最大nb_pkts分の受信データをrx_pktsに格納して返します.

static inline uint16_t
ixgbe_rx_fill_from_stage(struct ixgbe_rx_queue *rxq, struct rte_mbuf **rx_pkts,
             uint16_t nb_pkts)
{
    struct rte_mbuf **stage = &rxq->rx_stage[rxq->rx_next_avail];
    int i;

    /* how many packets are ready to return? */
    nb_pkts = (uint16_t)RTE_MIN(nb_pkts, rxq->rx_nb_avail);

    /* copy mbuf pointers to the application's packet list */
    for (i = 0; i < nb_pkts; ++i)
        rx_pkts[i] = stage[i];

    /* update internal queue state */
    rxq->rx_nb_avail = (uint16_t)(rxq->rx_nb_avail - nb_pkts);
    rxq->rx_next_avail = (uint16_t)(rxq->rx_next_avail + nb_pkts);

    return nb_pkts;
}

また,ixgbe_rx_alloc_bufs()では以下のようにしてmbufを取得しています.

static inline int
ixgbe_rx_alloc_bufs(struct ixgbe_rx_queue *rxq, bool reset_mbuf)
{
    volatile union ixgbe_adv_rx_desc *rxdp;
    struct ixgbe_rx_entry *rxep;
    struct rte_mbuf *mb;
    uint16_t alloc_idx;
    __le64 dma_addr;
    int diag, i;

    /* allocate buffers in bulk directly into the S/W ring */
    alloc_idx = rxq->rx_free_trigger - (rxq->rx_free_thresh - 1);
    rxep = &rxq->sw_ring[alloc_idx];
    diag = rte_mempool_get_bulk(rxq->mb_pool, (void *)rxep,
                    rxq->rx_free_thresh);

rte_mempool_get_bulk()を使ってmempoolから空いているmbufを取得します.

    if (unlikely(diag != 0))
        return -ENOMEM;

    rxdp = &rxq->rx_ring[alloc_idx];
    for (i = 0; i < rxq->rx_free_thresh; ++i) {
        /* populate the static rte mbuf fields */
        mb = rxep[i].mbuf;
        if (reset_mbuf) {
            mb->port = rxq->port_id;
        }

        rte_mbuf_refcnt_set(mb, 1);
        mb->data_off = RTE_PKTMBUF_HEADROOM;

        /* populate the descriptors */
        dma_addr = rte_cpu_to_le_64(rte_mbuf_data_dma_addr_default(mb));
        rxdp[i].read.hdr_addr = 0;
        rxdp[i].read.pkt_addr = dma_addr;
    }

各デスクリプタのpkt_addrにmbufのデータ領域のアドレスを書き込みます.

    /* update state of internal queue structure */
    rxq->rx_free_trigger = rxq->rx_free_trigger + rxq->rx_free_thresh;
    if (rxq->rx_free_trigger >= rxq->nb_rx_desc)
        rxq->rx_free_trigger = rxq->rx_free_thresh - 1;

    /* no errors */
    return 0;
}

最後に次にmbufを確保する際の閾値となるrx_free_triggerを更新しています. ちなみに,rx_free_threshはデフォルトで32のようです.

mbufの利用: マルチキャスト

受信以外のmbuf利用の例として,サンプルプログラムに含まれているマルチキャストのプログラムを見てみます.

http://dpdk.org/doc/guides/sample_app_ug/ipv4_multicast.html は受信したパケットを複数のポートからマルチキャストするサンプルプログラムです. マルチキャストの流れは以下のようになります.

f:id:mm_i:20170529110435p:plain

  1. 受信パケットからヘッダを取り除く (rte_pktmbuf_adj()を使ってdata_offを長くする)
  2. rte_pktmbuf_clone()を使って受信パケットのデータ部分を指すmbufを作成
  3. rte_pktmbuf_create()を使ってヘッダ部分のパケットを作成,mbuf->nextはcloneしたmbufを指すようにする 4. 作成したパケットを指定したポートから送信

cloneを使って複数のポートから送信する場合,各パケットは自身のメタデータを持つことになります.したがって最後のポートに関してはcloneをする必要がなく,受信パケットのメタデータを直接変更して使用することができます. もちろん,cloneをせず直接受信パケットをmbuf->nextで指す方法もありますが,この場合は受信パケットのメタデータを変更できないため,最後のポートに関しても他の場合と同様にして送信をおこないます. コードにはcloneする場合としない場合,2通りのコードが書いてあります.