wasi-async-runtimeを眺める
2024-04-29
先日async-std
の作者の yoshuawuyts 氏がDESIGNING AN ASYNC RUNTIME FOR WASI 0.2
というブログ記事とwasi-async-runtime
という crate を公開しており、これが面白かったのでメモを残しておく。
wasi-async-runtime
はproduction
で使ってくれ。というよりはtokio
やsmol
などが今後wasi-io
をサポートするときに参考となることを念頭においているようだ。
目次
wasi-io
について
以前はwasi-poll
という名前だったが、現在はwasi-io
となっている。
記事にはio-uring
とwindows
がcompletion-based
なので 0.3 でcompletion-based
可能性が示唆されているが 0.2 ではreadiness-based
となっている。
wit
は以下のようなシンプルなものになっている。 pollable
はready
とblock
が生えており、poll
にはpollable
をlist
で渡すようだ。
interface poll {
resource pollable {
ready: func() -> bool;
block: func();
}
poll: func(in: list<borrow<pollable>>) -> list<u32>;
}
wasi-http
の response をpoll
で待つ
記事中にwasi-http
の response をpoll
で待つ例が記載されていた。 これはwasi-http
のwit
を見るとよりわかりやすい。
まずhandle
の interface は以下のようになっている。
handle: func(
request: outgoing-request,
options: option<request-options>
) -> result<future-incoming-response, error-code>;
そしてfuture-incoming-response
は以下のようになっている。 future-incoming-response
にはsubscribe
が生えており、pollable
を返す。これをpoll
にわたすとready
になるまで待つようだ。
その後get
によって値を取得する。get
の返り値はoption
になっており、ready
ではない場合はnone
を返しそうだ。 このあたり。
resource future-incoming-response {
/// Returns a pollable which becomes ready when either the Response has
/// been received, or an error has occured. When this pollable is ready,
/// the `get` method will return `some`.
subscribe: func() -> pollable;
/// Returns the incoming HTTP Response, or an error, once one is ready.
///
/// The outer `option` represents future readiness. Users can wait on this
/// `option` to become `some` using the `subscribe` method.
///
/// The outer `result` is used to retrieve the response or error at most
/// once. It will be success on the first call in which the outer option
/// is `some`, and error on subsequent calls.
///
/// The inner `result` represents that either the incoming HTTP Response
/// status and headers have recieved successfully, or that an error
/// occured. Errors may also occur while consuming the response body,
/// but those will be reported by the `incoming-body` and its
/// `output-stream` child.
get: func() -> option<result<result<incoming-response, error-code>>>;
}
上記のような interface となっているので記事中にある以下のコードを書くことで動作するようだ。
このあたりwit
を眺めていてもいまいちぴんと来なかったので、今回の記事を読むことで解像度が上がったように思う。
fn main() {
// Construct an HTTP request
let fields = Fields::new();
let req = OutgoingRequest::new(fields);
req.set_method(&Method::Get).unwrap();
req.set_scheme(Some(&Scheme::Https)).unwrap();
req.set_path_with_query(Some("/")).unwrap();
req.set_authority(Some("example.com")).unwrap();
// Send the request and wait for it to complete
let res = handle(req, None).unwrap(); // 1. We're ready to send the request over the network
let pollable = res.subscribe(); // 2. We obtain the `Pollable` from the response future
poll::poll(&[&pollable]); // 3. Block until we're ready to look at the response
// We're now ready to try and access the response headers. If
// the request was unsuccessful we might still get an error here,
// but we won't get an error that we tried to read data before the
// operation was completed.
let res = res.get().unwrap().unwrap().unwrap();
for (name, _) in res.headers().entries() {
println!("header: {name}");
}
}
wasi-http-client
とblock_until
ここまででPollable
の扱い方のイメージは分かった。
これをベースに以下のようにasync/await
で request を送れるようにするには runtime に対応した client を用意する必要がある。
これはwasi-http-client
というcrate
が用意してある。
fn main() {
block_on(|reactor| async {
let client = Client::new(reactor);
let a = async {
let url = "https://example.com".parse().unwrap();
let req = Request::new(Method::Get, url);
let res = client.send(req).await;
// ...
};
}
client
は以下のようになっている。 new
で受け取ったReactor
のwait_for
でres.subscribe
から返ってきたPollable
を待っているだけのシンプルなものだ。
impl Client {
/// Create a new instance of `Client`
pub fn new(reactor: Reactor) -> Self {
Self { reactor }
}
/// Send an HTTP request.
pub async fn send(&self, req: Request) -> Result<Response> {
let wasi_req = req.into();
let res = wasi::http::outgoing_handler::handle(wasi_req, None).unwrap();
self.reactor.wait_for(res.subscribe()).await;
// NOTE: the first `unwrap` is to ensure readiness, the second `unwrap`
// is to trap if we try and get the response more than once. The final
// `?` is go raise the actual error if there is one.
let res = res.get().unwrap().unwrap()?;
Ok(Response::try_from_incoming(res, self.reactor.clone())?)
}
}
wait_for
は記事にあるように以下のようになっている.poller
はSlab<Pollable>
でPollable
を管理しているのでSlab
のindex
をEventKey
として、Waker
と紐付けつつ、Pollable
のready()
に応じてReady
、Pending
を返すようだ。
impl Reactor {
/// Wait for the pollable to resolve.
pub async fn wait_for(&self, pollable: Pollable) {
let mut pollable = Some(pollable);
let mut key = None;
// This function is the core loop of our function; it will be called
// multiple times as the future is resolving.
future::poll_fn(|cx| {
// Start by taking a lock on the reactor. This is single-threaded
// and short-lived, so it will never be contended.
let mut reactor = self.inner.borrow_mut();
// Schedule interest in the `pollable` on the first iteration. On
// every iteration, register the waker with the reactor.
let key = key.get_or_insert_with(|| reactor.poller.insert(pollable.take().unwrap()));
reactor.wakers.insert(*key, cx.waker().clone());
// Check whether we're ready or need to keep waiting. If we're
// ready, we clean up after ourselves.
if reactor.poller.get(key).unwrap().ready() {
reactor.poller.remove(*key);
reactor.wakers.remove(key);
Poll::Ready(())
} else {
Poll::Pending
}
})
.await
}
}
ここでinsert
されたPollable
がどこでpoll
されるかというとReactor
のblock_until
を経由してPoller
のblock_until
で以下のように行われている。
未完了のPollable
をwasi-io::poll::poll
でpoll
し、ready
になったものをVec<EventKey>
で返す。
pub(crate) fn block_until(&mut self) -> Vec<EventKey> {
let mut indexes = Vec::with_capacity(self.targets.len());
let mut targets = Vec::with_capacity(self.targets.len());
for (index, target) in self.targets.iter() {
indexes.push(index);
targets.push(target);
}
let ready_indexes = poll(&targets);
ready_indexes
.into_iter()
.map(|index| EventKey(indexes[index as usize] as u32))
.collect()
}
そのVec<EventKey>
を受け取ったReactor
のblock_until
でwake_by_ref
している。ちなみに、このruntime
ではno-op waker
を使用しているのでwake_by_ref
しても何も起こらないが、他の実装も考慮して作法通りwake_by_ref
しているようだ。
pub(crate) fn block_until(&self) {
let mut reactor = self.inner.borrow_mut();
for key in reactor.poller.block_until() {
match reactor.wakers.get(&key) {
Some(waker) => waker.wake_by_ref(),
None => panic!("tried to wake the waker for non-existent `{key:?}`"),
}
}
}
その後 top level のblock_on
において再度poll
され、処理が進んでいくという流れだ。
まとめ
wasi-async-runtime
は非常にシンプルなのでwasi-io
はもちろん非同期 runtime を理解するのにも良いサンプルだと感じた。
今の仕様で実現可能なのかわからないけどwasi-threads
と合わせてm:n
モデルの runtime を作ってみるのも面白いかもしれない。
以上。