-
-
Notifications
You must be signed in to change notification settings - Fork 3.9k
[Merged by Bors] - Basic adaptive batching for parallel query iteration #4777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
61 commits
Select commit
Hold shift + click to select a range
65bd41f
Remove task_pool parameter from par_for_each(_mut)
james7132 b110039
Fix benchmarks
james7132 cf10758
Embed task pool into QueryState
james7132 076db46
Remove the clone
james7132 12eefa1
Update docs
james7132 3c76af9
Merge branch 'main' into parallel-ergonomics
james7132 332e851
Update docs.
james7132 186fd50
Update docs.
james7132 88044d1
Update docs.
james7132 704cf61
Merge branch 'main' into parallel-ergonomics
james7132 cedd756
Merge branch 'main' into parallel-ergonomics
james7132 1f94913
Basic adaptive batching
james7132 5b3a730
Fix CI
james7132 8b365f4
Add par_iter impls
james7132 21d95cb
Merge branch 'parallel-ergonomics' into adaptive-batching
james7132 92af1f4
Fix CI
james7132 2309ab7
Add default batches per thread
james7132 f788a7a
Merge branch 'main' into adaptive-batching
james7132 6fd0cf1
Formatting
james7132 10904c1
Merge branch 'main' into adaptive-batching
james7132 ac4c524
Merge branch 'main' into adaptive-batching
james7132 0b8c1eb
Update example comments
james7132 3674f10
Add documentation comments to `bevy_window` (#4333)
arnavc52 08bf88b
bevy_render: Fix KTX2 UASTC format mapping (#4569)
superdump 3674e19
update hashbrown to 0.12 (#5035)
mockersf 619bdb9
WGSL: use correct syntax for matrix access (#5039)
mockersf 65a6c9a
Implement `Eq` and `PartialEq` for `MouseScrollUnit` (#5048)
frewsxcv 8959c2d
enable optional dependencies to stay optional (#5023)
mockersf 9a8e5fd
gltf: do not import IoTaskPool in wasm (#5038)
mockersf 03ffbe8
Physical viewport calculation fix (#5055)
aevyrie b89d878
Cleanups in diagnostics (#3871)
mockersf d3b997b
`bevy_reflect`: put `serialize` into external `ReflectSerialize` type…
jakobhellermann c5df0d6
Add benchmarks for schedule dependency resolution (#4961)
joseph-gio d5a5993
change panicking test to not run on global task pool (#4998)
hymm 606635f
Add a `release_all` function to `Input`. (#5011)
Hoidigan 5f6a290
Update `clap` to 3.2 in tools using `value_parser` (#5031)
mlodato517 ce10028
Fix redundant "have" in CONTRIBUTING (#5036)
mlodato517 ec9a481
Add `Input::reset_all` (#5015)
Hoidigan d74a318
Fix Nix section of linux_dependencies.md (#5050)
fluunke 2381ba2
Fixed bevy_ui touch input (#4099)
ManevilleF d025d03
Improve entity and component API docs (#4767)
Nilirad 4132b60
Change check_visibility to use thread-local queues instead of a chann…
james7132 34ae6ba
Mark mutable APIs under ECS storage as pub(crate) (#5065)
james7132 1eaee67
Callable PBR functions (#4939)
superdump f3eef7f
depend on dioxus(and bevy)-maintained fork of stretch (taffy) (#4716)
colepoirier 4ab1465
Make the batch size more configurable
james7132 7bd1617
Allow reusing the same ParIter
james7132 8bc37a0
More complete docs
james7132 bc2c649
Merge branch 'main' into adaptive-batching
james7132 b57d547
More CI fixes
james7132 5556377
Merge branch 'main' into adaptive-batching
james7132 a82ff07
Fix CI
james7132 fd8fefa
Defer to for_each if there is zero or one threads
james7132 a751055
Merge branch 'main' into adaptive-batching
james7132 9111a00
Fix CI
james7132 266bfce
Merge branch 'main' into adaptive-batching
james7132 2cfcb16
Fix build
james7132 73e5dfc
Merge branch 'main' into adaptive-batching
james7132 acf2f5b
Formatting
james7132 015e201
Add documentation for BatchingStrategy
james7132 c6363ea
Merge branch 'main' into adaptive-batching
james7132 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
use crate::world::World; | ||
use bevy_tasks::ComputeTaskPool; | ||
use std::ops::Range; | ||
|
||
use super::{QueryItem, QueryState, ROQueryItem, ReadOnlyWorldQuery, WorldQuery}; | ||
|
||
/// Dictates how a parallel query chunks up large tables/archetypes | ||
/// during iteration. | ||
/// | ||
/// A parallel query will chunk up large tables and archetypes into | ||
/// chunks of at most a certain batch size. | ||
/// | ||
/// By default, this batch size is automatically determined by dividing | ||
/// the size of the largest matched archetype by the number | ||
/// of threads. This attempts to minimize the overhead of scheduling | ||
/// tasks onto multiple threads, but assumes each entity has roughly the | ||
/// same amount of work to be done, which may not hold true in every | ||
/// workload. | ||
/// | ||
/// See [`Query::par_iter`] for more information. | ||
/// | ||
/// [`Query::par_iter`]: crate::system::Query::par_iter | ||
#[derive(Clone)] | ||
pub struct BatchingStrategy { | ||
/// The upper and lower limits for how large a batch of entities. | ||
/// | ||
/// Setting the bounds to the same value will result in a fixed | ||
/// batch size. | ||
/// | ||
/// Defaults to `[1, usize::MAX]`. | ||
pub batch_size_limits: Range<usize>, | ||
/// The number of batches per thread in the [`ComputeTaskPool`]. | ||
/// Increasing this value will decrease the batch size, which may | ||
/// increase the scheduling overhead for the iteration. | ||
/// | ||
/// Defaults to 1. | ||
pub batches_per_thread: usize, | ||
} | ||
|
||
impl BatchingStrategy { | ||
/// Creates a new unconstrained default batching strategy. | ||
pub const fn new() -> Self { | ||
Self { | ||
batch_size_limits: 1..usize::MAX, | ||
batches_per_thread: 1, | ||
} | ||
} | ||
|
||
/// Declares a batching strategy with a fixed batch size. | ||
pub const fn fixed(batch_size: usize) -> Self { | ||
Self { | ||
batch_size_limits: batch_size..batch_size, | ||
batches_per_thread: 1, | ||
} | ||
} | ||
|
||
pub const fn min_batch_size(mut self, batch_size: usize) -> Self { | ||
self.batch_size_limits.start = batch_size; | ||
self | ||
} | ||
|
||
pub const fn max_batch_size(mut self, batch_size: usize) -> Self { | ||
self.batch_size_limits.end = batch_size; | ||
self | ||
} | ||
|
||
pub fn batches_per_thread(mut self, batches_per_thread: usize) -> Self { | ||
assert!( | ||
batches_per_thread > 0, | ||
"The number of batches per thread must be non-zero." | ||
); | ||
self.batches_per_thread = batches_per_thread; | ||
self | ||
} | ||
} | ||
|
||
/// A parallel iterator over query results of a [`Query`](crate::system::Query). | ||
/// | ||
/// This struct is created by the [`Query::par_iter`](crate::system::Query::iter) and | ||
/// [`Query::par_iter_mut`](crate::system::Query::iter_mut) methods. | ||
pub struct QueryParIter<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> { | ||
pub(crate) world: &'w World, | ||
pub(crate) state: &'s QueryState<Q, F>, | ||
pub(crate) batching_strategy: BatchingStrategy, | ||
} | ||
|
||
impl<'w, 's, Q: ReadOnlyWorldQuery, F: ReadOnlyWorldQuery> QueryParIter<'w, 's, Q, F> { | ||
/// Runs `func` on each query result in parallel. | ||
/// | ||
/// This can only be called for read-only queries, see [`Self::for_each_mut`] for | ||
/// write-queries. | ||
/// | ||
/// # Panics | ||
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being | ||
/// initialized and run from the ECS scheduler, this should never panic. | ||
/// | ||
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool | ||
#[inline] | ||
pub fn for_each<FN: Fn(ROQueryItem<'w, Q>) + Send + Sync + Clone>(&self, func: FN) { | ||
// SAFETY: query is read only | ||
unsafe { | ||
self.for_each_unchecked(func); | ||
} | ||
} | ||
} | ||
|
||
impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> QueryParIter<'w, 's, Q, F> { | ||
/// Changes the batching strategy used when iterating. | ||
/// | ||
/// For more information on how this affects the resultant iteration, see | ||
/// [`BatchingStrategy`]. | ||
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self { | ||
self.batching_strategy = strategy; | ||
self | ||
} | ||
|
||
/// Runs `func` on each query result in parallel. | ||
/// | ||
/// # Panics | ||
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being | ||
/// initialized and run from the ECS scheduler, this should never panic. | ||
/// | ||
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool | ||
#[inline] | ||
pub fn for_each_mut<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(&mut self, func: FN) { | ||
// SAFETY: query has unique world access | ||
unsafe { | ||
self.for_each_unchecked(func); | ||
} | ||
} | ||
|
||
/// Runs `func` on each query result in parallel. | ||
/// | ||
/// # Panics | ||
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being | ||
/// initialized and run from the ECS scheduler, this should never panic. | ||
/// | ||
/// # Safety | ||
/// | ||
/// This does not check for mutable query correctness. To be safe, make sure mutable queries | ||
/// have unique access to the components they query. | ||
/// | ||
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool | ||
#[inline] | ||
pub unsafe fn for_each_unchecked<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>( | ||
&self, | ||
func: FN, | ||
) { | ||
let thread_count = ComputeTaskPool::get().thread_num(); | ||
if thread_count <= 1 { | ||
self.state.for_each_unchecked_manual( | ||
self.world, | ||
func, | ||
self.world.last_change_tick(), | ||
self.world.read_change_tick(), | ||
); | ||
} else { | ||
// Need a batch size of at least 1. | ||
let batch_size = self.get_batch_size(thread_count).max(1); | ||
self.state.par_for_each_unchecked_manual( | ||
self.world, | ||
batch_size, | ||
func, | ||
self.world.last_change_tick(), | ||
self.world.read_change_tick(), | ||
); | ||
} | ||
} | ||
|
||
fn get_batch_size(&self, thread_count: usize) -> usize { | ||
if self.batching_strategy.batch_size_limits.is_empty() { | ||
return self.batching_strategy.batch_size_limits.start; | ||
} | ||
|
||
assert!( | ||
thread_count > 0, | ||
"Attempted to run parallel iteration over a query with an empty TaskPool" | ||
); | ||
let max_size = if Q::IS_DENSE && F::IS_DENSE { | ||
let tables = &self.world.storages().tables; | ||
self.state | ||
.matched_table_ids | ||
.iter() | ||
.map(|id| tables[*id].entity_count()) | ||
.max() | ||
.unwrap_or(0) | ||
} else { | ||
let archetypes = &self.world.archetypes(); | ||
self.state | ||
.matched_archetype_ids | ||
.iter() | ||
.map(|id| archetypes[*id].len()) | ||
.max() | ||
.unwrap_or(0) | ||
}; | ||
let batch_size = max_size / (thread_count * self.batching_strategy.batches_per_thread); | ||
batch_size.clamp( | ||
self.batching_strategy.batch_size_limits.start, | ||
self.batching_strategy.batch_size_limits.end, | ||
) | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.