Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: add task builder API #7180

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ time = ["tokio/time","slab"]
io = []
io-util = ["io", "tokio/rt", "tokio/io-util"]
rt = ["tokio/rt", "tokio/sync", "futures-util", "hashbrown"]
tracing = ["dep:tracing", "tokio/tracing"]

__docs_rs = ["futures-util"]

Expand Down
270 changes: 270 additions & 0 deletions tokio-util/src/task/join_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ pub struct JoinMap<K, V, S = RandomState> {
tasks: JoinSet<V>,
}

/// A variant of [`task::Builder`] that spawns tasks on a [`JoinMap`] rather than on the current
/// default runtime.
///
/// [`task::Builder`]: tokio::task::Builder
#[cfg(feature = "tracing")]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "rt", feature = "tracing", tokio_unstable)))
)]
pub struct Builder<'a, K, V, S> {
joinmap: &'a mut JoinMap<K, V, S>,
name: Option<&'a str>,
}

/// A [`JoinMap`] key.
///
/// This holds both a `K`-typed key (the actual key as seen by the user), _and_
Expand Down Expand Up @@ -273,6 +287,38 @@ where
V: 'static,
S: BuildHasher,
{
/// Returns a [`Builder`] that can be used to configure a task prior to spawning it on this
/// [`JoinMap`].
///
/// # Examples
///
/// ```
/// use tokio_util::task::JoinMap;
///
/// #[tokio::main]
/// async fn main() -> std::io::Result<()> {
/// let mut map = JoinMap::new();
///
/// // Use the builder to configure the task's name before spawning it.
/// map.build_task()
/// .name("my_task")
/// .spawn(42, async { /* ... */ });
///
/// Ok(())
/// }
/// ```
#[cfg(feature = "tracing")]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "rt", feature = "tracing", tokio_unstable)))
)]
pub fn build_task(&mut self) -> Builder<'_, K, V, S> {
Builder {
joinmap: self,
name: None,
}
}

/// Spawn the provided task and store it in this `JoinMap` with the provided
/// key.
///
Expand Down Expand Up @@ -853,6 +899,230 @@ impl<K, V> Default for JoinMap<K, V> {
}
}

// === impl Builder ===

#[cfg(feature = "tracing")]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "rt", feature = "tracing", tokio_unstable)))
)]
impl<'a, K, V, S> Builder<'a, K, V, S>
where
K: Hash + Eq,
V: 'static,
S: BuildHasher,
{
/// Assigns a name to the task which will be spawned.
pub fn name(mut self, name: &'a str) -> Self {
self.name = Some(name);
self
}

/// Spawn the provided task with this builder's settings and store it in this `JoinMap` with
/// the provided key.
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// # Panics
///
/// This method panics if called outside of a Tokio runtime.
///
/// [`join_next`]: JoinMap::join_next
#[track_caller]
pub fn spawn<F>(self, key: K, task: F) -> std::io::Result<()>
where
F: Future<Output = V>,
F: Send + 'static,
V: Send,
{
let builder = self.joinmap.tasks.build_task();
let builder = if let Some(name) = self.name {
builder.name(name)
} else {
builder
};
let abort = builder.spawn(task)?;

Ok(self.joinmap.insert(key, abort))
}

/// Spawn the provided task on the provided runtime and store it in this
/// `JoinMap` with the provided key.
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// [`join_next`]: JoinMap::join_next
#[track_caller]
pub fn spawn_on<F>(&mut self, key: K, task: F, handle: &Handle) -> std::io::Result<()>
where
F: Future<Output = V>,
F: Send + 'static,
V: Send,
{
let builder = self.joinmap.tasks.build_task();
let builder = if let Some(name) = self.name {
builder.name(name)
} else {
builder
};
let abort = builder.spawn_on(task, handle)?;

Ok(self.joinmap.insert(key, abort))
}

/// Spawn the blocking code on the blocking threadpool and store it in this `JoinMap` with the provided
/// key.
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// Note that blocking tasks cannot be cancelled after execution starts.
/// Replaced blocking tasks will still run to completion if the task has begun
/// to execute when it is replaced. A blocking task which is replaced before
/// it has been scheduled on a blocking worker thread will be cancelled.
///
/// # Panics
///
/// This method panics if called outside of a Tokio runtime.
///
/// [`join_next`]: JoinMap::join_next
#[track_caller]
pub fn spawn_blocking<F>(&mut self, key: K, f: F) -> std::io::Result<()>
where
F: FnOnce() -> V,
F: Send + 'static,
V: Send,
{
let builder = self.joinmap.tasks.build_task();
let builder = if let Some(name) = self.name {
builder.name(name)
} else {
builder
};
let abort = builder.spawn_blocking(f)?;

Ok(self.joinmap.insert(key, abort))
}

/// Spawn the blocking code on the blocking threadpool of the provided runtime and store it in this
/// `JoinMap` with the provided key.
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// Note that blocking tasks cannot be cancelled after execution starts.
/// Replaced blocking tasks will still run to completion if the task has begun
/// to execute when it is replaced. A blocking task which is replaced before
/// it has been scheduled on a blocking worker thread will be cancelled.
///
/// [`join_next`]: JoinMap::join_next
#[track_caller]
pub fn spawn_blocking_on<F>(&mut self, key: K, f: F, handle: &Handle) -> std::io::Result<()>
where
F: FnOnce() -> V,
F: Send + 'static,
V: Send,
{
let builder = self.joinmap.tasks.build_task();
let builder = if let Some(name) = self.name {
builder.name(name)
} else {
builder
};
let abort = builder.spawn_blocking_on(f, handle)?;

Ok(self.joinmap.insert(key, abort))
}

/// Spawn the provided task on the current [`LocalSet`] and store it in this
/// `JoinMap` with the provided key.
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// # Panics
///
/// This method panics if it is called outside of a `LocalSet`.
///
/// [`LocalSet`]: tokio::task::LocalSet
/// [`join_next`]: JoinMap::join_next
#[track_caller]
pub fn spawn_local<F>(&mut self, key: K, task: F) -> std::io::Result<()>
where
F: Future<Output = V>,
F: 'static,
{
let builder = self.joinmap.tasks.build_task();
let builder = if let Some(name) = self.name {
builder.name(name)
} else {
builder
};
let abort = builder.spawn_local(task)?;

Ok(self.joinmap.insert(key, abort))
}

/// Spawn the provided task on the provided [`LocalSet`] and store it in
/// this `JoinMap` with the provided key.
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// [`LocalSet`]: tokio::task::LocalSet
/// [`join_next`]: JoinMap::join_next
#[track_caller]
pub fn spawn_local_on<F>(
&mut self,
key: K,
task: F,
local_set: &LocalSet,
) -> std::io::Result<()>
where
F: Future<Output = V>,
F: 'static,
{
let builder = self.joinmap.tasks.build_task();
let builder = if let Some(name) = self.name {
builder.name(name)
} else {
builder
};
let abort = builder.spawn_local_on(task, local_set)?;

Ok(self.joinmap.insert(key, abort))
}
}

// Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `V` and `S` are `Debug`.
#[cfg(feature = "tracing")]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "rt", feature = "tracing", tokio_unstable)))
)]
impl<'a, K: fmt::Debug, V, S> fmt::Debug for Builder<'a, K, V, S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("join_map::Builder")
.field("joinmap", &self.joinmap)
.field("name", &self.name)
.finish()
}
}

// === impl Key ===

impl<K: Hash> Hash for Key<K> {
Expand Down
7 changes: 7 additions & 0 deletions tokio-util/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ pub use spawn_pinned::LocalPoolHandle;
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "rt"))))]
pub use join_map::{JoinMap, JoinMapKeys};

#[cfg(all(tokio_unstable, feature = "tracing"))]
#[cfg_attr(
docsrs,
doc(cfg(all(tokio_unstable, feature = "rt", feature = "tracing")))
)]
pub use join_map::Builder as JoinMapBuilder;

pub mod task_tracker;
pub use task_tracker::TaskTracker;

Expand Down
Loading