Skip to content

Commit 03ec7e3

Browse files
committed
use ad-hoc runtime in tokio on last resort
It closes the gap between tokio and async-std, bringing similar functionality to tokio.
1 parent 4bb4069 commit 03ec7e3

File tree

1 file changed

+64
-8
lines changed

1 file changed

+64
-8
lines changed

src/spawner.rs

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,20 +53,69 @@ pub mod use_async_std {
5353
#[cfg(feature = "use-tokio")]
5454
pub mod use_tokio {
5555
use super::*;
56-
use tokio::{runtime::Handle, task as tokio_task};
56+
use tokio::{
57+
runtime::{Handle, Runtime},
58+
task::{self as tokio_task, block_in_place},
59+
};
60+
61+
pub struct TokioSpawner(Option<TokioRuntime>);
62+
63+
impl Clone for TokioSpawner {
64+
fn clone(&self) -> Self {
65+
Self(self.0.as_ref().map(|rt| match rt {
66+
TokioRuntime::ByHandle(handle) => TokioRuntime::ByHandle(handle.clone()),
67+
TokioRuntime::Owned(runtime) => TokioRuntime::ByHandle(runtime.handle().clone()),
68+
}))
69+
}
70+
}
71+
72+
const RUNTIME_INVARIANT_ERR: &str =
73+
"invariant: runtime must be available during the spawner's lifetime";
5774

58-
pub struct TokioSpawner(Handle);
75+
impl Drop for TokioSpawner {
76+
/// Graceful shutdown owned runtime.
77+
fn drop(&mut self) {
78+
if let TokioRuntime::Owned(rt) = self.0.take().expect(RUNTIME_INVARIANT_ERR) {
79+
rt.shutdown_background()
80+
}
81+
}
82+
}
5983

6084
impl TokioSpawner {
6185
pub fn new(rt_handle: Handle) -> Self {
62-
Self(rt_handle)
86+
Self(Some(TokioRuntime::ByHandle(rt_handle)))
6387
}
88+
89+
fn handle(&self) -> &Handle {
90+
match &self.0.as_ref().expect(RUNTIME_INVARIANT_ERR) {
91+
TokioRuntime::ByHandle(handle) => handle,
92+
TokioRuntime::Owned(runtime) => runtime.handle(),
93+
}
94+
}
95+
}
96+
97+
/// Variants of supplied tokio runtime.
98+
/// Is needed because runtime can be either passed or created.
99+
enum TokioRuntime {
100+
/// User provides its own runtime, we'll refer to it by handle.
101+
ByHandle(Handle),
102+
/// We've created our own ad-hoc runtime, so we'll own it.
103+
Owned(Runtime),
64104
}
65105

66-
// By default, `TokioSpawner` operates on global runtime.
106+
// By default, `TokioSpawner` operates on globally available runtime.
107+
// Ad-hoc runtime would only be created if it is not available globally.
108+
// Newly created runtime would be destroyed when spawner is gone.
109+
// That is needed because running runtime would prevent program from stopping.
67110
impl Default for TokioSpawner {
68111
fn default() -> Self {
69-
Self(Handle::current())
112+
if let Ok(handle) = Handle::try_current() {
113+
return Self(Some(TokioRuntime::ByHandle(handle)));
114+
}
115+
let runtime = tokio::runtime::Builder::new_current_thread()
116+
.build()
117+
.unwrap();
118+
Self(Some(TokioRuntime::Owned(runtime)))
70119
}
71120
}
72121

@@ -75,7 +124,7 @@ pub mod use_tokio {
75124
type SpawnHandle = tokio_task::JoinHandle<T>;
76125

77126
fn spawn<F: Future<Output = T> + Send + 'static>(&self, f: F) -> Self::SpawnHandle {
78-
self.0.spawn(f)
127+
self.handle().spawn(f)
79128
}
80129
}
81130

@@ -84,13 +133,20 @@ pub mod use_tokio {
84133
type SpawnHandle = tokio_task::JoinHandle<T>;
85134

86135
fn spawn_func<F: FnOnce() -> T + Send + 'static>(&self, f: F) -> Self::SpawnHandle {
87-
self.0.spawn_blocking(f)
136+
self.handle().spawn_blocking(f)
88137
}
89138
}
90139

91140
unsafe impl Blocker for TokioSpawner {
92141
fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
93-
tokio_task::block_in_place(|| self.0.block_on(f))
142+
let result = block_in_place(|| match self.0.as_ref().expect(RUNTIME_INVARIANT_ERR) {
143+
TokioRuntime::ByHandle(handle) => handle.block_on(f),
144+
// if runtime is owned, `block_on` must be called directly on it,
145+
// not via it's handle. Otherwise, future won't be able to run IO-tasks.
146+
// see `block_on` docs for more info.
147+
TokioRuntime::Owned(runtime) => runtime.block_on(f),
148+
});
149+
result
94150
}
95151
}
96152
}

0 commit comments

Comments
 (0)