Rustのasync/awaitを使ったecho serverの実装

rust 1.39でasync/await構文が安定化されます. 巷で話題の機能ですが,asycn/await何も分からん...ということで話題に乗り遅れないためにasync/awaitを利用してecho serverを実装してみます.

OSはLinuxが対象です.ソースはここにあります.

echo server

async/awaitを使用する前に,まずは通常のecho serverを作成してみます(ソース). あえてstd::netを使わないで実装しています.

主処理は以下のようになっています.

fn handle_client(stream: TcpStream) -> io::Result<()> {
    let mut buf = [0u8; 1024];
    loop {
        let n = syscall!(read(
            stream.0,
            buf.as_mut_ptr() as *mut libc::c_void,
            buf.len()
        ))?;
        if n == 0 {
            break;
        }
        // NOTE: write() possibly writes less than n bytes. So we should check the return value to
        // check how many bytes written but omit it here for simplicity.
        syscall!(write(
            stream.0,
            buf.as_ptr() as *const libc::c_void,
            n as usize
        ))?;
    }
    Ok(())
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = Ipv4Addr::new(127, 0, 0, 1);
    let port = 8080;

    let listner = TcpListener::bind(addr, port)?;
    for stream in listner.incoming() {
        let stream = stream?;
        handle_client(stream)?;
    }

    Ok(())
}

さて,一見良さそうなこのecho serverですが,致命的な問題があります.…そうです,一度に一コネクションしか対応できません.

問題の直接的な原因はクライアント処理をwhileループ内で実施していることですが,それとは別にI/O処理(listen()/accept()/read()/write())がブロックするというのもあります.

ブロックするというのは,処理が完了するまでシステムコールがリターンしない(結果としてスレッドが停止する)という意味です. 例えば,クライアントのパケットを読もうとしてread()をしても,その時点ではクライアントからのパケットが到着していないということは十分ありえます.ブロックする場合はパケット到着までそのスレッドはスリープすることになります. この間もし別のクライアントからの接続要求が先に来たとしても,そのスレッドはそれを優先的に処理することはできません.

multi-thread版 echo server

上記問題を解決する方法はいくつかありますが,単純にはクライアントごとにスレッドを立ち上げれば良いです(ソース).

    for stream in listner.incoming() {
        let stream = stream?;
        std::thread::spawn(move || {
            handle_client(stream).unwrap();
        });
    }

多くの場合,これで上手くいくと思います. そして,これで問題が無いのであればあえて他の手法を利用する必要は特にないと思います. よくスレッドの生成コスト云々言われますが,本当にそこがボトルネックになっているでしょうか? 適当にスレッドプールを用意することもできます.

...が,そうするとここで話が終わってしまうので以下で別の方法を説明します.

ノンブロッキングI/O (O_NONBLOCK)

Linuxでは,ソケットのfile descriptorにO_NONBLOCK属性を付与すると,そのソケットに対するアクセスがnon-blockingになります.

具体的には,ソケットに対してread()write()した際に,I/O処理がすぐに完了できない場合(クライアントからパケットが到着していない場合や,書き込みバッファが一杯の場合など)は,OSが処理可能になるまで待つ代わりに,errnoにEWOULDBLOCKEAGAINが設定されすぐにリターンします. そのためスレッドは別の処理を実行することができます.

echo serverの例で言うと,これを使って以下のように1スレッドで複数コネクションが扱えます.

(擬似コード)

loop {

    // handle client 1
    match read(fd1, ...) {
        Ok(n) => {...}
        Err(WOULDBLOCK) => ,
        ...
    }

    // handle client 2
    match read(fd2, ...) {
        Ok(n) => {...}
        Err(WOULDBLOCK) => ,
        ...
    }

    ....
}

ただし,この方法はユーザ空間でポーリングしていることになり,CPU利用的には非効率です. そこで次で説明するepollを利用します.

(ちなみに,ここで説明した意味でのノンブロッキング版 echo serverは実装していないのでソースはないです.)

epoll版 echo server

epoll(7)を利用すると,複数のfile descriptorを監視し,いずれかのfile descriptorが準備完了になった時点で通知を受け取ることができます. epollの使い方はざっと以下のようになります.

  • epoll_create()でepollを作成
  • epoll_ctl(EPOLL_CTL_ADD)を使ってイベントを登録
    • このとき,EPOLLINで入力待ち,EPOLLOUTが出力待ちの設定
  • epoll_wait()で登録したイベントのいずれかが準備完了になるまで待つ
  • epollから通知を受けたらそのfile descriptorに対して処理する

epoll版のメイン処理は以下のようになります(ソース).

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = Ipv4Addr::new(127, 0, 0, 1);
    let port = 8080;

    let mut epoll = Epoll::new(32)?;
    let listner = TcpListener::bind(addr, port)?;
    let mut clients: HashMap<RawFd, ClientState> = HashMap::new();

    epoll.add_event(listner.0, EpollEventType::In)?;

    loop {
        let nfd = epoll.wait()?;

        for i in 0..nfd {
            let fd = epoll.events[i].u64 as RawFd;
            if fd == listner.0 {
                /* Accept a new connection */
                let client = listner.accept()?;
                client.setnonblocking()?;
                epoll.add_event(client.0, EpollEventType::In)?;
                clients.insert(
                    client.0,
                    ClientState {
                        stream: client,
                        buf: [0u8; 1024],
                        buf_cursor: 0,
                    },
                );
            } else if clients.contains_key(&fd) {
                /* Handle a client */
                let events = epoll.events[i].events as i32;
                let client_state = clients.get_mut(&fd).unwrap();
                if (events & libc::EPOLLIN) > 0 {
                    /* read */
                    let n = syscall!(read(
                        fd,
                        client_state.buf.as_mut_ptr() as *mut libc::c_void,
                        client_state.buf.len()
                    ))?;
                    client_state.buf_cursor = n as usize;
                    if n == 0 {
                        epoll.del_event(fd)?;
                        clients.remove(&fd);
                        break;
                    }
                    epoll.mod_event(fd, EpollEventType::Out)?;
                } else if (events & libc::EPOLLOUT) > 0 {
                    /* write */
                    syscall!(write(
                        fd,
                        client_state.buf.as_ptr() as *const libc::c_void,
                        client_state.buf_cursor
                    ))?;
                    epoll.mod_event(fd, EpollEventType::In)?;
                } else {
                    unreachable!();
                }
            } else {
                unreachable!();
            }
        }
    }

    #[allow(unreachable_code)]
    Ok(())
}

epollを利用する場合はメインのイベントループ内で複数のコネクションを扱います. ここではlisten()のソケットと,accept()したソケットの双方をepollで待っていて,どちらかが処理完了になったらそれを処理します. epoll_wait()している間はスレッドはスリープするので,無駄なCPUを利用しません.

ここではepollの適当なラッパーを作成して利用していますが,実際にepollを使う場合はmioが利用できます. mioはLinux以外にもBSDWindowsに対応した,ノンブロッキングネットワーク処理のためのライブラリです.(epollはLinuxの機能で,mioはBSDではkqueue, WindowsではIOCPという機能を利用しています).mioは後述のtokioやasync-stdでも利用されています.

(補足) epollのレベルトリガとエッジトリガ

epollを利用するにあたり,ソケットにO_NONBLCOKを設定する必要は必ずしもありません. O_NONBLOCK属性の有無に関わらずepoll_wait()は登録したdescriptorのいずれかが準備完了になったらそれを通知します.

epoll_wait()にはレベルトリガ(デフォルト)とエッジトリガモードがあります. この違いはイベントの通知がイベント発生時にのみ生じるかどうかです. 例えば,epoll_wait()して2048byteのデータが読み込み可能になったとします. このときread()で1024byteだけ読みだした場合,レベルトリガでは再びepoll_wait()するとまた読み出し可能であることを通知しますが,エッジトリガモードだと通知しません.

エッジトリガモードは通常O_NONBLOCKなdescriptorと合わせて利用します. エッジトリガモードで読み出し可能であることを検出したら,プログラムはEWOULDBLOCKが返ってくるまでread()をしてデータを読みだします.この方が逐一readの度にepoll_wait()する必要がなく,効率的です. readに関して使い方をまとめると,以下のようになると思います.

  • レベルトリガ => read()のたびに逐一epoll_wait()する.さもないとread()がブロックすることがある.
  • レベルトリガ + O_NONBLOCK => epoll_wait()からリターンしたら,EWOULDBLOCKが返るまでread()する.あるいは,逐一epoll_wait()をすることもできる.
  • エッジトリガ => これは使用しない.さもないとデータを取りこぼす可能性がある.
  • エッジトリガ + O_NONBLOCK => epoll_wait()からリターンしたら,EWOULDBLOCKが返るまでread()する.

また,epollの前身であるselectには,もしselectが準備完了を通知しても,実際には準備完了でないという spurious wakeup *1 が生じる可能性がありました. この場合,O_NONBLOCKが指定されていなければread()はブロックすることになります. epollで同様の問題が生じるのかはmanページは何も記述が無いですが,安全のためにはO_NONBLOCKを指定するのが吉かと思います.

epollは複数スレッドから同じepoll fdに対してepoll_wait()することも可能ですが(主にロードバランシング用途),この場合もエッジトリガモードを利用する必要があります. さもなければ,複数のスレッドが同じfdに対してwake upする可能性があります. また,実際にはエッジトリガモード (EPOLLET)を使うだけでは駄目で,他にもいろいろと嵌りどころがあります(参考). マルチスレッドでepoll fdを共有して扱うのは非常に難しいので,避けられるなら避けた方が良いです.

なお,mioはデフォルトはエッジトリガモードで動作するようになっています().

async/await版 echo server

ようやく本題.

epollを利用したecho serverではイベントループ内で各コネクションを処理していました. これを最初に作成したecho serverのように処理できないか? ということで登場するのがasync/awaitです.

async/await + αを利用すると,以下のようなecho serverが作成できます(ソース).

async fn handle_client(stream: TcpStream) -> io::Result<()> {
    let mut buf = [0u8; 1024];
    info!("(handle client) {}", stream.0);
    loop {
        let n = stream.read(&mut buf).await?;
        if n == 0 {
            break;
        }
        stream.write(&buf[..n]).await?;
    }
    Ok(())
}

fn main() {
    init_log();

    let (executor, spawner) = new_executor_and_spawner();
    let spawner_clone = spawner.clone();

    let mainloop = async move {
        let addr = Ipv4Addr::new(127, 0, 0, 1);
        let port = 8080;
        let listner = TcpListener::bind(addr, port)?;

        let incoming = listner.incoming();

        while let Some(stream) = incoming.next().await {
            let stream = stream?;
            spawner.spawn(handle_client(stream));
        }

        Ok(())
    };

    spawner_clone.spawn(mainloop);
    drop(spawner_clone);
    executor.run();
}

以下いくつかポイントを説明します.なお,基本的なところはasync-bookを読むと分かりやすいです.

std::future::Future

rustの非同期処理の肝になるのがstd::future::Futureトレイト(および,std::task::Pollです. これは以下のように定義されます.

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}

Futureはpoll()されると,処理が実行できるのなら処理を実行してPoll::Ready(T)を返し,そうでなければPoll::Pendingをリターンします.pendingのfutureは後で再びpollします. ここでPin<T>は自身がmoveしない/or moveしても問題ないことを保証するためのもで,またContextは後述のfutureのwake upに利用されます.

accept()に関するfutureは以下のように定義できます.

struct AcceptFuture<'a>(&'a TcpListener);

impl<'a> Future for AcceptFuture<'a> {
    type Output = Option<io::Result<TcpStream>>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.0.accept() {
            Ok(stream) => {
                stream.setnonblocking()?;
                Poll::Ready(Some(Ok(stream)))
            }
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                REACTOR.add_event((self.0).0, EpollEventType::In, cx.waker().clone())?;
                Poll::Pending
            }
            Err(e) => Poll::Ready(Some(Err(e))),
        }
    }

ここで,

  • accept()が成功か,あるいはEWOULDBLOCK以外のエラーであれば,それをそのまま返す
  • そうでなければ(EWOULDBLOCKの場合),reactorにfile descriptorの監視を依頼して,Poll::Pendingを返す.

reactorに関しては後述します.

Executor

futureはpoll()しないと何も処理してくれません.futureを実行するものをexecutorと呼びます.

executorをどう作成するか,というのはいろいろと設計の余地がありますが,今回はasync-bookに書かれた方法と同じ手法を利用しています. executorはキューを持っていて,そのキューに入ってきたfutureを順に実行します.

(本当はfuturesクレートは使わない予定でしたが,流石に辛くなったのでBoxFutureとArcWakeは使いました.)

Reactor

ソケットIOがEWOULDBLOCKになったとき,どこかでepoll_wait()してそのソケットを監視する必要があります. このIOの準備状態を監視するものをreactorと呼びます.

今回はreactor用のスレッドを立ち上げて,そこでepoll_wait()しています.これはasync-stdと同様の方法です. なので,このasync/await版のecho serverは実は2スレッドで動作しています. なお,epollは監視対象のdescriptorが何も無い状態でepoll_wait()を開始することができ,epoll_wait()しているスレッド外からepoll_ctl(EPOLL_CTL_ADD)で監視するdescriptorを追加できます.

epoll_wait()から通知を受けたらどうするか? というとこれはもともとpoll()されたときのContextに入っていたwakerを利用して,executorに対して準備完了となったfutureを追加します.これもasync-bookと同様の手法を利用しています.

全体の動作は簡単に図にすると以下のようになります(ちょっと無理して図にしている点があります).

f:id:mm_i:20190929192745p:plain

executorとreactorの設計はこうしなければならないと決まっている訳ではありません. executorとreactorを1スレッドで動作させることも十分ありえます. futureがSendでなければローカルなexecutorが,一つ一つのfutureの計算処理が割と重いのであれば,パフォーマンスを出すにはマルチスレッドなexecutor/reactorが必要になります.

async/await構文

ここまでasync/await構文に触れていないので,結局何なんだという話ですが,async fn () -> R {...}fn () -> impl Future<Output=R> {...}とだいたい同じになります. 要するに,コンパイラstd::future::Futureを実装したstructを作成してくれるということです. また,.awaitを利用すると,async関数内で別のfutureの実行完了を待つことができます. つまり,poll()したときに,.awaitしているfutureがPoll::Pendingを返したらそこで処理が中断され,再びpoll()したらそこから処理が再開されるようになります. これによって,自分でいちいちFutureを実装する必要が無くなります(本当にコアの部分に関しては実装する必要がありますが). 他にも借用権周りで従来できなかった記述ができるようになっているようです.

もう少し具体例をいうと,例えば今回作成したasync fn handle_client(stream: TcpStream) -> io::Result<()>を呼び出して,let client = handle_client(stream);とすると,下で定義したようなstructが得られます(なお,例で示してるだけで,実際にコンパイラが出力したコードではないです).

struct handle_client_future {
    stream: TcpStream,
    buf: [0u8; 1024],
    ...
}

impl Future for handle_client future {
    type Output = io::Result<()>;
    fn poll(...) -> Poll::<Self::Output> {
        ...
    }
}

fn handle_client(stream: TcpStream) -> handle_client_future {
    return handle_client_future { stream: stream, ... }
}

このことがわかると,let client = handle_client(stream);しただけではfutureが生成されるだけなので,誰かがpollしないとfutureが実行されないということが理解できます. futureの実行はexecutorでおこないます.

epollの挙動

async/await版のecho serverと,その前に作成したepoll版echo serverでは,以下の点でepollの使い方が異なります.

  • epoll版
    • コネクション設立時にepoll_ctl(EPOLL_CTL_ADD)でfdを登録
    • epoll_ctl(EPOLL_CTL_MOD)を利用して,in/outの待ちを切り替える
  • async/await版
    • read/writeの度に逐一 epoll_ctl(EPOLL_CTL_ADD) / epoll_ctl(EPOLL_CTL_DEL)を実行

async/await版の方が2倍多くシステムコールが発行されることになります. これはread().awaitwrite().awaitごとに独立したfutureが生成されることに起因しています. これを減らすには構成を変更する必要がありますが,ちょっと工夫が必要な気がします.

async/awaitの実際

async/awaitに対応したライブラリが続々出ています.通常はこれらを利用することになると思います.

例えば,async-stdというrustのstdの非同期版を目指すライブラリを利用すればecho serverは以下のように書けます(examples/tcp-echo.rs).

use async_std::io;
use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*;
use async_std::task;

async fn process(stream: TcpStream) -> io::Result<()> {
    println!("Accepted from: {}", stream.peer_addr()?);

    let (reader, writer) = &mut (&stream, &stream);
    io::copy(reader, writer).await?;

    Ok(())
}

fn main() -> io::Result<()> {
    task::block_on(async {
        let listener = TcpListener::bind("127.0.0.1:8080").await?;
        println!("Listening on {}", listener.local_addr()?);

        let mut incoming = listener.incoming();

        while let Some(stream) = incoming.next().await {
            let stream = stream?;
            task::spawn(async {
                process(stream).await.unwrap();
            });
        }
        Ok(())
    })
}

他の非同期処理ライブラリとしてはかの有名なtokio*2やもうちょっと上位の(?)runtimeno_std向けのembrioなどがあります.

まとめ

rustのasync/await構文は非同期処理を簡潔に記述する上で非常に重要な一方で,それだけでは非同期処理は実現できません. async/awaitをつければ勝手に非同期化されるような魔法のキーワードでもありません.

実際に利用する際は何かライブラリ(ランタイム)が必要になります. そして,ランタイムのアルゴリズム(executorやreactorの動作)はライブラリが柔軟に選択できるような設計になっています. 逆にいえば,アプリケーションのコードは多少なりともライブラリに依存する形になります.

個人的にasync/awaitの利点は,パフォーマンス的な側面もありますが,これまでのイベントループやコールバックで書いていた処理が分かりやすく書くことができるようなるというのが大きいと思っています. 一方で,rustは言語自体にはランタイムを(ほぼ)持たないという選択をしているため,現状async/await実行のためのランタイムライブラリが乱立状態にあり,あまりまとまっていない印象を受けます. 例えばtokioとasync-stdが共存できるのか,ぱっと見ただけでは分かりません(簡単にはできないような気がします). またどんなときに何を使うべきかというのもはっきりしていないように思います. このあたりは今後rust 1.39が出てから徐々に解決されていくのかなと期待します.

async/awaitに見せかけたepollの文章でした

参考文献

*1:Under Linux, select() may report a socket file descriptor as "ready for reading", while nevertheless a subsequent read blocks. This could for example happen when data has arrived but upon examination has wrong checksum and is discarded. There may be other circumstances in which a file descriptor is spuriously reported as ready. Thus it may be safer to use O_NONBLOCK on sockets that should not block. - http://man7.org/linux/man-pages/man2/select.2.html

*2:ちなみに読み方はトキオじゃなくてトーキョーっぽい