Skip to content
On this page

wasm32-wasip1-threadsでrayonを使ったコードをNode.jsで動かす

2024-09-17

rayonを使用したコードをwasm32-wasip1-threadsbuildし、Node.jsで使用したいケースがあり、色々試した結果を記載しておく。
以前wasi-threads の概要と ComponentModel での利用の現状とその先で簡単なrayonを使ったサンプルは用意していたが、いざ実際に使い始めるといくつかの問題に遭遇したため。

目次

前提

  • rustc --version rustc 1.82.0-nightly (0d634185d 2024-08-29)
  • node --version v20.17.0
  • wasmtime --version wasmtime-cli 20.0.1 (d1feb032c 2024-05-03)

Repository

bokuweb/wasip1-threads-rayon-example
waspi-threads example
JavaScript

wasm32-wasip1-threadsとは

wasm32-wasip1-threadswasm32-wasi-preview1-threadswasm32-wasi-threadsが rename されたもの。tier2。 以下のように build することでwasi-threadsが利用できる。

sh
cargo build --release --target=wasm32-wasip1-threads

成果物のwasmを覗いてみると以下のようにthread-spawnが要求されていることがわかる。

lisp
(module $main.wasm
  (type (;0;) (func))
  ... 省略
  (import "wasi_snapshot_preview1" "environ_get" (func $__imported_wasi_snapshot_preview1_environ_get (type 2)))
  (import "wasi_snapshot_preview1" "environ_sizes_get" (func $__imported_wasi_snapshot_preview1_environ_sizes_get (type 2)))
  (import "wasi_snapshot_preview1" "clock_time_get" (func $__imported_wasi_snapshot_preview1_clock_time_get (type 8)))
  (import "wasi_snapshot_preview1" "proc_exit" (func $__imported_wasi_snapshot_preview1_proc_exit (type 1)))
  (import "wasi_snapshot_preview1" "sched_yield" (func $__imported_wasi_snapshot_preview1_sched_yield (type 9)))
  (import "wasi" "thread-spawn" (func $__imported_wasi_thread_spawn (type 4)))  
  ... 省略

たとえば以下のようなコードをbuildしてwasmtimeで実行すると動作が確認できる。

rust
use rayon::{prelude::*, ThreadPoolBuilder};

fn sum_of_squares(input: &[i32]) -> i32 {
    let pool = ThreadPoolBuilder::new().num_threads(4).build().unwrap();
    pool.install(|| input.par_iter().map(|&i| i * i).sum())
}

fn main() {
    let x = sum_of_squares(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
    println!("{x}");
}

wasmtimeで実行

sh
cargo build --release --target=wasm32-wasip1-threads
wasmtime --wasm-features=threads --wasi-modules=experimental-wasi-threads ./target/wasm32-wasip1-threads/release/wasp1-threads-rayon-example.wasm

285

thread-spwanについて

wasi-threadsを利用するにはthread-spawnwasmへのimportObject追加する必要がある。 詳細はwasi-threads の概要と ComponentModel での利用の現状とその先にも書いたが、簡単に書くとwasm側からthread-spawnが呼ばれたらthreadspawnし、spawnされたthreadではwasmから公開されているwasi_thread_startを呼べばよい。詳細はここを見るとよい。

簡略化するとNode.jsであればmain側では以下のようにthread-spawnの呼び出しでWorkerを作成し、引数やSharedArrayBufferなどを送信。

js
let nextId = 1;
const opts = { initial: 17, maximum: 20, shared: true };
const memory = new WebAssembly.Memory(opts);
let instance = await WebAssembly.instantiate(wasm, {
  ...new WASI().getImportObject(),
  wasi: {
    "thread-spawn": (startArg) => {
      const worker = new Worker("./worker.js");
      const tid = nextTid++;
      worker.postMessage({ startArg, tid, memory });
      return tid;
    },
  },
  env: { memory },
});

Worker側はメッセージをうけてwasmを受け取ったSharedArrayBufferとともにinstantiateし、wasi_thread_startthreadIdthread-spawnで受け取った値を渡し、処理を開始する。

js
const handler = async ({ startArg, tid, memory }) => {
  const wasm = await WebAssembly.compile(await file);
  let instance = await WebAssembly.instantiate(wasm, { ... });
  instance.exports.wasi_thread_start(tid, startArg);
};
parentPort.addListener("message", handler);

これで冒頭のrayonを使ったコードは動作する。ここまでがおさらい。
しかし、実際のusecaseにおいて、いくつかの問題に直面したため、記録しておく。

child threadで投げられた例外が補足できずデッドロックする

冒頭のコードを以下のように変更するとデッドロックする。

rust
fn sum_of_squares(input: &[i32]) -> i32 {
    let pool = ThreadPoolBuilder::new().num_threads(4).build().unwrap();
    pool.install(|| input.par_iter().map(|&i| {
      panic!("💀")
    }).sum())
}

これはmain threadwasi.startからぬけてこないのでWorkerからのErrorを補足するフェーズまで進まないからだと思われる。 そのため自分は以下のようにmain threadWorkerからのmessageを受けてthread-spawnとエラー処理だけを行うような構成とした。

JavaScript
const { Worker } = require("node:worker_threads");

const worker = new Worker("./entry.js");

let nextTid = 1;
const workers = [worker];

const spawn = (startArg, threadId, memory) => {
  const worker = new Worker("./worker.js");
  workers.push(worker);

  worker.on("message", ({ cmd, startArg, threadId, memory }) => {
    // ...ommited
    if (cmd === "thread-spawn") {
      spawn(startArg, threadId, memory);
    }
  });

  worker.on("error", (e) => {
    workers.forEach((w) => w.terminate());
    throw new Error(e);
  });

  const tid = nextTid++;
  if (threadId) {
    Atomics.store(threadId, 0, tid);
    Atomics.notify(threadId, 0);
  }
  worker.postMessage({ startArg, tid, memory });
  return tid;
};

worker.on("message", ({ cmd, startArg, threadId, memory }) => {
  // ...ommited
  if (cmd === "thread-spawn") {
    spawn(startArg, threadId, memory);
  }
});

worker.on("error", (err) => {
  workers.forEach((w) => w.terminate());
  throw new Error(err);
});

それに伴い以下のようにwasi.startWorkerで呼ぶようにし、wasm側からthread-spawnが呼ばれた場合はmain thread側にspawnを依頼するメッセージを返すように変更した。

JavaScript
const { readFile } = require("node:fs/promises");
const fs = require("node:fs");
const { WASI } = require("@tybys/wasm-util");
const { argv, env } = require("node:process");
const { join } = require("node:path");
const { parentPort } = require("node:worker_threads");

const wasi = new WASI({ ... });

const imports = wasi.getImportObject();
const file = readFile(YOUR_WASM);

(async () => {
  try {
    const wasm = await WebAssembly.compile(await file);
    const opts = { initial: 17, maximum: 200, shared: true };
    const memory = new WebAssembly.Memory(opts);
    let instance = await WebAssembly.instantiate(wasm, {
      ...imports,
      wasi: {
        "thread-spawn": (startArg) => {
          const threadIdBuffer = new SharedArrayBuffer(4);
          const id = new Int32Array(threadIdBuffer);
          Atomics.store(id, 0, -1);
          parentPort.postMessage({
            cmd: "thread-spawn",
            startArg,
            threadId: id,
            memory,
          });
          Atomics.wait(id, 0, -1);
          const tid = Atomics.load(id, 0);
          return tid;
        },
      },
      env: { memory },
    });
    wasi.start(instance);
    parentPort.postMessage({ cmd: "complete" });
  } catch (e) {
    throw e;
  }
})();

これでchild thread側から例外が投げられた場合もハンドルできるようになった。

JSからメモリが使用できずエラーになる

次に冒頭のコードを以下のように変更するとエラーが発生した。

rust
fn sum_of_squares(input: &[i32]) -> i32 {
    let pool = ThreadPoolBuilder::new().num_threads(4).build().unwrap();
    pool.install(|| input.par_iter().map(|&i| {
      dbg!(i); // <- 追加
      i * i
    }).sum())
}

これはWASI initialization in child thread #4102という(閉じられているが)issueがありwasi.start()を呼ばないと初期化が完了しない。しかしwasi-threadsにおいてchild threadではwasi.startでなくwasi_thread_startが呼ばれるべきだし、wasi.startを読んだところでエラーになる。という話のようだ。

なのでdbg!によってfd.writeを使おうとするがメモリ周りの初期化が終わっておらずJSから触ることができずエラーとなるようだ。

このissueの作成者の@toyobayashiさんは以下のようなProxyを用意して対処されていた。しかも質問したところPRまでいただけた。Special thanks @toyobayashi

toyobayashi/emnapi
Node-API implementation for Emscripten, wasi-sdk, clang wasm32 and napi-rs
C

どうやらwasi.startを呼んで初期化周りを実行させつつも、Proxyによってwasi._startは何もせずに抜けてくる。という戦略のようだ。

TypeScript
export const kIsProxy = Symbol('kIsProxy')

/** @public */
export function createInstanceProxy (
  instance: WebAssembly.Instance,
  memory?: WebAssembly.Memory | (() => WebAssembly.Memory)
): WebAssembly.Instance {
  if ((instance as any)[kIsProxy]) return instance

  // https://github.com/nodejs/help/issues/4102
  const originalExports = instance.exports
  const createHandler = function (target: WebAssembly.Exports): ProxyHandler<WebAssembly.Exports> {
    const handlers = [
      'apply',
      'construct',
      'defineProperty',
      'deleteProperty',
      'get',
      'getOwnPropertyDescriptor',
      'getPrototypeOf',
      'has',
      'isExtensible',
      'ownKeys',
      'preventExtensions',
      'set',
      'setPrototypeOf'
    ]
    const handler: ProxyHandler<WebAssembly.Exports> = {}
    for (let i = 0; i < handlers.length; i++) {
      const name = handlers[i] as keyof ProxyHandler<WebAssembly.Exports>
      handler[name] = function () {
        const args = Array.prototype.slice.call(arguments, 1)
        args.unshift(target)
        return (Reflect[name] as any).apply(Reflect, args)
      }
    }
    return handler
  }
  const handler = createHandler(originalExports)
  const _initialize = (): void => {}
  const _start = (): number => 0
  handler.get = function (_target, p, receiver) {
    if (p === 'memory') {
      return (typeof memory === 'function' ? memory() : memory) ?? Reflect.get(originalExports, p, receiver)
    }
    if (p === '_initialize') {
      return p in originalExports ? _initialize : undefined
    }
    if (p === '_start') {
      return p in originalExports ? _start : undefined
    }
    return Reflect.get(originalExports, p, receiver)
  }
  handler.has = function (_target, p) {
    if (p === 'memory') return true
    return Reflect.has(originalExports, p)
  }
  const exportsProxy = new Proxy(Object.create(null), handler)
  return new Proxy(instance, {
    get (target, p, receiver) {
      if (p === 'exports') {
        return exportsProxy
      }
      if (p === kIsProxy) {
        return true
      }
      return Reflect.get(target, p, receiver)
    }
  })
}

上記を踏まえ、以下のようにinstanceProxyをかぶせ、wasi.startを実行する。その後、kStartedフラグをこっそりもとに戻し、instance.exports.wasi_thread_startを呼ぶことでこの問題は解決した。

JavaScript
const { createInstanceProxy } = require("./proxy.js");
instance = createInstanceProxy(instance, memory);
wasi.start(instance);
try {
  const symbols = Object.getOwnPropertySymbols(wasi);
  const selectDescription = (description) => (s) => {
    if (s.description) {
      return s.description === description;
    }
    return s.toString() === `Symbol(${description})`;
  };
  if (Array.isArray(description)) {
    return description.map((d) => symbols.filter(selectDescription(d))[0]);
  }
  const kStarted = symbols.filter(selectDescription("kStarted"))[0];
  wasi[kStarted] = false;
} catch (_) {}
instance.exports.wasi_thread_start(tid, startArg);

まとめ

前述の内容を適用することで実際のusecaseにおいても動き始めた。
分かってみるとなるほどという感じだが、この2つの問題が同時に発生しているときはかなり混乱した。

今回はNode.jsを前提としたがBrowserでも動きそう気配がある。SharedArrayBufferのためにCOOP/COEPなどIsorationの対応が面倒だが。

以上。