tokio-rs/tokio を読む(1)
2023-03-07
最近tokio-rs/tokio
をちゃんと理解したいと思いちょっとずつ読んでいる。
一気に理解するのは難しく色々と発散してしまいそうなので、ちょっとずつ本記事ような形態でまとめていくことにした。
読むのは以下の version とする。
目次
Goals
今回は以下のコードで何が起こっているか理解することを目標とする。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct Hello {
state: StateHello,
}
enum StateHello {
Hello,
World,
}
impl Hello {
fn new() -> Self {
Hello {
state: StateHello::Hello,
}
}
}
impl Future for Hello {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
match (*self).state {
StateHello::Hello => {
println!("Hello ");
(*self).state = StateHello::World;
cx.waker().wake_by_ref();
Poll::Pending
}
StateHello::World => {
println!("World!");
Poll::Ready(())
}
}
}
}
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(Hello::new());
}
tokio::runtime::Runtime::new()
まずはRuntime
作成を一旦流す。Runtime
自身は以下のような構造体で default すなわちrt-multi-thread
であればBuilder
を介してbuild_threaded_runtimeで作成される。
pub fn new() -> std::io::Result<Runtime> {
Builder::new_multi_thread().enable_all().build()
}
pub struct Runtime {
/// Task scheduler
scheduler: Scheduler,
/// Handle to runtime, also contains driver handles
handle: Handle,
/// Blocking pool handle, used to signal shutdown
blocking_pool: BlockingPool,
}
fn new_multi_thread() -> Builder
Builder
は以下のように初期化される。
pub fn new_multi_thread() -> Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThread, 61, 61)
}
この61
という数字はglobal_queue
のタスクを確認する周期のようでgolang
由来のようだ。golang
の該当箇所は以下のようになっていた。
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
ただ、この数字の根拠は分からず調べて見ると以下のスライドや動画が見つかった。
スライドには以下のように書いてある。
Why 61?
It is not even 42! ¯\_(ツ)_/¯
Want something:
● not too small
● not too large
● prime to break any patterns
global queue
を覗きにいくタイミングなので小さすぎるとlocal_queue
よりglobal_queue
のほうが優先されすぎてlocal_queue
が捌けていかないし、大きすぎるとglobal_queue
のタスクがなかなか実行されない。
そのへんのバランスを考えると64
あたりになるが、2
の倍数などで設定しまうとアプリケーション/CPU
の周波数と同期し、意図しない偏りを産む懸念があり、素数にしている。と解釈した。
たとえば64
にした場合、具体的にどのような問題として健在化するのかはわからないが、ちょうど読んでいた「詳解システムパフォーマンス 6.5.4 プロファイリング」にもサンプリング周波数は99Hz
にする話が載っていた。これと似たような話なのかと思っている。
頻度を 99Hz にしているのは、CPU と歩調が揃ってプロファイルに偏りができるのを防ぐためである
実際にglobal_queue
を覗く処理には今回はたどり着かないので次回以降で見ていこうと思う。
fn build_threaded_runtime(&mut self) -> io::Result<Runtime>
Details
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, runtime::Scheduler};
use crate::runtime::scheduler::{self, MultiThread};
let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
// Create the blocking pool
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();
// Generate a rng seed for this runtime.
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();
let (scheduler, handle, launch) = MultiThread::new(
core_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: seed_generator_1,
},
);
let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
// Spawn the thread pool workers
let _enter = handle.enter();
launch.launch();
Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
}
まずは指定された数もしくはcpu
のコア数からworker
の数を算出している。
let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
次にDriver
を作成するが、今回対象としているコードではDriver
は登場しないように見えたので scope 外にしておく。
Driver
というのはIO
,time
,signal
等を駆動する君に見える。最終的にはRuntime.handle
の中にdriver
としてdriver::Handle
が保持されるようだ。
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
次にblocking_pool
を作成し、spawner
をclone
しておく。
引数はthread
数の上限のようで、ここではdefault
の512
+cpu数
が設定されている。
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();
fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool
BlockingPool
自体は以下のようになっており更にSpawner
などを見ていく。
pub(crate) struct BlockingPool {
spawner: Spawner,
shutdown_rx: shutdown::Receiver,
}
Spawner
自身はArc
で包んだInner
を持っており、Inner
は以下のようになっている。thread_name
やstack_size
を持っており、まさにspawn
をするための情報を持っているようだ。after_start
,before_stop
はコメント(Call after a thread starts
)からthread
開始後、停止前の hook のようだ。
#[derive(Clone)]
pub(crate) struct Spawner {
inner: Arc<Inner>,
}
struct Inner {
/// State shared between worker threads.
shared: Mutex<Shared>,
/// Pool threads wait on this.
condvar: Condvar,
/// Spawned threads use this name.
thread_name: ThreadNameFn,
/// Spawned thread stack size.
stack_size: Option<usize>,
/// Call after a thread starts.
after_start: Option<Callback>,
/// Call before a thread stops.
before_stop: Option<Callback>,
// Maximum number of threads.
thread_cap: usize,
// Customizable wait timeout.
keep_alive: Duration,
// Metrics about the pool.
metrics: SpawnerMetrics,
}
Mutex<Shared>
を見てみると以下のようになっている。 Task
の実体はBlockingSchedule
となっておりblocking
タスクを共有するキューがqueue: VecDeque<Task>
だろうか。
今回は scope 外とするが、spawn_blocking
を読んだときの流れを読んでいくとわかりそうだ。
あとはshutdown_tx
やworker_threads: HashMap<usize, thread::JoinHandle<()>>,
など shutdown 処理に必要なものが多くありそうだ。
struct Shared {
queue: VecDeque<Task>,
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
/// Prior to shutdown, we clean up JoinHandles by having each timed-out
/// thread join on the previous timed-out thread. This is not strictly
/// necessary but helps avoid Valgrind false positives, see
/// <https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666>
/// for more information.
last_exiting_thread: Option<thread::JoinHandle<()>>,
/// This holds the JoinHandles for all running threads; on shutdown, the thread
/// calling shutdown handles joining on these.
worker_threads: HashMap<usize, thread::JoinHandle<()>>,
/// This is a counter used to iterate worker_threads in a consistent order (for loom's
/// benefit).
worker_thread_index: usize,
}
pub(crate) struct Task {
task: task::UnownedTask<BlockingSchedule>,
mandatory: Mandatory,
}
さて、肝心のblocking::create_blocking_pool
まで遡ると実体は以下のようだ。
つまり先まで見てきた構造を初期値で埋めているだけだ。
またblocking_pool.spawner().clone()
はつまりArc::clone
であることがわかる。
pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
let (shutdown_tx, shutdown_rx) = shutdown::channel();
let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE);
BlockingPool {
spawner: Spawner {
inner: Arc::new(Inner {
shared: Mutex::new(Shared {
queue: VecDeque::new(),
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
last_exiting_thread: None,
worker_threads: HashMap::new(),
worker_thread_index: 0,
}),
condvar: Condvar::new(),
thread_name: builder.thread_name.clone(),
stack_size: builder.thread_stack_size,
after_start: builder.after_start.clone(),
before_stop: builder.before_stop.clone(),
thread_cap,
keep_alive,
metrics: Default::default(),
}),
},
shutdown_rx,
}
}
build_threaded_runtime
に戻りMultiThread::new
を見ていく。
let (scheduler, handle, launch) = MultiThread::new(
core_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: seed_generator_1,
},
);
MultiThread::new
はほぼworker::create
に移譲しているようなのでそちらを見ていく。
fn create(size: usize, park: Parker, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config) -> (Arc<Handle>, Launch)
長いので以下に畳んで記載しておく。
Details
pub(super) fn create(
size: usize,
park: Parker,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
) -> (Arc<Handle>, Launch) {
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
let mut worker_metrics = Vec::with_capacity(size);
// Create the local queues
for _ in 0..size {
let (steal, run_queue) = queue::local();
let park = park.clone();
let unpark = park.unpark();
cores.push(Box::new(Core {
tick: 0,
lifo_slot: None,
run_queue,
is_searching: false,
is_shutdown: false,
park: Some(park),
metrics: MetricsBatch::new(),
rand: FastRand::new(config.seed_generator.next_seed()),
}));
remotes.push(Remote { steal, unpark });
worker_metrics.push(WorkerMetrics::new());
}
let handle = Arc::new(Handle {
shared: Shared {
remotes: remotes.into_boxed_slice(),
inject: Inject::new(),
idle: Idle::new(size),
owned: OwnedTasks::new(),
shutdown_cores: Mutex::new(vec![]),
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: worker_metrics.into_boxed_slice(),
},
driver: driver_handle,
blocking_spawner,
seed_generator,
});
let mut launch = Launch(vec![]);
for (index, core) in cores.drain(..).enumerate() {
launch.0.push(Arc::new(Worker {
handle: handle.clone(),
index,
core: AtomicCell::new(Some(core)),
}));
}
(handle, launch)
}
まずはworker
の数だけcore
,remote
,metrics
を格納するためのVec
を作る。
これからが何かは後でみる。
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
let mut worker_metrics = Vec::with_capacity(size);
次にloop
でそれらを詰めていくようだ。 まずはlocal
キューを作っているのでここを見ていく。
for _ in 0..size {
let (steal, run_queue) = queue::local();
// ...
}
以下のような関数になっている。
LOCAL_QUEUE_CAPACITY
は256
になっていて、buffer
を作ったあと、UnsafeCell<MaybuUninit>
で埋めている。
pub(crate) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY);
for _ in 0..LOCAL_QUEUE_CAPACITY {
buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
}
let inner = Arc::new(Inner {
head: AtomicUnsignedLong::new(0),
tail: AtomicUnsignedShort::new(0),
buffer: make_fixed_size(buffer.into_boxed_slice()),
});
let local = Local {
inner: inner.clone(),
};
let remote = Steal(inner);
(remote, local)
}
その後Arc
で wrap したInner
を作成し、Local
,Steal
で wrap して返す。Inner
は以下のようになっている。
pub(crate) struct Inner<T: 'static> {
/// Concurrently updated by many threads.
///
/// Contains two `UnsignedShort` values. The LSB byte is the "real" head of
/// the queue. The `UnsignedShort` in the MSB is set by a stealer in process
/// of stealing values. It represents the first value being stolen in the
/// batch. The `UnsignedShort` indices are intentionally wider than strictly
/// required for buffer indexing in order to provide ABA mitigation and make
/// it possible to distinguish between full and empty buffers.
///
/// When both `UnsignedShort` values are the same, there is no active
/// stealer.
///
/// Tracking an in-progress stealer prevents a wrapping scenario.
head: AtomicUnsignedLong,
/// Only updated by producer thread but read by many threads.
tail: AtomicUnsignedShort,
/// Elements
buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY]>,
}
for
に戻ると、あとはParker
,Unparker
を作成し、park
やrun_queue
からCore
と呼ばれるデータを作成する。
queue::local
の返り値は(Steal<Arc<T>>, Local<Arc<T>>)
であったがCore
のrun_queue
はLocal<Arc<Handle>>
なのでT
がHandle
と確定する。
あとはWorkerMetrics
とsteal
とunpark
からRemote
を作成する。
for _ in 0..size {
let (steal, run_queue) = queue::local();
let park = park.clone();
let unpark = park.unpark();
cores.push(Box::new(Core {
tick: 0,
lifo_slot: None,
run_queue,
is_searching: false,
is_shutdown: false,
park: Some(park),
metrics: MetricsBatch::new(),
rand: FastRand::new(config.seed_generator.next_seed()),
}));
remotes.push(Remote { steal, unpark });
worker_metrics.push(WorkerMetrics::new());
}
次に先に作ったremotes
やworker_metrics
、引数で渡されたdriver_handle
,blocking_spawner
,seed_generator
などからHandle
を作成する。
Inject
やIdle
などが登場しているが一旦見送る。
このHandle
がscheduler::Handle
で wrap され、さらにstruct Handle { pub(crate) inner: scheduler::Handle }
として wrap したものをRuntime
では保持している。
let handle = Arc::new(Handle {
shared: Shared {
remotes: remotes.into_boxed_slice(),
inject: Inject::new(),
idle: Idle::new(size),
owned: OwnedTasks::new(),
shutdown_cores: Mutex::new(vec![]),
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: worker_metrics.into_boxed_slice(),
},
driver: driver_handle,
blocking_spawner,
seed_generator,
});
次にCore
とHandle
からLaunch
,Worker
を作成し、(Arc<Handle>, Launch)
を返しcreate
は完了する。
let mut launch = Launch(vec![]);
for (index, core) in cores.drain(..).enumerate() {
launch.0.push(Arc::new(Worker {
handle: handle.clone(),
index,
core: AtomicCell::new(Some(core)),
}));
}
(handle, launch)
MultiThread::new
は上記のArc<Handle>
,Launch
とMultiThread
を返して終了する。MultiThread
の定義は以下のようになっており、block_on
とshutdown
が生えている。
/// Work-stealing based thread pool for executing futures.
pub(crate) struct MultiThread;
pub(crate) fn block_on<F>(&self, handle: &scheduler::Handle, future: F) -> F::Output
where
F: Future,
{
let mut enter = crate::runtime::context::enter_runtime(handle, true);
enter
.blocking
.block_on(future)
.expect("failed to park thread")
}
pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
match handle {
scheduler::Handle::MultiThread(handle) => handle.shutdown(),
_ => panic!("expected MultiThread scheduler"),
}
}
再度build_threaded_runtime
まで戻り続きを見る。
先も述べたようにHandle
を wrap し、enter
を実行。
その後、launch.launch()
を実行し、blocking_pool
やhandle
からfrom_parts
で作ったRuntime
を返し終了だ。
let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
// Spawn the thread pool workers
let _enter = handle.enter();
launch.launch();
Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
handle.enter
を追っていくと以下のようにスレッドローカルにHandle
をセットしている。
pub(crate) fn try_set_current(handle: &scheduler::Handle) -> Option<SetCurrentGuard> {
CONTEXT.try_with(|ctx| ctx.set_current(handle)).ok()
}
CONTEXT
は以下のようになっており、handle
をspawing
やdrivers
にアクセスするため、現在のruntime handle
を保持するような仕組みのようだ。
static CONTEXT: Context = {
Context {
#[cfg(feature = "rt")]
thread_id: Cell::new(None),
/// Tracks the current runtime handle to use when spawning,
/// accessing drivers, etc...
#[cfg(feature = "rt")]
handle: RefCell::new(None),
#[cfg(feature = "rt")]
current_task_id: Cell::new(None),
// ....
}
}
fn launch(mut self)
次にlaunch.launch()
を見ていく。
launch
は以下のようにruntime::spawn_blocking
でworker
をblocking
処理として立ち上げていっている。
launch
内のWorker
がデフォルトではcpu
分生成されていたので、ここでcpu
数分のworker
が立ち上がるということになる。
ちなみに手元のM2 MacBook Air
では8
個立ち上がった。
pub(crate) fn launch(mut self) {
for worker in self.0.drain(..) {
runtime::spawn_blocking(move || run(worker));
}
}
runtime::spawn_blocking
は以下のようになっている。
先程保存したCONTEXT
内のHandle
を取り出し、そこからspawn_blocking
を呼んでいる。
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let rt = Handle::current();
rt.spawn_blocking(func)
}
更に追っていくと、まずspawn_blocking_inner
にてfunc
がBlockingShedule
で wrap される。task::unowned
でUnownedTask
となり、spawn_task
に渡されるようだ。
pub(crate) fn spawn_blocking_inner<F, R>(
&self,
func: F,
is_mandatory: Mandatory,
name: Option<&str>,
rt: &Handle,
) -> (JoinHandle<R>, Result<(), SpawnError>)
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let fut = BlockingTask::new(func);
let id = task::Id::next();
let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);
let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
(handle, spawned)
}
BlockingTask<T>
まずはfunc
をBlockingTask
で wrap している。確認のために書くが現時点でfunc
の実体は|| run(worker)
のはず。
そして、こいつはFuture
を実装している。coop::stop()
を無視すれば、基本的にはtake
してPoll::Ready
で返しているだけだ。
つまりpoll
されるがPending
が返ることはない。
impl<T, R> Future for BlockingTask<T>
where
T: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
type Output = R;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<R> {
let me = &mut *self;
let func = me
.func
.take()
.expect("[internal exception] blocking task ran twice.");
// This is a little subtle:
// For convenience, we'd like _every_ call tokio ever makes to Task::poll() to be budgeted
// using coop. However, the way things are currently modeled, even running a blocking task
// currently goes through Task::poll(), and so is subject to budgeting. That isn't really
// what we want; a blocking task may itself want to run tasks (it might be a Worker!), so
// we want it to start without any budgeting.
crate::runtime::coop::stop();
Poll::Ready(func())
}
}
次にtask::unowned
を覗いてみる。 コメントにはOwnedTasks
に格納されないタスクを使うときのみ使用され、blocking_task
のみがこれを使用するとある。
pub(crate) fn unowned<T, S>(task: T, scheduler: S, id: Id) -> (UnownedTask<S>, JoinHandle<T::Output>)
where
S: Schedule,
T: Send + Future + 'static,
T::Output: Send + 'static,
{
let (task, notified, join) = new_task(task, scheduler, id);
// This transfers the ref-count of task and notified into an UnownedTask.
// This is valid because an UnownedTask holds two ref-counts.
let unowned = UnownedTask {
raw: task.raw,
_p: PhantomData,
};
std::mem::forget(task);
std::mem::forget(notified);
(unowned, join)
}
RawTask
まわりやNotified
、またそれらをmem::forget
してる理由などがいまいちわからなかった。
おそらくRawTask
が持っているHeader
をUnownedTask
と同期間生存させようとしているのかもしれない。
ちなみにUnownedTask
のDrop
は以下のようになっているので、この処理を紐解いていくと分かるのかもしれない。確かにvtable
経由でRawTask
のdeallloc
を呼んでいる。
このあたりは再度理解を試みたい。
impl<S: 'static> Drop for UnownedTask<S> {
fn drop(&mut self) {
// Decrement the ref count
if self.raw.header().state.ref_dec_twice() {
// Deallocate if this is the final ref count
self.raw.dealloc();
}
}
}
new_task
は以下のようになっておりRawTask
,Notified
,JoinHandle
を作成するだけのようだ。
fn new_task<T, S>(
task: T,
scheduler: S,
id: Id,
) -> (Task<S>, Notified<S>, JoinHandle<T::Output>)
where
S: Schedule,
T: Future + 'static,
T::Output: 'static,
{
let raw = RawTask::new::<T, S>(task, scheduler, id);
let task = Task {
raw,
_p: PhantomData,
};
let notified = Notified(Task {
raw,
_p: PhantomData,
});
let join = JoinHandle::new(raw);
(task, notified, join)
}
RawTask::new
では以下のようにtask
やscheduler
を wrap してpointer
に変換している。
pub(super) fn new<T, S>(task: T, scheduler: S, id: Id) -> RawTask
where
T: Future,
S: Schedule,
{
let ptr = Box::into_raw(Cell::<_, S>::new(task, scheduler, State::new(), id));
let ptr = unsafe { NonNull::new_unchecked(ptr as *mut Header) };
RawTask { ptr }
}
不明点が多くなってきたが、UnownedTask
が更にTask
で wrap され、spawn_task
に渡されるように見える。
spawn_task
はいくつか省略しているがだいたい以下のような感じだ。
shared
からlock
をとって、local_queue
にtask
をpush_back
する。
idle
状態のthread
がなく、かつ、thread
数の上限に到達していなければspawn_thread
でspawn
し成功すればmetrics
のthread
数を加算する。
(WorkerMetrics
についてはスルーしてきたが、idle
状態のthread
数やthread
の数、local_queue
の深さなどをカウントするような責務をもっているように見える。)
その後spawn_thread
でspawn
する。
fn spawn_task(&self, task: Task, rt: &Handle) -> Result<(), SpawnError> {
let mut shared = self.inner.shared.lock();
// ...
shared.queue.push_back(task);
self.inner.metrics.inc_queue_depth();
if self.inner.metrics.num_idle_threads() == 0 {
// No threads are able to process the task.
if self.inner.metrics.num_threads() == self.inner.thread_cap {
// At max number of threads
} else {
// ...
let id = shared.worker_thread_index;
match self.spawn_thread(shutdown_tx, rt, id) {
Ok(handle) => {
self.inner.metrics.inc_num_threads();
shared.worker_thread_index += 1;
shared.worker_threads.insert(id, handle);
}
Err(e) => {
return Err(/*...*/);
}
}
//
}
} else {
// ....
}
Ok(())
}
spawn_thread
は以下のようになっており、ようやくstd::thread
が現れる。
ここでは、シンプルにthread::Builder
でthread
を作りspawn
している。
その後前述したようにCONTEXT
にHandle
を登録し、blocking::Spawner
のinner.run()
を実行する。
fn spawn_thread(
&self,
shutdown_tx: shutdown::Sender,
rt: &Handle,
id: usize,
) -> std::io::Result<thread::JoinHandle<()>> {
let mut builder = thread::Builder::new().name((self.inner.thread_name)());
if let Some(stack_size) = self.inner.stack_size {
builder = builder.stack_size(stack_size);
}
let rt = rt.clone();
builder.spawn(move || {
// Only the reference should be moved into the closure
let _enter = rt.enter();
rt.inner.blocking_spawner().inner.run(id);
drop(shutdown_tx);
})
}
run
は以下。少し長いがポイントになりそうな箇所を絞って読んでいく。
Details
fn run(&self, worker_thread_id: usize) {
if let Some(f) = &self.after_start {
f()
}
let mut shared = self.shared.lock();
let mut join_on_thread = None;
'main: loop {
// BUSY
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.run();
shared = self.shared.lock();
}
// IDLE
self.metrics.inc_num_idle_threads();
while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
shared = lock_result.0;
let timeout_result = lock_result.1;
if shared.num_notify != 0 {
// We have received a legitimate wakeup,
// acknowledge it by decrementing the counter
// and transition to the BUSY state.
shared.num_notify -= 1;
break;
}
// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
// We'll join the prior timed-out thread's JoinHandle after dropping the lock.
// This isn't done when shutting down, because the thread calling shutdown will
// handle joining everything.
let my_handle = shared.worker_threads.remove(&worker_thread_id);
join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);
break 'main;
}
// Spurious wakeup detected, go back to sleep.
}
if shared.shutdown {
// Drain the queue
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.shutdown_or_run_if_mandatory();
shared = self.shared.lock();
}
// Work was produced, and we "took" it (by decrementing num_notify).
// This means that num_idle was decremented once for our wakeup.
// But, since we are exiting, we need to "undo" that, as we'll stay idle.
self.metrics.inc_num_idle_threads();
// NOTE: Technically we should also do num_notify++ and notify again,
// but since we're shutting down anyway, that won't be necessary.
break;
}
}
// Thread exit
self.metrics.dec_num_threads();
// num_idle should now be tracked exactly, panic
// with a descriptive message if it is not the
// case.
let prev_idle = self.metrics.dec_num_idle_threads();
if prev_idle < self.metrics.num_idle_threads() {
panic!("num_idle_threads underflowed on thread exit")
}
if shared.shutdown && self.metrics.num_threads() == 0 {
self.condvar.notify_one();
}
drop(shared);
if let Some(f) = &self.before_stop {
f()
}
if let Some(handle) = join_on_thread {
let _ = handle.join();
}
}
基本的には以下のloop
となっておりshutdown
flag が立つまで、またはtimeout
するまでloop
から抜けないようだ。 loop
内の処理はBUSY
とIDLE
で構成されている。
'main: loop {
// BUSY
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.run();
shared = self.shared.lock();
}
// IDLE
self.metrics.inc_num_idle_threads();
while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
shared = lock_result.0;
let timeout_result = lock_result.1;
if shared.num_notify != 0 {
// We have received a legitimate wakeup,
// acknowledge it by decrementing the counter
// and transition to the BUSY state.
shared.num_notify -= 1;
break;
}
// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
// We'll join the prior timed-out thread's JoinHandle after dropping the lock.
// This isn't done when shutting down, because the thread calling shutdown will
// handle joining everything.
let my_handle = shared.worker_threads.remove(&worker_thread_id);
join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);
break 'main;
}
// Spurious wakeup detected, go back to sleep.
}
// ....
}
BUSY
基本的にはlocal_queue
からタスクがなくなるまで実行するようだ。task
の実行前にlock
をdrop
し、task
完了後に取り直している。
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.run();
shared = self.shared.lock();
}
task.run
の実体は以下のように先程作成したRawTask
経由でpoll
している。
pub(crate) fn run(self) {
let raw = self.raw;
mem::forget(self);
// Transfer one ref-count to a Task object.
let task = Task::<S> {
raw,
_p: PhantomData,
};
// Use the other ref-count to poll the task.
raw.poll();
// Decrement our extra ref-count
drop(task);
}
RawTask
のpoll
はvtable
を経由しHarness
を介して呼ばれる。
pub(super) fn poll(self) {
let vtable = self.header().vtable;
unsafe { (vtable.poll)(self.ptr) }
}
unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.poll();
}
その後poll_inner
を介し、更にpoll_future
のguard.core.poll()
にたどり着く。
そしてCore
のpoll
からようやくBlockingTask
のpoll
が呼ばれるようだ。
以下がCore
内のpoll
。
pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
let res = {
self.stage.stage.with_mut(|ptr| {
// Safety: The caller ensures mutual exclusion to the field.
let future = match unsafe { &mut *ptr } {
Stage::Running(future) => future,
_ => unreachable!("unexpected stage"),
};
// Safety: The caller ensures the future is pinned.
let future = unsafe { Pin::new_unchecked(future) };
let _guard = TaskIdGuard::enter(self.task_id);
future.poll(&mut cx)
})
};
if res.is_ready() {
self.drop_future_or_output();
}
res
}
ここで現れたCore
は何者かと振り返ってみたが、RawTask::new
から呼ばれている以下で作成されていた。
つまりWorker
を作成するときにでてきたCore
とはまた別物のようだ。
pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> {
let result = Box::new(Cell {
header: Header {
state,
queue_next: UnsafeCell::new(None),
vtable: raw::vtable::<T, S>(),
owner_id: UnsafeCell::new(0),
},
core: Core {
scheduler,
stage: CoreStage {
stage: UnsafeCell::new(Stage::Running(future)),
},
task_id,
},
trailer: Trailer {
waker: UnsafeCell::new(None),
owned: linked_list::Pointers::new(),
},
});
}
先程のHarness
を介してpoll
している箇所が出てきていたが、RawTask
とHarness
をつないでいるのはこのvtable
のようだ。 raw::vtable
とそこで設定されるpoll
はそれぞれ以下のようになっている。
pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
&Vtable {
poll: poll::<T, S>,
schedule: schedule::<S>,
dealloc: dealloc::<T, S>,
try_read_output: try_read_output::<T, S>,
drop_join_handle_slow: drop_join_handle_slow::<T, S>,
drop_abort_handle: drop_abort_handle::<T, S>,
shutdown: shutdown::<T, S>,
trailer_offset: OffsetHelper::<T, S>::TRAILER_OFFSET,
scheduler_offset: OffsetHelper::<T, S>::SCHEDULER_OFFSET,
id_offset: OffsetHelper::<T, S>::ID_OFFSET,
}
}
unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.poll();
}
次にIDLE
のブロックを見ていく。
IDLE
まずはidle
なthread
数を増加させ、shutdown
になるまでloop
する。
とはいえ、基本的にはCondvar
で待ちつつtimeout
したらshutdown
状態じゃなくともworker_thread
を’掃除するように見える。
// IDLE
self.metrics.inc_num_idle_threads();
while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
shared = lock_result.0;
let timeout_result = lock_result.1;
if shared.num_notify != 0 {
// We have received a legitimate wakeup,
// acknowledge it by decrementing the counter
// and transition to the BUSY state.
shared.num_notify -= 1;
break;
}
// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
// We'll join the prior timed-out thread's JoinHandle after dropping the lock.
// This isn't done when shutting down, because the thread calling shutdown will
// handle joining everything.
let my_handle = shared.worker_threads.remove(&worker_thread_id);
join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);
break 'main;
}
// Spurious wakeup detected, go back to sleep.
}
不明点も増えてきたが、少なくともworker
がBlockingTask
としてpoll
されるまでの流れは見えてきた。 なので、launch.launch()
まで戻ってrun(worker)
を見ていく。
fn run(worker: Arc<Worker>)
run
のは以下のようになっている。worker.core
の取得、worker.handle
からHandle
を作成し、enter_runtime
でCONTEXT
にHandle
を登録する。
さらにworker::Context
を作成し、それをCURRENT.set
でCURRENT
というthread_local
にセットしつつ、cx.run(core)
で実行する。
fn run(worker: Arc<Worker>) {
// ...
let core = match worker.core.take() {
Some(core) => core,
None => return,
};
let handle = scheduler::Handle::MultiThread(worker.handle.clone());
let _enter = crate::runtime::context::enter_runtime(&handle, true);
// Set the worker context.
let cx = Context {
worker,
core: RefCell::new(None),
};
CURRENT.set(&cx, || {
assert!(cx.run(core).is_err());
wake_deferred_tasks();
});
}
CURRENT
は以下のような定義になっており、型ごとにscope
を切ってthread_local
への参照を保持しているように見える。
CURRENT
には今回、Arc<Worker>
,RefCell<Option<Box<Core>>>
を格納しており、これをどう使っていくかは読み進めていけば次回以降わかるかもしれない。
// Tracks thread-local state
scoped_thread_local!(static CURRENT: Context);
/// Sets a reference as a thread-local.
macro_rules! scoped_thread_local {
($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty) => (
$(#[$attrs])*
$vis static $name: $crate::macros::scoped_tls::ScopedKey<$ty>
= $crate::macros::scoped_tls::ScopedKey {
inner: {
tokio_thread_local!(static FOO: ::std::cell::Cell<*const ()> = const {
std::cell::Cell::new(::std::ptr::null())
});
&FOO
},
_marker: ::std::marker::PhantomData,
};
)
}
fn run(&self, mut core: Box<Core>) -> RunResult
これでようやくworker
のメイン処理まで到達した。
Details
fn run(&self, mut core: Box<Core>) -> RunResult {
while !core.is_shutdown {
// Increment the tick
core.tick();
// Run maintenance, if needed
core = self.maintenance(core);
// First, check work available to the current worker.
if let Some(task) = core.next_task(&self.worker) {
core = self.run_task(task, core)?;
continue;
}
// There is no more **local** work to process, try to steal work
// from other workers.
if let Some(task) = core.steal_work(&self.worker) {
core = self.run_task(task, core)?;
} else {
// Wait for work
core = if did_defer_tasks() {
self.park_timeout(core, Some(Duration::from_millis(0)))
} else {
self.park(core)
};
}
}
core.pre_shutdown(&self.worker);
// Signal shutdown
self.worker.handle.shutdown_core(core);
Err(())
}
// ...
}
重要そうな部分をピックアップして以下に記載しておく。
ざっと見た感じ、shutdown
されるまで以下を行っているように見える。
tick
maintenance
local_queue
から次のタスクを探して実行
local_queue
が無くなったら steal を試みる
steal
できるものがなければpark
本当はmaintenance
やsteal_work
などの挙動に興味があるので読み進めたいが、ここまでで結構な分量になっているのと、今回目的としているコードではtask
がworker
で処理されることはなく、立ち上げられたworker
はみんなすぐpark
してしまうので今回はスルーし次回以降の課題としておく。
fn run(&self, mut core: Box<Core>) -> RunResult {
while !core.is_shutdown {
core.tick();
core = self.maintenance(core);
// First, check work available to the current worker.
if let Some(task) = core.next_task(&self.worker) {
core = self.run_task(task, core)?;
continue;
}
// There is no more **local** work to process, try to steal work
// from other workers.
if let Some(task) = core.steal_work(&self.worker) {
core = self.run_task(task, core)?;
} else {
// Wait for work
core = if did_defer_tasks() {
self.park_timeout(core, Some(Duration::from_millis(0)))
} else {
self.park(core)
};
}
}
// ...
}
これでlaunch
の流れは分かったのでRuntime::new
の大枠の流れもつかめたように思う。worker
のunpark
などは次回の課題としつつmain
まで戻りblock_on
を読んでいく。
rt.block_on(Hello::new())
Runtime
のblock_on
はenter
し、Scheduler
を選択しているだけのようだ。
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
let _enter = self.enter();
match &self.scheduler {
Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
}
}
exec.block_on(&self.handle.inner, future)
は前述した以下のMultiThread
のblock_on
を呼んでいる。 ここではcontext::enter_runtime
のちblocking.block_on
を実行している。
pub(crate) fn block_on<F>(&self, handle: &scheduler::Handle, future: F) -> F::Output
where
F: Future,
{
let mut enter = crate::runtime::context::enter_runtime(handle, true);
enter
.blocking
.block_on(future)
.expect("failed to park thread")
}
fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError>
さらにblock_on
を掘っていくとruntime/park.rs
のCachedParkThread
のblocK_on
にたどり着く。以下だ。
coop::budget
やdefer.wake
など不明な箇所もあるが、大筋は教科書とおりだ。waker
を取得しContext
を作る。Future
をPin
止めし、Ready
を返すまでpoll
とpark
をloop
で回す。ということをやっている。
pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
use std::task::Context;
use std::task::Poll::Ready;
// `get_unpark()` should not return a Result
let waker = self.waker()?;
let mut cx = Context::from_waker(&waker);
pin!(f);
loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}
// Wake any yielded tasks before parking in order to avoid blocking.
crate::runtime::context::with_defer(|defer| defer.wake());
self.park();
}
}
まずはself.waker
を見てみる。self.unpark
経由でunpark
を取得しそこからWaker
に変換している。
pub(crate) fn waker(&self) -> Result<Waker, AccessError> {
self.unpark().map(|unpark| unpark.into_waker())
}
unpark
はCURRENT_PARKER
経由で取得している。
fn unpark(&self) -> Result<UnparkThread, AccessError> {
self.with_current(|park_thread| park_thread.unpark())
}
fn with_current<F, R>(&self, f: F) -> Result<R, AccessError>
where
F: FnOnce(&ParkThread) -> R,
{
CURRENT_PARKER.try_with(|inner| f(inner))
}
CURRENT_PARKER
は以下のように作成されており、AtomicUsize
,Mutex
,Condvar
のセットとなっておりAtomicUsize
はEMPTY
,PARKED
とNOTIFIED
の 3 つの状態を表しているようだ。
tokio_thread_local! {
static CURRENT_PARKER: ParkThread = ParkThread::new();
}
impl ParkThread {
pub(crate) fn new() -> Self {
Self {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
mutex: Mutex::new(()),
condvar: Condvar::new(),
}),
}
}
}
unpark
はこのInner
をUnparkThread
でwrap
し、into_waker
でWaker
に変換される。
これでself.waker()
からWaker
をどうやって取得しているのかが分かった。
pub(crate) fn unpark(&self) -> UnparkThread {
let inner = self.inner.clone();
UnparkThread { inner }
}
impl UnparkThread {
pub(crate) fn into_waker(self) -> Waker {
unsafe {
let raw = unparker_to_raw_waker(self.inner);
Waker::from_raw(raw)
}
}
}
unparker_to_raw_waker
では以下のようにRawWaker
を作成している。
RawWaker::new
の第一引数はwake
などが受けとるデータへのポインタなので、wake
などはInner
へのポインタを受け取ることになる。
unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
RawWaker::new(
Inner::into_raw(unparker),
&RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
)
}
RawWakerVTable
が出てきたのでwake_by_ref
の実装だけ見ておくこととする。
前述したように第一引数にInner
へのポインタが渡されるので、そこからInner
に変換しunpark
を呼ぶ。
最後にmem::forget
しているのは、実際にはclone
されたInner
(ここではunparker
)をReactor
が所持しているにも関わらずdrop
が走り、参照カウントが意図せずデクリメントされるのを防ぐためだと理解している。これはRawWaker
のdata
が*const ()
なのでArc::into_raw
で変換する必要があり、この際にArc
がforget
されるためだ。(つまりArc::into_raw
した場合はleak
を許容するか、自分で参照カウントを管理するかのいずれかになるという理解。)
unsafe fn wake_by_ref(raw: *const ()) {
let unparker = Inner::from_raw(raw);
unparker.unpark();
// We don't actually own a reference to the unparker
mem::forget(unparker);
}
これでwaker
を作るまでが分かった。
取得したWaker
はContext
で wrap され、poll
にわたす準備ができる。
次にPin
止めしているが、基本的にはpin_utils
とやっていることは大差なさそう。
その後loop
に入っていく。
crate::runtime::coop::budget
というのはpoll
の予算のようでpoll
ごとにdecrement
していくような仕組みに見える。が今回はdecrement
するようなパスが見当たらなかったので、次回以降の課題としておく。
そしてようやくpoll
だ。
loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}
// Wake any yielded tasks before parking in order to avoid
// blocking.
#[cfg(feature = "rt")]
crate::runtime::context::with_defer(|defer| defer.wake());
self.park();
}
先程用意したContext
を渡し実行することで今回のターゲットとしている以下のコードが呼び出される。
これによりHello
が表示され、状態遷移する。
また、渡されたContext
から取り出したwaker
経由でwake_by_ref
を呼ぶことでRawWakerVTable
に設定したwake_by_ref
が発火し、その後Poll::Pending
が返る。
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
match (*self).state {
StateHello::Hello => {
println!("Hello ");
(*self).state = StateHello::World;
cx.waker().wake_by_ref();
Poll::Pending
}
StateHello::World => {
println!("World!");
Poll::Ready(())
}
}
}
wake_by_ref
とは、すなわち前述した以下のコードだ。
unsafe fn wake_by_ref(raw: *const ()) {
let unparker = Inner::from_raw(raw);
unparker.unpark();
// We don't actually own a reference to the unparker
mem::forget(unparker);
}
unparker.unpark()
を掘ってもいいが、先にpark
を見ておいたほうが分かり易そうなので先回りして見ておく。
loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}
// Wake any yielded tasks before parking in order to avoid
// blocking.
#[cfg(feature = "rt")]
crate::runtime::context::with_defer(|defer| defer.wake());
self.park();
}
self.park
は以下のようになっており、更に掘っていくとInner
のpark
にたどり着く。
pub(crate) fn park(&mut self) {
self.with_current(|park_thread| park_thread.inner.park())
.unwrap();
}
fn park(&self)
コードは以下に畳んで掲載し、小分けにして見ていく。
Details
fn park(&self) {
// If we were previously notified then we consume this notification and
// return quickly.
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
}
// Otherwise we need to coordinate going to sleep
let mut m = self.mutex.lock();
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
// We must read here, even though we know it will be `NOTIFIED`.
// This is because `unpark` may have been called again since we read
// `NOTIFIED` in the `compare_exchange` above. We must perform an
// acquire operation that synchronizes with that `unpark` to observe
// any writes it made before the call to unpark. To do that we must
// read from the write it made to `state`.
let old = self.state.swap(EMPTY, SeqCst);
debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
}
Err(actual) => panic!("inconsistent park state; actual = {}", actual),
}
loop {
m = self.condvar.wait(m).unwrap();
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
// got a notification
return;
}
// spurious wakeup, go back to sleep
}
}
Inner
のAtomicUsize
は 3 つの状態を表現していると前述したが、以下のように通知状態を検査している。
ここはコメントにある通り、Inner
のpark
に到達したがすでに通知済である場合にwait
することなく再度poll
するため、早期 return
しているようだ。
今回ターゲットにしているコードではpoll
の中で即座にwake_by_ref
が呼ばれているので恐らくここが真になり早期return
になるものと推測される。
// If we were previously notified then we consume this notification and
// return quickly.
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
}
上記においてreturn
しない想定で読みすすめると、次にlock
をとり、state
をPARKED
にし、park
の準備にはいる。
ここでコメントにあるようにunpark
が再度実行されているケースを考慮し、再度state
がNOTIFIED
ではないかチェックし、通知済であればstate
をEMPTY
にし早期return
する。
このあたりはGuard run_executor of local_pool.rs against use of park / unpark in user-code. #2010と同様の話のように見える。
このチェックを怠るとunpark
の呼び出し回数とstate
が食い違い二度と起きないケースが発生するように思う。
// Otherwise we need to coordinate going to sleep
let mut m = self.mutex.lock();
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
// We must read here, even though we know it will be `NOTIFIED`.
// This is because `unpark` may have been called again since we read
// `NOTIFIED` in the `compare_exchange` above. We must perform an
// acquire operation that synchronizes with that `unpark` to observe
// any writes it made before the call to unpark. To do that we must
// read from the write it made to `state`.
let old = self.state.swap(EMPTY, SeqCst);
debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
}
Err(actual) => panic!("inconsistent park state; actual = {}", actual),
}
あとは定番のloop
でspurious wakeup
をケアしつつCondvar.wait
するパターンだ。wait
を抜けたあとstate
をEMPTY
へのcompare_exchange
を行いreturn
することにより、再度poll
が実行されるはずだ。
loop {
m = self.condvar.wait(m).unwrap();
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
// got a notification
return;
}
// spurious wakeup, go back to sleep
}
park
は以上なのでunpark
側に戻るとする。
fn unpark(&self)
unparker.unpark()
はすなわちInner
のunpark
だ。park
を先に読んだことである程度想像はつくかと思うが、以下のようになっている。
fn unpark(&self) {
// To ensure the unparked thread will observe any writes we made before
// this call, we must perform a release operation that `park` can
// synchronize with. To do that we must write `NOTIFIED` even if `state`
// is already `NOTIFIED`. That is why this must be a swap rather than a
// compare-and-swap that returns if it reads `NOTIFIED` on failure.
match self.state.swap(NOTIFIED, SeqCst) {
EMPTY => return, // no one was waiting
NOTIFIED => return, // already unparked
PARKED => {} // gotta go wake someone up
_ => panic!("inconsistent state in unpark"),
}
// There is a period between when the parked thread sets `state` to
// `PARKED` (or last checked `state` in the case of a spurious wake
// up) and when it actually waits on `cvar`. If we were to notify
// during this period it would be ignored and then when the parked
// thread went to sleep it would never wake up. Fortunately, it has
// `lock` locked at this stage so we can acquire `lock` to wait until
// it is ready to receive the notification.
//
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
drop(self.mutex.lock());
self.condvar.notify_one()
}
まずはstate
をNOTIFIED
にswap
する。
すでにEMPTY
やNOTIFIED
の場合は何もせずreturn
し、PARKED
の場合だけ次に通知処理に移る。
// To ensure the unparked thread will observe any writes we made before
// this call, we must perform a release operation that `park` can
// synchronize with. To do that we must write `NOTIFIED` even if `state`
// is already `NOTIFIED`. That is why this must be a swap rather than a
// compare-and-swap that returns if it reads `NOTIFIED` on failure.
match self.state.swap(NOTIFIED, SeqCst) {
EMPTY => return, // no one was waiting
NOTIFIED => return, // already unparked
PARKED => {} // gotta go wake someone up
_ => panic!("inconsistent state in unpark"),
}
まずコメントの冒頭には、state
がPARKED
になる隙間で通知が飛んでしまうことを防ぐ旨が書いてある。 これはpark
側のlet mut m = self.mutex.lock(); match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst)
の部分を指しているように見える。
後半はlock
をdrop
してから通知を出さないと無駄なlock
が解放されるまでの無駄な待ちが入ってしまうから。という旨が書いてあるように見える。確かにこのlock
はPARKED
になるまで待つ意図なので通知までにdrop
してしまってもいいのか。
lock
を解放したらnotify_one
で通知して終了だ。
// There is a period between when the parked thread sets `state` to
// `PARKED` (or last checked `state` in the case of a spurious wake
// up) and when it actually waits on `cvar`. If we were to notify
// during this period it would be ignored and then when the parked
// thread went to sleep it would never wake up. Fortunately, it has
// `lock` locked at this stage so we can acquire `lock` to wait until
// it is ready to receive the notification.
//
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
drop(self.mutex.lock());
self.condvar.notify_one()
これでpark
,unpark
は読めた。
もう一度block_on
まで戻って眺めてみると全体像が分かる。budget
やcontext::with_defer
は今後見ていくとして今回ターゲットとしているコードの大枠は以下の流れだ。
waker
を取得する
Context
でwaker
を wrap する
Future
をPin
止めする
- 3 の
as_mut
をとってpoll
する。
- 3 の
Hello
が表示される
wake_by_ref
すなわちunpark
が呼ばれる
park
するがすでにNOTIFIED
なので再度poll
する
World!
が表示され、Ready
が返る
- 終了
pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
use std::task::Context;
use std::task::Poll::Ready;
// `get_unpark()` should not return a Result
let waker = self.waker()?;
let mut cx = Context::from_waker(&waker);
pin!(f);
loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}
// Wake any yielded tasks before parking in order to avoid
// blocking.
#[cfg(feature = "rt")]
crate::runtime::context::with_defer(|defer| defer.wake());
self.park();
}
}
おそらくこのあと、shutdown
状態となり各thread
の後片付けが行われていくのだろうが、今回は skip する。
まとめ
ひとまず今回の goal はある程度達成できたが、不明点も多くでてきたので継続して見ていきたい。
特に今回はworker
が仕事をしていなかったり、mio
までたどり着いたりしていなかったり、肝心のsteal
周りをスキップしたりしたので今後、理解を深めたい。次のステップとしてはsmol
を読んでみたり、spawn_blocking
を読んでみたりsleep
を読んでみたりを試そうかと思っている。
以上。