Skip to content
On this page

containerd/runwasiを読む

2023-02-24

以下を見て興味が湧いたので、軽く試したり読んだりしてみた。 とりあえず分かったことを記録しておく。

目次

Hello runwasi

とりあずREADME通りに試してみる。 wasmedgeをインストールし、make build, sudo make install。 demo を build しloadする。

sh
$ cd crates/wasi-demo-app && cargo build --features oci-v1-tar
$ make load

これで demo が動く。

sh
$ sudo ctr run --rm --runtime=io.containerd.wasmedge.v1 ghcr.io/containerd/runwasi/wasi-demo-app:latest testwasm /wasi-demo-app.wasm echo 'hello'

hello
exiting

build.rs と img.tar

demo のbuild.rsを見るとだいたいctr image importするためにどんなことをしているのか分かる気がする。 難しいことはやっていないようで生成されたwasmを追加したり。

rust
tar::Builder::new(File::create(&layer_path).unwrap())
    .append_path_with_name(&app_path, "wasi-demo-app.wasm")

ociに準拠するように(oci ちゃんと見てないのでわからない。後で読む)entrypointを指定し、configを作ったり。

rust
let config = spec::ConfigBuilder::default()
    .entrypoint(vec!["/wasi-demo-app.wasm".to_owned()])
    .build()

arch,os,rootfsなどを設定してこれらをtarに固めているだけのようだ。

rust
let img = spec::ImageConfigurationBuilder::default()
     .config(config)
     .os("wasi")
     .architecture("wasm")
     .rootfs(
         spec::RootFsBuilder::default()
             .diff_ids(vec!["sha256:".to_owned() + &layer_digest])
             .build()
             .unwrap(),
     )
     .build()

そうすると以下のようなjsonたちやwasmが入ったimg.tarが’出来上がる

json
{
  "architecture": "wasm",
  "os": "wasi",
  "config": {
    "Entrypoint": ["/wasi-demo-app.wasm"]
  },
  "rootfs": {
    "type": "layers",
    "diff_ids": [
      "sha256:6b4d2287efa13c803d9c065aee49b796e677af88082b29be9fe8e6986f9ff9b9"
    ]
  },
  "history": []
}
json
[
  {
    "Config": "blobs/sha256/addb778fc7e3a95acee51ace991c6ca3a823f327aeeb22c2fd329f84f631687a",
    "RepoTags": ["ghcr.io/containerd/runwasi/wasi-demo-app:latest"],
    "Layers": [
      "blobs/sha256/6b4d2287efa13c803d9c065aee49b796e677af88082b29be9fe8e6986f9ff9b9"
    ]
  }
]
json
{ "imageLayoutVersion": "1.0.0" }

containerd と shim とのやりとり

よく分かっておらずpublickeyの記事を見たときはcontainerdshimを設定しruntime指定によってrunwasiを使いわけるような実装を想像をしたけどそうではなかった。

Makefileを見れば分かるのだがinstallは以下のようになっている。

つまりcontainerd-shim-wasmtime-v1のようなバイナリを/usr/localに配置して、そのバイナリとはttrpcでやりとりするような決まりになっているっぽい。

Makefile
PREFIX ?= /usr/local
INSTALL ?= install
RUNTIMES ?= wasmedge wasmtime

.PHONY: install
install:
	mkdir -p $(PREFIX)/bin
	$(foreach runtime,$(RUNTIMES), \
		$(INSTALL) target/$(TARGET)/containerd-shim-$(runtime)-v1 $(PREFIX)/bin/; \
		$(INSTALL) target/$(TARGET)/containerd-shim-$(runtime)d-v1 $(PREFIX)/bin/; \
		$(INSTALL) target/$(TARGET)/containerd-$(runtime)d $(PREFIX)/bin/; \
	)

ttrpchttp,http2,tlsを必須としないgRPCの省機能版でGRPC for low-memory environmentsと記載されている。なのでgRPC同様、protobufを定義して通信するようだ。

また、サンプルなどを見る限りUNIX domain socketを使ってるっぽい。

この定義はおそらくcontainerd/rust-extensionsshim-protoscrate にある以下あたりだろうか。

proto
service Task {
	rpc State(StateRequest) returns (StateResponse);
	rpc Create(CreateTaskRequest) returns (CreateTaskResponse);
	rpc Start(StartRequest) returns (StartResponse);
	rpc Delete(DeleteRequest) returns (DeleteResponse);
	rpc Pids(PidsRequest) returns (PidsResponse);
	rpc Pause(PauseRequest) returns (google.protobuf.Empty);
	rpc Resume(ResumeRequest) returns (google.protobuf.Empty);
	rpc Checkpoint(CheckpointTaskRequest) returns (google.protobuf.Empty);
	rpc Kill(KillRequest) returns (google.protobuf.Empty);
	rpc Exec(ExecProcessRequest) returns (google.protobuf.Empty);
	rpc ResizePty(ResizePtyRequest) returns (google.protobuf.Empty);
	rpc CloseIO(CloseIORequest) returns (google.protobuf.Empty);
	rpc Update(UpdateTaskRequest) returns (google.protobuf.Empty);
	rpc Wait(WaitRequest) returns (WaitResponse);
	rpc Stats(StatsRequest) returns (StatsResponse);
	rpc Connect(ConnectRequest) returns (ConnectResponse);
	rpc Shutdown(ShutdownRequest) returns (google.protobuf.Empty);
}

このような取り決めとなているためrunwasiREADMEにある以下ようなオレオレshimも実装できるという話っぽい。この例ではmyshimになっているが、これがhogeでもfugaでもいいという話になりそう。

rust
impl Instance for MyInstance {
    // ...
}

fn main() {
    shim::run::<ShimCli<MyInstance>>("io.containerd.myshim.v1", opts);
}

そしてそのshimの振る舞いはInstancetrait の実装で決定される。

今回はwasmtimeを追ってみようと思っていた。であれば以下のようなInstance実装している箇所見れば良い。

rust
impl Instance for Wasi {
    type E = wasmtime::Engine;

    // ...省略
}

impl Instance for Wasi {}

wasmtimeInstanceを見ていくわけだが、どういう形で使用されるのかtest_wasiというテストを見ていくとInstanceを使用するイメージがしやすい。

fn test_wasi()

流れは以下のような感じ。

まずはrootfswasm,stdoutを用意する。

rust
let dir = tempdir()?;
create_dir(dir.path().join("rootfs"))?;
rust
let mut f = File::create(dir.path().join("rootfs/hello.wat"))?;
f.write_all(WASI_HELLO_WAT)?;
rust
let stdout = File::create(dir.path().join("stdout"))?;
drop(stdout);

次にSpecを用意してconfig.jsonとしてrootfsに保存する。
本来はcontainerdから共有されるもののように見える(要確認)

rust
let spec = SpecBuilder::default()
    .root(RootBuilder::default().path("rootfs").build()?)
    .process(
        ProcessBuilder::default()
            .cwd("/")
            .args(vec!["hello.wat".to_string()])
            .build()?,
    )
    .build()?;
spec.save(dir.path().join("config.json"))?;

一部省略するがspecはこんなデータとなっている。

rust
version: "1.0.2-dev",
    root: Some(
        Root {
            path: "/tmp/.tmpNNCrAK/rootfs",
            readonly: Some(
                true,
            ),
        },
    ),
    mounts: Some(
        [
            Mount {
                destination: "/proc",
                typ: "proc",
                source: "proc",
                options: None,
            },
            // ...
        ],
    ),
    process: Some(
        Process {
            terminal: false,
            user: User {
                uid: 0,
                gid: 0,
                umask: None,
                additional_gids: None,
                username: None,
            },
            args: ["hello.wat"],
            command_line: None,
            env: [
                "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
                "TERM=xterm",
            ],
            cwd: "/",
            capabilities: Some(
                LinuxCapabilities { ... },
            ),
            rlimits:[
                    LinuxRlimit {
                        typ: RlimitNofile,
                        hard: 1024,
                        soft: 1024,
                    },
            ],
            // ...
        },
    ),
    hostname: "youki",
    linux: Linux {
            uid_mappings: None,
            gid_mappings: None,
            sysctl: None,
            resources: LinuxResources {/* ... */ },
            namespaces: [
                    LinuxNamespace { typ: Pid, path: None },
                    LinuxNamespace { typ: Network, path: None },
                    LinuxNamespace { typ: Ipc, path: None },
                    LinuxNamespace { typ: Uts, path: None },
                    LinuxNamespace { typ: Mount, path: None },
                ],
            ),
            devices: None,
            seccomp: None,
            rootfs_propagation: None,
            masked_paths: [/* ... */]
            readonly_paths: [/* ... */]
            // ...
    },
    // ...

config.jsonrunc specでデフォルトのものを出力できるらしく、試してみると以下が出力された。

Details
json
{
    "ociVersion": "1.0.2-dev",
    "process": {
        "terminal": true,
        "user": {
            "uid": 0,
            "gid": 0
        },
        "args": [
            "sh"
        ],
        "env": [
            "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
            "TERM=xterm"
        ],
        "cwd": "/",
        "capabilities": {
            "bounding": [
                "CAP_AUDIT_WRITE",
                "CAP_KILL",
                "CAP_NET_BIND_SERVICE"
            ],
            "effective": [
                "CAP_AUDIT_WRITE",
                "CAP_KILL",
                "CAP_NET_BIND_SERVICE"
            ],
            "inheritable": [
                "CAP_AUDIT_WRITE",
                "CAP_KILL",
                "CAP_NET_BIND_SERVICE"
            ],
            "permitted": [
                "CAP_AUDIT_WRITE",
                "CAP_KILL",
                "CAP_NET_BIND_SERVICE"
            ],
            "ambient": [
                "CAP_AUDIT_WRITE",
                "CAP_KILL",
                "CAP_NET_BIND_SERVICE"
            ]
        },
        "rlimits": [
            {
                "type": "RLIMIT_NOFILE",
                "hard": 1024,
                "soft": 1024
            }
        ],
        "noNewPrivileges": true
    },
    "root": {
        "path": "rootfs",
        "readonly": true
    },
    "hostname": "runc",
    "mounts": [
        {
            "destination": "/proc",
            "type": "proc",
            "source": "proc"
        },
        {
            "destination": "/dev",
            "type": "tmpfs",
            "source": "tmpfs",
            "options": [
                "nosuid",
                "strictatime",
                "mode=755",
                "size=65536k"
            ]
        },
        {
            "destination": "/dev/pts",
            "type": "devpts",
            "source": "devpts",
            "options": [
                "nosuid",
                "noexec",
                "newinstance",
                "ptmxmode=0666",
                "mode=0620",
                "gid=5"
            ]
        },
        {
            "destination": "/dev/shm",
            "type": "tmpfs",
            "source": "shm",
            "options": [
                "nosuid",
                "noexec",
                "nodev",
                "mode=1777",
                "size=65536k"
            ]
        },
        {
            "destination": "/dev/mqueue",
            "type": "mqueue",
            "source": "mqueue",
            "options": [
                "nosuid",
                "noexec",
                "nodev"
            ]
        },
        {
            "destination": "/sys",
            "type": "sysfs",
            "source": "sysfs",
            "options": [
                "nosuid",
                "noexec",
                "nodev",
                "ro"
            ]
        },
        {
            "destination": "/sys/fs/cgroup",
            "type": "cgroup",
            "source": "cgroup",
            "options": [
                "nosuid",
                "noexec",
                "nodev",
                "relatime",
                "ro"
            ]
        }
    ],
    "linux": {
        "resources": {
            "devices": [
                {
                    "allow": false,
                    "access": "rwm"
                }
            ]
        },
        "namespaces": [
            {
                "type": "pid"
            },
            {
                "type": "network"
            },
            {
                "type": "ipc"
            },
            {
                "type": "uts"
            },
            {
                "type": "mount"
            },
            {
                "type": "cgroup"
            }
        ],
        "maskedPaths": [
            "/proc/acpi",
            "/proc/asound",
            "/proc/kcore",
            "/proc/keys",
            "/proc/latency_stats",
            "/proc/timer_list",
            "/proc/timer_stats",
            "/proc/sched_debug",
            "/sys/firmware",
            "/proc/scsi"
        ],
        "readonlyPaths": [
            "/proc/bus",
            "/proc/fs",
            "/proc/irq",
            "/proc/sys",
            "/proc/sysrq-trigger"
        ]
    }
}

その後はstdoutを設定したconfigなど作ってwasiを作る。

rust
let mut cfg = InstanceConfig::new(Engine::default());
let cfg = cfg
    .set_bundle(dir.path().to_str().unwrap().to_string())
    .set_stdout(dir.path().join("stdout").to_str().unwrap().to_string());
let wasi = Arc::new(Wasi::new("test".to_string(), Some(cfg)));

wasiができたらstartする。
上記のWasi::newと下記startInstancetrait で定義されているものだ。

rust
wasi.start()?;

あとはchannelを作ってthread::spawnしつつ、w.waitSenderを渡して実行する。

これで完了し次第waitからSender経由でmessageを投げてくれる。
このw.waitInstancetrait で定義されているものだ。

rust
let w = wasi.clone();
let (tx, rx) = channel();
thread::spawn(move || {
    w.wait(tx).unwrap();
});

あとは rx.recv_timeoutで完了を待ち、完了メッセージが到着し次第、killして終了というのが一番シンプルな流れだ。

rust
let res = match rx.recv_timeout(Duration::from_secs(10)) {
    Ok(res) => res,
    Err(e) => {
        wasi.kill(SIGKILL as u32).unwrap();
        return Err( /* ... */);
    }
};

なので、この流れを思い浮かべながらstart,wait,killあたりの実装見れば良さそうだ。

fn start(&self) -> Result<u32, Error>

まずはwasmtimeを準備している。

rust
let engine = self.engine.clone();
let mut linker = Linker::new(&engine);
wasmtime_wasi::add_to_linker(&mut linker, |s| s).map_err( /* ... */)?;

add_to_linkerは追ってないが、以下のようになっているのでおそらくfd_writeなどwasiの実装をlinkしているんじゃないかと思う。

rust
snapshots::preview_1::add_wasi_snapshot_preview1_to_linker(linker, get_cx)?;
snapshots::preview_0::add_wasi_unstable_to_linker(linker, get_cx)?;
Ok(())

次にSpecを作る。これは前述したconfig.jsonからSpecを作っている。

rust
let spec = load_spec(self.bundle.clone())?;

その後prepare_module内でWasiCtxBuilder経由で標準入出力のつなぎこみなどを行ったあと、それを wrap したStoreを作り先程のlinkerからinstanceを作成する。

rust
let m = prepare_module(engine.clone(), &spec, stdin, stdout, stderr).map_err( /* ... */)
let mut store = Store::new(&engine, m.0);
let i = linker.instantiate(&mut store, &m.1).map_err( /* ... */)

instanceができたら、そこからwasmのエントリポイントである_start関数を取り出しておく。

rust
let f = i.get_func(&mut store, "_start").ok_or_else(|| {  /* ... */ })?;

その後Speccgroupの設定が含まれている場合はここで設定が行われるようだ。(今回未確認)

rust
let cg = oci::get_cgroup(&spec)?;
oci::setup_cgroup(cg.as_ref(), &spec).map_err(/* ... */);

ここまで来たらexec::forkにより新しいprocessを生成する。

rust
let res = unsafe { exec::fork(Some(cg.as_ref())) }?;

こいつは追っていくとuapiというcrateを使用したsyscallclone3呼んでいる箇所にたどりつく。つまりここがprocess生成の実体のはず。

rust
pub unsafe fn clone3_system_call(cl_args: &CloneArgs) -> c_long {
    syscall(
        SYS_clone3,
        cl_args as *const CloneArgs,
        core::mem::size_of::<CloneArgs>(),
    )
}

INFO

uapiは、いわゆるnixcrate と同じようなcrateだと思うのだが、このcrateを選択している理由はわからなかった。uapiREADMEには以下のようにnixとの比較も書いてあったのだが、この内容ならnixでいいのでは、と思ったりはした。

  • nix uses a nested module structure. uapi exports all APIs in the crate root.
  • nix I/O works on [u8]. uapi I/O works on [MaybeUninit<u8>].
  • nix uses enums and bitflags for integer/flag parameters. uapi uses plain integers.
  • nix uses methods declared on wrapper types to expose APIs. uapi uses free functions unless doing so would be unsafe.
  • nix uses enums for the values produced and consumed by certain generic OS APIs (e.g. control messages.) uapi uses generic functions that do not restrict the types that can be used.

exec::forkの返り値は親か子かの情報と、親の場合、pidpidfdが返るようになっている。 まずシンプルな子の場合を見てみる。

基本的には以前instanceより取りだした_startすなわちfを実行しているだけだ。
つまり、ここでようやくwasmが実行されることになる。
子は終了し次第process.exitするのでneverだ。

rust
let _ret = match f.call(&mut store, &[], &mut []) {
    Ok(_) => std::process::exit(0),
    Err(_) => std::process::exit(137),
};

次に親の処理を見てみる。
まずはpidfdself.pidfdに保持する。
これは後述するkillなどで使用するためだ。

rust
let mut lr = self.pidfd.lock().unwrap();
*lr = Some(pidfd.clone());
let code = self.exit_code.clone();

その後thread::spawnpidを返す。

spawnされたthreadではまずself.exit_codeからcondvarを取り出す。
あとはpidfd経由で子プロセスの終了を待つ。

完了後はexit_codeを書き込んでcondvarで完了通知を送る。
基本的なstartの内容はこんな感じだ。

rust
let _ = thread::spawn(move || {
    let (lock, cvar) = &*code;
    let status = match pidfd.wait() {
        Ok(status) => status,
        Err(e) => {
            cvar.notify_all();
            return;
        }
    };
    let mut ec = lock.lock().unwrap();
    *ec = Some((status.status, Utc::now()));
    drop(ec);
    cvar.notify_all();
});

Ok(tid)

fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error>

次にwaitを見てみる。 こちらはシンプルだ。

fn test_wasi()の際に言及したが、waitSenderを受け取る。 thread::spawnし、condvarの通知を待ち、たしかに完了していたらSender経由で上位に終了通知を返すという流れだろう。

rust
fn wait(&self, channel: Sender<(u32, DateTime<Utc>)>) -> Result<(), Error> {
    let code = self.exit_code.clone();
    thread::spawn(move || {
        let (lock, cvar) = &*code;
        let mut exit = lock.lock().unwrap();
        while (*exit).is_none() {
            exit = cvar.wait(exit).unwrap();
        }
        let ec = (*exit).unwrap();
        channel.send(ec).unwrap();
    });
    Ok(())
}

ttrpcからのwaitrequest を扱う関数(おそらく)は以下のようになっている。 なのでinstancewaitを実行しrx.recv()で完了まで block する。 そして完了し次第、responsetimestampを設定して返す。

rust
fn task_wait(&self, req: api::WaitRequest) -> Result<api::WaitResponse> {  
    let i = self.get_instance(req.get_id())?;
    let (lock, cvar) = &*i.status.clone();
    let mut status = lock.lock().unwrap();
    while (*status).is_none() {
        status = cvar.wait(status).unwrap();
    }
    let (tx, rx) = channel::<(u32, DateTime<Utc>)>();  
    i.wait(tx)?;  
    let code = rx.recv().unwrap();  
    let mut timestamp = Timestamp::new();
    // ...
    let mut wr = api::WaitResponse {
        exit_status: code.0,
        ..Default::default()
    };
    wr.set_exited_at(timestamp);
    Ok(wr)
}

kill(&self, signal: u32) -> Result<(), Error>

最後にkillも見ておく。
こちらもシンプルで基本的にはpidfd経由で子プロセスをkillしているだけっぽい。

rust
    fn kill(&self, signal: u32) -> Result<(), Error> { 
        if signal != SIGKILL as u32 {
            return Err(Error::InvalidArgument(
                "only SIGKILL is supported".to_string(),
            ));
        }

        let lr = self.pidfd.lock().unwrap(); 
        let fd = lr.as_ref().ok_or_else(|| /* ... */)?; 
        fd.kill(signal as i32) 
    }

まとめ

大雑把にまとめると以下だろうか

  • shim名と関連付けられたbinaryが起動されるよ
  • ttrpc経由でtaskを作ったり、開始したり、待ったりがrequestされるよ
  • requestがきたらrootfsconfigwasmからstdoutをつないだりしてinstantiateするよ
  • configcgroupの設定がある場合は設定するよ
  • clone3で子プロセスを生成してinstanceから取り出した_startを実行するよ
  • 親プロセスは子プロセスの完了を待つよ

container周りとても疎くわからないことだらけだったが、こうやって追ってみると色々と学びがあったし「とても面白いな!」となったので良かった。

以上。

追記

わかりやすいスライドを紹介していただいたので貼っておく。