containerd/runwasiを読む
2023-02-24
以下を見て興味が湧いたので、軽く試したり読んだりしてみた。 とりあえず分かったことを記録しておく。
コンテナランタイムのcontainerdに、WebAssemblyをコンテナとして扱うための「runwasi」が統合。これからのコンテナランタイムはWebAssemblyと統合されていくhttps://t.co/FAFSwMpOY5
— Publickey (@publickey) February 21, 2023
目次
Hello runwasi
とりあずREADME
通りに試してみる。 wasmedge
をインストールし、make build
, sudo make install
。 demo を build しload
する。
$ cd crates/wasi-demo-app && cargo build --features oci-v1-tar
$ make load
これで demo が動く。
$ sudo ctr run --rm --runtime=io.containerd.wasmedge.v1 ghcr.io/containerd/runwasi/wasi-demo-app:latest testwasm /wasi-demo-app.wasm echo 'hello'
hello
exiting
build.rs
と img.tar
demo のbuild.rs
を見るとだいたいctr image import
するためにどんなことをしているのか分かる気がする。 難しいことはやっていないようで生成されたwasm
を追加したり。
tar::Builder::new(File::create(&layer_path).unwrap())
.append_path_with_name(&app_path, "wasi-demo-app.wasm")
oci
に準拠するように(oci ちゃんと見てないのでわからない。後で読む)entrypoint
を指定し、config
を作ったり。
let config = spec::ConfigBuilder::default()
.entrypoint(vec!["/wasi-demo-app.wasm".to_owned()])
.build()
arch
,os
,rootfs
などを設定してこれらをtar
に固めているだけのようだ。
let img = spec::ImageConfigurationBuilder::default()
.config(config)
.os("wasi")
.architecture("wasm")
.rootfs(
spec::RootFsBuilder::default()
.diff_ids(vec!["sha256:".to_owned() + &layer_digest])
.build()
.unwrap(),
)
.build()
そうすると以下のようなjson
たちやwasm
が入ったimg.tar
が’出来上がる
{
"architecture": "wasm",
"os": "wasi",
"config": {
"Entrypoint": ["/wasi-demo-app.wasm"]
},
"rootfs": {
"type": "layers",
"diff_ids": [
"sha256:6b4d2287efa13c803d9c065aee49b796e677af88082b29be9fe8e6986f9ff9b9"
]
},
"history": []
}
[
{
"Config": "blobs/sha256/addb778fc7e3a95acee51ace991c6ca3a823f327aeeb22c2fd329f84f631687a",
"RepoTags": ["ghcr.io/containerd/runwasi/wasi-demo-app:latest"],
"Layers": [
"blobs/sha256/6b4d2287efa13c803d9c065aee49b796e677af88082b29be9fe8e6986f9ff9b9"
]
}
]
{ "imageLayoutVersion": "1.0.0" }
containerd と shim とのやりとり
よく分かっておらずpublickey
の記事を見たときはcontainerd
にshim
を設定しruntime
指定によってrunwasi
を使いわけるような実装を想像をしたけどそうではなかった。
Makefile
を見れば分かるのだがinstall
は以下のようになっている。
つまりcontainerd-shim-wasmtime-v1
のようなバイナリを/usr/local
に配置して、そのバイナリとはttrpc
でやりとりするような決まりになっているっぽい。
PREFIX ?= /usr/local
INSTALL ?= install
RUNTIMES ?= wasmedge wasmtime
.PHONY: install
install:
mkdir -p $(PREFIX)/bin
$(foreach runtime,$(RUNTIMES), \
$(INSTALL) target/$(TARGET)/containerd-shim-$(runtime)-v1 $(PREFIX)/bin/; \
$(INSTALL) target/$(TARGET)/containerd-shim-$(runtime)d-v1 $(PREFIX)/bin/; \
$(INSTALL) target/$(TARGET)/containerd-$(runtime)d $(PREFIX)/bin/; \
)
ttrpcはhttp
,http2
,tls
を必須としないgRPC
の省機能版でGRPC for low-memory environments
と記載されている。なのでgRPC
同様、protobuf
を定義して通信するようだ。
また、サンプルなどを見る限りUNIX domain socket
を使ってるっぽい。
この定義はおそらくcontainerd/rust-extensions
のshim-protos
crate にある以下あたりだろうか。
service Task {
rpc State(StateRequest) returns (StateResponse);
rpc Create(CreateTaskRequest) returns (CreateTaskResponse);
rpc Start(StartRequest) returns (StartResponse);
rpc Delete(DeleteRequest) returns (DeleteResponse);
rpc Pids(PidsRequest) returns (PidsResponse);
rpc Pause(PauseRequest) returns (google.protobuf.Empty);
rpc Resume(ResumeRequest) returns (google.protobuf.Empty);
rpc Checkpoint(CheckpointTaskRequest) returns (google.protobuf.Empty);
rpc Kill(KillRequest) returns (google.protobuf.Empty);
rpc Exec(ExecProcessRequest) returns (google.protobuf.Empty);
rpc ResizePty(ResizePtyRequest) returns (google.protobuf.Empty);
rpc CloseIO(CloseIORequest) returns (google.protobuf.Empty);
rpc Update(UpdateTaskRequest) returns (google.protobuf.Empty);
rpc Wait(WaitRequest) returns (WaitResponse);
rpc Stats(StatsRequest) returns (StatsResponse);
rpc Connect(ConnectRequest) returns (ConnectResponse);
rpc Shutdown(ShutdownRequest) returns (google.protobuf.Empty);
}
このような取り決めとなているためrunwasi
のREADME
にある以下ようなオレオレshim
も実装できるという話っぽい。この例ではmyshim
になっているが、これがhoge
でもfuga
でもいいという話になりそう。
impl Instance for MyInstance {
// ...
}
fn main() {
shim::run::<ShimCli<MyInstance>>("io.containerd.myshim.v1", opts);
}
そしてそのshim
の振る舞いはInstance
trait の実装で決定される。
今回はwasmtime
を追ってみようと思っていた。であれば以下のようなInstance
を実装している箇所見れば良い。
impl Instance for Wasi {
type E = wasmtime::Engine;
// ...省略
}
impl Instance for Wasi {}
wasmtime
のInstance
を見ていくわけだが、どういう形で使用されるのかtest_wasi
というテストを見ていくとInstance
を使用するイメージがしやすい。
fn test_wasi()
流れは以下のような感じ。
まずはrootfs
とwasm
,stdout
を用意する。
let dir = tempdir()?;
create_dir(dir.path().join("rootfs"))?;
let mut f = File::create(dir.path().join("rootfs/hello.wat"))?;
f.write_all(WASI_HELLO_WAT)?;
let stdout = File::create(dir.path().join("stdout"))?;
drop(stdout);
次にSpec
を用意してconfig.json
としてrootfs
に保存する。
本来はcontainerd
から共有されるもののように見える(要確認)
let spec = SpecBuilder::default()
.root(RootBuilder::default().path("rootfs").build()?)
.process(
ProcessBuilder::default()
.cwd("/")
.args(vec!["hello.wat".to_string()])
.build()?,
)
.build()?;
spec.save(dir.path().join("config.json"))?;
一部省略するがspec
はこんなデータとなっている。
version: "1.0.2-dev",
root: Some(
Root {
path: "/tmp/.tmpNNCrAK/rootfs",
readonly: Some(
true,
),
},
),
mounts: Some(
[
Mount {
destination: "/proc",
typ: "proc",
source: "proc",
options: None,
},
// ...
],
),
process: Some(
Process {
terminal: false,
user: User {
uid: 0,
gid: 0,
umask: None,
additional_gids: None,
username: None,
},
args: ["hello.wat"],
command_line: None,
env: [
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
"TERM=xterm",
],
cwd: "/",
capabilities: Some(
LinuxCapabilities { ... },
),
rlimits:[
LinuxRlimit {
typ: RlimitNofile,
hard: 1024,
soft: 1024,
},
],
// ...
},
),
hostname: "youki",
linux: Linux {
uid_mappings: None,
gid_mappings: None,
sysctl: None,
resources: LinuxResources {/* ... */ },
namespaces: [
LinuxNamespace { typ: Pid, path: None },
LinuxNamespace { typ: Network, path: None },
LinuxNamespace { typ: Ipc, path: None },
LinuxNamespace { typ: Uts, path: None },
LinuxNamespace { typ: Mount, path: None },
],
),
devices: None,
seccomp: None,
rootfs_propagation: None,
masked_paths: [/* ... */]
readonly_paths: [/* ... */]
// ...
},
// ...
config.json
はrunc spec
でデフォルトのものを出力できるらしく、試してみると以下が出力された。
Details
{
"ociVersion": "1.0.2-dev",
"process": {
"terminal": true,
"user": {
"uid": 0,
"gid": 0
},
"args": [
"sh"
],
"env": [
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
"TERM=xterm"
],
"cwd": "/",
"capabilities": {
"bounding": [
"CAP_AUDIT_WRITE",
"CAP_KILL",
"CAP_NET_BIND_SERVICE"
],
"effective": [
"CAP_AUDIT_WRITE",
"CAP_KILL",
"CAP_NET_BIND_SERVICE"
],
"inheritable": [
"CAP_AUDIT_WRITE",
"CAP_KILL",
"CAP_NET_BIND_SERVICE"
],
"permitted": [
"CAP_AUDIT_WRITE",
"CAP_KILL",
"CAP_NET_BIND_SERVICE"
],
"ambient": [
"CAP_AUDIT_WRITE",
"CAP_KILL",
"CAP_NET_BIND_SERVICE"
]
},
"rlimits": [
{
"type": "RLIMIT_NOFILE",
"hard": 1024,
"soft": 1024
}
],
"noNewPrivileges": true
},
"root": {
"path": "rootfs",
"readonly": true
},
"hostname": "runc",
"mounts": [
{
"destination": "/proc",
"type": "proc",
"source": "proc"
},
{
"destination": "/dev",
"type": "tmpfs",
"source": "tmpfs",
"options": [
"nosuid",
"strictatime",
"mode=755",
"size=65536k"
]
},
{
"destination": "/dev/pts",
"type": "devpts",
"source": "devpts",
"options": [
"nosuid",
"noexec",
"newinstance",
"ptmxmode=0666",
"mode=0620",
"gid=5"
]
},
{
"destination": "/dev/shm",
"type": "tmpfs",
"source": "shm",
"options": [
"nosuid",
"noexec",
"nodev",
"mode=1777",
"size=65536k"
]
},
{
"destination": "/dev/mqueue",
"type": "mqueue",
"source": "mqueue",
"options": [
"nosuid",
"noexec",
"nodev"
]
},
{
"destination": "/sys",
"type": "sysfs",
"source": "sysfs",
"options": [
"nosuid",
"noexec",
"nodev",
"ro"
]
},
{
"destination": "/sys/fs/cgroup",
"type": "cgroup",
"source": "cgroup",
"options": [
"nosuid",
"noexec",
"nodev",
"relatime",
"ro"
]
}
],
"linux": {
"resources": {
"devices": [
{
"allow": false,
"access": "rwm"
}
]
},
"namespaces": [
{
"type": "pid"
},
{
"type": "network"
},
{
"type": "ipc"
},
{
"type": "uts"
},
{
"type": "mount"
},
{
"type": "cgroup"
}
],
"maskedPaths": [
"/proc/acpi",
"/proc/asound",
"/proc/kcore",
"/proc/keys",
"/proc/latency_stats",
"/proc/timer_list",
"/proc/timer_stats",
"/proc/sched_debug",
"/sys/firmware",
"/proc/scsi"
],
"readonlyPaths": [
"/proc/bus",
"/proc/fs",
"/proc/irq",
"/proc/sys",
"/proc/sysrq-trigger"
]
}
}
その後はstdout
を設定したconfig
など作ってwasi
を作る。
let mut cfg = InstanceConfig::new(Engine::default());
let cfg = cfg
.set_bundle(dir.path().to_str().unwrap().to_string())
.set_stdout(dir.path().join("stdout").to_str().unwrap().to_string());
let wasi = Arc::new(Wasi::new("test".to_string(), Some(cfg)));
wasi
ができたらstart
する。
上記のWasi::new
と下記start
はInstance
trait で定義されているものだ。
wasi.start()?;
あとはchannel
を作ってthread::spawn
しつつ、w.wait
にSender
を渡して実行する。
これで完了し次第wait
からSender
経由でmessage
を投げてくれる。
このw.wait
もInstance
trait で定義されているものだ。
let w = wasi.clone();
let (tx, rx) = channel();
thread::spawn(move || {
w.wait(tx).unwrap();
});
あとは rx.recv_timeout
で完了を待ち、完了メッセージが到着し次第、kill
して終了というのが一番シンプルな流れだ。
let res = match rx.recv_timeout(Duration::from_secs(10)) {
Ok(res) => res,
Err(e) => {
wasi.kill(SIGKILL as u32).unwrap();
return Err( /* ... */);
}
};
なので、この流れを思い浮かべながらstart
,wait
,kill
あたりの実装見れば良さそうだ。
fn start(&self) -> Result<u32, Error>
まずはwasmtime
を準備している。
let engine = self.engine.clone();
let mut linker = Linker::new(&engine);
wasmtime_wasi::add_to_linker(&mut linker, |s| s).map_err( /* ... */)?;
add_to_linker
は追ってないが、以下のようになっているのでおそらくfd_write
などwasi
の実装をlink
しているんじゃないかと思う。
snapshots::preview_1::add_wasi_snapshot_preview1_to_linker(linker, get_cx)?;
snapshots::preview_0::add_wasi_unstable_to_linker(linker, get_cx)?;
Ok(())
次にSpec
を作る。これは前述したconfig.json
からSpec
を作っている。
let spec = load_spec(self.bundle.clone())?;
その後prepare_module
内でWasiCtxBuilder
経由で標準入出力のつなぎこみなどを行ったあと、それを wrap したStore
を作り先程のlinker
からinstance
を作成する。
let m = prepare_module(engine.clone(), &spec, stdin, stdout, stderr).map_err( /* ... */)
let mut store = Store::new(&engine, m.0);
let i = linker.instantiate(&mut store, &m.1).map_err( /* ... */)
instance
ができたら、そこからwasm
のエントリポイントである_start
関数を取り出しておく。
let f = i.get_func(&mut store, "_start").ok_or_else(|| { /* ... */ })?;
その後Spec
にcgroup
の設定が含まれている場合はここで設定が行われるようだ。(今回未確認)
let cg = oci::get_cgroup(&spec)?;
oci::setup_cgroup(cg.as_ref(), &spec).map_err(/* ... */);
ここまで来たらexec::fork
により新しいprocess
を生成する。
let res = unsafe { exec::fork(Some(cg.as_ref())) }?;
こいつは追っていくとuapi
というcrate
を使用したsyscall
でclone3
呼んでいる箇所にたどりつく。つまりここがprocess
生成の実体のはず。
pub unsafe fn clone3_system_call(cl_args: &CloneArgs) -> c_long {
syscall(
SYS_clone3,
cl_args as *const CloneArgs,
core::mem::size_of::<CloneArgs>(),
)
}
INFO
uapi
は、いわゆるnix
crate と同じようなcrate
だと思うのだが、このcrate
を選択している理由はわからなかった。uapi
のREADME
には以下のようにnix
との比較も書いてあったのだが、この内容ならnix
でいいのでは、と思ったりはした。
- nix uses a nested module structure. uapi exports all APIs in the crate root.
- nix I/O works on
[u8]
. uapi I/O works on[MaybeUninit<u8>]
.- nix uses enums and bitflags for integer/flag parameters. uapi uses plain integers.
- nix uses methods declared on wrapper types to expose APIs. uapi uses free functions unless doing so would be unsafe.
- nix uses enums for the values produced and consumed by certain generic OS APIs (e.g. control messages.) uapi uses generic functions that do not restrict the types that can be used.
exec::fork
の返り値は親か子かの情報と、親の場合、pid
とpidfd
が返るようになっている。 まずシンプルな子の場合を見てみる。
基本的には以前instance
より取りだした_start
すなわちf
を実行しているだけだ。
つまり、ここでようやくwasm
が実行されることになる。
子は終了し次第process.exit
するのでnever
だ。
let _ret = match f.call(&mut store, &[], &mut []) {
Ok(_) => std::process::exit(0),
Err(_) => std::process::exit(137),
};
次に親の処理を見てみる。
まずはpidfd
をself.pidfd
に保持する。
これは後述するkill
などで使用するためだ。
let mut lr = self.pidfd.lock().unwrap();
*lr = Some(pidfd.clone());
let code = self.exit_code.clone();
その後thread::spawn
しpid
を返す。
spawn
されたthread
ではまずself.exit_code
からcondvar
を取り出す。
あとはpidfd
経由で子プロセスの終了を待つ。
完了後はexit_code
を書き込んでcondvar
で完了通知を送る。
基本的なstart
の内容はこんな感じだ。
let _ = thread::spawn(move || {
let (lock, cvar) = &*code;
let status = match pidfd.wait() {
Ok(status) => status,
Err(e) => {
cvar.notify_all();
return;
}
};
let mut ec = lock.lock().unwrap();
*ec = Some((status.status, Utc::now()));
drop(ec);
cvar.notify_all();
});
Ok(tid)
fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error>
次にwait
を見てみる。 こちらはシンプルだ。
fn test_wasi()
の際に言及したが、wait
はSender
を受け取る。 thread::spawn
し、condvar
の通知を待ち、たしかに完了していたらSender
経由で上位に終了通知を返すという流れだろう。
fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
let code = self.exit_code.clone();
thread::spawn(move || {
let (lock, cvar) = &*code;
let mut exit = lock.lock().unwrap();
while (*exit).is_none() {
exit = cvar.wait(exit).unwrap();
}
let ec = (*exit).unwrap();
channel.send(ec).unwrap();
});
Ok(())
}
ttrpc
からのwait
request を扱う関数(おそらく)は以下のようになっている。 なのでinstance
のwait
を実行しrx.recv()
で完了まで block する。 そして完了し次第、response
にtimestamp
を設定して返す。
fn task_wait(&self, req: api::WaitRequest) -> Result<api::WaitResponse> {
let i = self.get_instance(req.get_id())?;
let (lock, cvar) = &*i.status.clone();
let mut status = lock.lock().unwrap();
while (*status).is_none() {
status = cvar.wait(status).unwrap();
}
let (tx, rx) = channel::<(u32, DateTime<Utc>)>();
i.wait(tx)?;
let code = rx.recv().unwrap();
let mut timestamp = Timestamp::new();
// ...
let mut wr = api::WaitResponse {
exit_status: code.0,
..Default::default()
};
wr.set_exited_at(timestamp);
Ok(wr)
}
kill(&self, signal: u32) -> Result<(), Error>
最後にkill
も見ておく。
こちらもシンプルで基本的にはpidfd
経由で子プロセスをkill
しているだけっぽい。
fn kill(&self, signal: u32) -> Result<(), Error> {
if signal != SIGKILL as u32 {
return Err(Error::InvalidArgument(
"only SIGKILL is supported".to_string(),
));
}
let lr = self.pidfd.lock().unwrap();
let fd = lr.as_ref().ok_or_else(|| /* ... */)?;
fd.kill(signal as i32)
}
まとめ
大雑把にまとめると以下だろうか
shim
名と関連付けられたbinary
が起動されるよttrpc
経由でtask
を作ったり、開始したり、待ったりがrequest
されるよrequest
がきたらrootfs
のconfig
とwasm
からstdout
をつないだりしてinstantiate
するよconfig
にcgroup
の設定がある場合は設定するよclone3
で子プロセスを生成してinstance
から取り出した_start
を実行するよ- 親プロセスは子プロセスの完了を待つよ
container
周りとても疎くわからないことだらけだったが、こうやって追ってみると色々と学びがあったし「とても面白いな!」となったので良かった。
以上。
追記
わかりやすいスライドを紹介していただいたので貼っておく。
Runtime Shimに関しては @_moricho_ が昔発表したhttps://t.co/EliSt2PaJI が日本語だと一番詳しいかもしれません。
— inductor / Kohei Ota (@_inductor_) February 27, 2023