Skip to content

Commit d22d310

Browse files
hymmMichael Hsucart
committed
Nested spawns on scope (#4466)
# Objective - Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor. - Fixes #4301 ## Solution - Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools. - Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope. - Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it. - removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower. --- ## Changelog Add ability to nest spawns ```rust fn main() { let pool = TaskPool::new(); pool.scope(|scope| { scope.spawn(async move { // calling scope.spawn from an spawn task was not possible before scope.spawn(async move { // do something }); }); }) } ``` ## Migration Guide If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now. ```rust fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {} // should become fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {} ``` `scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures. ## TODO * [x] think real hard about all the lifetimes * [x] add doc about what 'env and 'scope mean. * [x] manually check that the single threaded task pool still works * [x] Get updated perf numbers * [x] check and make sure all the transmutes are necessary * [x] move commented out test into a compile fail test * [x] look through the tests for scope on std and see if I should add any more tests Co-authored-by: Michael Hsu <[email protected]> Co-authored-by: Carter Anderson <[email protected]>
1 parent 92c90a9 commit d22d310

File tree

4 files changed

+265
-91
lines changed

4 files changed

+265
-91
lines changed

crates/bevy_ecs/src/schedule/executor_parallel.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ impl ParallelExecutor {
166166
/// queues systems with no dependencies to run (or skip) at next opportunity.
167167
fn prepare_systems<'scope>(
168168
&mut self,
169-
scope: &mut Scope<'scope, ()>,
169+
scope: &Scope<'_, 'scope, ()>,
170170
systems: &'scope mut [SystemContainer],
171171
world: &'scope World,
172172
) {
@@ -236,7 +236,7 @@ impl ParallelExecutor {
236236
if system_data.is_send {
237237
scope.spawn(task);
238238
} else {
239-
scope.spawn_local(task);
239+
scope.spawn_on_scope(task);
240240
}
241241

242242
#[cfg(test)]
@@ -271,7 +271,7 @@ impl ParallelExecutor {
271271
if system_data.is_send {
272272
scope.spawn(task);
273273
} else {
274-
scope.spawn_local(task);
274+
scope.spawn_on_scope(task);
275275
}
276276
}
277277
}

crates/bevy_tasks/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ futures-lite = "1.4.0"
1313
async-executor = "1.3.0"
1414
async-channel = "1.4.2"
1515
once_cell = "1.7"
16+
concurrent-queue = "1.2.2"
1617

1718
[target.'cfg(target_arch = "wasm32")'.dependencies]
1819
wasm-bindgen-futures = "0.4"

crates/bevy_tasks/src/single_threaded_task_pool.rs

+30-18
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{
22
future::Future,
3+
marker::PhantomData,
34
mem,
45
sync::{Arc, Mutex},
56
};
@@ -61,27 +62,34 @@ impl TaskPool {
6162
/// to spawn tasks. This function will await the completion of all tasks before returning.
6263
///
6364
/// This is similar to `rayon::scope` and `crossbeam::scope`
64-
pub fn scope<'scope, F, T>(&self, f: F) -> Vec<T>
65+
pub fn scope<'env, F, T>(&self, f: F) -> Vec<T>
6566
where
66-
F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send,
67+
F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>),
6768
T: Send + 'static,
6869
{
6970
let executor = &async_executor::LocalExecutor::new();
70-
let executor: &'scope async_executor::LocalExecutor<'scope> =
71+
let executor: &'env async_executor::LocalExecutor<'env> =
7172
unsafe { mem::transmute(executor) };
7273

74+
let results: Mutex<Vec<Arc<Mutex<Option<T>>>>> = Mutex::new(Vec::new());
75+
let results: &'env Mutex<Vec<Arc<Mutex<Option<T>>>>> = unsafe { mem::transmute(&results) };
76+
7377
let mut scope = Scope {
7478
executor,
75-
results: Vec::new(),
79+
results,
80+
scope: PhantomData,
81+
env: PhantomData,
7682
};
7783

78-
f(&mut scope);
84+
let scope_ref: &'env mut Scope<'_, 'env, T> = unsafe { mem::transmute(&mut scope) };
85+
86+
f(scope_ref);
7987

8088
// Loop until all tasks are done
8189
while executor.try_tick() {}
8290

83-
scope
84-
.results
91+
let results = scope.results.lock().unwrap();
92+
results
8593
.iter()
8694
.map(|result| result.lock().unwrap().take().unwrap())
8795
.collect()
@@ -127,32 +135,36 @@ impl FakeTask {
127135
///
128136
/// For more information, see [`TaskPool::scope`].
129137
#[derive(Debug)]
130-
pub struct Scope<'scope, T> {
131-
executor: &'scope async_executor::LocalExecutor<'scope>,
138+
pub struct Scope<'scope, 'env: 'scope, T> {
139+
executor: &'env async_executor::LocalExecutor<'env>,
132140
// Vector to gather results of all futures spawned during scope run
133-
results: Vec<Arc<Mutex<Option<T>>>>,
141+
results: &'env Mutex<Vec<Arc<Mutex<Option<T>>>>>,
142+
143+
// make `Scope` invariant over 'scope and 'env
144+
scope: PhantomData<&'scope mut &'scope ()>,
145+
env: PhantomData<&'env mut &'env ()>,
134146
}
135147

136-
impl<'scope, T: Send + 'scope> Scope<'scope, T> {
148+
impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
137149
/// Spawns a scoped future onto the thread-local executor. The scope *must* outlive
138150
/// the provided future. The results of the future will be returned as a part of
139151
/// [`TaskPool::scope`]'s return value.
140152
///
141153
/// On the single threaded task pool, it just calls [`Scope::spawn_local`].
142154
///
143155
/// For more information, see [`TaskPool::scope`].
144-
pub fn spawn<Fut: Future<Output = T> + 'scope + Send>(&mut self, f: Fut) {
145-
self.spawn_local(f);
156+
pub fn spawn<Fut: Future<Output = T> + 'env>(&self, f: Fut) {
157+
self.spawn_on_scope(f);
146158
}
147159

148-
/// Spawns a scoped future onto the thread-local executor. The scope *must* outlive
149-
/// the provided future. The results of the future will be returned as a part of
150-
/// [`TaskPool::scope`]'s return value.
160+
/// Spawns a scoped future that runs on the thread the scope called from. The
161+
/// scope *must* outlive the provided future. The results of the future will be
162+
/// returned as a part of [`TaskPool::scope`]'s return value.
151163
///
152164
/// For more information, see [`TaskPool::scope`].
153-
pub fn spawn_local<Fut: Future<Output = T> + 'scope>(&mut self, f: Fut) {
165+
pub fn spawn_on_scope<Fut: Future<Output = T> + 'env>(&self, f: Fut) {
154166
let result = Arc::new(Mutex::new(None));
155-
self.results.push(result.clone());
167+
self.results.lock().unwrap().push(result.clone());
156168
let f = async move {
157169
result.lock().unwrap().replace(f.await);
158170
};

0 commit comments

Comments
 (0)