Skip to content
On this page

wasi-async-runtimeを眺める

2024-04-29

先日async-stdのcommitterの yoshuawuyts 氏がDESIGNING AN ASYNC RUNTIME FOR WASI 0.2というブログ記事とwasi-async-runtimeという crate を公開しており、これが面白かったのでメモを残しておく。

wasi-async-runtimeproductionで使ってくれ。というよりはtokiosmolなどが今後wasi-ioをサポートするときに参考となることを念頭においているようだ。

目次

wasi-ioについて

以前はwasi-pollという名前だったが、現在はwasi-ioとなっている。

記事にはio-uringwindowscompletion-basedなので 0.3 でcompletion-basedに統一される可能性が示唆されているが 0.2 ではreadiness-basedとなっている。

witは以下のようなシンプルなものになっている。 pollablereadyblockが生えており、pollにはpollablelistで渡すようだ。

TypeScript
interface poll {
    resource pollable {
      ready: func() -> bool;
      block: func();
    }
    poll: func(in: list<borrow<pollable>>) -> list<u32>;
}

wasi-httpの response をpollで待つ

記事中にwasi-httpの response をpollで待つ例が記載されていた。 これはwasi-httpwitを見るとよりわかりやすい。

まずhandleの interface は以下のようになっている。

TypeScript
handle: func(
  request: outgoing-request,
  options: option<request-options>
) -> result<future-incoming-response, error-code>;

そしてfuture-incoming-response以下のようになっている。 future-incoming-responseにはsubscribeが生えており、pollableを返す。これをpollにわたすとreadyになるまで待つようだ。

その後getによって値を取得する。getの返り値はoptionになっており、readyではない場合はnoneを返しそうだ。 このあたり。

TypeScript
  resource future-incoming-response {
    /// Returns a pollable which becomes ready when either the Response has
    /// been received, or an error has occured. When this pollable is ready,
    /// the `get` method will return `some`.
    subscribe: func() -> pollable;

    /// Returns the incoming HTTP Response, or an error, once one is ready.
    ///
    /// The outer `option` represents future readiness. Users can wait on this
    /// `option` to become `some` using the `subscribe` method.
    ///
    /// The outer `result` is used to retrieve the response or error at most
    /// once. It will be success on the first call in which the outer option
    /// is `some`, and error on subsequent calls.
    ///
    /// The inner `result` represents that either the incoming HTTP Response
    /// status and headers have recieved successfully, or that an error
    /// occured. Errors may also occur while consuming the response body,
    /// but those will be reported by the `incoming-body` and its
    /// `output-stream` child.
    get: func() -> option<result<result<incoming-response, error-code>>>;
  }

上記のような interface となっているので記事中にある以下のコードを書くことで動作するようだ。

このあたりwitを眺めていてもいまいちぴんと来なかったので、今回の記事を読むことで解像度が上がったように思う。

rust
fn main() {
    // Construct an HTTP request
    let fields = Fields::new();
    let req = OutgoingRequest::new(fields);
    req.set_method(&Method::Get).unwrap();
    req.set_scheme(Some(&Scheme::Https)).unwrap();
    req.set_path_with_query(Some("/")).unwrap();
    req.set_authority(Some("example.com")).unwrap();

    // Send the request and wait for it to complete
    let res = handle(req, None).unwrap();  // 1. We're ready to send the request over the network
    let pollable = res.subscribe();        // 2. We obtain the `Pollable` from the response future
    poll::poll(&[&pollable]);              // 3. Block until we're ready to look at the response

    // We're now ready to try and access the response headers. If
    // the request was unsuccessful we might still get an error here,
    // but we won't get an error that we tried to read data before the
    // operation was completed.
    let res = res.get().unwrap().unwrap().unwrap();
    for (name, _) in res.headers().entries() {
        println!("header: {name}");
    }
}

wasi-http-clientblock_until

ここまででPollableの扱い方のイメージは分かった。
これをベースに以下のようにasync/awaitで request を送れるようにするには runtime に対応した client を用意する必要がある。

これはwasi-http-clientというcrateが用意してある。

rust
fn main() {
    block_on(|reactor| async {
        let client = Client::new(reactor);

        let a = async {
            let url = "https://example.com".parse().unwrap();
            let req = Request::new(Method::Get, url);
            let res = client.send(req).await;
            // ...
        };
}

clientは以下のようになっている。 newで受け取ったReactorwait_forres.subscribeから返ってきたPollableを待っているだけのシンプルなものだ。

rust
impl Client {
    /// Create a new instance of `Client`
    pub fn new(reactor: Reactor) -> Self {
        Self { reactor }
    }

    /// Send an HTTP request.
    pub async fn send(&self, req: Request) -> Result<Response> {
        let wasi_req = req.into();
        let res = wasi::http::outgoing_handler::handle(wasi_req, None).unwrap();
        self.reactor.wait_for(res.subscribe()).await;

        // NOTE: the first `unwrap` is to ensure readiness, the second `unwrap`
        // is to trap if we try and get the response more than once. The final
        // `?` is go raise the actual error if there is one.
        let res = res.get().unwrap().unwrap()?;
        Ok(Response::try_from_incoming(res, self.reactor.clone())?)
    }
}

wait_forは記事にあるように以下のようになっている.
pollerSlab<Pollable>Pollableを管理しているのでSlabindexEventKeyとして、Wakerと紐付けつつ、Pollableready()に応じてReadyPendingを返すようだ。

rust
impl Reactor {
    /// Wait for the pollable to resolve.
    pub async fn wait_for(&self, pollable: Pollable) {
        let mut pollable = Some(pollable);
        let mut key = None;

        // This function is the core loop of our function; it will be called
        // multiple times as the future is resolving.
        future::poll_fn(|cx| {
            // Start by taking a lock on the reactor. This is single-threaded
            // and short-lived, so it will never be contended.
            let mut reactor = self.inner.borrow_mut();

            // Schedule interest in the `pollable` on the first iteration. On
            // every iteration, register the waker with the reactor.
            let key = key.get_or_insert_with(|| reactor.poller.insert(pollable.take().unwrap()));
            reactor.wakers.insert(*key, cx.waker().clone());

            // Check whether we're ready or need to keep waiting. If we're
            // ready, we clean up after ourselves.
            if reactor.poller.get(key).unwrap().ready() {
                reactor.poller.remove(*key);
                reactor.wakers.remove(key);
                Poll::Ready(())
            } else {
                Poll::Pending
            }
        })
        .await
    }
}

ここでinsertされたPollableがどこでpollされるかというとReactorblock_untilを経由してPollerblock_untilで以下のように行われている。

未完了のPollablewasi-io::poll::pollpollし、readyになったものをVec<EventKey>で返す。

rust
pub(crate) fn block_until(&mut self) -> Vec<EventKey> {
    let mut indexes = Vec::with_capacity(self.targets.len());
    let mut targets = Vec::with_capacity(self.targets.len());
    for (index, target) in self.targets.iter() {
        indexes.push(index);
        targets.push(target);
    }
    let ready_indexes = poll(&targets);
    ready_indexes
        .into_iter()
        .map(|index| EventKey(indexes[index as usize] as u32))
        .collect()
}

そのVec<EventKey>を受け取ったReactorblock_untilwake_by_refしている。ちなみに、このruntimeではno-op wakerを使用しているのでwake_by_refしても何も起こらないが、他の実装も考慮して作法通りwake_by_refしているようだ。

rust
pub(crate) fn block_until(&self) {
    let mut reactor = self.inner.borrow_mut();
    for key in reactor.poller.block_until() {
        match reactor.wakers.get(&key) {
            Some(waker) => waker.wake_by_ref(),
            None => panic!("tried to wake the waker for non-existent `{key:?}`"),
        }
    }
}

その後 top level のblock_onにおいて再度pollされ、処理が進んでいくという流れだ。

まとめ

wasi-async-runtimeは非常にシンプルなのでwasi-ioはもちろん非同期 runtime を理解するのにも良いサンプルだと感じた。
今の仕様で実現可能なのかわからないけどwasi-threadsと合わせてm:nモデルの runtime を作ってみるのも面白いかもしれない。

以上。