Rust: tokioを使って独自プロトコルのメッセージを受け取るTCPサーバーを作る

オンラインゲーム開発などで独自プロトコルのデータを読み書きするTCPサーバーを作ることが時々あるが、 Rustのtokio を使うとそのようなTCPサーバーがとても簡潔に記述できる。

独自プロトコルといっても様々だが、TCPのようなストリーム指向の経路の場合ある程度のバイト配列のかたまりをひとつのフレームとして解釈することが多い。よってストリーム内で個々のフレームを区切る必要がある。 よく使われるやり方だとたとえば次の2つがある。

  • 各フレームの先頭に長さの情報を付ける
  • 特定の値を区切り文字とする

今回は1番目の「各フレームの先頭に長さの情報を付ける」パターンを例に考える。 たとえば先頭4バイトに長さがあり、その後にその長さ分のデータが入っているというプロトコルだとする。

+---- len: u32 ----+---- data ----+
| \x00\x00\x00\x05 |  hello       |
+------------------+--------------+

TCPによるソケットプログラミング経験者ならわかると思うが、readを一度呼び出してもフレーム1個分のデータすべてが取得できるとは限らない。フレームの一部しか届かない場合もある。よって次のようなケースが普通にあり得る。

  • readしたデータがフレーム1つ分よりも小さい
  • readしたデータに次のフレームの情報も一緒に入っている

こういったケースを考慮しつつフレームを解釈するTCPサーバーを実装するのはなかなかに複雑になってしまう。 やりたいことは特定プロトコルのフレームを順番に処理したいだけなのに…。

そこで、Rustの有名な非同期ランタイムであるtokio はそういったストリームから特定のフレームをきれいに読み取ってくれる機能を提供している。

まず最初に完成形のコードを全部見せておく。 tokioが提供する3つのクレートを入れ、mainのコードはたったこれだけ! (エラー処理は unwrap() でサボっているが)

# Cargo.toml
[dependencies]
tokio = { version = "1.16.1", features = ["full"] }
tokio-util = { version = "0.7.0", features = ["full"]}
tokio-stream = "0.1"
use tokio::net::TcpListener;
use tokio_stream::StreamExt;
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:12345").await.unwrap();
    loop {
        let (client, _) = listener.accept().await.unwrap();
        let mut frame_reader = FramedRead::new(client, LengthDelimitedCodec::new());
        while let Some(frame) = frame_reader.next().await {
            match frame {
                Ok(data) => println!("received: {:?}", data),
                Err(err) => eprintln!("error: {:?}", err),
            }
        }
    }
}

このmainを実行して、次のように先頭に4byteの長さと後続するデータをTCPで送信すると、received: b"abc" がちゃんと表示される。

# pythonからTCP経由でフレームを送ってみる
$ python3
>>> import socket
>>> s = socket.socket()
>>> s.connect(("127.0.0.1", 12345))
>>> s.send(b"\x00\x00\x00\x03abc")
7
# rust TCP server側の出力
received: b"abc"

確かに短いコードで実現できた!……が、隠蔽されすぎていて何がどうなっているのかわかりづらいかもしれない。 そこで、tokioはどのように簡潔な独自プロトコルのRead/Writeの仕組みを用意しているのか、表層だけさらっと紹介する。

Codec

tokioの関連クレート tokio_util には Codec というものが用意されている。 これはまさにバイナリからなる独自プロトコルを扱うための仕組みである。

Codecは EncoderDecoder という2つのTraitが用意されていて、それぞれを実装することでバイナリとフレームの相互変換が可能になる。 また、前述した「各フレームの先頭に長さの情報を付ける」パターンは LengthDelimitedCodec という名前で最初から tokio_util が提供している。今回のコード例でもこの LengthDelimitedCodec をそのまま使った。 もちろん独自の Encoder, Decoder を作ることも可能である。

tokio_util::codec::length_delimited - Rust

モジュール自体のドキュメントが実例も含めて充実しているので具体的な作り方などは以下を参照するとよい。

https://docs.rs/tokio-util/0.7.0/tokio_util/codec/index.html

Framed

前述した Codec はバイナリとフレームの相互変換を提供したが、それをTCPソケットと組み合わせるには非同期I/O に対応する必要がある。その組み合わせを提供するのがこの Framed という構造体である。

Framed は StreamSink を実装しているので、非同期的な読み書きができる。 特に Stream は非同期版Iteratorといった感じなので、StreamExt trait とawaitを組み合わせると単純な while ループで順番に読み取りができる。

while let Some(frame) = frame_reader.next().await {
    match frame {
        Ok(data) => println!("received: {:?}", data),
        Err(err) => eprintln!("error: {:?}", err),
    }
}

Framed 自体は、対象となる非同期I/Oオブジェクト(たとえばTcpStream)と、変換に使うコーデックを渡せば作れる。 また、今回の場合は read/write のうち read のみだったので read専用の FramedRead を利用した。

let mut frame_reader = FramedRead::new(client, LengthDelimitedCodec::new());

参考資料