containerd/runwasiを読む
2023-02-24
以下を見て興味が湧いたので、軽く試したり読んだりしてみた。 とりあえず分かったことを記録しておく。
コンテナランタイムのcontainerdに、WebAssemblyをコンテナとして扱うための「runwasi」が統合。これからのコンテナランタイムはWebAssemblyと統合されていくhttps://t.co/FAFSwMpOY5
— Publickey (@publickey) February 21, 2023
目次
Hello runwasi
とりあずREADME通りに試してみる。 wasmedgeをインストールし、make build, sudo make install。 demo を build しloadする。
$ cd crates/wasi-demo-app && cargo build --features oci-v1-tar
$ make load
これで demo が動く。
$ sudo ctr run --rm --runtime=io.containerd.wasmedge.v1 ghcr.io/containerd/runwasi/wasi-demo-app:latest testwasm /wasi-demo-app.wasm echo 'hello'
hello
exiting
build.rs と img.tar
demo のbuild.rsを見るとだいたいctr image importするためにどんなことをしているのか分かる気がする。 難しいことはやっていないようで生成されたwasmを追加したり。
tar::Builder::new(File::create(&layer_path).unwrap())
.append_path_with_name(&app_path, "wasi-demo-app.wasm")
ociに準拠するように(oci ちゃんと見てないのでわからない。後で読む)entrypointを指定し、configを作ったり。
let config = spec::ConfigBuilder::default()
.entrypoint(vec!["/wasi-demo-app.wasm".to_owned()])
.build()
arch,os,rootfsなどを設定してこれらをtarに固めているだけのようだ。
let img = spec::ImageConfigurationBuilder::default()
.config(config)
.os("wasi")
.architecture("wasm")
.rootfs(
spec::RootFsBuilder::default()
.diff_ids(vec!["sha256:".to_owned() + &layer_digest])
.build()
.unwrap(),
)
.build()
そうすると以下のようなjsonたちやwasmが入ったimg.tarが’出来上がる
{
"architecture": "wasm",
"os": "wasi",
"config": {
"Entrypoint": ["/wasi-demo-app.wasm"]
},
"rootfs": {
"type": "layers",
"diff_ids": [
"sha256:6b4d2287efa13c803d9c065aee49b796e677af88082b29be9fe8e6986f9ff9b9"
]
},
"history": []
}
[
{
"Config": "blobs/sha256/addb778fc7e3a95acee51ace991c6ca3a823f327aeeb22c2fd329f84f631687a",
"RepoTags": ["ghcr.io/containerd/runwasi/wasi-demo-app:latest"],
"Layers": [
"blobs/sha256/6b4d2287efa13c803d9c065aee49b796e677af88082b29be9fe8e6986f9ff9b9"
]
}
]
{ "imageLayoutVersion": "1.0.0" }
containerd と shim とのやりとり
よく分かっておらずpublickeyの記事を見たときはcontainerdにshimを設定しruntime指定によってrunwasiを使いわけるような実装を想像をしたけどそうではなかった。
Makefileを見れば分かるのだがinstallは以下のようになっている。
つまりcontainerd-shim-wasmtime-v1のようなバイナリを/usr/localに配置して、そのバイナリとはttrpcでやりとりするような決まりになっているっぽい。
PREFIX ?= /usr/local
INSTALL ?= install
RUNTIMES ?= wasmedge wasmtime
.PHONY: install
install:
mkdir -p $(PREFIX)/bin
$(foreach runtime,$(RUNTIMES), \
$(INSTALL) target/$(TARGET)/containerd-shim-$(runtime)-v1 $(PREFIX)/bin/; \
$(INSTALL) target/$(TARGET)/containerd-shim-$(runtime)d-v1 $(PREFIX)/bin/; \
$(INSTALL) target/$(TARGET)/containerd-$(runtime)d $(PREFIX)/bin/; \
)
ttrpcはhttp,http2,tlsを必須としないgRPCの省機能版でGRPC for low-memory environmentsと記載されている。なのでgRPC同様、protobufを定義して通信するようだ。
また、サンプルなどを見る限りUNIX domain socketを使ってるっぽい。
この定義はおそらくcontainerd/rust-extensionsのshim-protoscrate にある以下あたりだろうか。
service Task {
rpc State(StateRequest) returns (StateResponse);
rpc Create(CreateTaskRequest) returns (CreateTaskResponse);
rpc Start(StartRequest) returns (StartResponse);
rpc Delete(DeleteRequest) returns (DeleteResponse);
rpc Pids(PidsRequest) returns (PidsResponse);
rpc Pause(PauseRequest) returns (google.protobuf.Empty);
rpc Resume(ResumeRequest) returns (google.protobuf.Empty);
rpc Checkpoint(CheckpointTaskRequest) returns (google.protobuf.Empty);
rpc Kill(KillRequest) returns (google.protobuf.Empty);
rpc Exec(ExecProcessRequest) returns (google.protobuf.Empty);
rpc ResizePty(ResizePtyRequest) returns (google.protobuf.Empty);
rpc CloseIO(CloseIORequest) returns (google.protobuf.Empty);
rpc Update(UpdateTaskRequest) returns (google.protobuf.Empty);
rpc Wait(WaitRequest) returns (WaitResponse);
rpc Stats(StatsRequest) returns (StatsResponse);
rpc Connect(ConnectRequest) returns (ConnectResponse);
rpc Shutdown(ShutdownRequest) returns (google.protobuf.Empty);
}
このような取り決めとなているためrunwasiのREADMEにある以下ようなオレオレshimも実装できるという話っぽい。この例ではmyshimになっているが、これがhogeでもfugaでもいいという話になりそう。
impl Instance for MyInstance {
// ...
}
fn main() {
shim::run::<ShimCli<MyInstance>>("io.containerd.myshim.v1", opts);
}
そしてそのshimの振る舞いはInstancetrait の実装で決定される。
今回はwasmtimeを追ってみようと思っていた。であれば以下のようなInstanceを実装している箇所見れば良い。
impl Instance for Wasi {
type E = wasmtime::Engine;
// ...省略
}
impl Instance for Wasi {}
wasmtimeのInstanceを見ていくわけだが、どういう形で使用されるのかtest_wasiというテストを見ていくとInstanceを使用するイメージがしやすい。
fn test_wasi()
流れは以下のような感じ。
まずはrootfsとwasm,stdoutを用意する。
let dir = tempdir()?;
create_dir(dir.path().join("rootfs"))?;
let mut f = File::create(dir.path().join("rootfs/hello.wat"))?;
f.write_all(WASI_HELLO_WAT)?;
let stdout = File::create(dir.path().join("stdout"))?;
drop(stdout);
次にSpecを用意してconfig.jsonとしてrootfsに保存する。
本来はcontainerdから共有されるもののように見える(要確認)
let spec = SpecBuilder::default()
.root(RootBuilder::default().path("rootfs").build()?)
.process(
ProcessBuilder::default()
.cwd("/")
.args(vec!["hello.wat".to_string()])
.build()?,
)
.build()?;
spec.save(dir.path().join("config.json"))?;
一部省略するがspecはこんなデータとなっている。
version: "1.0.2-dev",
root: Some(
Root {
path: "/tmp/.tmpNNCrAK/rootfs",
readonly: Some(
true,
),
},
),
mounts: Some(
[
Mount {
destination: "/proc",
typ: "proc",
source: "proc",
options: None,
},
// ...
],
),
process: Some(
Process {
terminal: false,
user: User {
uid: 0,
gid: 0,
umask: None,
additional_gids: None,
username: None,
},
args: ["hello.wat"],
command_line: None,
env: [
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
"TERM=xterm",
],
cwd: "/",
capabilities: Some(
LinuxCapabilities { ... },
),
rlimits:[
LinuxRlimit {
typ: RlimitNofile,
hard: 1024,
soft: 1024,
},
],
// ...
},
),
hostname: "youki",
linux: Linux {
uid_mappings: None,
gid_mappings: None,
sysctl: None,
resources: LinuxResources {/* ... */ },
namespaces: [
LinuxNamespace { typ: Pid, path: None },
LinuxNamespace { typ: Network, path: None },
LinuxNamespace { typ: Ipc, path: None },
LinuxNamespace { typ: Uts, path: None },
LinuxNamespace { typ: Mount, path: None },
],
),
devices: None,
seccomp: None,
rootfs_propagation: None,
masked_paths: [/* ... */]
readonly_paths: [/* ... */]
// ...
},
// ...
config.jsonはrunc specでデフォルトのものを出力できるらしく、試してみると以下が出力された。
Details
{
"ociVersion": "1.0.2-dev",
"process": {
"terminal": true,
"user": {
"uid": 0,
"gid": 0
},
"args": [
"sh"
],
"env": [
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
"TERM=xterm"
],
"cwd": "/",
"capabilities": {
"bounding": [
"CAP_AUDIT_WRITE",
"CAP_KILL",
"CAP_NET_BIND_SERVICE"
],
"effective": [
"CAP_AUDIT_WRITE",
"CAP_KILL",
"CAP_NET_BIND_SERVICE"
],
"inheritable": [
"CAP_AUDIT_WRITE",
"CAP_KILL",
"CAP_NET_BIND_SERVICE"
],
"permitted": [
"CAP_AUDIT_WRITE",
"CAP_KILL",
"CAP_NET_BIND_SERVICE"
],
"ambient": [
"CAP_AUDIT_WRITE",
"CAP_KILL",
"CAP_NET_BIND_SERVICE"
]
},
"rlimits": [
{
"type": "RLIMIT_NOFILE",
"hard": 1024,
"soft": 1024
}
],
"noNewPrivileges": true
},
"root": {
"path": "rootfs",
"readonly": true
},
"hostname": "runc",
"mounts": [
{
"destination": "/proc",
"type": "proc",
"source": "proc"
},
{
"destination": "/dev",
"type": "tmpfs",
"source": "tmpfs",
"options": [
"nosuid",
"strictatime",
"mode=755",
"size=65536k"
]
},
{
"destination": "/dev/pts",
"type": "devpts",
"source": "devpts",
"options": [
"nosuid",
"noexec",
"newinstance",
"ptmxmode=0666",
"mode=0620",
"gid=5"
]
},
{
"destination": "/dev/shm",
"type": "tmpfs",
"source": "shm",
"options": [
"nosuid",
"noexec",
"nodev",
"mode=1777",
"size=65536k"
]
},
{
"destination": "/dev/mqueue",
"type": "mqueue",
"source": "mqueue",
"options": [
"nosuid",
"noexec",
"nodev"
]
},
{
"destination": "/sys",
"type": "sysfs",
"source": "sysfs",
"options": [
"nosuid",
"noexec",
"nodev",
"ro"
]
},
{
"destination": "/sys/fs/cgroup",
"type": "cgroup",
"source": "cgroup",
"options": [
"nosuid",
"noexec",
"nodev",
"relatime",
"ro"
]
}
],
"linux": {
"resources": {
"devices": [
{
"allow": false,
"access": "rwm"
}
]
},
"namespaces": [
{
"type": "pid"
},
{
"type": "network"
},
{
"type": "ipc"
},
{
"type": "uts"
},
{
"type": "mount"
},
{
"type": "cgroup"
}
],
"maskedPaths": [
"/proc/acpi",
"/proc/asound",
"/proc/kcore",
"/proc/keys",
"/proc/latency_stats",
"/proc/timer_list",
"/proc/timer_stats",
"/proc/sched_debug",
"/sys/firmware",
"/proc/scsi"
],
"readonlyPaths": [
"/proc/bus",
"/proc/fs",
"/proc/irq",
"/proc/sys",
"/proc/sysrq-trigger"
]
}
}
その後はstdoutを設定したconfigなど作ってwasiを作る。
let mut cfg = InstanceConfig::new(Engine::default());
let cfg = cfg
.set_bundle(dir.path().to_str().unwrap().to_string())
.set_stdout(dir.path().join("stdout").to_str().unwrap().to_string());
let wasi = Arc::new(Wasi::new("test".to_string(), Some(cfg)));
wasiができたらstartする。
上記のWasi::newと下記startはInstancetrait で定義されているものだ。
wasi.start()?;
あとはchannelを作ってthread::spawnしつつ、w.waitにSenderを渡して実行する。
これで完了し次第waitからSender経由でmessageを投げてくれる。
このw.waitもInstancetrait で定義されているものだ。
let w = wasi.clone();
let (tx, rx) = channel();
thread::spawn(move || {
w.wait(tx).unwrap();
});
あとは rx.recv_timeoutで完了を待ち、完了メッセージが到着し次第、killして終了というのが一番シンプルな流れだ。
let res = match rx.recv_timeout(Duration::from_secs(10)) {
Ok(res) => res,
Err(e) => {
wasi.kill(SIGKILL as u32).unwrap();
return Err( /* ... */);
}
};
なので、この流れを思い浮かべながらstart,wait,killあたりの実装見れば良さそうだ。
fn start(&self) -> Result<u32, Error>
まずはwasmtimeを準備している。
let engine = self.engine.clone();
let mut linker = Linker::new(&engine);
wasmtime_wasi::add_to_linker(&mut linker, |s| s).map_err( /* ... */)?;
add_to_linkerは追ってないが、以下のようになっているのでおそらくfd_writeなどwasiの実装をlinkしているんじゃないかと思う。
snapshots::preview_1::add_wasi_snapshot_preview1_to_linker(linker, get_cx)?;
snapshots::preview_0::add_wasi_unstable_to_linker(linker, get_cx)?;
Ok(())
次にSpecを作る。これは前述したconfig.jsonからSpecを作っている。
let spec = load_spec(self.bundle.clone())?;
その後prepare_module内でWasiCtxBuilder経由で標準入出力のつなぎこみなどを行ったあと、それを wrap したStoreを作り先程のlinkerからinstanceを作成する。
let m = prepare_module(engine.clone(), &spec, stdin, stdout, stderr).map_err( /* ... */)
let mut store = Store::new(&engine, m.0);
let i = linker.instantiate(&mut store, &m.1).map_err( /* ... */)
instanceができたら、そこからwasmのエントリポイントである_start関数を取り出しておく。
let f = i.get_func(&mut store, "_start").ok_or_else(|| { /* ... */ })?;
その後Specにcgroupの設定が含まれている場合はここで設定が行われるようだ。(今回未確認)
let cg = oci::get_cgroup(&spec)?;
oci::setup_cgroup(cg.as_ref(), &spec).map_err(/* ... */);
ここまで来たらexec::forkにより新しいprocessを生成する。
let res = unsafe { exec::fork(Some(cg.as_ref())) }?;
こいつは追っていくとuapiというcrateを使用したsyscallでclone3呼んでいる箇所にたどりつく。つまりここがprocess生成の実体のはず。
pub unsafe fn clone3_system_call(cl_args: &CloneArgs) -> c_long {
syscall(
SYS_clone3,
cl_args as *const CloneArgs,
core::mem::size_of::<CloneArgs>(),
)
}
INFO
uapiは、いわゆるnixcrate と同じようなcrateだと思うのだが、このcrateを選択している理由はわからなかった。uapiのREADMEには以下のようにnixとの比較も書いてあったのだが、この内容ならnixでいいのでは、と思ったりはした。
- nix uses a nested module structure. uapi exports all APIs in the crate root.
- nix I/O works on
[u8]. uapi I/O works on[MaybeUninit<u8>].- nix uses enums and bitflags for integer/flag parameters. uapi uses plain integers.
- nix uses methods declared on wrapper types to expose APIs. uapi uses free functions unless doing so would be unsafe.
- nix uses enums for the values produced and consumed by certain generic OS APIs (e.g. control messages.) uapi uses generic functions that do not restrict the types that can be used.
exec::forkの返り値は親か子かの情報と、親の場合、pidとpidfdが返るようになっている。 まずシンプルな子の場合を見てみる。
基本的には以前instanceより取りだした_startすなわちfを実行しているだけだ。
つまり、ここでようやくwasmが実行されることになる。
子は終了し次第process.exitするのでneverだ。
let _ret = match f.call(&mut store, &[], &mut []) {
Ok(_) => std::process::exit(0),
Err(_) => std::process::exit(137),
};
次に親の処理を見てみる。
まずはpidfdをself.pidfdに保持する。
これは後述するkillなどで使用するためだ。
let mut lr = self.pidfd.lock().unwrap();
*lr = Some(pidfd.clone());
let code = self.exit_code.clone();
その後thread::spawnしpidを返す。
spawnされたthreadではまずself.exit_codeからcondvarを取り出す。
あとはpidfd経由で子プロセスの終了を待つ。
完了後はexit_codeを書き込んでcondvarで完了通知を送る。
基本的なstartの内容はこんな感じだ。
let _ = thread::spawn(move || {
let (lock, cvar) = &*code;
let status = match pidfd.wait() {
Ok(status) => status,
Err(e) => {
cvar.notify_all();
return;
}
};
let mut ec = lock.lock().unwrap();
*ec = Some((status.status, Utc::now()));
drop(ec);
cvar.notify_all();
});
Ok(tid)
fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error>
次にwaitを見てみる。 こちらはシンプルだ。
fn test_wasi()の際に言及したが、waitはSenderを受け取る。 thread::spawnし、condvarの通知を待ち、たしかに完了していたらSender経由で上位に終了通知を返すという流れだろう。
fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});
Ok(())
}
ttrpcからのwaitrequest を扱う関数(おそらく)は以下のようになっている。 なのでinstanceのwaitを実行しrx.recv()で完了まで block する。 そして完了し次第、responseにtimestampを設定して返す。
fn task_wait(&self, req: api::WaitRequest) -> Result<api::WaitResponse> {
let i = self.get_instance(req.get_id())?;
let (lock, cvar) = &*i.status.clone();
let mut status = lock.lock().unwrap();
while (*status).is_none() {
status = cvar.wait(status).unwrap();
}
let (tx, rx) = channel::<(u32, DateTime<Utc>)>();
i.wait(tx)?;
let code = rx.recv().unwrap();
let mut timestamp = Timestamp::new();
// ...
let mut wr = api::WaitResponse {
exit_status: code.0,
..Default::default()
};
wr.set_exited_at(timestamp);
Ok(wr)
}
kill(&self, signal: u32) -> Result<(), Error>
最後にkillも見ておく。
こちらもシンプルで基本的にはpidfd経由で子プロセスをkillしているだけっぽい。
fn kill(&self, signal: u32) -> Result<(), Error> {
if signal != SIGKILL as u32 {
return Err(Error::InvalidArgument(
"only SIGKILL is supported".to_string(),
));
}
let lr = self.pidfd.lock().unwrap();
let fd = lr.as_ref().ok_or_else(|| /* ... */)?;
fd.kill(signal as i32)
}
まとめ
大雑把にまとめると以下だろうか
shim名と関連付けられたbinaryが起動されるよttrpc経由でtaskを作ったり、開始したり、待ったりがrequestされるよrequestがきたらrootfsのconfigとwasmからstdoutをつないだりしてinstantiateするよconfigにcgroupの設定がある場合は設定するよclone3で子プロセスを生成してinstanceから取り出した_startを実行するよ- 親プロセスは子プロセスの完了を待つよ
container周りとても疎くわからないことだらけだったが、こうやって追ってみると色々と学びがあったし「とても面白いな!」となったので良かった。
以上。
追記
わかりやすいスライドを紹介していただいたので貼っておく。
Runtime Shimに関しては @_moricho_ が昔発表したhttps://t.co/EliSt2PaJI が日本語だと一番詳しいかもしれません。
— inductor / Kohei Ota (@_inductor_) February 27, 2023