Skip to content
On this page

tokio-rs/tokio を読む(1)

2023-03-07

最近tokio-rs/tokioをちゃんと理解したいと思いちょっとずつ読んでいる。
一気に理解するのは難しく色々と発散してしまいそうなので、ちょっとずつ本記事ような形態でまとめていくことにした。

読むのは以下の version とする。

tokio-rs/tokio

目次

Goals

今回は以下のコードで何が起こっているか理解することを目標とする。

playground

rust
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct Hello {
    state: StateHello,
}

enum StateHello {
    Hello,
    World,
}

impl Hello {
    fn new() -> Self {
        Hello {
            state: StateHello::Hello,
        }
    }
}

impl Future for Hello {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        match (*self).state {
            StateHello::Hello => {
                println!("Hello ");
                (*self).state = StateHello::World;
                cx.waker().wake_by_ref();
                Poll::Pending
            }
            StateHello::World => {
                println!("World!");
                Poll::Ready(())
            }
        }
    }
}

fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(Hello::new());
}

tokio::runtime::Runtime::new()

まずはRuntime作成を一旦流す。
Runtime自身は以下のような構造体で default すなわちrt-multi-threadであればBuilderを介してbuild_threaded_runtimeで作成される。

rust
pub fn new() -> std::io::Result<Runtime> {
    Builder::new_multi_thread().enable_all().build()
}
rust
pub struct Runtime {
    /// Task scheduler
    scheduler: Scheduler,
    /// Handle to runtime, also contains driver handles
    handle: Handle,
    /// Blocking pool handle, used to signal shutdown
    blocking_pool: BlockingPool,
}

fn new_multi_thread() -> Builder

Builderは以下のように初期化される。

rust
pub fn new_multi_thread() -> Builder {
    // The number `61` is fairly arbitrary. I believe this value was copied from golang.
    Builder::new(Kind::MultiThread, 61, 61)
}

この61という数字はglobal_queueのタスクを確認する周期のようでgolang由来のようだ。
golangの該当箇所は以下のようになっていた。

go
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {

ただ、この数字の根拠は分からず調べて見ると以下のスライドや動画が見つかった。

スライドには以下のように書いてある。

Why 61?
It is not even 42! ¯\_(ツ)_/¯
Want something:
● not too small
● not too large
● prime to break any patterns

global queueを覗きにいくタイミングなので小さすぎるとlocal_queueよりglobal_queueのほうが優先されすぎてlocal_queueが捌けていかないし、大きすぎるとglobal_queueのタスクがなかなか実行されない。

そのへんのバランスを考えると64あたりになるが、2の倍数などで設定しまうとアプリケーション/CPUの周波数と同期し、意図しない偏りを産む懸念があり、素数にしている。と解釈した。

たとえば64にした場合、具体的にどのような問題として健在化するのかはわからないが、ちょうど読んでいた「詳解システムパフォーマンス 6.5.4 プロファイリング」にもサンプリング周波数は99Hzにする話が載っていた。これと似たような話なのかと思っている。

頻度を 99Hz にしているのは、CPU と歩調が揃ってプロファイルに偏りができるのを防ぐためである

実際にglobal_queueを覗く処理には今回はたどり着かないので次回以降で見ていこうと思う。

fn build_threaded_runtime(&mut self) -> io::Result<Runtime>

Details
rust
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
    use crate::loom::sys::num_cpus;
    use crate::runtime::{Config, runtime::Scheduler};
    use crate::runtime::scheduler::{self, MultiThread};
    let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
    let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
    // Create the blocking pool
    let blocking_pool =
        blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
    let blocking_spawner = blocking_pool.spawner().clone();
    // Generate a rng seed for this runtime.
    let seed_generator_1 = self.seed_generator.next_generator();
    let seed_generator_2 = self.seed_generator.next_generator();
    let (scheduler, handle, launch) = MultiThread::new(
        core_threads,
        driver,
        driver_handle,
        blocking_spawner,
        seed_generator_2,
        Config {
            before_park: self.before_park.clone(),
            after_unpark: self.after_unpark.clone(),
            global_queue_interval: self.global_queue_interval,
            event_interval: self.event_interval,
            #[cfg(tokio_unstable)]
            unhandled_panic: self.unhandled_panic.clone(),
            disable_lifo_slot: self.disable_lifo_slot,
            seed_generator: seed_generator_1,
        },
    );
    let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
    // Spawn the thread pool workers
    let _enter = handle.enter();
    launch.launch();
    Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
}

まずは指定された数もしくはcpuのコア数からworkerの数を算出している。

rust
let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

次にDriverを作成するが、今回対象としているコードではDriverは登場しないように見えたので scope 外にしておく。

DriverというのはIO,time,signal等を駆動する君に見える。最終的にはRuntime.handleの中にdriverとしてdriver::Handleが保持されるようだ。

rust
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

次にblocking_poolを作成し、spawnercloneしておく。
引数はthread数の上限のようで、ここではdefault512+cpu数が設定されている。

rust
let blocking_pool =
    blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();

fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool

BlockingPool自体は以下のようになっており更にSpawnerなどを見ていく。

rust
pub(crate) struct BlockingPool {
    spawner: Spawner,
    shutdown_rx: shutdown::Receiver,
}

Spawner自身はArcで包んだInnerを持っており、Innerは以下のようになっている。thread_namestack_sizeを持っており、まさにspawnをするための情報を持っているようだ。after_start,before_stopはコメント(Call after a thread starts)からthread開始後、停止前の hook のようだ。

rust
#[derive(Clone)]
pub(crate) struct Spawner {
    inner: Arc<Inner>,
}

struct Inner {
    /// State shared between worker threads.
    shared: Mutex<Shared>,
    /// Pool threads wait on this.
    condvar: Condvar,
    /// Spawned threads use this name.
    thread_name: ThreadNameFn,
    /// Spawned thread stack size.
    stack_size: Option<usize>,
    /// Call after a thread starts.
    after_start: Option<Callback>,
    /// Call before a thread stops.
    before_stop: Option<Callback>,
    // Maximum number of threads.
    thread_cap: usize,
    // Customizable wait timeout.
    keep_alive: Duration,
    // Metrics about the pool.
    metrics: SpawnerMetrics,
}

Mutex<Shared>を見てみると以下のようになっている。 Taskの実体はBlockingScheduleとなっておりblockingタスクを共有するキューがqueue: VecDeque<Task>だろうか。
今回は scope 外とするが、spawn_blockingを読んだときの流れを読んでいくとわかりそうだ。

あとはshutdown_txworker_threads: HashMap<usize, thread::JoinHandle<()>>,など shutdown 処理に必要なものが多くありそうだ。

rust
struct Shared {
    queue: VecDeque<Task>,
    num_notify: u32,
    shutdown: bool,
    shutdown_tx: Option<shutdown::Sender>,
    /// Prior to shutdown, we clean up JoinHandles by having each timed-out
    /// thread join on the previous timed-out thread. This is not strictly
    /// necessary but helps avoid Valgrind false positives, see
    /// <https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666>
    /// for more information.
    last_exiting_thread: Option<thread::JoinHandle<()>>,
    /// This holds the JoinHandles for all running threads; on shutdown, the thread
    /// calling shutdown handles joining on these.
    worker_threads: HashMap<usize, thread::JoinHandle<()>>,
    /// This is a counter used to iterate worker_threads in a consistent order (for loom's
    /// benefit).
    worker_thread_index: usize,
}

pub(crate) struct Task {
    task: task::UnownedTask<BlockingSchedule>,
    mandatory: Mandatory,
}

さて、肝心のblocking::create_blocking_poolまで遡ると実体は以下のようだ。
つまり先まで見てきた構造を初期値で埋めているだけだ。

またblocking_pool.spawner().clone()はつまりArc::cloneであることがわかる。

rust
pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
    let (shutdown_tx, shutdown_rx) = shutdown::channel();
    let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE);

    BlockingPool {
        spawner: Spawner {
            inner: Arc::new(Inner {
                shared: Mutex::new(Shared {
                    queue: VecDeque::new(),
                    num_notify: 0,
                    shutdown: false,
                    shutdown_tx: Some(shutdown_tx),
                    last_exiting_thread: None,
                    worker_threads: HashMap::new(),
                    worker_thread_index: 0,
                }),
                condvar: Condvar::new(),
                thread_name: builder.thread_name.clone(),
                stack_size: builder.thread_stack_size,
                after_start: builder.after_start.clone(),
                before_stop: builder.before_stop.clone(),
                thread_cap,
                keep_alive,
                metrics: Default::default(),
            }),
        },
        shutdown_rx,
    }
}

build_threaded_runtimeに戻りMultiThread::newを見ていく。

rust
let (scheduler, handle, launch) = MultiThread::new(
    core_threads,
    driver,
    driver_handle,
    blocking_spawner,
    seed_generator_2,
    Config {
        before_park: self.before_park.clone(),
        after_unpark: self.after_unpark.clone(),
        global_queue_interval: self.global_queue_interval,
        event_interval: self.event_interval,
        #[cfg(tokio_unstable)]
        unhandled_panic: self.unhandled_panic.clone(),
        disable_lifo_slot: self.disable_lifo_slot,
        seed_generator: seed_generator_1,
    },
);

MultiThread::newはほぼworker::createに移譲しているようなのでそちらを見ていく。

fn create(size: usize, park: Parker, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config) -> (Arc<Handle>, Launch)

長いので以下に畳んで記載しておく。

Details
rust
pub(super) fn create(
    size: usize,
    park: Parker,
    driver_handle: driver::Handle,
    blocking_spawner: blocking::Spawner,
    seed_generator: RngSeedGenerator,
    config: Config,
) -> (Arc<Handle>, Launch) {
    let mut cores = Vec::with_capacity(size);
    let mut remotes = Vec::with_capacity(size);
    let mut worker_metrics = Vec::with_capacity(size);

    // Create the local queues
    for _ in 0..size {
        let (steal, run_queue) = queue::local();

        let park = park.clone();
        let unpark = park.unpark();

        cores.push(Box::new(Core {
            tick: 0,
            lifo_slot: None,
            run_queue,
            is_searching: false,
            is_shutdown: false,
            park: Some(park),
            metrics: MetricsBatch::new(),
            rand: FastRand::new(config.seed_generator.next_seed()),
        }));

        remotes.push(Remote { steal, unpark });
        worker_metrics.push(WorkerMetrics::new());
    }

    let handle = Arc::new(Handle {
        shared: Shared {
            remotes: remotes.into_boxed_slice(),
            inject: Inject::new(),
            idle: Idle::new(size),
            owned: OwnedTasks::new(),
            shutdown_cores: Mutex::new(vec![]),
            config,
            scheduler_metrics: SchedulerMetrics::new(),
            worker_metrics: worker_metrics.into_boxed_slice(),
        },
        driver: driver_handle,
        blocking_spawner,
        seed_generator,
    });

    let mut launch = Launch(vec![]);

    for (index, core) in cores.drain(..).enumerate() {
        launch.0.push(Arc::new(Worker {
            handle: handle.clone(),
            index,
            core: AtomicCell::new(Some(core)),
        }));
    }

    (handle, launch)
}

まずはworkerの数だけcore,remote,metricsを格納するためのVecを作る。
これからが何かは後でみる。

rust
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
let mut worker_metrics = Vec::with_capacity(size);

次にloopでそれらを詰めていくようだ。 まずはlocalキューを作っているのでここを見ていく。

rust
for _ in 0..size {
    let (steal, run_queue) = queue::local();
    // ...
}

以下のような関数になっている。

LOCAL_QUEUE_CAPACITY256になっていて、bufferを作ったあと、UnsafeCell<MaybuUninit>で埋めている。

rust
pub(crate) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
    let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY);

    for _ in 0..LOCAL_QUEUE_CAPACITY {
        buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
    }

    let inner = Arc::new(Inner {
        head: AtomicUnsignedLong::new(0),
        tail: AtomicUnsignedShort::new(0),
        buffer: make_fixed_size(buffer.into_boxed_slice()),
    });

    let local = Local {
        inner: inner.clone(),
    };

    let remote = Steal(inner);

    (remote, local)
}

その後Arcで wrap したInnerを作成し、Local,Stealで wrap して返す。
Innerは以下のようになっている。

rust
pub(crate) struct Inner<T: 'static> {
    /// Concurrently updated by many threads.
    ///
    /// Contains two `UnsignedShort` values. The LSB byte is the "real" head of
    /// the queue. The `UnsignedShort` in the MSB is set by a stealer in process
    /// of stealing values. It represents the first value being stolen in the
    /// batch. The `UnsignedShort` indices are intentionally wider than strictly
    /// required for buffer indexing in order to provide ABA mitigation and make
    /// it possible to distinguish between full and empty buffers.
    ///
    /// When both `UnsignedShort` values are the same, there is no active
    /// stealer.
    ///
    /// Tracking an in-progress stealer prevents a wrapping scenario.
    head: AtomicUnsignedLong,

    /// Only updated by producer thread but read by many threads.
    tail: AtomicUnsignedShort,

    /// Elements
    buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY]>,
}

forに戻ると、あとはParker,Unparkerを作成し、parkrun_queueからCoreと呼ばれるデータを作成する。

queue::localの返り値は(Steal<Arc<T>>, Local<Arc<T>>)であったがCorerun_queueLocal<Arc<Handle>>なのでTHandleと確定する。

あとはWorkerMetricsstealunparkからRemoteを作成する。

rust
for _ in 0..size {
    let (steal, run_queue) = queue::local();
    let park = park.clone();
    let unpark = park.unpark();
    cores.push(Box::new(Core {
        tick: 0,
        lifo_slot: None,
        run_queue,
        is_searching: false,
        is_shutdown: false,
        park: Some(park),
        metrics: MetricsBatch::new(),
        rand: FastRand::new(config.seed_generator.next_seed()),
    }));
    remotes.push(Remote { steal, unpark });
    worker_metrics.push(WorkerMetrics::new());
}

次に先に作ったremotesworker_metrics、引数で渡されたdriver_handle,blocking_spawner,seed_generatorなどからHandleを作成する。

InjectIdleなどが登場しているが一旦見送る。
このHandlescheduler::Handleで wrap され、さらにstruct Handle { pub(crate) inner: scheduler::Handle }として wrap したものをRuntimeでは保持している。

rust
let handle = Arc::new(Handle {
    shared: Shared {
        remotes: remotes.into_boxed_slice(),
        inject: Inject::new(),
        idle: Idle::new(size),
        owned: OwnedTasks::new(),
        shutdown_cores: Mutex::new(vec![]),
        config,
        scheduler_metrics: SchedulerMetrics::new(),
        worker_metrics: worker_metrics.into_boxed_slice(),
    },
    driver: driver_handle,
    blocking_spawner,
    seed_generator,
});

次にCoreHandleからLaunch,Workerを作成し、(Arc<Handle>, Launch)を返しcreateは完了する。

rust
    let mut launch = Launch(vec![]);
    for (index, core) in cores.drain(..).enumerate() {
        launch.0.push(Arc::new(Worker {
            handle: handle.clone(),
            index,
            core: AtomicCell::new(Some(core)),
        }));
    }
    (handle, launch)

MultiThread::newは上記のArc<Handle>,LaunchMultiThreadを返して終了する。
MultiThreadの定義は以下のようになっており、block_onshutdownが生えている。

rust
/// Work-stealing based thread pool for executing futures.
pub(crate) struct MultiThread;
rust
pub(crate) fn block_on<F>(&self, handle: &scheduler::Handle, future: F) -> F::Output
where
    F: Future,
{
    let mut enter = crate::runtime::context::enter_runtime(handle, true);
    enter
        .blocking
        .block_on(future)
        .expect("failed to park thread")
}

pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
    match handle {
        scheduler::Handle::MultiThread(handle) => handle.shutdown(),
        _ => panic!("expected MultiThread scheduler"),
    }
}

再度build_threaded_runtimeまで戻り続きを見る。

先も述べたようにHandleを wrap し、enterを実行。
その後、launch.launch()を実行し、blocking_poolhandleからfrom_partsで作ったRuntimeを返し終了だ。

rust
let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
// Spawn the thread pool workers
let _enter = handle.enter();
launch.launch();
Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))

handle.enterを追っていくと以下のようにスレッドローカルにHandleをセットしている。

rust
pub(crate) fn try_set_current(handle: &scheduler::Handle) -> Option<SetCurrentGuard> {
    CONTEXT.try_with(|ctx| ctx.set_current(handle)).ok()
}

CONTEXT以下のようになっており、handlespawingdriversにアクセスするため、現在のruntime handleを保持するような仕組みのようだ。

rust
static CONTEXT: Context = {
    Context {
        #[cfg(feature = "rt")]
        thread_id: Cell::new(None),
        /// Tracks the current runtime handle to use when spawning,
        /// accessing drivers, etc...
        #[cfg(feature = "rt")]
        handle: RefCell::new(None),
        #[cfg(feature = "rt")]
        current_task_id: Cell::new(None),
        // ....
    }
}

fn launch(mut self)

次にlaunch.launch()を見ていく。

launchは以下のようにruntime::spawn_blockingworkerblocking処理として立ち上げていっている。

launch内のWorkerがデフォルトではcpu分生成されていたので、ここでcpu数分のworkerが立ち上がるということになる。

ちなみに手元のM2 MacBook Airでは8個立ち上がった。

rust
pub(crate) fn launch(mut self) {
    for worker in self.0.drain(..) {
        runtime::spawn_blocking(move || run(worker));
    }
}

runtime::spawn_blockingは以下のようになっている。
先程保存したCONTEXT内のHandleを取り出し、そこからspawn_blockingを呼んでいる。

rust
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
    F: FnOnce() -> R + Send + 'static,
    R: Send + 'static,
{
    let rt = Handle::current();
    rt.spawn_blocking(func)
}

更に追っていくと、まずspawn_blocking_innerにてfuncBlockingSheduleで wrap される。task::unownedUnownedTaskとなり、spawn_taskに渡されるようだ。

rust
pub(crate) fn spawn_blocking_inner<F, R>(
    &self,
    func: F,
    is_mandatory: Mandatory,
    name: Option<&str>,
    rt: &Handle,
) -> (JoinHandle<R>, Result<(), SpawnError>)
where
    F: FnOnce() -> R + Send + 'static,
    R: Send + 'static,
{
    let fut = BlockingTask::new(func);
    let id = task::Id::next();
    let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);
    let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
    (handle, spawned)
}

BlockingTask<T>

まずはfuncBlockingTaskで wrap している。確認のために書くが現時点でfuncの実体は|| run(worker)のはず。

そして、こいつはFutureを実装している。
coop::stop()を無視すれば、基本的にはtakeしてPoll::Readyで返しているだけだ。
つまりpollされるがPendingが返ることはない。

rust
impl<T, R> Future for BlockingTask<T>
where
    T: FnOnce() -> R + Send + 'static,
    R: Send + 'static,
{
    type Output = R;

    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<R> {
        let me = &mut *self;
        let func = me
            .func
            .take()
            .expect("[internal exception] blocking task ran twice.");

        // This is a little subtle:
        // For convenience, we'd like _every_ call tokio ever makes to Task::poll() to be budgeted
        // using coop. However, the way things are currently modeled, even running a blocking task
        // currently goes through Task::poll(), and so is subject to budgeting. That isn't really
        // what we want; a blocking task may itself want to run tasks (it might be a Worker!), so
        // we want it to start without any budgeting.
        crate::runtime::coop::stop();

        Poll::Ready(func())
    }
}

次にtask::unownedを覗いてみる。 コメントにはOwnedTasksに格納されないタスクを使うときのみ使用され、blocking_taskのみがこれを使用するとある。

rust
pub(crate) fn unowned<T, S>(task: T, scheduler: S, id: Id) -> (UnownedTask<S>, JoinHandle<T::Output>)
where
    S: Schedule,
    T: Send + Future + 'static,
    T::Output: Send + 'static,
{
    let (task, notified, join) = new_task(task, scheduler, id);
    // This transfers the ref-count of task and notified into an UnownedTask.
    // This is valid because an UnownedTask holds two ref-counts.
    let unowned = UnownedTask {
        raw: task.raw,
        _p: PhantomData,
    };
    std::mem::forget(task);
    std::mem::forget(notified);
    (unowned, join)
}

RawTaskまわりやNotified、またそれらをmem::forgetしてる理由などがいまいちわからなかった。

おそらくRawTaskが持っているHeaderUnownedTaskと同期間生存させようとしているのかもしれない。

ちなみにUnownedTaskDropは以下のようになっているので、この処理を紐解いていくと分かるのかもしれない。確かにvtable経由でRawTaskdealllocを呼んでいる。

このあたりは再度理解を試みたい。

rust
impl<S: 'static> Drop for UnownedTask<S> {
    fn drop(&mut self) {
        // Decrement the ref count
        if self.raw.header().state.ref_dec_twice() {
            // Deallocate if this is the final ref count
            self.raw.dealloc();
        }
    }
}

new_taskは以下のようになっておりRawTask,Notified,JoinHandleを作成するだけのようだ。

rust
    fn new_task<T, S>(
        task: T,
        scheduler: S,
        id: Id,
    ) -> (Task<S>, Notified<S>, JoinHandle<T::Output>)
    where
        S: Schedule,
        T: Future + 'static,
        T::Output: 'static,
    {
        let raw = RawTask::new::<T, S>(task, scheduler, id);
        let task = Task {
            raw,
            _p: PhantomData,
        };
        let notified = Notified(Task {
            raw,
            _p: PhantomData,
        });
        let join = JoinHandle::new(raw);

        (task, notified, join)
    }

RawTask::newでは以下のようにtaskschedulerを wrap してpointerに変換している。

rust
pub(super) fn new<T, S>(task: T, scheduler: S, id: Id) -> RawTask
where
    T: Future,
    S: Schedule,
{
    let ptr = Box::into_raw(Cell::<_, S>::new(task, scheduler, State::new(), id));
    let ptr = unsafe { NonNull::new_unchecked(ptr as *mut Header) };
    RawTask { ptr }
}

不明点が多くなってきたが、UnownedTaskが更にTaskで wrap され、spawn_taskに渡されるように見える。

spawn_taskはいくつか省略しているがだいたい以下のような感じだ。

sharedからlockをとって、local_queuetaskpush_backする。

idle状態のthreadがなく、かつ、thread数の上限に到達していなければspawn_threadspawnし成功すればmetricsthread数を加算する。

(WorkerMetricsについてはスルーしてきたが、idle状態のthread数やthreadの数、local_queueの深さなどをカウントするような責務をもっているように見える。)

その後spawn_threadspawnする。

rust
fn spawn_task(&self, task: Task, rt: &Handle) -> Result<(), SpawnError> {
    let mut shared = self.inner.shared.lock();
    // ...
    shared.queue.push_back(task);
    self.inner.metrics.inc_queue_depth();
    if self.inner.metrics.num_idle_threads() == 0 {
        // No threads are able to process the task.
        if self.inner.metrics.num_threads() == self.inner.thread_cap {
            // At max number of threads
        } else {
            // ...
                let id = shared.worker_thread_index;
                match self.spawn_thread(shutdown_tx, rt, id) {
                    Ok(handle) => {
                        self.inner.metrics.inc_num_threads();
                        shared.worker_thread_index += 1;
                        shared.worker_threads.insert(id, handle);
                    }
                    Err(e) => {
                        return Err(/*...*/);
                    }
                }
            //
        }
    } else {
        // ....
    }
    Ok(())
}

spawn_threadは以下のようになっており、ようやくstd::threadが現れる。
ここでは、シンプルにthread::Builderthreadを作りspawnしている。
その後前述したようにCONTEXTHandleを登録し、blocking::Spawnerinner.run()を実行する。

rust
fn spawn_thread(
    &self,
    shutdown_tx: shutdown::Sender,
    rt: &Handle,
    id: usize,
) -> std::io::Result<thread::JoinHandle<()>> {
    let mut builder = thread::Builder::new().name((self.inner.thread_name)());
    if let Some(stack_size) = self.inner.stack_size {
        builder = builder.stack_size(stack_size);
    }
    let rt = rt.clone();
    builder.spawn(move || {
        // Only the reference should be moved into the closure
        let _enter = rt.enter();
        rt.inner.blocking_spawner().inner.run(id);
        drop(shutdown_tx);
    })
}

runは以下。少し長いがポイントになりそうな箇所を絞って読んでいく。

Details
rust
    fn run(&self, worker_thread_id: usize) {
        if let Some(f) = &self.after_start {
            f()
        }

        let mut shared = self.shared.lock();
        let mut join_on_thread = None;

        'main: loop {
            // BUSY
            while let Some(task) = shared.queue.pop_front() {
                self.metrics.dec_queue_depth();
                drop(shared);
                task.run();

                shared = self.shared.lock();
            }

            // IDLE
            self.metrics.inc_num_idle_threads();

            while !shared.shutdown {
                let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();

                shared = lock_result.0;
                let timeout_result = lock_result.1;

                if shared.num_notify != 0 {
                    // We have received a legitimate wakeup,
                    // acknowledge it by decrementing the counter
                    // and transition to the BUSY state.
                    shared.num_notify -= 1;
                    break;
                }

                // Even if the condvar "timed out", if the pool is entering the
                // shutdown phase, we want to perform the cleanup logic.
                if !shared.shutdown && timeout_result.timed_out() {
                    // We'll join the prior timed-out thread's JoinHandle after dropping the lock.
                    // This isn't done when shutting down, because the thread calling shutdown will
                    // handle joining everything.
                    let my_handle = shared.worker_threads.remove(&worker_thread_id);
                    join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);

                    break 'main;
                }

                // Spurious wakeup detected, go back to sleep.
            }

            if shared.shutdown {
                // Drain the queue
                while let Some(task) = shared.queue.pop_front() {
                    self.metrics.dec_queue_depth();
                    drop(shared);

                    task.shutdown_or_run_if_mandatory();

                    shared = self.shared.lock();
                }

                // Work was produced, and we "took" it (by decrementing num_notify).
                // This means that num_idle was decremented once for our wakeup.
                // But, since we are exiting, we need to "undo" that, as we'll stay idle.
                self.metrics.inc_num_idle_threads();
                // NOTE: Technically we should also do num_notify++ and notify again,
                // but since we're shutting down anyway, that won't be necessary.
                break;
            }
        }

        // Thread exit
        self.metrics.dec_num_threads();

        // num_idle should now be tracked exactly, panic
        // with a descriptive message if it is not the
        // case.
        let prev_idle = self.metrics.dec_num_idle_threads();
        if prev_idle < self.metrics.num_idle_threads() {
            panic!("num_idle_threads underflowed on thread exit")
        }

        if shared.shutdown && self.metrics.num_threads() == 0 {
            self.condvar.notify_one();
        }

        drop(shared);

        if let Some(f) = &self.before_stop {
            f()
        }

        if let Some(handle) = join_on_thread {
            let _ = handle.join();
        }
    }

基本的には以下のloopとなっておりshutdownflag が立つまで、またはtimeoutするまでloopから抜けないようだ。 loop内の処理はBUSYIDLEで構成されている。

rust
'main: loop {
    // BUSY
    while let Some(task) = shared.queue.pop_front() {
        self.metrics.dec_queue_depth();
        drop(shared);
        task.run();
        shared = self.shared.lock();
    }

    // IDLE
    self.metrics.inc_num_idle_threads();
    while !shared.shutdown {
        let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
        shared = lock_result.0;
        let timeout_result = lock_result.1;
        if shared.num_notify != 0 {
            // We have received a legitimate wakeup,
            // acknowledge it by decrementing the counter
            // and transition to the BUSY state.
            shared.num_notify -= 1;
            break;
        }
        // Even if the condvar "timed out", if the pool is entering the
        // shutdown phase, we want to perform the cleanup logic.
        if !shared.shutdown && timeout_result.timed_out() {
            // We'll join the prior timed-out thread's JoinHandle after dropping the lock.
            // This isn't done when shutting down, because the thread calling shutdown will
            // handle joining everything.
            let my_handle = shared.worker_threads.remove(&worker_thread_id);
            join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);
            break 'main;
        }
        // Spurious wakeup detected, go back to sleep.
    }
    // ....
}
BUSY

基本的にはlocal_queueからタスクがなくなるまで実行するようだ。
taskの実行前にlockdropし、task完了後に取り直している。

rust
while let Some(task) = shared.queue.pop_front() {
    self.metrics.dec_queue_depth();
    drop(shared);
    task.run();
    shared = self.shared.lock();
}

task.runの実体は以下のように先程作成したRawTask経由でpollしている。

rust
pub(crate) fn run(self) {
    let raw = self.raw;
    mem::forget(self);
    // Transfer one ref-count to a Task object.
    let task = Task::<S> {
        raw,
        _p: PhantomData,
    };
    // Use the other ref-count to poll the task.
    raw.poll();
    // Decrement our extra ref-count
    drop(task);
}

RawTaskpollvtableを経由しHarnessを介して呼ばれる。

rust
pub(super) fn poll(self) {
    let vtable = self.header().vtable;
    unsafe { (vtable.poll)(self.ptr) }
}

unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {
    let harness = Harness::<T, S>::from_raw(ptr);
    harness.poll();
}

その後poll_innerを介し、更にpoll_futureguard.core.poll()にたどり着く。

そしてCorepollからようやくBlockingTaskpollが呼ばれるようだ。

以下がCore内のpoll

rust
pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> { 
    let res = {
        self.stage.stage.with_mut(|ptr| {
            // Safety: The caller ensures mutual exclusion to the field.
            let future = match unsafe { &mut *ptr } {
                Stage::Running(future) => future,
                _ => unreachable!("unexpected stage"),
            };
            // Safety: The caller ensures the future is pinned.
            let future = unsafe { Pin::new_unchecked(future) };
            let _guard = TaskIdGuard::enter(self.task_id);
            future.poll(&mut cx) 
        })
    };
    if res.is_ready() {
        self.drop_future_or_output();
    }
    res
}

ここで現れたCoreは何者かと振り返ってみたが、RawTask::newから呼ばれている以下で作成されていた。

つまりWorkerを作成するときにでてきたCoreとはまた別物のようだ。

rust
pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> {
    let result = Box::new(Cell {
        header: Header {
            state,
            queue_next: UnsafeCell::new(None),
            vtable: raw::vtable::<T, S>(),
            owner_id: UnsafeCell::new(0),
        },
        core: Core {
            scheduler,
            stage: CoreStage {
                stage: UnsafeCell::new(Stage::Running(future)),
            },
            task_id,
        },
        trailer: Trailer {
            waker: UnsafeCell::new(None),
            owned: linked_list::Pointers::new(),
        },
    });
}

先程のHarnessを介してpollしている箇所が出てきていたが、RawTaskHarnessをつないでいるのはこのvtableのようだ。 raw::vtableとそこで設定されるpollはそれぞれ以下のようになっている。

rust
pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
    &Vtable {
        poll: poll::<T, S>,
        schedule: schedule::<S>,
        dealloc: dealloc::<T, S>,
        try_read_output: try_read_output::<T, S>,
        drop_join_handle_slow: drop_join_handle_slow::<T, S>,
        drop_abort_handle: drop_abort_handle::<T, S>,
        shutdown: shutdown::<T, S>,
        trailer_offset: OffsetHelper::<T, S>::TRAILER_OFFSET,
        scheduler_offset: OffsetHelper::<T, S>::SCHEDULER_OFFSET,
        id_offset: OffsetHelper::<T, S>::ID_OFFSET,
    }
}
rust
unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {
    let harness = Harness::<T, S>::from_raw(ptr);
    harness.poll();
}

次にIDLEのブロックを見ていく。

IDLE

まずはidlethread数を増加させ、shutdownになるまでloopする。
とはいえ、基本的にはCondvarで待ちつつtimeoutしたらshutdown状態じゃなくともworker_threadを’掃除するように見える。

rust
// IDLE
self.metrics.inc_num_idle_threads();
while !shared.shutdown {
    let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
    shared = lock_result.0;
    let timeout_result = lock_result.1;
    if shared.num_notify != 0 {
        // We have received a legitimate wakeup,
        // acknowledge it by decrementing the counter
        // and transition to the BUSY state.
        shared.num_notify -= 1;
        break;
    }
    // Even if the condvar "timed out", if the pool is entering the
    // shutdown phase, we want to perform the cleanup logic.
    if !shared.shutdown && timeout_result.timed_out() {
        // We'll join the prior timed-out thread's JoinHandle after dropping the lock.
        // This isn't done when shutting down, because the thread calling shutdown will
        // handle joining everything.
        let my_handle = shared.worker_threads.remove(&worker_thread_id);
        join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);
        break 'main;
    }
    // Spurious wakeup detected, go back to sleep.
}

不明点も増えてきたが、少なくともworkerBlockingTaskとしてpollされるまでの流れは見えてきた。 なので、launch.launch()まで戻ってrun(worker)を見ていく。

fn run(worker: Arc<Worker>)

runのは以下のようになっている。
worker.coreの取得、worker.handleからHandleを作成し、enter_runtimeCONTEXTHandleを登録する。

さらにworker::Contextを作成し、それをCURRENT.setCURRENTというthread_localにセットしつつ、cx.run(core)で実行する。

rust
fn run(worker: Arc<Worker>) {
    // ...
    let core = match worker.core.take() {
        Some(core) => core,
        None => return,
    };

    let handle = scheduler::Handle::MultiThread(worker.handle.clone());
    let _enter = crate::runtime::context::enter_runtime(&handle, true);

    // Set the worker context.
    let cx = Context {
        worker,
        core: RefCell::new(None),
    };

    CURRENT.set(&cx, || {
        assert!(cx.run(core).is_err());
        wake_deferred_tasks();
    });
}

CURRENTは以下のような定義になっており、型ごとにscopeを切ってthread_localへの参照を保持しているように見える。

CURRENTには今回、Arc<Worker>,RefCell<Option<Box<Core>>>を格納しており、これをどう使っていくかは読み進めていけば次回以降わかるかもしれない。

rust
// Tracks thread-local state
scoped_thread_local!(static CURRENT: Context);

/// Sets a reference as a thread-local.
macro_rules! scoped_thread_local {
    ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty) => (
        $(#[$attrs])*
        $vis static $name: $crate::macros::scoped_tls::ScopedKey<$ty>
            = $crate::macros::scoped_tls::ScopedKey {
                inner: {
                    tokio_thread_local!(static FOO: ::std::cell::Cell<*const ()> = const {
                        std::cell::Cell::new(::std::ptr::null())
                    });
                    &FOO
                },
                _marker: ::std::marker::PhantomData,
            };
    )
}

fn run(&self, mut core: Box<Core>) -> RunResult

これでようやくworkerのメイン処理まで到達した。

Details
rust
 fn run(&self, mut core: Box<Core>) -> RunResult {
        while !core.is_shutdown {
            // Increment the tick
            core.tick();

            // Run maintenance, if needed
            core = self.maintenance(core);

            // First, check work available to the current worker.
            if let Some(task) = core.next_task(&self.worker) {
                core = self.run_task(task, core)?;
                continue;
            }

            // There is no more **local** work to process, try to steal work
            // from other workers.
            if let Some(task) = core.steal_work(&self.worker) {
                core = self.run_task(task, core)?;
            } else {
                // Wait for work
                core = if did_defer_tasks() {
                    self.park_timeout(core, Some(Duration::from_millis(0)))
                } else {
                    self.park(core)
                };
            }
        }

        core.pre_shutdown(&self.worker);

        // Signal shutdown
        self.worker.handle.shutdown_core(core);
        Err(())
    }
    // ...
}

重要そうな部分をピックアップして以下に記載しておく。

ざっと見た感じ、shutdownされるまで以下を行っているように見える。

    1. tick
    1. maintenance
    1. local_queueから次のタスクを探して実行
    1. local_queueが無くなったら steal を試みる
    1. stealできるものがなければpark

本当はmaintenancesteal_workなどの挙動に興味があるので読み進めたいが、ここまでで結構な分量になっているのと、今回目的としているコードではtaskworkerで処理されることはなく、立ち上げられたworkerはみんなすぐparkしてしまうので今回はスルーし次回以降の課題としておく。

rust
fn run(&self, mut core: Box<Core>) -> RunResult {
    while !core.is_shutdown {
        core.tick();
        core = self.maintenance(core);
        // First, check work available to the current worker.
        if let Some(task) = core.next_task(&self.worker) {
            core = self.run_task(task, core)?;
            continue;
        }
        // There is no more **local** work to process, try to steal work
        // from other workers.
        if let Some(task) = core.steal_work(&self.worker) {
            core = self.run_task(task, core)?;
        } else {
            // Wait for work
            core = if did_defer_tasks() {
                self.park_timeout(core, Some(Duration::from_millis(0)))
            } else {
                self.park(core)
            };
        }
    }
    // ...
}

これでlaunchの流れは分かったのでRuntime::newの大枠の流れもつかめたように思う。
workerunparkなどは次回の課題としつつmainまで戻りblock_onを読んでいく。

rt.block_on(Hello::new())

Runtimeblock_onenterし、Schedulerを選択しているだけのようだ。

rust
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
    let _enter = self.enter();
    match &self.scheduler {
        Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
        #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
        Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
    }
}

exec.block_on(&self.handle.inner, future)は前述した以下のMultiThreadblock_onを呼んでいる。 ここではcontext::enter_runtimeのちblocking.block_onを実行している。

rust
pub(crate) fn block_on<F>(&self, handle: &scheduler::Handle, future: F) -> F::Output
where
    F: Future,
{
    let mut enter = crate::runtime::context::enter_runtime(handle, true);
    enter
        .blocking
        .block_on(future)
        .expect("failed to park thread")
}

fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError>

さらにblock_onを掘っていくとruntime/park.rsCachedParkThreadblocK_onにたどり着く。以下だ。

coop::budgetdefer.wakeなど不明な箇所もあるが、大筋は教科書とおりだ。wakerを取得しContextを作る。FuturePin止めし、Readyを返すまでpollparkloopで回す。ということをやっている。

rust
pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
    use std::task::Context;
    use std::task::Poll::Ready;
    // `get_unpark()` should not return a Result
    let waker = self.waker()?;
    let mut cx = Context::from_waker(&waker);
    pin!(f);
    loop {
        if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
            return Ok(v);
        }
        // Wake any yielded tasks before parking in order to avoid blocking.
        crate::runtime::context::with_defer(|defer| defer.wake());
        self.park();
    }
}

まずはself.wakerを見てみる。
self.unpark経由でunparkを取得しそこからWakerに変換している。

rust
pub(crate) fn waker(&self) -> Result<Waker, AccessError> {
    self.unpark().map(|unpark| unpark.into_waker())
}

unparkCURRENT_PARKER経由で取得している。

rust
fn unpark(&self) -> Result<UnparkThread, AccessError> {
    self.with_current(|park_thread| park_thread.unpark())
}

fn with_current<F, R>(&self, f: F) -> Result<R, AccessError>
where
    F: FnOnce(&ParkThread) -> R,
{
    CURRENT_PARKER.try_with(|inner| f(inner))
}

CURRENT_PARKERは以下のように作成されており、AtomicUsize,Mutex,CondvarのセットとなっておりAtomicUsizeEMPTY,PARKEDNOTIFIEDの 3 つの状態を表しているようだ。

rust
tokio_thread_local! {
    static CURRENT_PARKER: ParkThread = ParkThread::new();
}

impl ParkThread {
    pub(crate) fn new() -> Self {
        Self {
            inner: Arc::new(Inner {
                state: AtomicUsize::new(EMPTY),
                mutex: Mutex::new(()),
                condvar: Condvar::new(),
            }),
        }
    }
}

unparkはこのInnerUnparkThreadwrapし、into_wakerWakerに変換される。
これでself.waker()からWakerをどうやって取得しているのかが分かった。

rust
pub(crate) fn unpark(&self) -> UnparkThread {
    let inner = self.inner.clone();
    UnparkThread { inner }
}

impl UnparkThread {
    pub(crate) fn into_waker(self) -> Waker {
        unsafe {
            let raw = unparker_to_raw_waker(self.inner);
            Waker::from_raw(raw)
        }
    }
}

unparker_to_raw_wakerでは以下のようにRawWakerを作成している。

RawWaker::newの第一引数はwakeなどが受けとるデータへのポインタなので、wakeなどはInnerへのポインタを受け取ることになる。

rust
unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
    RawWaker::new(
        Inner::into_raw(unparker),
        &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
    )
}

RawWakerVTableが出てきたのでwake_by_refの実装だけ見ておくこととする。
前述したように第一引数にInnerへのポインタが渡されるので、そこからInnerに変換しunparkを呼ぶ。

最後にmem::forgetしているのは、実際にはcloneされたInner(ここではunparker)をReactorが所持しているにも関わらずdropが走り、参照カウントが意図せずデクリメントされるのを防ぐためだと理解している。これはRawWakerdata*const ()なのでArc::into_rawで変換する必要があり、この際にArcforgetされるためだ。(つまりArc::into_rawした場合はleakを許容するか、自分で参照カウントを管理するかのいずれかになるという理解。)

rust
unsafe fn wake_by_ref(raw: *const ()) {
    let unparker = Inner::from_raw(raw);
    unparker.unpark();

    // We don't actually own a reference to the unparker
    mem::forget(unparker);
}

これでwakerを作るまでが分かった。
取得したWakerContextで wrap され、pollにわたす準備ができる。

次にPin止めしているが、基本的にはpin_utilsやっていることは大差なさそう

その後loopに入っていく。

crate::runtime::coop::budgetというのはpollの予算のようでpollごとにdecrementしていくような仕組みに見える。が今回はdecrementするようなパスが見当たらなかったので、次回以降の課題としておく。

そしてようやくpollだ。

rust
loop { 
    if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) { 
        return Ok(v); 
    } 
    // Wake any yielded tasks before parking in order to avoid
    // blocking.
    #[cfg(feature = "rt")]
    crate::runtime::context::with_defer(|defer| defer.wake());
    self.park();
} 

先程用意したContextを渡し実行することで今回のターゲットとしている以下のコードが呼び出される。

これによりHello が表示され、状態遷移する。

また、渡されたContextから取り出したwaker経由でwake_by_refを呼ぶことでRawWakerVTableに設定したwake_by_refが発火し、その後Poll::Pendingが返る。

rust
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {  
    match (*self).state { 
        StateHello::Hello => { 
            println!("Hello "); 
            (*self).state = StateHello::World; 
            cx.waker().wake_by_ref(); 
            Poll::Pending 
        } 
        StateHello::World => {
            println!("World!");
            Poll::Ready(())
        }
    }
}

wake_by_refとは、すなわち前述した以下のコードだ。

rust
unsafe fn wake_by_ref(raw: *const ()) {
    let unparker = Inner::from_raw(raw);
    unparker.unpark();

    // We don't actually own a reference to the unparker
    mem::forget(unparker);
}

unparker.unpark()を掘ってもいいが、先にparkを見ておいたほうが分かり易そうなので先回りして見ておく。

rust
loop { 
    if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
        return Ok(v);
    }
    // Wake any yielded tasks before parking in order to avoid
    // blocking.
    #[cfg(feature = "rt")]
    crate::runtime::context::with_defer(|defer| defer.wake());
    self.park(); 
} 

self.parkは以下のようになっており、更に掘っていくとInnerparkにたどり着く。

rust
pub(crate) fn park(&mut self) {
    self.with_current(|park_thread| park_thread.inner.park())
        .unwrap();
}

fn park(&self)

コードは以下に畳んで掲載し、小分けにして見ていく。

Details
rust
    fn park(&self) {
        // If we were previously notified then we consume this notification and
        // return quickly.
        if self
            .state
            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
            .is_ok()
        {
            return;
        }

        // Otherwise we need to coordinate going to sleep
        let mut m = self.mutex.lock();

        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
            Ok(_) => {}
            Err(NOTIFIED) => {
                // We must read here, even though we know it will be `NOTIFIED`.
                // This is because `unpark` may have been called again since we read
                // `NOTIFIED` in the `compare_exchange` above. We must perform an
                // acquire operation that synchronizes with that `unpark` to observe
                // any writes it made before the call to unpark. To do that we must
                // read from the write it made to `state`.
                let old = self.state.swap(EMPTY, SeqCst);
                debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");

                return;
            }
            Err(actual) => panic!("inconsistent park state; actual = {}", actual),
        }

        loop {
            m = self.condvar.wait(m).unwrap();

            if self
                .state
                .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
                .is_ok()
            {
                // got a notification
                return;
            }

            // spurious wakeup, go back to sleep
        }
    }

InnerAtomicUsizeは 3 つの状態を表現していると前述したが、以下のように通知状態を検査している。

ここはコメントにある通り、Innerparkに到達したがすでに通知済である場合にwaitすることなく再度pollするため、早期 returnしているようだ。

今回ターゲットにしているコードではpollの中で即座にwake_by_refが呼ばれているので恐らくここが真になり早期returnになるものと推測される。

rust
// If we were previously notified then we consume this notification and
// return quickly.
if self
    .state
    .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
    .is_ok()
{
    return;
}

上記においてreturnしない想定で読みすすめると、次にlockをとり、statePARKEDにし、parkの準備にはいる。

ここでコメントにあるようにunparkが再度実行されているケースを考慮し、再度stateNOTIFIEDではないかチェックし、通知済であればstateEMPTYにし早期returnする。

このあたりはGuard run_executor of local_pool.rs against use of park / unpark in user-code. #2010と同様の話のように見える。

このチェックを怠るとunparkの呼び出し回数とstateが食い違い二度と起きないケースが発生するように思う。

rust
// Otherwise we need to coordinate going to sleep
let mut m = self.mutex.lock();
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
    Ok(_) => {}
    Err(NOTIFIED) => {
        // We must read here, even though we know it will be `NOTIFIED`.
        // This is because `unpark` may have been called again since we read
        // `NOTIFIED` in the `compare_exchange` above. We must perform an
        // acquire operation that synchronizes with that `unpark` to observe
        // any writes it made before the call to unpark. To do that we must
        // read from the write it made to `state`.
        let old = self.state.swap(EMPTY, SeqCst);
        debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
        return;
    }
    Err(actual) => panic!("inconsistent park state; actual = {}", actual),
}

あとは定番のloopspurious wakeupをケアしつつCondvar.waitするパターンだ。
waitを抜けたあとstateEMPTYへのcompare_exchangeを行いreturnすることにより、再度pollが実行されるはずだ。

rust
loop {
    m = self.condvar.wait(m).unwrap();
    if self
        .state
        .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
        .is_ok()
    {
        // got a notification
        return;
    }
    // spurious wakeup, go back to sleep
}

parkは以上なのでunpark側に戻るとする。

fn unpark(&self)

unparker.unpark()はすなわちInnerunparkだ。
parkを先に読んだことである程度想像はつくかと思うが、以下のようになっている。

rust
fn unpark(&self) {
    // To ensure the unparked thread will observe any writes we made before
    // this call, we must perform a release operation that `park` can
    // synchronize with. To do that we must write `NOTIFIED` even if `state`
    // is already `NOTIFIED`. That is why this must be a swap rather than a
    // compare-and-swap that returns if it reads `NOTIFIED` on failure.
    match self.state.swap(NOTIFIED, SeqCst) {
        EMPTY => return,    // no one was waiting
        NOTIFIED => return, // already unparked
        PARKED => {}        // gotta go wake someone up
        _ => panic!("inconsistent state in unpark"),
    }
    // There is a period between when the parked thread sets `state` to
    // `PARKED` (or last checked `state` in the case of a spurious wake
    // up) and when it actually waits on `cvar`. If we were to notify
    // during this period it would be ignored and then when the parked
    // thread went to sleep it would never wake up. Fortunately, it has
    // `lock` locked at this stage so we can acquire `lock` to wait until
    // it is ready to receive the notification.
    //
    // Releasing `lock` before the call to `notify_one` means that when the
    // parked thread wakes it doesn't get woken only to have to wait for us
    // to release `lock`.
    drop(self.mutex.lock());
    self.condvar.notify_one()
}

まずはstateNOTIFIEDswapする。
すでにEMPTYNOTIFIEDの場合は何もせずreturnし、PARKEDの場合だけ次に通知処理に移る。

rust
// To ensure the unparked thread will observe any writes we made before
// this call, we must perform a release operation that `park` can
// synchronize with. To do that we must write `NOTIFIED` even if `state`
// is already `NOTIFIED`. That is why this must be a swap rather than a
// compare-and-swap that returns if it reads `NOTIFIED` on failure.
match self.state.swap(NOTIFIED, SeqCst) {
    EMPTY => return,    // no one was waiting
    NOTIFIED => return, // already unparked
    PARKED => {}        // gotta go wake someone up
    _ => panic!("inconsistent state in unpark"),
}

まずコメントの冒頭には、statePARKEDになる隙間で通知が飛んでしまうことを防ぐ旨が書いてある。 これはpark側のlet mut m = self.mutex.lock(); match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst)の部分を指しているように見える。

後半はlockdropしてから通知を出さないと無駄なlockが解放されるまでの無駄な待ちが入ってしまうから。という旨が書いてあるように見える。確かにこのlockPARKEDになるまで待つ意図なので通知までにdropしてしまってもいいのか。

lockを解放したらnotify_oneで通知して終了だ。

rust
// There is a period between when the parked thread sets `state` to
// `PARKED` (or last checked `state` in the case of a spurious wake
// up) and when it actually waits on `cvar`. If we were to notify
// during this period it would be ignored and then when the parked
// thread went to sleep it would never wake up. Fortunately, it has
// `lock` locked at this stage so we can acquire `lock` to wait until
// it is ready to receive the notification.
//
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
drop(self.mutex.lock());

self.condvar.notify_one()

これでpark,unparkは読めた。
もう一度block_onまで戻って眺めてみると全体像が分かる。
budgetcontext::with_deferは今後見ていくとして今回ターゲットとしているコードの大枠は以下の流れだ。

    1. wakerを取得する
    1. Contextwakerを wrap する
    1. FuturePin止めする
    1. 3 のas_mutをとってpollする。
    1. Hello が表示される
    1. wake_by_refすなわちunparkが呼ばれる
    1. parkするがすでにNOTIFIEDなので再度pollする
    1. World!が表示され、Readyが返る
    1. 終了
rust
pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
    use std::task::Context;
    use std::task::Poll::Ready;
    // `get_unpark()` should not return a Result
    let waker = self.waker()?;
    let mut cx = Context::from_waker(&waker);
    pin!(f);
    loop {
        if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
            return Ok(v);
        }
        // Wake any yielded tasks before parking in order to avoid
        // blocking.
        #[cfg(feature = "rt")]
        crate::runtime::context::with_defer(|defer| defer.wake());
        self.park();
    }
}

おそらくこのあと、shutdown状態となり各threadの後片付けが行われていくのだろうが、今回は skip する。

まとめ

ひとまず今回の goal はある程度達成できたが、不明点も多くでてきたので継続して見ていきたい。

特に今回はworkerが仕事をしていなかったり、mioまでたどり着いたりしていなかったり、肝心のsteal周りをスキップしたりしたので今後、理解を深めたい。次のステップとしてはsmolを読んでみたり、spawn_blockingを読んでみたりsleepを読んでみたりを試そうかと思っている。

以上。