diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index eb5f80f56c8..10e8ca77cb1 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -30,6 +30,7 @@ time = ["tokio/time","slab"] io = [] io-util = ["io", "tokio/rt", "tokio/io-util"] rt = ["tokio/rt", "tokio/sync", "futures-util", "hashbrown"] +tracing = ["dep:tracing", "tokio/tracing"] __docs_rs = ["futures-util"] diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index 13e27bb670b..46960ffc885 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -125,6 +125,20 @@ pub struct JoinMap<K, V, S = RandomState> { tasks: JoinSet<V>, } +/// A variant of [`task::Builder`] that spawns tasks on a [`JoinMap`] rather than on the current +/// default runtime. +/// +/// [`task::Builder`]: tokio::task::Builder +#[cfg(feature = "tracing")] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "rt", feature = "tracing", tokio_unstable))) +)] +pub struct Builder<'a, K, V, S> { + joinmap: &'a mut JoinMap<K, V, S>, + name: Option<&'a str>, +} + /// A [`JoinMap`] key. /// /// This holds both a `K`-typed key (the actual key as seen by the user), _and_ @@ -273,6 +287,38 @@ where V: 'static, S: BuildHasher, { + /// Returns a [`Builder`] that can be used to configure a task prior to spawning it on this + /// [`JoinMap`]. + /// + /// # Examples + /// + /// ``` + /// use tokio_util::task::JoinMap; + /// + /// #[tokio::main] + /// async fn main() -> std::io::Result<()> { + /// let mut map = JoinMap::new(); + /// + /// // Use the builder to configure the task's name before spawning it. + /// map.build_task() + /// .name("my_task") + /// .spawn(42, async { /* ... */ }); + /// + /// Ok(()) + /// } + /// ``` + #[cfg(feature = "tracing")] + #[cfg_attr( + docsrs, + doc(cfg(all(feature = "rt", feature = "tracing", tokio_unstable))) + )] + pub fn build_task(&mut self) -> Builder<'_, K, V, S> { + Builder { + joinmap: self, + name: None, + } + } + /// Spawn the provided task and store it in this `JoinMap` with the provided /// key. /// @@ -853,6 +899,230 @@ impl<K, V> Default for JoinMap<K, V> { } } +// === impl Builder === + +#[cfg(feature = "tracing")] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "rt", feature = "tracing", tokio_unstable))) +)] +impl<'a, K, V, S> Builder<'a, K, V, S> +where + K: Hash + Eq, + V: 'static, + S: BuildHasher, +{ + /// Assigns a name to the task which will be spawned. + pub fn name(mut self, name: &'a str) -> Self { + self.name = Some(name); + self + } + + /// Spawn the provided task with this builder's settings and store it in this `JoinMap` with + /// the provided key. + /// + /// If a task previously existed in the `JoinMap` for this key, that task + /// will be cancelled and replaced with the new one. The previous task will + /// be removed from the `JoinMap`; a subsequent call to [`join_next`] will + /// *not* return a cancelled [`JoinError`] for that task. + /// + /// # Panics + /// + /// This method panics if called outside of a Tokio runtime. + /// + /// [`join_next`]: JoinMap::join_next + #[track_caller] + pub fn spawn<F>(self, key: K, task: F) -> std::io::Result<()> + where + F: Future<Output = V>, + F: Send + 'static, + V: Send, + { + let builder = self.joinmap.tasks.build_task(); + let builder = if let Some(name) = self.name { + builder.name(name) + } else { + builder + }; + let abort = builder.spawn(task)?; + + Ok(self.joinmap.insert(key, abort)) + } + + /// Spawn the provided task on the provided runtime and store it in this + /// `JoinMap` with the provided key. + /// + /// If a task previously existed in the `JoinMap` for this key, that task + /// will be cancelled and replaced with the new one. The previous task will + /// be removed from the `JoinMap`; a subsequent call to [`join_next`] will + /// *not* return a cancelled [`JoinError`] for that task. + /// + /// [`join_next`]: JoinMap::join_next + #[track_caller] + pub fn spawn_on<F>(&mut self, key: K, task: F, handle: &Handle) -> std::io::Result<()> + where + F: Future<Output = V>, + F: Send + 'static, + V: Send, + { + let builder = self.joinmap.tasks.build_task(); + let builder = if let Some(name) = self.name { + builder.name(name) + } else { + builder + }; + let abort = builder.spawn_on(task, handle)?; + + Ok(self.joinmap.insert(key, abort)) + } + + /// Spawn the blocking code on the blocking threadpool and store it in this `JoinMap` with the provided + /// key. + /// + /// If a task previously existed in the `JoinMap` for this key, that task + /// will be cancelled and replaced with the new one. The previous task will + /// be removed from the `JoinMap`; a subsequent call to [`join_next`] will + /// *not* return a cancelled [`JoinError`] for that task. + /// + /// Note that blocking tasks cannot be cancelled after execution starts. + /// Replaced blocking tasks will still run to completion if the task has begun + /// to execute when it is replaced. A blocking task which is replaced before + /// it has been scheduled on a blocking worker thread will be cancelled. + /// + /// # Panics + /// + /// This method panics if called outside of a Tokio runtime. + /// + /// [`join_next`]: JoinMap::join_next + #[track_caller] + pub fn spawn_blocking<F>(&mut self, key: K, f: F) -> std::io::Result<()> + where + F: FnOnce() -> V, + F: Send + 'static, + V: Send, + { + let builder = self.joinmap.tasks.build_task(); + let builder = if let Some(name) = self.name { + builder.name(name) + } else { + builder + }; + let abort = builder.spawn_blocking(f)?; + + Ok(self.joinmap.insert(key, abort)) + } + + /// Spawn the blocking code on the blocking threadpool of the provided runtime and store it in this + /// `JoinMap` with the provided key. + /// + /// If a task previously existed in the `JoinMap` for this key, that task + /// will be cancelled and replaced with the new one. The previous task will + /// be removed from the `JoinMap`; a subsequent call to [`join_next`] will + /// *not* return a cancelled [`JoinError`] for that task. + /// + /// Note that blocking tasks cannot be cancelled after execution starts. + /// Replaced blocking tasks will still run to completion if the task has begun + /// to execute when it is replaced. A blocking task which is replaced before + /// it has been scheduled on a blocking worker thread will be cancelled. + /// + /// [`join_next`]: JoinMap::join_next + #[track_caller] + pub fn spawn_blocking_on<F>(&mut self, key: K, f: F, handle: &Handle) -> std::io::Result<()> + where + F: FnOnce() -> V, + F: Send + 'static, + V: Send, + { + let builder = self.joinmap.tasks.build_task(); + let builder = if let Some(name) = self.name { + builder.name(name) + } else { + builder + }; + let abort = builder.spawn_blocking_on(f, handle)?; + + Ok(self.joinmap.insert(key, abort)) + } + + /// Spawn the provided task on the current [`LocalSet`] and store it in this + /// `JoinMap` with the provided key. + /// + /// If a task previously existed in the `JoinMap` for this key, that task + /// will be cancelled and replaced with the new one. The previous task will + /// be removed from the `JoinMap`; a subsequent call to [`join_next`] will + /// *not* return a cancelled [`JoinError`] for that task. + /// + /// # Panics + /// + /// This method panics if it is called outside of a `LocalSet`. + /// + /// [`LocalSet`]: tokio::task::LocalSet + /// [`join_next`]: JoinMap::join_next + #[track_caller] + pub fn spawn_local<F>(&mut self, key: K, task: F) -> std::io::Result<()> + where + F: Future<Output = V>, + F: 'static, + { + let builder = self.joinmap.tasks.build_task(); + let builder = if let Some(name) = self.name { + builder.name(name) + } else { + builder + }; + let abort = builder.spawn_local(task)?; + + Ok(self.joinmap.insert(key, abort)) + } + + /// Spawn the provided task on the provided [`LocalSet`] and store it in + /// this `JoinMap` with the provided key. + /// + /// If a task previously existed in the `JoinMap` for this key, that task + /// will be cancelled and replaced with the new one. The previous task will + /// be removed from the `JoinMap`; a subsequent call to [`join_next`] will + /// *not* return a cancelled [`JoinError`] for that task. + /// + /// [`LocalSet`]: tokio::task::LocalSet + /// [`join_next`]: JoinMap::join_next + #[track_caller] + pub fn spawn_local_on<F>( + &mut self, + key: K, + task: F, + local_set: &LocalSet, + ) -> std::io::Result<()> + where + F: Future<Output = V>, + F: 'static, + { + let builder = self.joinmap.tasks.build_task(); + let builder = if let Some(name) = self.name { + builder.name(name) + } else { + builder + }; + let abort = builder.spawn_local_on(task, local_set)?; + + Ok(self.joinmap.insert(key, abort)) + } +} + +// Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `V` and `S` are `Debug`. +#[cfg(feature = "tracing")] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "rt", feature = "tracing", tokio_unstable))) +)] +impl<'a, K: fmt::Debug, V, S> fmt::Debug for Builder<'a, K, V, S> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("join_map::Builder") + .field("joinmap", &self.joinmap) + .field("name", &self.name) + .finish() + } +} + // === impl Key === impl<K: Hash> Hash for Key<K> { diff --git a/tokio-util/src/task/mod.rs b/tokio-util/src/task/mod.rs index 6d0c379fe20..829b761b563 100644 --- a/tokio-util/src/task/mod.rs +++ b/tokio-util/src/task/mod.rs @@ -9,6 +9,13 @@ pub use spawn_pinned::LocalPoolHandle; #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "rt"))))] pub use join_map::{JoinMap, JoinMapKeys}; +#[cfg(all(tokio_unstable, feature = "tracing"))] +#[cfg_attr( + docsrs, + doc(cfg(all(tokio_unstable, feature = "rt", feature = "tracing"))) +)] +pub use join_map::Builder as JoinMapBuilder; + pub mod task_tracker; pub use task_tracker::TaskTracker;