From c1183ed035f0c722bcb9b866d310e91d23e48af6 Mon Sep 17 00:00:00 2001 From: MiniaczQ Date: Sun, 27 Mar 2022 01:24:13 +0100 Subject: [PATCH 1/8] put a mutex on `spawned` vec (non-wasm) --- crates/bevy_tasks/src/task_pool.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 597ebc334c872..c19418fe109f1 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -2,7 +2,7 @@ use std::{ future::Future, mem, pin::Pin, - sync::Arc, + sync::{Arc, Mutex}, thread::{self, JoinHandle}, }; @@ -179,19 +179,20 @@ impl TaskPool { let mut scope = Scope { executor, local_executor, - spawned: Vec::new(), + spawned: Mutex::new(Vec::new()), }; f(&mut scope); - if scope.spawned.is_empty() { + let mut spawned = scope.spawned.lock().unwrap(); + if spawned.is_empty() { Vec::default() - } else if scope.spawned.len() == 1 { - vec![future::block_on(&mut scope.spawned[0])] + } else if spawned.len() == 1 { + vec![future::block_on(&mut spawned[0])] } else { let fut = async move { - let mut results = Vec::with_capacity(scope.spawned.len()); - for task in scope.spawned { + let mut results = Vec::with_capacity(spawned.len()); + for task in spawned.iter_mut() { results.push(task.await); } @@ -265,7 +266,7 @@ impl Default for TaskPool { pub struct Scope<'scope, T> { executor: &'scope async_executor::Executor<'scope>, local_executor: &'scope async_executor::LocalExecutor<'scope>, - spawned: Vec>, + spawned: Mutex>>, } impl<'scope, T: Send + 'scope> Scope<'scope, T> { @@ -279,7 +280,7 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// For more information, see [`TaskPool::scope`]. pub fn spawn + 'scope + Send>(&mut self, f: Fut) { let task = self.executor.spawn(f); - self.spawned.push(task); + self.spawned.lock().unwrap().push(task); } /// Spawns a scoped future onto the thread-local executor. The scope *must* outlive @@ -290,7 +291,7 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// For more information, see [`TaskPool::scope`]. pub fn spawn_local + 'scope>(&mut self, f: Fut) { let task = self.local_executor.spawn(f); - self.spawned.push(task); + self.spawned.lock().unwrap().push(task); } } From 30f5dc0199e88114dd7abe0d86a4f5eef8bde72f Mon Sep 17 00:00:00 2001 From: MiniaczQ Date: Sun, 27 Mar 2022 01:45:47 +0100 Subject: [PATCH 2/8] add the arc --- crates/bevy_tasks/src/task_pool.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index c19418fe109f1..ffe84c8795e55 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -179,7 +179,7 @@ impl TaskPool { let mut scope = Scope { executor, local_executor, - spawned: Mutex::new(Vec::new()), + spawned: Arc::new(Mutex::new(Vec::new())), }; f(&mut scope); @@ -266,7 +266,7 @@ impl Default for TaskPool { pub struct Scope<'scope, T> { executor: &'scope async_executor::Executor<'scope>, local_executor: &'scope async_executor::LocalExecutor<'scope>, - spawned: Mutex>>, + spawned: Arc>>>, } impl<'scope, T: Send + 'scope> Scope<'scope, T> { From f92a145d55ff66592d24c98a79656806050712fe Mon Sep 17 00:00:00 2001 From: MiniaczQ Date: Sun, 27 Mar 2022 02:04:57 +0100 Subject: [PATCH 3/8] remove ref mut requirement for spawn functions --- crates/bevy_ecs/src/schedule/executor_parallel.rs | 2 +- crates/bevy_tasks/src/task_pool.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index b2ab15c7a528c..9262b232dff99 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -179,7 +179,7 @@ impl ParallelExecutor { /// queues systems with no dependencies to run (or skip) at next opportunity. fn prepare_systems<'scope>( &mut self, - scope: &mut Scope<'scope, ()>, + scope: &Scope<'scope, ()>, systems: &'scope mut [ParallelSystemContainer], world: &'scope World, ) { diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index ffe84c8795e55..197e27e169559 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -164,7 +164,7 @@ impl TaskPool { /// This is similar to `rayon::scope` and `crossbeam::scope` pub fn scope<'scope, F, T>(&self, f: F) -> Vec where - F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, + F: FnOnce(&Scope<'scope, T>) + 'scope + Send, T: Send + 'static, { TaskPool::LOCAL_EXECUTOR.with(|local_executor| { @@ -278,7 +278,7 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// instead. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn + 'scope + Send>(&mut self, f: Fut) { + pub fn spawn + 'scope + Send>(&self, f: Fut) { let task = self.executor.spawn(f); self.spawned.lock().unwrap().push(task); } @@ -289,7 +289,7 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// [`Scope::spawn`] instead, unless the provided future is not `Send`. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_local + 'scope>(&mut self, f: Fut) { + pub fn spawn_local + 'scope>(&self, f: Fut) { let task = self.local_executor.spawn(f); self.spawned.lock().unwrap().push(task); } From 910260b773840b45bd207c0618e2aa60d5459465 Mon Sep 17 00:00:00 2001 From: MiniaczQ Date: Sun, 27 Mar 2022 02:23:23 +0100 Subject: [PATCH 4/8] wrap results in arc mutex for wasm --- .../bevy_tasks/src/single_threaded_task_pool.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 8f248e1005c38..b091e669df16f 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -68,7 +68,7 @@ impl TaskPool { let mut scope = Scope { executor, - results: Vec::new(), + results: Arc::new(Mutex::new(Vec::new())), }; f(&mut scope); @@ -76,11 +76,12 @@ impl TaskPool { // Loop until all tasks are done while executor.try_tick() {} - scope - .results + let result = scope + .results.lock().unwrap() .iter() .map(|result| result.lock().unwrap().take().unwrap()) - .collect() + .collect(); + result } // Spawns a static future onto the JS event loop. For now it is returning FakeTask @@ -122,17 +123,17 @@ impl FakeTask { pub struct Scope<'scope, T> { executor: &'scope async_executor::LocalExecutor<'scope>, // Vector to gather results of all futures spawned during scope run - results: Vec>>>, + results: Arc>>>>>, } impl<'scope, T: Send + 'scope> Scope<'scope, T> { - pub fn spawn + 'scope + Send>(&mut self, f: Fut) { + pub fn spawn + 'scope + Send>(&self, f: Fut) { self.spawn_local(f); } - pub fn spawn_local + 'scope>(&mut self, f: Fut) { + pub fn spawn_local + 'scope>(&self, f: Fut) { let result = Arc::new(Mutex::new(None)); - self.results.push(result.clone()); + self.results.lock().unwrap().push(result.clone()); let f = async move { result.lock().unwrap().replace(f.await); }; From 34075e50ff97f950391abf72aa803ec461a4f129 Mon Sep 17 00:00:00 2001 From: MiniaczQ Date: Sun, 27 Mar 2022 02:26:22 +0100 Subject: [PATCH 5/8] make fmt happy --- crates/bevy_tasks/src/single_threaded_task_pool.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index b091e669df16f..f55f3119801f3 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -77,7 +77,9 @@ impl TaskPool { while executor.try_tick() {} let result = scope - .results.lock().unwrap() + .results + .lock() + .unwrap() .iter() .map(|result| result.lock().unwrap().take().unwrap()) .collect(); From 545292639107862fafe046d767988b4b2c81f092 Mon Sep 17 00:00:00 2001 From: MiniaczQ Date: Sun, 27 Mar 2022 03:15:39 +0100 Subject: [PATCH 6/8] naive test --- crates/bevy_tasks/src/task_pool.rs | 37 ++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 197e27e169559..d9b372609c361 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -335,6 +335,43 @@ mod tests { assert_eq!(count.load(Ordering::Relaxed), 100); } + #[test] + fn test_nested_spawn() { + let pool = TaskPool::new(); + + let foo = Box::new(42); + let foo = &*foo; + + let count = Arc::new(AtomicI32::new(0)); + + let outputs = pool.scope(|scope| { + for _ in 0..10 { + let count_clone = count.clone(); + scope.spawn(async move { + for _ in 0..10 { + let count_clone_clone = count_clone.clone(); + let nested_outputs = scope.spawn(async move { + if *foo != 42 { + panic!("not 42!?!?") + } else { + count_clone_clone.fetch_add(1, Ordering::Relaxed); + *foo + } + }); + } + *foo + }); + } + }); + + for output in &outputs { + assert_eq!(*output, 42); + } + + assert_eq!(outputs.len(), 100); + assert_eq!(count.load(Ordering::Relaxed), 100); + } + #[test] fn test_mixed_spawn_local_and_spawn() { let pool = TaskPool::new(); From a876d81e997a29f21358aa9cdd353c9a29a3ebc6 Mon Sep 17 00:00:00 2001 From: MiniaczQ Date: Sun, 27 Mar 2022 03:17:30 +0100 Subject: [PATCH 7/8] naive test, err no.2 --- crates/bevy_tasks/src/task_pool.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index d9b372609c361..4f188d003d8cd 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -347,10 +347,10 @@ mod tests { let outputs = pool.scope(|scope| { for _ in 0..10 { let count_clone = count.clone(); - scope.spawn(async move { + scope.spawn_local(async move { for _ in 0..10 { let count_clone_clone = count_clone.clone(); - let nested_outputs = scope.spawn(async move { + scope.spawn(async move { if *foo != 42 { panic!("not 42!?!?") } else { From 0c3e9dab9277f37a485bec122fb6fce793323aca Mon Sep 17 00:00:00 2001 From: MiniaczQ Date: Sun, 27 Mar 2022 12:57:46 +0100 Subject: [PATCH 8/8] go back to the og nesting error --- crates/bevy_tasks/src/task_pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 4f188d003d8cd..5ab71be6224da 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -347,7 +347,7 @@ mod tests { let outputs = pool.scope(|scope| { for _ in 0..10 { let count_clone = count.clone(); - scope.spawn_local(async move { + scope.spawn(async move { for _ in 0..10 { let count_clone_clone = count_clone.clone(); scope.spawn(async move {