wasm32-wasip1-threadsでrayonを使ったコードをNode.jsで動かす
2024-09-17
rayon
を使用したコードをwasm32-wasip1-threads
でbuild
し、Node.js
で使用したいケースがあり、色々試した結果を記載しておく。
以前wasi-threads の概要と ComponentModel での利用の現状とその先で簡単なrayon
を使ったサンプルは用意していたが、いざ実際に使い始めるといくつかの問題に遭遇したため。
目次
前提
rustc --version rustc 1.82.0-nightly (0d634185d 2024-08-29)
node --version v20.17.0
wasmtime --version wasmtime-cli 20.0.1 (d1feb032c 2024-05-03)
Repository
wasm32-wasip1-threads
とは
wasm32-wasip1-threads
はwasm32-wasi-preview1-threads
やwasm32-wasi-threads
が rename されたもの。tier2
。 以下のように build することでwasi-threads
が利用できる。
cargo build --release --target=wasm32-wasip1-threads
成果物のwasm
を覗いてみると以下のようにthread-spawn
が要求されていることがわかる。
(module $main.wasm
(type (;0;) (func))
... 省略
(import "wasi_snapshot_preview1" "environ_get" (func $__imported_wasi_snapshot_preview1_environ_get (type 2)))
(import "wasi_snapshot_preview1" "environ_sizes_get" (func $__imported_wasi_snapshot_preview1_environ_sizes_get (type 2)))
(import "wasi_snapshot_preview1" "clock_time_get" (func $__imported_wasi_snapshot_preview1_clock_time_get (type 8)))
(import "wasi_snapshot_preview1" "proc_exit" (func $__imported_wasi_snapshot_preview1_proc_exit (type 1)))
(import "wasi_snapshot_preview1" "sched_yield" (func $__imported_wasi_snapshot_preview1_sched_yield (type 9)))
(import "wasi" "thread-spawn" (func $__imported_wasi_thread_spawn (type 4)))
... 省略
たとえば以下のようなコードをbuild
してwasmtime
で実行すると動作が確認できる。
use rayon::{prelude::*, ThreadPoolBuilder};
fn sum_of_squares(input: &[i32]) -> i32 {
let pool = ThreadPoolBuilder::new().num_threads(4).build().unwrap();
pool.install(|| input.par_iter().map(|&i| i * i).sum())
}
fn main() {
let x = sum_of_squares(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
println!("{x}");
}
wasmtime
で実行
cargo build --release --target=wasm32-wasip1-threads
wasmtime --wasm-features=threads --wasi-modules=experimental-wasi-threads ./target/wasm32-wasip1-threads/release/wasp1-threads-rayon-example.wasm
285
thread-spwan
について
wasi-threads
を利用するにはthread-spawn
をwasm
へのimportObject
追加する必要がある。 詳細はwasi-threads の概要と ComponentModel での利用の現状とその先にも書いたが、簡単に書くとwasm
側からthread-spawn
が呼ばれたらthread
をspawn
し、spawn
されたthread
ではwasm
から公開されているwasi_thread_start
を呼べばよい。詳細はここを見るとよい。
簡略化するとNode.js
であればmain
側では以下のようにthread-spawn
の呼び出しでWorker
を作成し、引数やSharedArrayBuffer
などを送信。
let nextId = 1;
const opts = { initial: 17, maximum: 20, shared: true };
const memory = new WebAssembly.Memory(opts);
let instance = await WebAssembly.instantiate(wasm, {
...new WASI().getImportObject(),
wasi: {
"thread-spawn": (startArg) => {
const worker = new Worker("./worker.js");
const tid = nextTid++;
worker.postMessage({ startArg, tid, memory });
return tid;
},
},
env: { memory },
});
Worker
側はメッセージをうけてwasm
を受け取ったSharedArrayBuffer
とともにinstantiate
し、wasi_thread_start
にthreadId
とthread-spawn
で受け取った値を渡し、処理を開始する。
const handler = async ({ startArg, tid, memory }) => {
const wasm = await WebAssembly.compile(await file);
let instance = await WebAssembly.instantiate(wasm, { ... });
instance.exports.wasi_thread_start(tid, startArg);
};
parentPort.addListener("message", handler);
これで冒頭のrayon
を使ったコードは動作する。ここまでがおさらい。
しかし、実際のusecase
において、いくつかの問題に直面したため、記録しておく。
child thread
で投げられた例外が補足できずデッドロックする
冒頭のコードを以下のように変更するとデッドロックする。
fn sum_of_squares(input: &[i32]) -> i32 {
let pool = ThreadPoolBuilder::new().num_threads(4).build().unwrap();
pool.install(|| input.par_iter().map(|&i| {
panic!("💀")
}).sum())
}
これはmain thread
がwasi.start
からぬけてこないのでWorker
からのError
を補足するフェーズまで進まないからだと思われる。 そのため自分は以下のようにmain thread
はWorker
からのmessage
を受けてthread-spawn
とエラー処理だけを行うような構成とした。
const { Worker } = require("node:worker_threads");
const worker = new Worker("./entry.js");
let nextTid = 1;
const workers = [worker];
const spawn = (startArg, threadId, memory) => {
const worker = new Worker("./worker.js");
workers.push(worker);
worker.on("message", ({ cmd, startArg, threadId, memory }) => {
// ...ommited
if (cmd === "thread-spawn") {
spawn(startArg, threadId, memory);
}
});
worker.on("error", (e) => {
workers.forEach((w) => w.terminate());
throw new Error(e);
});
const tid = nextTid++;
if (threadId) {
Atomics.store(threadId, 0, tid);
Atomics.notify(threadId, 0);
}
worker.postMessage({ startArg, tid, memory });
return tid;
};
worker.on("message", ({ cmd, startArg, threadId, memory }) => {
// ...ommited
if (cmd === "thread-spawn") {
spawn(startArg, threadId, memory);
}
});
worker.on("error", (err) => {
workers.forEach((w) => w.terminate());
throw new Error(err);
});
それに伴い以下のようにwasi.start
はWorker
で呼ぶようにし、wasm
側からthread-spawn
が呼ばれた場合はmain thread
側にspawn
を依頼するメッセージを返すように変更した。
const { readFile } = require("node:fs/promises");
const fs = require("node:fs");
const { WASI } = require("@tybys/wasm-util");
const { argv, env } = require("node:process");
const { join } = require("node:path");
const { parentPort } = require("node:worker_threads");
const wasi = new WASI({ ... });
const imports = wasi.getImportObject();
const file = readFile(YOUR_WASM);
(async () => {
try {
const wasm = await WebAssembly.compile(await file);
const opts = { initial: 17, maximum: 200, shared: true };
const memory = new WebAssembly.Memory(opts);
let instance = await WebAssembly.instantiate(wasm, {
...imports,
wasi: {
"thread-spawn": (startArg) => {
const threadIdBuffer = new SharedArrayBuffer(4);
const id = new Int32Array(threadIdBuffer);
Atomics.store(id, 0, -1);
parentPort.postMessage({
cmd: "thread-spawn",
startArg,
threadId: id,
memory,
});
Atomics.wait(id, 0, -1);
const tid = Atomics.load(id, 0);
return tid;
},
},
env: { memory },
});
wasi.start(instance);
parentPort.postMessage({ cmd: "complete" });
} catch (e) {
throw e;
}
})();
これでchild thread
側から例外が投げられた場合もハンドルできるようになった。
JSからメモリが使用できずエラーになる
次に冒頭のコードを以下のように変更するとエラーが発生した。
fn sum_of_squares(input: &[i32]) -> i32 {
let pool = ThreadPoolBuilder::new().num_threads(4).build().unwrap();
pool.install(|| input.par_iter().map(|&i| {
dbg!(i); // <- 追加
i * i
}).sum())
}
これはWASI initialization in child thread #4102
という(閉じられているが)issue
がありwasi.start()
を呼ばないと初期化が完了しない。しかしwasi-threads
においてchild thread
ではwasi.start
でなくwasi_thread_start
が呼ばれるべきだし、wasi.start
を読んだところでエラーになる。という話のようだ。
なのでdbg!
によってfd.write
を使おうとするがメモリ周りの初期化が終わっておらずJS
から触ることができずエラーとなるようだ。
このissue
の作成者の@toyobayashiさんは以下のようなProxy
を用意して対処されていた。しかも質問したところPR
までいただけた。Special thanks @toyobayashi。
どうやらwasi.start
を呼んで初期化周りを実行させつつも、Proxy
によってwasi._start
は何もせずに抜けてくる。という戦略のようだ。
export const kIsProxy = Symbol('kIsProxy')
/** @public */
export function createInstanceProxy (
instance: WebAssembly.Instance,
memory?: WebAssembly.Memory | (() => WebAssembly.Memory)
): WebAssembly.Instance {
if ((instance as any)[kIsProxy]) return instance
// https://github.com/nodejs/help/issues/4102
const originalExports = instance.exports
const createHandler = function (target: WebAssembly.Exports): ProxyHandler<WebAssembly.Exports> {
const handlers = [
'apply',
'construct',
'defineProperty',
'deleteProperty',
'get',
'getOwnPropertyDescriptor',
'getPrototypeOf',
'has',
'isExtensible',
'ownKeys',
'preventExtensions',
'set',
'setPrototypeOf'
]
const handler: ProxyHandler<WebAssembly.Exports> = {}
for (let i = 0; i < handlers.length; i++) {
const name = handlers[i] as keyof ProxyHandler<WebAssembly.Exports>
handler[name] = function () {
const args = Array.prototype.slice.call(arguments, 1)
args.unshift(target)
return (Reflect[name] as any).apply(Reflect, args)
}
}
return handler
}
const handler = createHandler(originalExports)
const _initialize = (): void => {}
const _start = (): number => 0
handler.get = function (_target, p, receiver) {
if (p === 'memory') {
return (typeof memory === 'function' ? memory() : memory) ?? Reflect.get(originalExports, p, receiver)
}
if (p === '_initialize') {
return p in originalExports ? _initialize : undefined
}
if (p === '_start') {
return p in originalExports ? _start : undefined
}
return Reflect.get(originalExports, p, receiver)
}
handler.has = function (_target, p) {
if (p === 'memory') return true
return Reflect.has(originalExports, p)
}
const exportsProxy = new Proxy(Object.create(null), handler)
return new Proxy(instance, {
get (target, p, receiver) {
if (p === 'exports') {
return exportsProxy
}
if (p === kIsProxy) {
return true
}
return Reflect.get(target, p, receiver)
}
})
}
上記を踏まえ、以下のようにinstance
にProxy
をかぶせ、wasi.start
を実行する。その後、kStarted
フラグをこっそりもとに戻し、instance.exports.wasi_thread_start
を呼ぶことでこの問題は解決した。
const { createInstanceProxy } = require("./proxy.js");
instance = createInstanceProxy(instance, memory);
wasi.start(instance);
try {
const symbols = Object.getOwnPropertySymbols(wasi);
const selectDescription = (description) => (s) => {
if (s.description) {
return s.description === description;
}
return s.toString() === `Symbol(${description})`;
};
if (Array.isArray(description)) {
return description.map((d) => symbols.filter(selectDescription(d))[0]);
}
const kStarted = symbols.filter(selectDescription("kStarted"))[0];
wasi[kStarted] = false;
} catch (_) {}
instance.exports.wasi_thread_start(tid, startArg);
まとめ
前述の内容を適用することで実際のusecase
においても動き始めた。
分かってみるとなるほどという感じだが、この2つの問題が同時に発生しているときはかなり混乱した。
今回はNode.js
を前提としたがBrowser
でも動きそう気配がある。SharedArrayBuffer
のためにCOOP/COEP
などIsoration
の対応が面倒だが。
以上。