Skip to content

[Merged by Bors] - Add ParallelCommands system parameter #4749

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
1 change: 1 addition & 0 deletions crates/bevy_ecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ bevy_utils = { path = "../bevy_utils", version = "0.8.0-dev" }
bevy_ecs_macros = { path = "macros", version = "0.8.0-dev" }

async-channel = "1.4"
thread_local = "1.1.4"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this already in our tree?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but it is a nice crate that another PR (#4663) depends on as well.

fixedbitset = "0.4"
fxhash = "0.2"
downcast-rs = "1.2"
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_ecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub mod prelude {
},
system::{
Commands, In, IntoChainSystem, IntoExclusiveSystem, IntoSystem, Local, NonSend,
NonSendMut, ParamSet, Query, RemovedComponents, Res, ResMut, System,
NonSendMut, ParallelCommands, ParamSet, Query, RemovedComponents, Res, ResMut, System,
SystemParamFunction,
},
world::{FromWorld, Mut, World},
Expand Down
2 changes: 2 additions & 0 deletions crates/bevy_ecs/src/system/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod command_queue;
mod parallel_scope;

use crate::{
bundle::Bundle,
Expand All @@ -8,6 +9,7 @@ use crate::{
};
use bevy_utils::tracing::{error, warn};
pub use command_queue::CommandQueue;
pub use parallel_scope::*;
use std::marker::PhantomData;

use super::Resource;
Expand Down
98 changes: 98 additions & 0 deletions crates/bevy_ecs/src/system/commands/parallel_scope.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::cell::Cell;

use thread_local::ThreadLocal;

use crate::{
entity::Entities,
prelude::World,
system::{SystemParam, SystemParamFetch, SystemParamState},
};

use super::{CommandQueue, Commands};

#[doc(hidden)]
#[derive(Default)]
/// The internal [`SystemParamState`] of the [`ParallelCommands`] type
pub struct ParallelCommandsState {
thread_local_storage: ThreadLocal<Cell<CommandQueue>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should generalize this and add it to bevy_utils. This is seemingly a very useful abstraction that showed up in #4899 too.

}

/// An alternative to [`Commands`] that can be used in parallel contexts, such as those in [`Query::par_for_each`](crate::system::Query::par_for_each)
///
/// Note: Because command application order will depend on how many threads are ran, non-commutative commands may result in non-deterministic results.
///
/// Example:
/// ```
/// # use bevy_ecs::prelude::*;
/// # use bevy_tasks::ComputeTaskPool;
/// #
/// # #[derive(Component)]
/// # struct Velocity;
/// # impl Velocity { fn magnitude(&self) -> f32 { 42.0 } }
/// fn parallel_command_system(
/// mut query: Query<(Entity, &Velocity)>,
/// par_commands: ParallelCommands
/// ) {
/// query.par_for_each(32, |(entity, velocity)| {
/// if velocity.magnitude() > 10.0 {
/// par_commands.command_scope(|mut commands| {
/// commands.entity(entity).despawn();
/// });
/// }
/// });
/// }
/// # bevy_ecs::system::assert_is_system(parallel_command_system);
///```
pub struct ParallelCommands<'w, 's> {
state: &'s mut ParallelCommandsState,
entities: &'w Entities,
}

impl SystemParam for ParallelCommands<'_, '_> {
type Fetch = ParallelCommandsState;
}

impl<'w, 's> SystemParamFetch<'w, 's> for ParallelCommandsState {
type Item = ParallelCommands<'w, 's>;

unsafe fn get_param(
state: &'s mut Self,
_: &crate::system::SystemMeta,
world: &'w World,
_: u32,
) -> Self::Item {
ParallelCommands {
state,
entities: world.entities(),
}
}
}

// SAFE: no component or resource access to report
unsafe impl SystemParamState for ParallelCommandsState {
fn init(_: &mut World, _: &mut crate::system::SystemMeta) -> Self {
Self::default()
}

fn apply(&mut self, world: &mut World) {
for cq in self.thread_local_storage.iter_mut() {
cq.get_mut().apply(world);
}
}
}

impl<'w, 's> ParallelCommands<'w, 's> {
pub fn command_scope<R>(&self, f: impl FnOnce(Commands) -> R) -> R {
let store = &self.state.thread_local_storage;
let command_queue_cell = store.get_or_default();
let mut command_queue = command_queue_cell.take();

let r = f(Commands::new_from_entities(
&mut command_queue,
self.entities,
));

command_queue_cell.set(command_queue);
r
}
}