tokio-rs/tokio を読む(1)
2023-03-07
最近tokio-rs/tokioをちゃんと理解したいと思いちょっとずつ読んでいる。
一気に理解するのは難しく色々と発散してしまいそうなので、ちょっとずつ本記事ような形態でまとめていくことにした。
読むのは以下の version とする。
目次
Goals
今回は以下のコードで何が起こっているか理解することを目標とする。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct Hello {
state: StateHello,
}
enum StateHello {
Hello,
World,
}
impl Hello {
fn new() -> Self {
Hello {
state: StateHello::Hello,
}
}
}
impl Future for Hello {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
match (*self).state {
StateHello::Hello => {
println!("Hello ");
(*self).state = StateHello::World;
cx.waker().wake_by_ref();
Poll::Pending
}
StateHello::World => {
println!("World!");
Poll::Ready(())
}
}
}
}
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(Hello::new());
}
tokio::runtime::Runtime::new()
まずはRuntime作成を一旦流す。Runtime自身は以下のような構造体で default すなわちrt-multi-threadであればBuilderを介してbuild_threaded_runtimeで作成される。
pub fn new() -> std::io::Result<Runtime> {
Builder::new_multi_thread().enable_all().build()
}
pub struct Runtime {
/// Task scheduler
scheduler: Scheduler,
/// Handle to runtime, also contains driver handles
handle: Handle,
/// Blocking pool handle, used to signal shutdown
blocking_pool: BlockingPool,
}
fn new_multi_thread() -> Builder
Builderは以下のように初期化される。
pub fn new_multi_thread() -> Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThread, 61, 61)
}
この61という数字はglobal_queueのタスクを確認する周期のようでgolang由来のようだ。golangの該当箇所は以下のようになっていた。
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
ただ、この数字の根拠は分からず調べて見ると以下のスライドや動画が見つかった。
スライドには以下のように書いてある。
Why 61?
It is not even 42! ¯\_(ツ)_/¯
Want something:
● not too small
● not too large
● prime to break any patterns
global queueを覗きにいくタイミングなので小さすぎるとlocal_queueよりglobal_queueのほうが優先されすぎてlocal_queueが捌けていかないし、大きすぎるとglobal_queueのタスクがなかなか実行されない。
そのへんのバランスを考えると64あたりになるが、2の倍数などで設定しまうとアプリケーション/CPUの周波数と同期し、意図しない偏りを産む懸念があり、素数にしている。と解釈した。
たとえば64にした場合、具体的にどのような問題として健在化するのかはわからないが、ちょうど読んでいた「詳解システムパフォーマンス 6.5.4 プロファイリング」にもサンプリング周波数は99Hzにする話が載っていた。これと似たような話なのかと思っている。
頻度を 99Hz にしているのは、CPU と歩調が揃ってプロファイルに偏りができるのを防ぐためである
実際にglobal_queueを覗く処理には今回はたどり着かないので次回以降で見ていこうと思う。
fn build_threaded_runtime(&mut self) -> io::Result<Runtime>
Details
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, runtime::Scheduler};
use crate::runtime::scheduler::{self, MultiThread};
let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
// Create the blocking pool
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();
// Generate a rng seed for this runtime.
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();
let (scheduler, handle, launch) = MultiThread::new(
core_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: seed_generator_1,
},
);
let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
// Spawn the thread pool workers
let _enter = handle.enter();
launch.launch();
Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
}
まずは指定された数もしくはcpuのコア数からworkerの数を算出している。
let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
次にDriverを作成するが、今回対象としているコードではDriverは登場しないように見えたので scope 外にしておく。
DriverというのはIO,time,signal等を駆動する君に見える。最終的にはRuntime.handleの中にdriverとしてdriver::Handleが保持されるようだ。
let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
次にblocking_poolを作成し、spawnerをcloneしておく。
引数はthread数の上限のようで、ここではdefaultの512+cpu数が設定されている。
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();
fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool
BlockingPool自体は以下のようになっており更にSpawnerなどを見ていく。
pub(crate) struct BlockingPool {
spawner: Spawner,
shutdown_rx: shutdown::Receiver,
}
Spawner自身はArcで包んだInnerを持っており、Innerは以下のようになっている。thread_nameやstack_sizeを持っており、まさにspawnをするための情報を持っているようだ。after_start,before_stopはコメント(Call after a thread starts)からthread開始後、停止前の hook のようだ。
#[derive(Clone)]
pub(crate) struct Spawner {
inner: Arc<Inner>,
}
struct Inner {
/// State shared between worker threads.
shared: Mutex<Shared>,
/// Pool threads wait on this.
condvar: Condvar,
/// Spawned threads use this name.
thread_name: ThreadNameFn,
/// Spawned thread stack size.
stack_size: Option<usize>,
/// Call after a thread starts.
after_start: Option<Callback>,
/// Call before a thread stops.
before_stop: Option<Callback>,
// Maximum number of threads.
thread_cap: usize,
// Customizable wait timeout.
keep_alive: Duration,
// Metrics about the pool.
metrics: SpawnerMetrics,
}
Mutex<Shared>を見てみると以下のようになっている。 Taskの実体はBlockingScheduleとなっておりblockingタスクを共有するキューがqueue: VecDeque<Task>だろうか。
今回は scope 外とするが、spawn_blockingを読んだときの流れを読んでいくとわかりそうだ。
あとはshutdown_txやworker_threads: HashMap<usize, thread::JoinHandle<()>>,など shutdown 処理に必要なものが多くありそうだ。
struct Shared {
queue: VecDeque<Task>,
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
/// Prior to shutdown, we clean up JoinHandles by having each timed-out
/// thread join on the previous timed-out thread. This is not strictly
/// necessary but helps avoid Valgrind false positives, see
/// <https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666>
/// for more information.
last_exiting_thread: Option<thread::JoinHandle<()>>,
/// This holds the JoinHandles for all running threads; on shutdown, the thread
/// calling shutdown handles joining on these.
worker_threads: HashMap<usize, thread::JoinHandle<()>>,
/// This is a counter used to iterate worker_threads in a consistent order (for loom's
/// benefit).
worker_thread_index: usize,
}
pub(crate) struct Task {
task: task::UnownedTask<BlockingSchedule>,
mandatory: Mandatory,
}
さて、肝心のblocking::create_blocking_poolまで遡ると実体は以下のようだ。
つまり先まで見てきた構造を初期値で埋めているだけだ。
またblocking_pool.spawner().clone()はつまりArc::cloneであることがわかる。
pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
let (shutdown_tx, shutdown_rx) = shutdown::channel();
let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE);
BlockingPool {
spawner: Spawner {
inner: Arc::new(Inner {
shared: Mutex::new(Shared {
queue: VecDeque::new(),
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
last_exiting_thread: None,
worker_threads: HashMap::new(),
worker_thread_index: 0,
}),
condvar: Condvar::new(),
thread_name: builder.thread_name.clone(),
stack_size: builder.thread_stack_size,
after_start: builder.after_start.clone(),
before_stop: builder.before_stop.clone(),
thread_cap,
keep_alive,
metrics: Default::default(),
}),
},
shutdown_rx,
}
}
build_threaded_runtimeに戻りMultiThread::newを見ていく。
let (scheduler, handle, launch) = MultiThread::new(
core_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: seed_generator_1,
},
);
MultiThread::newはほぼworker::createに移譲しているようなのでそちらを見ていく。
fn create(size: usize, park: Parker, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config) -> (Arc<Handle>, Launch)
長いので以下に畳んで記載しておく。
Details
pub(super) fn create(
size: usize,
park: Parker,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
) -> (Arc<Handle>, Launch) {
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
let mut worker_metrics = Vec::with_capacity(size);
// Create the local queues
for _ in 0..size {
let (steal, run_queue) = queue::local();
let park = park.clone();
let unpark = park.unpark();
cores.push(Box::new(Core {
tick: 0,
lifo_slot: None,
run_queue,
is_searching: false,
is_shutdown: false,
park: Some(park),
metrics: MetricsBatch::new(),
rand: FastRand::new(config.seed_generator.next_seed()),
}));
remotes.push(Remote { steal, unpark });
worker_metrics.push(WorkerMetrics::new());
}
let handle = Arc::new(Handle {
shared: Shared {
remotes: remotes.into_boxed_slice(),
inject: Inject::new(),
idle: Idle::new(size),
owned: OwnedTasks::new(),
shutdown_cores: Mutex::new(vec![]),
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: worker_metrics.into_boxed_slice(),
},
driver: driver_handle,
blocking_spawner,
seed_generator,
});
let mut launch = Launch(vec![]);
for (index, core) in cores.drain(..).enumerate() {
launch.0.push(Arc::new(Worker {
handle: handle.clone(),
index,
core: AtomicCell::new(Some(core)),
}));
}
(handle, launch)
}
まずはworkerの数だけcore,remote,metricsを格納するためのVecを作る。
これからが何かは後でみる。
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
let mut worker_metrics = Vec::with_capacity(size);
次にloopでそれらを詰めていくようだ。 まずはlocalキューを作っているのでここを見ていく。
for _ in 0..size {
let (steal, run_queue) = queue::local();
// ...
}
以下のような関数になっている。
LOCAL_QUEUE_CAPACITYは256になっていて、bufferを作ったあと、UnsafeCell<MaybuUninit>で埋めている。
pub(crate) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY);
for _ in 0..LOCAL_QUEUE_CAPACITY {
buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
}
let inner = Arc::new(Inner {
head: AtomicUnsignedLong::new(0),
tail: AtomicUnsignedShort::new(0),
buffer: make_fixed_size(buffer.into_boxed_slice()),
});
let local = Local {
inner: inner.clone(),
};
let remote = Steal(inner);
(remote, local)
}
その後Arcで wrap したInnerを作成し、Local,Stealで wrap して返す。Innerは以下のようになっている。
pub(crate) struct Inner<T: 'static> {
/// Concurrently updated by many threads.
///
/// Contains two `UnsignedShort` values. The LSB byte is the "real" head of
/// the queue. The `UnsignedShort` in the MSB is set by a stealer in process
/// of stealing values. It represents the first value being stolen in the
/// batch. The `UnsignedShort` indices are intentionally wider than strictly
/// required for buffer indexing in order to provide ABA mitigation and make
/// it possible to distinguish between full and empty buffers.
///
/// When both `UnsignedShort` values are the same, there is no active
/// stealer.
///
/// Tracking an in-progress stealer prevents a wrapping scenario.
head: AtomicUnsignedLong,
/// Only updated by producer thread but read by many threads.
tail: AtomicUnsignedShort,
/// Elements
buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY]>,
}
forに戻ると、あとはParker,Unparkerを作成し、parkやrun_queueからCoreと呼ばれるデータを作成する。
queue::localの返り値は(Steal<Arc<T>>, Local<Arc<T>>)であったがCoreのrun_queueはLocal<Arc<Handle>>なのでTがHandleと確定する。
あとはWorkerMetricsとstealとunparkからRemoteを作成する。
for _ in 0..size {
let (steal, run_queue) = queue::local();
let park = park.clone();
let unpark = park.unpark();
cores.push(Box::new(Core {
tick: 0,
lifo_slot: None,
run_queue,
is_searching: false,
is_shutdown: false,
park: Some(park),
metrics: MetricsBatch::new(),
rand: FastRand::new(config.seed_generator.next_seed()),
}));
remotes.push(Remote { steal, unpark });
worker_metrics.push(WorkerMetrics::new());
}
次に先に作ったremotesやworker_metrics、引数で渡されたdriver_handle,blocking_spawner,seed_generatorなどからHandleを作成する。
InjectやIdleなどが登場しているが一旦見送る。
このHandleがscheduler::Handleで wrap され、さらにstruct Handle { pub(crate) inner: scheduler::Handle }として wrap したものをRuntimeでは保持している。
let handle = Arc::new(Handle {
shared: Shared {
remotes: remotes.into_boxed_slice(),
inject: Inject::new(),
idle: Idle::new(size),
owned: OwnedTasks::new(),
shutdown_cores: Mutex::new(vec![]),
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: worker_metrics.into_boxed_slice(),
},
driver: driver_handle,
blocking_spawner,
seed_generator,
});
次にCoreとHandleからLaunch,Workerを作成し、(Arc<Handle>, Launch)を返しcreateは完了する。
let mut launch = Launch(vec![]);
for (index, core) in cores.drain(..).enumerate() {
launch.0.push(Arc::new(Worker {
handle: handle.clone(),
index,
core: AtomicCell::new(Some(core)),
}));
}
(handle, launch)
MultiThread::newは上記のArc<Handle>,LaunchとMultiThreadを返して終了する。MultiThreadの定義は以下のようになっており、block_onとshutdownが生えている。
/// Work-stealing based thread pool for executing futures.
pub(crate) struct MultiThread;
pub(crate) fn block_on<F>(&self, handle: &scheduler::Handle, future: F) -> F::Output
where
F: Future,
{
let mut enter = crate::runtime::context::enter_runtime(handle, true);
enter
.blocking
.block_on(future)
.expect("failed to park thread")
}
pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
match handle {
scheduler::Handle::MultiThread(handle) => handle.shutdown(),
_ => panic!("expected MultiThread scheduler"),
}
}
再度build_threaded_runtimeまで戻り続きを見る。
先も述べたようにHandleを wrap し、enterを実行。
その後、launch.launch()を実行し、blocking_poolやhandleからfrom_partsで作ったRuntimeを返し終了だ。
let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
// Spawn the thread pool workers
let _enter = handle.enter();
launch.launch();
Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
handle.enterを追っていくと以下のようにスレッドローカルにHandleをセットしている。
pub(crate) fn try_set_current(handle: &scheduler::Handle) -> Option<SetCurrentGuard> {
CONTEXT.try_with(|ctx| ctx.set_current(handle)).ok()
}
CONTEXTは以下のようになっており、handleをspawingやdriversにアクセスするため、現在のruntime handleを保持するような仕組みのようだ。
static CONTEXT: Context = {
Context {
#[cfg(feature = "rt")]
thread_id: Cell::new(None),
/// Tracks the current runtime handle to use when spawning,
/// accessing drivers, etc...
#[cfg(feature = "rt")]
handle: RefCell::new(None),
#[cfg(feature = "rt")]
current_task_id: Cell::new(None),
// ....
}
}
fn launch(mut self)
次にlaunch.launch()を見ていく。
launchは以下のようにruntime::spawn_blockingでworkerをblocking処理として立ち上げていっている。
launch内のWorkerがデフォルトではcpu分生成されていたので、ここでcpu数分のworkerが立ち上がるということになる。
ちなみに手元のM2 MacBook Airでは8個立ち上がった。
pub(crate) fn launch(mut self) {
for worker in self.0.drain(..) {
runtime::spawn_blocking(move || run(worker));
}
}
runtime::spawn_blockingは以下のようになっている。
先程保存したCONTEXT内のHandleを取り出し、そこからspawn_blockingを呼んでいる。
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let rt = Handle::current();
rt.spawn_blocking(func)
}
更に追っていくと、まずspawn_blocking_innerにてfuncがBlockingSheduleで wrap される。task::unownedでUnownedTaskとなり、spawn_taskに渡されるようだ。
pub(crate) fn spawn_blocking_inner<F, R>(
&self,
func: F,
is_mandatory: Mandatory,
name: Option<&str>,
rt: &Handle,
) -> (JoinHandle<R>, Result<(), SpawnError>)
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let fut = BlockingTask::new(func);
let id = task::Id::next();
let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);
let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
(handle, spawned)
}
BlockingTask<T>
まずはfuncをBlockingTaskで wrap している。確認のために書くが現時点でfuncの実体は|| run(worker)のはず。
そして、こいつはFutureを実装している。coop::stop()を無視すれば、基本的にはtakeしてPoll::Readyで返しているだけだ。
つまりpollされるがPendingが返ることはない。
impl<T, R> Future for BlockingTask<T>
where
T: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
type Output = R;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<R> {
let me = &mut *self;
let func = me
.func
.take()
.expect("[internal exception] blocking task ran twice.");
// This is a little subtle:
// For convenience, we'd like _every_ call tokio ever makes to Task::poll() to be budgeted
// using coop. However, the way things are currently modeled, even running a blocking task
// currently goes through Task::poll(), and so is subject to budgeting. That isn't really
// what we want; a blocking task may itself want to run tasks (it might be a Worker!), so
// we want it to start without any budgeting.
crate::runtime::coop::stop();
Poll::Ready(func())
}
}
次にtask::unownedを覗いてみる。 コメントにはOwnedTasksに格納されないタスクを使うときのみ使用され、blocking_taskのみがこれを使用するとある。
pub(crate) fn unowned<T, S>(task: T, scheduler: S, id: Id) -> (UnownedTask<S>, JoinHandle<T::Output>)
where
S: Schedule,
T: Send + Future + 'static,
T::Output: Send + 'static,
{
let (task, notified, join) = new_task(task, scheduler, id);
// This transfers the ref-count of task and notified into an UnownedTask.
// This is valid because an UnownedTask holds two ref-counts.
let unowned = UnownedTask {
raw: task.raw,
_p: PhantomData,
};
std::mem::forget(task);
std::mem::forget(notified);
(unowned, join)
}
RawTaskまわりやNotified、またそれらをmem::forgetしてる理由などがいまいちわからなかった。
おそらくRawTaskが持っているHeaderをUnownedTaskと同期間生存させようとしているのかもしれない。
ちなみにUnownedTaskのDropは以下のようになっているので、この処理を紐解いていくと分かるのかもしれない。確かにvtable経由でRawTaskのdealllocを呼んでいる。
このあたりは再度理解を試みたい。
impl<S: 'static> Drop for UnownedTask<S> {
fn drop(&mut self) {
// Decrement the ref count
if self.raw.header().state.ref_dec_twice() {
// Deallocate if this is the final ref count
self.raw.dealloc();
}
}
}
new_taskは以下のようになっておりRawTask,Notified,JoinHandleを作成するだけのようだ。
fn new_task<T, S>(
task: T,
scheduler: S,
id: Id,
) -> (Task<S>, Notified<S>, JoinHandle<T::Output>)
where
S: Schedule,
T: Future + 'static,
T::Output: 'static,
{
let raw = RawTask::new::<T, S>(task, scheduler, id);
let task = Task {
raw,
_p: PhantomData,
};
let notified = Notified(Task {
raw,
_p: PhantomData,
});
let join = JoinHandle::new(raw);
(task, notified, join)
}
RawTask::newでは以下のようにtaskやschedulerを wrap してpointerに変換している。
pub(super) fn new<T, S>(task: T, scheduler: S, id: Id) -> RawTask
where
T: Future,
S: Schedule,
{
let ptr = Box::into_raw(Cell::<_, S>::new(task, scheduler, State::new(), id));
let ptr = unsafe { NonNull::new_unchecked(ptr as *mut Header) };
RawTask { ptr }
}
不明点が多くなってきたが、UnownedTaskが更にTaskで wrap され、spawn_taskに渡されるように見える。
spawn_taskはいくつか省略しているがだいたい以下のような感じだ。
sharedからlockをとって、local_queueにtaskをpush_backする。
idle状態のthreadがなく、かつ、thread数の上限に到達していなければspawn_threadでspawnし成功すればmetricsのthread数を加算する。
(WorkerMetricsについてはスルーしてきたが、idle状態のthread数やthreadの数、local_queueの深さなどをカウントするような責務をもっているように見える。)
その後spawn_threadでspawnする。
fn spawn_task(&self, task: Task, rt: &Handle) -> Result<(), SpawnError> {
let mut shared = self.inner.shared.lock();
// ...
shared.queue.push_back(task);
self.inner.metrics.inc_queue_depth();
if self.inner.metrics.num_idle_threads() == 0 {
// No threads are able to process the task.
if self.inner.metrics.num_threads() == self.inner.thread_cap {
// At max number of threads
} else {
// ...
let id = shared.worker_thread_index;
match self.spawn_thread(shutdown_tx, rt, id) {
Ok(handle) => {
self.inner.metrics.inc_num_threads();
shared.worker_thread_index += 1;
shared.worker_threads.insert(id, handle);
}
Err(e) => {
return Err(/*...*/);
}
}
//
}
} else {
// ....
}
Ok(())
}
spawn_threadは以下のようになっており、ようやくstd::threadが現れる。
ここでは、シンプルにthread::Builderでthreadを作りspawnしている。
その後前述したようにCONTEXTにHandleを登録し、blocking::Spawnerのinner.run()を実行する。
fn spawn_thread(
&self,
shutdown_tx: shutdown::Sender,
rt: &Handle,
id: usize,
) -> std::io::Result<thread::JoinHandle<()>> {
let mut builder = thread::Builder::new().name((self.inner.thread_name)());
if let Some(stack_size) = self.inner.stack_size {
builder = builder.stack_size(stack_size);
}
let rt = rt.clone();
builder.spawn(move || {
// Only the reference should be moved into the closure
let _enter = rt.enter();
rt.inner.blocking_spawner().inner.run(id);
drop(shutdown_tx);
})
}
runは以下。少し長いがポイントになりそうな箇所を絞って読んでいく。
Details
fn run(&self, worker_thread_id: usize) {
if let Some(f) = &self.after_start {
f()
}
let mut shared = self.shared.lock();
let mut join_on_thread = None;
'main: loop {
// BUSY
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.run();
shared = self.shared.lock();
}
// IDLE
self.metrics.inc_num_idle_threads();
while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
shared = lock_result.0;
let timeout_result = lock_result.1;
if shared.num_notify != 0 {
// We have received a legitimate wakeup,
// acknowledge it by decrementing the counter
// and transition to the BUSY state.
shared.num_notify -= 1;
break;
}
// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
// We'll join the prior timed-out thread's JoinHandle after dropping the lock.
// This isn't done when shutting down, because the thread calling shutdown will
// handle joining everything.
let my_handle = shared.worker_threads.remove(&worker_thread_id);
join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);
break 'main;
}
// Spurious wakeup detected, go back to sleep.
}
if shared.shutdown {
// Drain the queue
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.shutdown_or_run_if_mandatory();
shared = self.shared.lock();
}
// Work was produced, and we "took" it (by decrementing num_notify).
// This means that num_idle was decremented once for our wakeup.
// But, since we are exiting, we need to "undo" that, as we'll stay idle.
self.metrics.inc_num_idle_threads();
// NOTE: Technically we should also do num_notify++ and notify again,
// but since we're shutting down anyway, that won't be necessary.
break;
}
}
// Thread exit
self.metrics.dec_num_threads();
// num_idle should now be tracked exactly, panic
// with a descriptive message if it is not the
// case.
let prev_idle = self.metrics.dec_num_idle_threads();
if prev_idle < self.metrics.num_idle_threads() {
panic!("num_idle_threads underflowed on thread exit")
}
if shared.shutdown && self.metrics.num_threads() == 0 {
self.condvar.notify_one();
}
drop(shared);
if let Some(f) = &self.before_stop {
f()
}
if let Some(handle) = join_on_thread {
let _ = handle.join();
}
}
基本的には以下のloopとなっておりshutdownflag が立つまで、またはtimeoutするまでloopから抜けないようだ。 loop内の処理はBUSYとIDLEで構成されている。
'main: loop {
// BUSY
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.run();
shared = self.shared.lock();
}
// IDLE
self.metrics.inc_num_idle_threads();
while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
shared = lock_result.0;
let timeout_result = lock_result.1;
if shared.num_notify != 0 {
// We have received a legitimate wakeup,
// acknowledge it by decrementing the counter
// and transition to the BUSY state.
shared.num_notify -= 1;
break;
}
// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
// We'll join the prior timed-out thread's JoinHandle after dropping the lock.
// This isn't done when shutting down, because the thread calling shutdown will
// handle joining everything.
let my_handle = shared.worker_threads.remove(&worker_thread_id);
join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);
break 'main;
}
// Spurious wakeup detected, go back to sleep.
}
// ....
}
BUSY
基本的にはlocal_queueからタスクがなくなるまで実行するようだ。taskの実行前にlockをdropし、task完了後に取り直している。
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.run();
shared = self.shared.lock();
}
task.runの実体は以下のように先程作成したRawTask経由でpollしている。
pub(crate) fn run(self) {
let raw = self.raw;
mem::forget(self);
// Transfer one ref-count to a Task object.
let task = Task::<S> {
raw,
_p: PhantomData,
};
// Use the other ref-count to poll the task.
raw.poll();
// Decrement our extra ref-count
drop(task);
}
RawTaskのpollはvtableを経由しHarnessを介して呼ばれる。
pub(super) fn poll(self) {
let vtable = self.header().vtable;
unsafe { (vtable.poll)(self.ptr) }
}
unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.poll();
}
その後poll_innerを介し、更にpoll_futureのguard.core.poll()にたどり着く。
そしてCoreのpollからようやくBlockingTaskのpollが呼ばれるようだ。
以下がCore内のpoll。
pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
let res = {
self.stage.stage.with_mut(|ptr| {
// Safety: The caller ensures mutual exclusion to the field.
let future = match unsafe { &mut *ptr } {
Stage::Running(future) => future,
_ => unreachable!("unexpected stage"),
};
// Safety: The caller ensures the future is pinned.
let future = unsafe { Pin::new_unchecked(future) };
let _guard = TaskIdGuard::enter(self.task_id);
future.poll(&mut cx)
})
};
if res.is_ready() {
self.drop_future_or_output();
}
res
}
ここで現れたCoreは何者かと振り返ってみたが、RawTask::newから呼ばれている以下で作成されていた。
つまりWorkerを作成するときにでてきたCoreとはまた別物のようだ。
pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> {
let result = Box::new(Cell {
header: Header {
state,
queue_next: UnsafeCell::new(None),
vtable: raw::vtable::<T, S>(),
owner_id: UnsafeCell::new(0),
},
core: Core {
scheduler,
stage: CoreStage {
stage: UnsafeCell::new(Stage::Running(future)),
},
task_id,
},
trailer: Trailer {
waker: UnsafeCell::new(None),
owned: linked_list::Pointers::new(),
},
});
}
先程のHarnessを介してpollしている箇所が出てきていたが、RawTaskとHarnessをつないでいるのはこのvtableのようだ。 raw::vtableとそこで設定されるpollはそれぞれ以下のようになっている。
pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
&Vtable {
poll: poll::<T, S>,
schedule: schedule::<S>,
dealloc: dealloc::<T, S>,
try_read_output: try_read_output::<T, S>,
drop_join_handle_slow: drop_join_handle_slow::<T, S>,
drop_abort_handle: drop_abort_handle::<T, S>,
shutdown: shutdown::<T, S>,
trailer_offset: OffsetHelper::<T, S>::TRAILER_OFFSET,
scheduler_offset: OffsetHelper::<T, S>::SCHEDULER_OFFSET,
id_offset: OffsetHelper::<T, S>::ID_OFFSET,
}
}
unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.poll();
}
次にIDLEのブロックを見ていく。
IDLE
まずはidleなthread数を増加させ、shutdownになるまでloopする。
とはいえ、基本的にはCondvarで待ちつつtimeoutしたらshutdown状態じゃなくともworker_threadを’掃除するように見える。
// IDLE
self.metrics.inc_num_idle_threads();
while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
shared = lock_result.0;
let timeout_result = lock_result.1;
if shared.num_notify != 0 {
// We have received a legitimate wakeup,
// acknowledge it by decrementing the counter
// and transition to the BUSY state.
shared.num_notify -= 1;
break;
}
// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
// We'll join the prior timed-out thread's JoinHandle after dropping the lock.
// This isn't done when shutting down, because the thread calling shutdown will
// handle joining everything.
let my_handle = shared.worker_threads.remove(&worker_thread_id);
join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);
break 'main;
}
// Spurious wakeup detected, go back to sleep.
}
不明点も増えてきたが、少なくともworkerがBlockingTaskとしてpollされるまでの流れは見えてきた。 なので、launch.launch()まで戻ってrun(worker)を見ていく。
fn run(worker: Arc<Worker>)
runのは以下のようになっている。worker.coreの取得、worker.handleからHandleを作成し、enter_runtimeでCONTEXTにHandleを登録する。
さらにworker::Contextを作成し、それをCURRENT.setでCURRENTというthread_localにセットしつつ、cx.run(core)で実行する。
fn run(worker: Arc<Worker>) {
// ...
let core = match worker.core.take() {
Some(core) => core,
None => return,
};
let handle = scheduler::Handle::MultiThread(worker.handle.clone());
let _enter = crate::runtime::context::enter_runtime(&handle, true);
// Set the worker context.
let cx = Context {
worker,
core: RefCell::new(None),
};
CURRENT.set(&cx, || {
assert!(cx.run(core).is_err());
wake_deferred_tasks();
});
}
CURRENTは以下のような定義になっており、型ごとにscopeを切ってthread_localへの参照を保持しているように見える。
CURRENTには今回、Arc<Worker>,RefCell<Option<Box<Core>>>を格納しており、これをどう使っていくかは読み進めていけば次回以降わかるかもしれない。
// Tracks thread-local state
scoped_thread_local!(static CURRENT: Context);
/// Sets a reference as a thread-local.
macro_rules! scoped_thread_local {
($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty) => (
$(#[$attrs])*
$vis static $name: $crate::macros::scoped_tls::ScopedKey<$ty>
= $crate::macros::scoped_tls::ScopedKey {
inner: {
tokio_thread_local!(static FOO: ::std::cell::Cell<*const ()> = const {
std::cell::Cell::new(::std::ptr::null())
});
&FOO
},
_marker: ::std::marker::PhantomData,
};
)
}
fn run(&self, mut core: Box<Core>) -> RunResult
これでようやくworkerのメイン処理まで到達した。
Details
fn run(&self, mut core: Box<Core>) -> RunResult {
while !core.is_shutdown {
// Increment the tick
core.tick();
// Run maintenance, if needed
core = self.maintenance(core);
// First, check work available to the current worker.
if let Some(task) = core.next_task(&self.worker) {
core = self.run_task(task, core)?;
continue;
}
// There is no more **local** work to process, try to steal work
// from other workers.
if let Some(task) = core.steal_work(&self.worker) {
core = self.run_task(task, core)?;
} else {
// Wait for work
core = if did_defer_tasks() {
self.park_timeout(core, Some(Duration::from_millis(0)))
} else {
self.park(core)
};
}
}
core.pre_shutdown(&self.worker);
// Signal shutdown
self.worker.handle.shutdown_core(core);
Err(())
}
// ...
}
重要そうな部分をピックアップして以下に記載しておく。
ざっと見た感じ、shutdownされるまで以下を行っているように見える。
tick
maintenance
local_queueから次のタスクを探して実行
local_queueが無くなったら steal を試みる
stealできるものがなければpark
本当はmaintenanceやsteal_workなどの挙動に興味があるので読み進めたいが、ここまでで結構な分量になっているのと、今回目的としているコードではtaskがworkerで処理されることはなく、立ち上げられたworkerはみんなすぐparkしてしまうので今回はスルーし次回以降の課題としておく。
fn run(&self, mut core: Box<Core>) -> RunResult {
while !core.is_shutdown {
core.tick();
core = self.maintenance(core);
// First, check work available to the current worker.
if let Some(task) = core.next_task(&self.worker) {
core = self.run_task(task, core)?;
continue;
}
// There is no more **local** work to process, try to steal work
// from other workers.
if let Some(task) = core.steal_work(&self.worker) {
core = self.run_task(task, core)?;
} else {
// Wait for work
core = if did_defer_tasks() {
self.park_timeout(core, Some(Duration::from_millis(0)))
} else {
self.park(core)
};
}
}
// ...
}
これでlaunchの流れは分かったのでRuntime::newの大枠の流れもつかめたように思う。workerのunparkなどは次回の課題としつつmainまで戻りblock_onを読んでいく。
rt.block_on(Hello::new())
Runtimeのblock_onはenterし、Schedulerを選択しているだけのようだ。
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
let _enter = self.enter();
match &self.scheduler {
Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
}
}
exec.block_on(&self.handle.inner, future)は前述した以下のMultiThreadのblock_onを呼んでいる。 ここではcontext::enter_runtimeのちblocking.block_onを実行している。
pub(crate) fn block_on<F>(&self, handle: &scheduler::Handle, future: F) -> F::Output
where
F: Future,
{
let mut enter = crate::runtime::context::enter_runtime(handle, true);
enter
.blocking
.block_on(future)
.expect("failed to park thread")
}
fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError>
さらにblock_onを掘っていくとruntime/park.rsのCachedParkThreadのblocK_onにたどり着く。以下だ。
coop::budgetやdefer.wakeなど不明な箇所もあるが、大筋は教科書とおりだ。wakerを取得しContextを作る。FutureをPin止めし、Readyを返すまでpollとparkをloopで回す。ということをやっている。
pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
use std::task::Context;
use std::task::Poll::Ready;
// `get_unpark()` should not return a Result
let waker = self.waker()?;
let mut cx = Context::from_waker(&waker);
pin!(f);
loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}
// Wake any yielded tasks before parking in order to avoid blocking.
crate::runtime::context::with_defer(|defer| defer.wake());
self.park();
}
}
まずはself.wakerを見てみる。self.unpark経由でunparkを取得しそこからWakerに変換している。
pub(crate) fn waker(&self) -> Result<Waker, AccessError> {
self.unpark().map(|unpark| unpark.into_waker())
}
unparkはCURRENT_PARKER経由で取得している。
fn unpark(&self) -> Result<UnparkThread, AccessError> {
self.with_current(|park_thread| park_thread.unpark())
}
fn with_current<F, R>(&self, f: F) -> Result<R, AccessError>
where
F: FnOnce(&ParkThread) -> R,
{
CURRENT_PARKER.try_with(|inner| f(inner))
}
CURRENT_PARKERは以下のように作成されており、AtomicUsize,Mutex,CondvarのセットとなっておりAtomicUsizeはEMPTY,PARKEDとNOTIFIEDの 3 つの状態を表しているようだ。
tokio_thread_local! {
static CURRENT_PARKER: ParkThread = ParkThread::new();
}
impl ParkThread {
pub(crate) fn new() -> Self {
Self {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
mutex: Mutex::new(()),
condvar: Condvar::new(),
}),
}
}
}
unparkはこのInnerをUnparkThreadでwrapし、into_wakerでWakerに変換される。
これでself.waker()からWakerをどうやって取得しているのかが分かった。
pub(crate) fn unpark(&self) -> UnparkThread {
let inner = self.inner.clone();
UnparkThread { inner }
}
impl UnparkThread {
pub(crate) fn into_waker(self) -> Waker {
unsafe {
let raw = unparker_to_raw_waker(self.inner);
Waker::from_raw(raw)
}
}
}
unparker_to_raw_wakerでは以下のようにRawWakerを作成している。
RawWaker::newの第一引数はwakeなどが受けとるデータへのポインタなので、wakeなどはInnerへのポインタを受け取ることになる。
unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
RawWaker::new(
Inner::into_raw(unparker),
&RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
)
}
RawWakerVTableが出てきたのでwake_by_refの実装だけ見ておくこととする。
前述したように第一引数にInnerへのポインタが渡されるので、そこからInnerに変換しunparkを呼ぶ。
最後にmem::forgetしているのは、実際にはcloneされたInner(ここではunparker)をReactorが所持しているにも関わらずdropが走り、参照カウントが意図せずデクリメントされるのを防ぐためだと理解している。これはRawWakerのdataが*const ()なのでArc::into_rawで変換する必要があり、この際にArcがforgetされるためだ。(つまりArc::into_rawした場合はleakを許容するか、自分で参照カウントを管理するかのいずれかになるという理解。)
unsafe fn wake_by_ref(raw: *const ()) {
let unparker = Inner::from_raw(raw);
unparker.unpark();
// We don't actually own a reference to the unparker
mem::forget(unparker);
}
これでwakerを作るまでが分かった。
取得したWakerはContextで wrap され、pollにわたす準備ができる。
次にPin止めしているが、基本的にはpin_utilsとやっていることは大差なさそう。
その後loopに入っていく。
crate::runtime::coop::budgetというのはpollの予算のようでpollごとにdecrementしていくような仕組みに見える。が今回はdecrementするようなパスが見当たらなかったので、次回以降の課題としておく。
そしてようやくpollだ。
loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}
// Wake any yielded tasks before parking in order to avoid
// blocking.
#[cfg(feature = "rt")]
crate::runtime::context::with_defer(|defer| defer.wake());
self.park();
}
先程用意したContextを渡し実行することで今回のターゲットとしている以下のコードが呼び出される。
これによりHello が表示され、状態遷移する。
また、渡されたContextから取り出したwaker経由でwake_by_refを呼ぶことでRawWakerVTableに設定したwake_by_refが発火し、その後Poll::Pendingが返る。
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
match (*self).state {
StateHello::Hello => {
println!("Hello ");
(*self).state = StateHello::World;
cx.waker().wake_by_ref();
Poll::Pending
}
StateHello::World => {
println!("World!");
Poll::Ready(())
}
}
}
wake_by_refとは、すなわち前述した以下のコードだ。
unsafe fn wake_by_ref(raw: *const ()) {
let unparker = Inner::from_raw(raw);
unparker.unpark();
// We don't actually own a reference to the unparker
mem::forget(unparker);
}
unparker.unpark()を掘ってもいいが、先にparkを見ておいたほうが分かり易そうなので先回りして見ておく。
loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}
// Wake any yielded tasks before parking in order to avoid
// blocking.
#[cfg(feature = "rt")]
crate::runtime::context::with_defer(|defer| defer.wake());
self.park();
}
self.parkは以下のようになっており、更に掘っていくとInnerのparkにたどり着く。
pub(crate) fn park(&mut self) {
self.with_current(|park_thread| park_thread.inner.park())
.unwrap();
}
fn park(&self)
コードは以下に畳んで掲載し、小分けにして見ていく。
Details
fn park(&self) {
// If we were previously notified then we consume this notification and
// return quickly.
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
}
// Otherwise we need to coordinate going to sleep
let mut m = self.mutex.lock();
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
// We must read here, even though we know it will be `NOTIFIED`.
// This is because `unpark` may have been called again since we read
// `NOTIFIED` in the `compare_exchange` above. We must perform an
// acquire operation that synchronizes with that `unpark` to observe
// any writes it made before the call to unpark. To do that we must
// read from the write it made to `state`.
let old = self.state.swap(EMPTY, SeqCst);
debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
}
Err(actual) => panic!("inconsistent park state; actual = {}", actual),
}
loop {
m = self.condvar.wait(m).unwrap();
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
// got a notification
return;
}
// spurious wakeup, go back to sleep
}
}
InnerのAtomicUsizeは 3 つの状態を表現していると前述したが、以下のように通知状態を検査している。
ここはコメントにある通り、Innerのparkに到達したがすでに通知済である場合にwaitすることなく再度pollするため、早期 returnしているようだ。
今回ターゲットにしているコードではpollの中で即座にwake_by_refが呼ばれているので恐らくここが真になり早期returnになるものと推測される。
// If we were previously notified then we consume this notification and
// return quickly.
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
}
上記においてreturnしない想定で読みすすめると、次にlockをとり、stateをPARKEDにし、parkの準備にはいる。
ここでコメントにあるようにunparkが再度実行されているケースを考慮し、再度stateがNOTIFIEDではないかチェックし、通知済であればstateをEMPTYにし早期returnする。
このあたりはGuard run_executor of local_pool.rs against use of park / unpark in user-code. #2010と同様の話のように見える。
このチェックを怠るとunparkの呼び出し回数とstateが食い違い二度と起きないケースが発生するように思う。
// Otherwise we need to coordinate going to sleep
let mut m = self.mutex.lock();
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
// We must read here, even though we know it will be `NOTIFIED`.
// This is because `unpark` may have been called again since we read
// `NOTIFIED` in the `compare_exchange` above. We must perform an
// acquire operation that synchronizes with that `unpark` to observe
// any writes it made before the call to unpark. To do that we must
// read from the write it made to `state`.
let old = self.state.swap(EMPTY, SeqCst);
debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
}
Err(actual) => panic!("inconsistent park state; actual = {}", actual),
}
あとは定番のloopでspurious wakeupをケアしつつCondvar.waitするパターンだ。waitを抜けたあとstateをEMPTYへのcompare_exchangeを行いreturnすることにより、再度pollが実行されるはずだ。
loop {
m = self.condvar.wait(m).unwrap();
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
// got a notification
return;
}
// spurious wakeup, go back to sleep
}
parkは以上なのでunpark側に戻るとする。
fn unpark(&self)
unparker.unpark()はすなわちInnerのunparkだ。parkを先に読んだことである程度想像はつくかと思うが、以下のようになっている。
fn unpark(&self) {
// To ensure the unparked thread will observe any writes we made before
// this call, we must perform a release operation that `park` can
// synchronize with. To do that we must write `NOTIFIED` even if `state`
// is already `NOTIFIED`. That is why this must be a swap rather than a
// compare-and-swap that returns if it reads `NOTIFIED` on failure.
match self.state.swap(NOTIFIED, SeqCst) {
EMPTY => return, // no one was waiting
NOTIFIED => return, // already unparked
PARKED => {} // gotta go wake someone up
_ => panic!("inconsistent state in unpark"),
}
// There is a period between when the parked thread sets `state` to
// `PARKED` (or last checked `state` in the case of a spurious wake
// up) and when it actually waits on `cvar`. If we were to notify
// during this period it would be ignored and then when the parked
// thread went to sleep it would never wake up. Fortunately, it has
// `lock` locked at this stage so we can acquire `lock` to wait until
// it is ready to receive the notification.
//
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
drop(self.mutex.lock());
self.condvar.notify_one()
}
まずはstateをNOTIFIEDにswapする。
すでにEMPTYやNOTIFIEDの場合は何もせずreturnし、PARKEDの場合だけ次に通知処理に移る。
// To ensure the unparked thread will observe any writes we made before
// this call, we must perform a release operation that `park` can
// synchronize with. To do that we must write `NOTIFIED` even if `state`
// is already `NOTIFIED`. That is why this must be a swap rather than a
// compare-and-swap that returns if it reads `NOTIFIED` on failure.
match self.state.swap(NOTIFIED, SeqCst) {
EMPTY => return, // no one was waiting
NOTIFIED => return, // already unparked
PARKED => {} // gotta go wake someone up
_ => panic!("inconsistent state in unpark"),
}
まずコメントの冒頭には、stateがPARKEDになる隙間で通知が飛んでしまうことを防ぐ旨が書いてある。 これはpark側のlet mut m = self.mutex.lock(); match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst)の部分を指しているように見える。
後半はlockをdropしてから通知を出さないと無駄なlockが解放されるまでの無駄な待ちが入ってしまうから。という旨が書いてあるように見える。確かにこのlockはPARKEDになるまで待つ意図なので通知までにdropしてしまってもいいのか。
lockを解放したらnotify_oneで通知して終了だ。
// There is a period between when the parked thread sets `state` to
// `PARKED` (or last checked `state` in the case of a spurious wake
// up) and when it actually waits on `cvar`. If we were to notify
// during this period it would be ignored and then when the parked
// thread went to sleep it would never wake up. Fortunately, it has
// `lock` locked at this stage so we can acquire `lock` to wait until
// it is ready to receive the notification.
//
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
drop(self.mutex.lock());
self.condvar.notify_one()
これでpark,unparkは読めた。
もう一度block_onまで戻って眺めてみると全体像が分かる。budgetやcontext::with_deferは今後見ていくとして今回ターゲットとしているコードの大枠は以下の流れだ。
wakerを取得する
Contextでwakerを wrap する
FutureをPin止めする
- 3 の
as_mutをとってpollする。
- 3 の
Helloが表示される
wake_by_refすなわちunparkが呼ばれる
parkするがすでにNOTIFIEDなので再度pollする
World!が表示され、Readyが返る
- 終了
pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
use std::task::Context;
use std::task::Poll::Ready;
// `get_unpark()` should not return a Result
let waker = self.waker()?;
let mut cx = Context::from_waker(&waker);
pin!(f);
loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}
// Wake any yielded tasks before parking in order to avoid
// blocking.
#[cfg(feature = "rt")]
crate::runtime::context::with_defer(|defer| defer.wake());
self.park();
}
}
おそらくこのあと、shutdown状態となり各threadの後片付けが行われていくのだろうが、今回は skip する。
まとめ
ひとまず今回の goal はある程度達成できたが、不明点も多くでてきたので継続して見ていきたい。
特に今回はworkerが仕事をしていなかったり、mioまでたどり着いたりしていなかったり、肝心のsteal周りをスキップしたりしたので今後、理解を深めたい。次のステップとしてはsmolを読んでみたり、spawn_blockingを読んでみたりsleepを読んでみたりを試そうかと思っている。
以上。