Skip to content

Commit dfea88c

Browse files
james7132arnavc52superdumpmockersffrewsxcv
committed
Basic adaptive batching for parallel query iteration (#4777)
# Objective Fixes #3184. Fixes #6640. Fixes #4798. Using `Query::par_for_each(_mut)` currently requires a `batch_size` parameter, which affects how it chunks up large archetypes and tables into smaller chunks to run in parallel. Tuning this value is difficult, as the performance characteristics entirely depends on the state of the `World` it's being run on. Typically, users will just use a flat constant and just tune it by hand until it performs well in some benchmarks. However, this is both error prone and risks overfitting the tuning on that benchmark. This PR proposes a naive automatic batch-size computation based on the current state of the `World`. ## Background `Query::par_for_each(_mut)` schedules a new Task for every archetype or table that it matches. Archetypes/tables larger than the batch size are chunked into smaller tasks. Assuming every entity matched by the query has an identical workload, this makes the worst case scenario involve using a batch size equal to the size of the largest matched archetype or table. Conversely, a batch size of `max {archetype, table} size / thread count * COUNT_PER_THREAD` is likely the sweetspot where the overhead of scheduling tasks is minimized, at least not without grouping small archetypes/tables together. There is also likely a strict minimum batch size below which the overhead of scheduling these tasks is heavier than running the entire thing single-threaded. ## Solution - [x] Remove the `batch_size` from `Query(State)::par_for_each` and friends. - [x] Add a check to compute `batch_size = max {archeytpe/table} size / thread count * COUNT_PER_THREAD` - [x] ~~Panic if thread count is 0.~~ Defer to `for_each` if the thread count is 1 or less. - [x] Early return if there is no matched table/archetype. - [x] Add override option for users have queries that strongly violate the initial assumption that all iterated entities have an equal workload. --- ## Changelog Changed: `Query::par_for_each(_mut)` has been changed to `Query::par_iter(_mut)` and will now automatically try to produce a batch size for callers based on the current `World` state. ## Migration Guide The `batch_size` parameter for `Query(State)::par_for_each(_mut)` has been removed. These calls will automatically compute a batch size for you. Remove these parameters from all calls to these functions. Before: ```rust fn parallel_system(query: Query<&MyComponent>) { query.par_for_each(32, |comp| { ... }); } ``` After: ```rust fn parallel_system(query: Query<&MyComponent>) { query.par_iter().for_each(|comp| { ... }); } ``` Co-authored-by: Arnav Choubey <[email protected]> Co-authored-by: Robert Swain <[email protected]> Co-authored-by: François <[email protected]> Co-authored-by: Corey Farwell <[email protected]> Co-authored-by: Aevyrie <[email protected]>
1 parent cab065b commit dfea88c

File tree

11 files changed

+294
-188
lines changed

11 files changed

+294
-188
lines changed

benches/benches/bevy_ecs/iteration/heavy_compute.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub fn heavy_compute(c: &mut Criterion) {
3434
}));
3535

3636
fn sys(mut query: Query<(&mut Position, &mut Transform)>) {
37-
query.par_for_each_mut(128, |(mut pos, mut mat)| {
37+
query.par_iter_mut().for_each_mut(|(mut pos, mut mat)| {
3838
for _ in 0..100 {
3939
mat.0 = mat.0.inverse();
4040
}

crates/bevy_animation/src/lib.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -352,20 +352,22 @@ pub fn animation_player(
352352
parents: Query<(Option<With<AnimationPlayer>>, Option<&Parent>)>,
353353
mut animation_players: Query<(Entity, Option<&Parent>, &mut AnimationPlayer)>,
354354
) {
355-
animation_players.par_for_each_mut(10, |(root, maybe_parent, mut player)| {
356-
update_transitions(&mut player, &time);
357-
run_animation_player(
358-
root,
359-
player,
360-
&time,
361-
&animations,
362-
&names,
363-
&transforms,
364-
maybe_parent,
365-
&parents,
366-
&children,
367-
);
368-
});
355+
animation_players
356+
.par_iter_mut()
357+
.for_each_mut(|(root, maybe_parent, mut player)| {
358+
update_transitions(&mut player, &time);
359+
run_animation_player(
360+
root,
361+
player,
362+
&time,
363+
&animations,
364+
&names,
365+
&transforms,
366+
maybe_parent,
367+
&parents,
368+
&children,
369+
);
370+
});
369371
}
370372

371373
#[allow(clippy::too_many_arguments)]

crates/bevy_ecs/src/lib.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,8 @@ mod tests {
400400
let results = Arc::new(Mutex::new(Vec::new()));
401401
world
402402
.query::<(Entity, &A)>()
403-
.par_for_each(&world, 2, |(e, &A(i))| {
403+
.par_iter(&world)
404+
.for_each(|(e, &A(i))| {
404405
results.lock().unwrap().push((e, i));
405406
});
406407
results.lock().unwrap().sort();
@@ -420,11 +421,10 @@ mod tests {
420421
let e4 = world.spawn((SparseStored(4), A(1))).id();
421422
let e5 = world.spawn((SparseStored(5), A(1))).id();
422423
let results = Arc::new(Mutex::new(Vec::new()));
423-
world.query::<(Entity, &SparseStored)>().par_for_each(
424-
&world,
425-
2,
426-
|(e, &SparseStored(i))| results.lock().unwrap().push((e, i)),
427-
);
424+
world
425+
.query::<(Entity, &SparseStored)>()
426+
.par_iter(&world)
427+
.for_each(|(e, &SparseStored(i))| results.lock().unwrap().push((e, i)));
428428
results.lock().unwrap().sort();
429429
assert_eq!(
430430
&*results.lock().unwrap(),

crates/bevy_ecs/src/query/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ mod access;
22
mod fetch;
33
mod filter;
44
mod iter;
5+
mod par_iter;
56
mod state;
67

78
pub use access::*;
89
pub use fetch::*;
910
pub use filter::*;
1011
pub use iter::*;
12+
pub use par_iter::*;
1113
pub use state::*;
1214

1315
/// A debug checked version of [`Option::unwrap_unchecked`]. Will panic in

crates/bevy_ecs/src/query/par_iter.rs

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
use crate::world::World;
2+
use bevy_tasks::ComputeTaskPool;
3+
use std::ops::Range;
4+
5+
use super::{QueryItem, QueryState, ROQueryItem, ReadOnlyWorldQuery, WorldQuery};
6+
7+
/// Dictates how a parallel query chunks up large tables/archetypes
8+
/// during iteration.
9+
///
10+
/// A parallel query will chunk up large tables and archetypes into
11+
/// chunks of at most a certain batch size.
12+
///
13+
/// By default, this batch size is automatically determined by dividing
14+
/// the size of the largest matched archetype by the number
15+
/// of threads. This attempts to minimize the overhead of scheduling
16+
/// tasks onto multiple threads, but assumes each entity has roughly the
17+
/// same amount of work to be done, which may not hold true in every
18+
/// workload.
19+
///
20+
/// See [`Query::par_iter`] for more information.
21+
///
22+
/// [`Query::par_iter`]: crate::system::Query::par_iter
23+
#[derive(Clone)]
24+
pub struct BatchingStrategy {
25+
/// The upper and lower limits for how large a batch of entities.
26+
///
27+
/// Setting the bounds to the same value will result in a fixed
28+
/// batch size.
29+
///
30+
/// Defaults to `[1, usize::MAX]`.
31+
pub batch_size_limits: Range<usize>,
32+
/// The number of batches per thread in the [`ComputeTaskPool`].
33+
/// Increasing this value will decrease the batch size, which may
34+
/// increase the scheduling overhead for the iteration.
35+
///
36+
/// Defaults to 1.
37+
pub batches_per_thread: usize,
38+
}
39+
40+
impl BatchingStrategy {
41+
/// Creates a new unconstrained default batching strategy.
42+
pub const fn new() -> Self {
43+
Self {
44+
batch_size_limits: 1..usize::MAX,
45+
batches_per_thread: 1,
46+
}
47+
}
48+
49+
/// Declares a batching strategy with a fixed batch size.
50+
pub const fn fixed(batch_size: usize) -> Self {
51+
Self {
52+
batch_size_limits: batch_size..batch_size,
53+
batches_per_thread: 1,
54+
}
55+
}
56+
57+
pub const fn min_batch_size(mut self, batch_size: usize) -> Self {
58+
self.batch_size_limits.start = batch_size;
59+
self
60+
}
61+
62+
pub const fn max_batch_size(mut self, batch_size: usize) -> Self {
63+
self.batch_size_limits.end = batch_size;
64+
self
65+
}
66+
67+
pub fn batches_per_thread(mut self, batches_per_thread: usize) -> Self {
68+
assert!(
69+
batches_per_thread > 0,
70+
"The number of batches per thread must be non-zero."
71+
);
72+
self.batches_per_thread = batches_per_thread;
73+
self
74+
}
75+
}
76+
77+
/// A parallel iterator over query results of a [`Query`](crate::system::Query).
78+
///
79+
/// This struct is created by the [`Query::par_iter`](crate::system::Query::iter) and
80+
/// [`Query::par_iter_mut`](crate::system::Query::iter_mut) methods.
81+
pub struct QueryParIter<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> {
82+
pub(crate) world: &'w World,
83+
pub(crate) state: &'s QueryState<Q, F>,
84+
pub(crate) batching_strategy: BatchingStrategy,
85+
}
86+
87+
impl<'w, 's, Q: ReadOnlyWorldQuery, F: ReadOnlyWorldQuery> QueryParIter<'w, 's, Q, F> {
88+
/// Runs `func` on each query result in parallel.
89+
///
90+
/// This can only be called for read-only queries, see [`Self::for_each_mut`] for
91+
/// write-queries.
92+
///
93+
/// # Panics
94+
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
95+
/// initialized and run from the ECS scheduler, this should never panic.
96+
///
97+
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
98+
#[inline]
99+
pub fn for_each<FN: Fn(ROQueryItem<'w, Q>) + Send + Sync + Clone>(&self, func: FN) {
100+
// SAFETY: query is read only
101+
unsafe {
102+
self.for_each_unchecked(func);
103+
}
104+
}
105+
}
106+
107+
impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> QueryParIter<'w, 's, Q, F> {
108+
/// Changes the batching strategy used when iterating.
109+
///
110+
/// For more information on how this affects the resultant iteration, see
111+
/// [`BatchingStrategy`].
112+
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
113+
self.batching_strategy = strategy;
114+
self
115+
}
116+
117+
/// Runs `func` on each query result in parallel.
118+
///
119+
/// # Panics
120+
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
121+
/// initialized and run from the ECS scheduler, this should never panic.
122+
///
123+
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
124+
#[inline]
125+
pub fn for_each_mut<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(&mut self, func: FN) {
126+
// SAFETY: query has unique world access
127+
unsafe {
128+
self.for_each_unchecked(func);
129+
}
130+
}
131+
132+
/// Runs `func` on each query result in parallel.
133+
///
134+
/// # Panics
135+
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
136+
/// initialized and run from the ECS scheduler, this should never panic.
137+
///
138+
/// # Safety
139+
///
140+
/// This does not check for mutable query correctness. To be safe, make sure mutable queries
141+
/// have unique access to the components they query.
142+
///
143+
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
144+
#[inline]
145+
pub unsafe fn for_each_unchecked<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(
146+
&self,
147+
func: FN,
148+
) {
149+
let thread_count = ComputeTaskPool::get().thread_num();
150+
if thread_count <= 1 {
151+
self.state.for_each_unchecked_manual(
152+
self.world,
153+
func,
154+
self.world.last_change_tick(),
155+
self.world.read_change_tick(),
156+
);
157+
} else {
158+
// Need a batch size of at least 1.
159+
let batch_size = self.get_batch_size(thread_count).max(1);
160+
self.state.par_for_each_unchecked_manual(
161+
self.world,
162+
batch_size,
163+
func,
164+
self.world.last_change_tick(),
165+
self.world.read_change_tick(),
166+
);
167+
}
168+
}
169+
170+
fn get_batch_size(&self, thread_count: usize) -> usize {
171+
if self.batching_strategy.batch_size_limits.is_empty() {
172+
return self.batching_strategy.batch_size_limits.start;
173+
}
174+
175+
assert!(
176+
thread_count > 0,
177+
"Attempted to run parallel iteration over a query with an empty TaskPool"
178+
);
179+
let max_size = if Q::IS_DENSE && F::IS_DENSE {
180+
let tables = &self.world.storages().tables;
181+
self.state
182+
.matched_table_ids
183+
.iter()
184+
.map(|id| tables[*id].entity_count())
185+
.max()
186+
.unwrap_or(0)
187+
} else {
188+
let archetypes = &self.world.archetypes();
189+
self.state
190+
.matched_archetype_ids
191+
.iter()
192+
.map(|id| archetypes[*id].len())
193+
.max()
194+
.unwrap_or(0)
195+
};
196+
let batch_size = max_size / (thread_count * self.batching_strategy.batches_per_thread);
197+
batch_size.clamp(
198+
self.batching_strategy.batch_size_limits.start,
199+
self.batching_strategy.batch_size_limits.end,
200+
)
201+
}
202+
}

0 commit comments

Comments
 (0)