diff --git a/.cargo/config.toml b/.cargo/config.toml index ddbe09f3..e3c29ee0 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,2 +1,5 @@ [build] rustdocflags = ["-D", "warnings"] + +[target.wasm32-unknown-unknown] +rustflags = '--cfg getrandom_backend="wasm_js"' diff --git a/.github/workflows/ci_linux.yaml b/.github/workflows/ci_linux.yaml index 660d6933..da631df2 100644 --- a/.github/workflows/ci_linux.yaml +++ b/.github/workflows/ci_linux.yaml @@ -18,7 +18,7 @@ jobs: build: strategy: matrix: - rust-version: [stable, 1.75] + rust-version: [stable] runs-on: ubuntu-latest @@ -28,14 +28,6 @@ jobs: - name: Setup rust run: rustup default ${{ matrix.rust-version }} - # As new versions of our dependencies come out, they might depend on newer - # versions of the Rust compiler. When that happens, we'll use this step to - # lock down the dependency to a version that is known to be compatible with - # compiler version 1.75. - - name: Patch dependencies - if: ${{ matrix.rust-version == 1.75 }} - run: ./scripts/patch-versions-msrv-1_75.sh - - name: Build default features run: cargo build --workspace - name: Test default features diff --git a/Cargo.toml b/Cargo.toml index 8a05bd05..080d049c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bevy_impulse" -version = "0.0.2" +version = "0.4.0" edition = "2021" authors = ["Grey "] license = "Apache-2.0" @@ -17,18 +17,17 @@ categories = [ [dependencies] bevy_impulse_derive = { path = "macros", version = "0.0.2" } -bevy_ecs = "0.12" -bevy_utils = "0.12" -bevy_hierarchy = "0.12" -bevy_derive = "0.12" -bevy_app = "0.12" +bevy_ecs = "0.16" +bevy_utils = "0.16" +bevy_derive = "0.16" +bevy_app = "0.16" async-task = { version = "4.7.1", optional = true } # TODO(@mxgrey) We could probably remove bevy_tasks when the single_threaded_async # feature is active, but we'd have to refactor some internal usage of # bevy_tasks::Task, so we're leaving it as a mandatory dependency for now. -bevy_tasks = { version = "0.12", features = ["multi-threaded"] } +bevy_tasks = { version = "0.16", features = ["multi_threaded"] } itertools = "0.13" smallvec = "1.13" @@ -37,14 +36,15 @@ futures = "0.3" backtrace = "0.3" anyhow = "1.0" thiserror = "1.0" +variadics_please = "1.1" # These dependencies are only used by the testing module. # We may want to consider feature-gating them, but we use # the testing module for doctests, and doctests can only # make use of default features, so we're a bit stuck with # these for now. -bevy_core = "0.12" -bevy_time = "0.12" +bevy_diagnostic = "0.16" +bevy_time = "0.16" schemars = { version = "0.8.21", optional = true } serde = { version = "1.0.210", features = ["derive", "rc"], optional = true } @@ -54,8 +54,9 @@ tracing = "0.1.41" strum = { version = "0.26.3", optional = true, features = ["derive"] } semver = { version = "1.0.24", optional = true } -[target.'cfg(target_arch = "wasm32")'.dependencies] +[target.wasm32-unknown-unknown.dependencies] uuid = { version = "1.13.1", default-features = false, features = ["js"] } +getrandom = { version = "0.3.3", features = ["wasm_js"] } [features] single_threaded_async = ["dep:async-task"] diff --git a/diagram.schema.json b/diagram.schema.json index 40e04d27..0dbfa57a 100644 --- a/diagram.schema.json +++ b/diagram.schema.json @@ -145,8 +145,7 @@ "builder": { "type": "string" } - }, - "additionalProperties": false + } }, { "type": "object", @@ -157,8 +156,7 @@ "template": { "type": "string" } - }, - "additionalProperties": false + } } ], "required": [ @@ -506,11 +504,6 @@ "title": "NamespacedOperation", "description": "Refer to an operation inside of a namespace, e.g. { \"\": \"\"", "type": "object", - "allOf": [ - { - "$ref": "NamespacedOperation" - } - ], "maxProperties": 1, "minProperties": 1, "additionalProperties": { diff --git a/examples/diagram/calculator/Cargo.toml b/examples/diagram/calculator/Cargo.toml index 8e89632b..fe07260a 100644 --- a/examples/diagram/calculator/Cargo.toml +++ b/examples/diagram/calculator/Cargo.toml @@ -1,13 +1,13 @@ [package] name = "calculator" -version = "0.1.0" +version = "0.4.0" edition = "2021" [dependencies] -bevy_app = "0.12" -bevy_core = "0.12" -bevy_impulse = { version = "0.0.2", path = "../../..", features = ["diagram"] } -bevy_time = "0.12" +bevy_app = "0.16" +bevy_core = "0.15" +bevy_impulse = { version = "0.4.0", path = "../../..", features = ["diagram"] } +bevy_time = "0.16" clap = { version = "4.5.23", features = ["derive"] } serde_json = "1.0.128" tracing-subscriber = "0.3.19" diff --git a/examples/diagram/calculator/src/main.rs b/examples/diagram/calculator/src/main.rs index 2321f0cf..dc743d13 100644 --- a/examples/diagram/calculator/src/main.rs +++ b/examples/diagram/calculator/src/main.rs @@ -46,7 +46,7 @@ fn main() -> Result<(), Box> { let request = serde_json::Value::from_str(&args.request)?; let mut promise = - app.world + app.world_mut() .command(|cmds| -> Result, DiagramError> { let workflow = diagram.spawn_io_workflow(cmds, ®istry)?; Ok(cmds.request(request, workflow).take_response()) diff --git a/scripts/patch-versions-msrv-1_75.sh b/scripts/patch-versions-msrv-1_75.sh deleted file mode 100755 index 292036de..00000000 --- a/scripts/patch-versions-msrv-1_75.sh +++ /dev/null @@ -1,4 +0,0 @@ -# This script is useful for forcing dependencies to be compatible with Rust v1.75 -# Run this script in the root directory of the package. - -cargo add home@=0.5.9 diff --git a/src/async_execution/single_threaded_execution.rs b/src/async_execution/single_threaded_execution.rs index ffd8ebc4..f2354f7c 100644 --- a/src/async_execution/single_threaded_execution.rs +++ b/src/async_execution/single_threaded_execution.rs @@ -18,7 +18,7 @@ use bevy_ecs::prelude::World; use async_task::Runnable; -pub(crate) use bevy_tasks::Task as TaskHandle; +pub(crate) use bevy_tasks::{Task as TaskHandle, TaskPool}; use tokio::sync::mpsc::{ unbounded_channel, UnboundedReceiver as TokioReceiver, UnboundedSender as TokioSender, }; @@ -99,12 +99,7 @@ impl SingleThreadedExecution { where T: Send + 'static, { - let sender = self.runnable_sender.clone(); - let (runnable, task) = async_task::spawn_local(future, move |runnable| { - sender.send(runnable).ok(); - }); - let _ = self.runnable_sender.send(runnable); - TaskHandle::new(task) + TaskPool::new().spawn_local(future) } pub(crate) fn cancel_sender(&self) -> SingleThreadedExecutionSender { diff --git a/src/buffer.rs b/src/buffer.rs index 1c962681..5a0910f9 100644 --- a/src/buffer.rs +++ b/src/buffer.rs @@ -78,7 +78,7 @@ impl Buffer { let target = builder.commands.spawn(UnusedTarget).id(); builder .commands - .add(OnNewBufferValue::new(self.id(), target)); + .queue(OnNewBufferValue::new(self.id(), target)); Chain::new(target, builder) } @@ -442,7 +442,7 @@ impl BufferWorldAccess for World { { let buffer_ref = self .get_entity(key.tag.buffer) - .ok_or(BufferError::BufferMissing)?; + .map_err(|_| BufferError::BufferMissing)?; let storage = buffer_ref .get::>() .ok_or(BufferError::BufferMissing)?; @@ -734,7 +734,7 @@ where { fn drop(&mut self) { if self.modified { - self.commands.add(NotifyBufferUpdate::new( + self.commands.queue(NotifyBufferUpdate::new( self.buffer, self.session, self.accessor, diff --git a/src/buffer/any_buffer.rs b/src/buffer/any_buffer.rs index ebfd8943..5276a101 100644 --- a/src/buffer/any_buffer.rs +++ b/src/buffer/any_buffer.rs @@ -520,7 +520,7 @@ impl<'w, 's, 'a> AnyBufferMut<'w, 's, 'a> { impl<'w, 's, 'a> Drop for AnyBufferMut<'w, 's, 'a> { fn drop(&mut self) { if self.modified { - self.commands.add(NotifyBufferUpdate::new( + self.commands.queue(NotifyBufferUpdate::new( self.buffer, self.session, self.accessor, @@ -1003,7 +1003,7 @@ impl AnyBufferAccessInterface for AnyBufferAcces ) -> Result, BufferError> { let buffer_ref = world .get_entity(key.tag.buffer) - .ok_or(BufferError::BufferMissing)?; + .map_err(|_| BufferError::BufferMissing)?; let storage = buffer_ref .get::>() .ok_or(BufferError::BufferMissing)?; diff --git a/src/buffer/bufferable.rs b/src/buffer/bufferable.rs index daf55738..54ca911f 100644 --- a/src/buffer/bufferable.rs +++ b/src/buffer/bufferable.rs @@ -15,8 +15,8 @@ * */ -use bevy_utils::all_tuples; use smallvec::SmallVec; +use variadics_please::all_tuples; use crate::{ Accessing, AddOperation, Buffer, BufferSettings, Buffering, Builder, Chain, CloneFromBuffer, @@ -186,7 +186,7 @@ pub trait IterBufferable { let buffers = self.into_buffer_vec::(builder); let join = builder.commands.spawn(()).id(); let target = builder.commands.spawn(UnusedTarget).id(); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(builder.scope()), join, Join::new(buffers, target), diff --git a/src/buffer/buffering.rs b/src/buffer/buffering.rs index 81eea5f1..28ce94a6 100644 --- a/src/buffer/buffering.rs +++ b/src/buffer/buffering.rs @@ -15,9 +15,11 @@ * */ -use bevy_ecs::prelude::{Entity, World}; -use bevy_hierarchy::BuildChildren; -use bevy_utils::all_tuples; +use bevy_ecs::{ + hierarchy::ChildOf, + prelude::{Entity, World}, +}; +use variadics_please::all_tuples; use smallvec::SmallVec; @@ -68,7 +70,7 @@ pub trait Joining: Buffering { let join = builder.commands.spawn(()).id(); let target = builder.commands.spawn(UnusedTarget).id(); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(scope), join, Join::new(self, target), @@ -101,7 +103,7 @@ pub trait Accessing: Buffering { let listen = builder.commands.spawn(()).id(); let target = builder.commands.spawn(UnusedTarget).id(); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(scope), listen, Listen::new(self, target), @@ -113,7 +115,7 @@ pub trait Accessing: Buffering { fn access(self, builder: &mut Builder) -> Node { let source = builder.commands.spawn(()).id(); let target = builder.commands.spawn(UnusedTarget).id(); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(builder.scope), source, OperateBufferAccess::::new(self, target), @@ -187,9 +189,13 @@ pub trait Accessing: Buffering { build, ); - let begin_cancel = builder.commands.spawn(()).set_parent(builder.scope).id(); + let begin_cancel = builder + .commands + .spawn(()) + .insert(ChildOf(builder.scope)) + .id(); self.verify_scope(builder.scope); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( None, begin_cancel, BeginCleanupWorkflow::::new( diff --git a/src/buffer/json_buffer.rs b/src/buffer/json_buffer.rs index 68cb1c95..248aff85 100644 --- a/src/buffer/json_buffer.rs +++ b/src/buffer/json_buffer.rs @@ -458,7 +458,7 @@ impl<'w, 's, 'a> JsonBufferMut<'w, 's, 'a> { impl<'w, 's, 'a> Drop for JsonBufferMut<'w, 's, 'a> { fn drop(&mut self) { if self.modified { - self.commands.add(NotifyBufferUpdate::new( + self.commands.queue(NotifyBufferUpdate::new( self.buffer, self.session, self.accessor, @@ -922,7 +922,7 @@ impl JsonBufferAccessIn ) -> Result, BufferError> { let buffer_ref = world .get_entity(key.tag.buffer) - .ok_or(BufferError::BufferMissing)?; + .map_err(|_| BufferError::BufferMissing)?; let storage = buffer_ref .get::>() .ok_or(BufferError::BufferMissing)?; diff --git a/src/builder.rs b/src/builder.rs index a4009aab..4615d2c6 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -145,7 +145,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { /// Connect the output of one into the input slot of another node. pub fn connect(&mut self, output: Output, input: InputSlot) { assert_eq!(output.scope(), input.scope()); - self.commands.add(Connect { + self.commands.queue(Connect { original_target: output.id(), new_target: input.id(), }); @@ -159,7 +159,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { settings: BufferSettings, ) -> Buffer { let source = self.commands.spawn(()).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, OperateBuffer::::new(settings), @@ -222,7 +222,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { T: Clone + 'static + Send + Sync, { let source = self.commands.spawn(()).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, ForkClone::::new(ForkTargetStorage::new()), @@ -258,7 +258,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { let target_ok = self.commands.spawn(UnusedTarget).id(); let target_err = self.commands.spawn(UnusedTarget).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, make_result_branching::(ForkTargetStorage::from_iter([target_ok, target_err])), @@ -287,7 +287,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { let target_some = self.commands.spawn(UnusedTarget).id(); let target_none = self.commands.spawn(UnusedTarget).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, make_option_branching::(ForkTargetStorage::from_iter([target_some, target_none])), @@ -390,7 +390,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { let source = self.commands.spawn(()).id(); let target = self.commands.spawn(UnusedTarget).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, Collect::::new(target, min, max), @@ -427,7 +427,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { T: 'static + Send + Sync + Splittable, { let source = self.commands.spawn(()).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, OperateSplit::::default(), @@ -450,7 +450,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { T: 'static + Send + Sync + ToString, { let source = self.commands.spawn(()).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, OperateCancel::::new(), @@ -466,7 +466,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { /// input value that triggered it, use [`Self::create_cancel`]. pub fn create_quiet_cancel(&mut self) -> InputSlot<()> { let source = self.commands.spawn(()).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, OperateQuietCancel, @@ -582,7 +582,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { let source = self.commands.spawn(()).id(); let target = self.commands.spawn(UnusedTarget).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, Trim::::new(branches, target), @@ -613,7 +613,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { let source = self.commands.spawn(()).id(); let target = self.commands.spawn(UnusedTarget).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, OperateDynamicGate::::new(buffers, target), @@ -642,7 +642,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { let source = self.commands.spawn(()).id(); let target = self.commands.spawn(UnusedTarget).id(); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, OperateStaticGate::::new(buffers, target, action), @@ -752,7 +752,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> { let mut map = StreamTargetMap::default(); let (bundle, streams) = Streams::spawn_node_streams(source, &mut map, self); self.commands.entity(source).insert((bundle, map)); - self.commands.add(AddOperation::new( + self.commands.queue(AddOperation::new( Some(self.scope), source, Injection::::new(target), diff --git a/src/builder/connect.rs b/src/builder/connect.rs index 80c8819c..5288fd8f 100644 --- a/src/builder/connect.rs +++ b/src/builder/connect.rs @@ -15,11 +15,7 @@ * */ -use bevy_ecs::{ - prelude::{Entity, World}, - system::Command, -}; -use bevy_hierarchy::prelude::DespawnRecursiveExt; +use bevy_ecs::prelude::{Command, Entity, World}; use backtrace::Backtrace; @@ -74,7 +70,7 @@ fn try_connect(connect: Connect, world: &mut World) -> OperationResult { world .get_entity_mut(connect.original_target) .or_broken()? - .despawn_recursive(); + .despawn(); return Ok(()); } @@ -140,7 +136,7 @@ fn try_connect(connect: Connect, world: &mut World) -> OperationResult { world .get_entity_mut(connect.original_target) .or_broken()? - .despawn_recursive(); + .despawn(); Ok(()) } diff --git a/src/callback.rs b/src/callback.rs index 7d52b0b6..2c15e95d 100644 --- a/src/callback.rs +++ b/src/callback.rs @@ -267,7 +267,7 @@ pub trait CallbackTrait { pub struct BlockingCallbackMarker(std::marker::PhantomData); struct BlockingCallbackSystem { - system: BoxedSystem, Response>, + system: BoxedSystem>, Response>, initialized: bool, } @@ -319,7 +319,7 @@ where pub struct AsyncCallbackMarker(std::marker::PhantomData); struct AsyncCallbackSystem { - system: BoxedSystem, Task>, + system: BoxedSystem>, Task>, initialized: bool, } @@ -389,7 +389,7 @@ pub trait AsCallback { impl AsCallback> for Sys where - Sys: IntoSystem, Response, M>, + Sys: IntoSystem>, Response, M>, Request: 'static + Send + Sync, Response: 'static + Send + Sync, Streams: StreamPack, @@ -409,7 +409,7 @@ where impl AsCallback> for Sys where - Sys: IntoSystem, Task, M>, + Sys: IntoSystem>, Task, M>, Task: Future + 'static + Sendish, Request: 'static + Send + Sync, Task::Output: 'static + Send + Sync, @@ -510,7 +510,7 @@ pub trait IntoBlockingCallback { impl IntoBlockingCallback> for Sys where - Sys: IntoSystem, + Sys: IntoSystem, Response, M>, Request: 'static + Send + Sync, Response: 'static + Send + Sync, { @@ -551,7 +551,7 @@ pub trait IntoAsyncCallback { impl IntoAsyncCallback> for Sys where - Sys: IntoSystem, + Sys: IntoSystem, Task, M>, Task: Future + 'static + Sendish, Request: 'static + Send + Sync, Task::Output: 'static + Send + Sync, @@ -602,7 +602,7 @@ where target: Entity, commands: &mut Commands, ) { - commands.add(AddOperation::new( + commands.queue(AddOperation::new( scope, source, OperateCallback::new(self, target), diff --git a/src/cancel.rs b/src/cancel.rs index b255b545..35023ebf 100644 --- a/src/cancel.rs +++ b/src/cancel.rs @@ -379,7 +379,7 @@ pub fn try_emit_broken( world: &mut World, roster: &mut OperationRoster, ) { - if let Some(mut source_mut) = world.get_entity_mut(source) { + if let Ok(mut source_mut) = world.get_entity_mut(source) { source_mut.emit_broken(backtrace, roster); } else { world diff --git a/src/chain.rs b/src/chain.rs index ccc8f7a4..9d8ce81a 100644 --- a/src/chain.rs +++ b/src/chain.rs @@ -310,7 +310,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.builder.scope), source, OperateBufferAccess::::new(buffers, target), @@ -361,7 +361,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { where T: ToString, { - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), self.target, OperateCancel::::new(), @@ -460,7 +460,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { { let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.builder.scope), source, Spread::::new(target), @@ -496,7 +496,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.builder.scope), source, Collect::::new(target, min, max), @@ -533,7 +533,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.builder.scope), source, Trim::::new(branches, target), @@ -580,7 +580,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.builder.scope), source, OperateStaticGate::::new(buffers, target, action), @@ -627,7 +627,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { T: Splittable, { let source = self.target; - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.builder.scope), source, OperateSplit::::default(), @@ -698,7 +698,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> { let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, Noop::::new(target), @@ -773,7 +773,7 @@ where let target_ok = self.builder.commands.spawn(UnusedTarget).id(); let target_err = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, make_result_branching::(ForkTargetStorage::from_iter([target_ok, target_err])), @@ -837,7 +837,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateCancelFilter::on_err::(target), @@ -854,7 +854,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateCancelFilter::on_quiet_err::(target), @@ -876,7 +876,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateDisposalFilter::on_err::(target), @@ -893,7 +893,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateDisposalFilter::on_quiet_err::(target), @@ -909,7 +909,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateDisposalFilter::on_ok::(target), @@ -951,7 +951,7 @@ where let target_some = self.builder.commands.spawn(UnusedTarget).id(); let target_none = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, make_option_branching::(ForkTargetStorage::from_iter([target_some, target_none])), @@ -983,7 +983,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateCancelFilter::on_none::(target), @@ -1002,7 +1002,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateDisposalFilter::on_none::(target), @@ -1019,7 +1019,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), source, CreateDisposalFilter::on_some::(target), @@ -1121,7 +1121,7 @@ where let source = self.target; let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.builder.scope), source, OperateDynamicGate::::new(buffers, target), @@ -1176,7 +1176,7 @@ impl<'w, 's, 'a, 'b> Chain<'w, 's, 'a, 'b, ()> { /// If you want to include information about the value that triggered the /// cancellation, use [`Self::then_cancel`]. pub fn then_quiet_cancel(self) { - self.builder.commands.add(AddOperation::new( + self.builder.commands.queue(AddOperation::new( Some(self.scope()), self.target, OperateQuietCancel, diff --git a/src/chain/fork_clone_builder.rs b/src/chain/fork_clone_builder.rs index 2dc279c1..9464c5e0 100644 --- a/src/chain/fork_clone_builder.rs +++ b/src/chain/fork_clone_builder.rs @@ -15,7 +15,7 @@ * */ -use bevy_utils::all_tuples; +use variadics_please::all_tuples; use crate::{AddOperation, Builder, Chain, ForkClone, ForkTargetStorage, Output, UnusedTarget}; @@ -50,7 +50,7 @@ macro_rules! impl_forkclonebuilder_for_tuple { )* ]; - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(source.scope()), source.id(), ForkClone::::new( diff --git a/src/chain/premade.rs b/src/chain/premade.rs index e1a0d6f3..b2117793 100644 --- a/src/chain/premade.rs +++ b/src/chain/premade.rs @@ -15,12 +15,26 @@ * */ -use bevy_ecs::{prelude::In, query::QueryEntityError}; +use bevy_ecs::{ + prelude::{Entity, In}, + query::QueryEntityError, +}; use smallvec::SmallVec; +use thiserror::Error; use crate::{BufferAccessMut, BufferKey}; +#[derive(Debug, Error)] +pub enum BufferAccessError { + #[error("The query does not match the entity {0}")] + QueryDoesNotMatch(Entity), + #[error("The entity {0} does not exist")] + EntityDoesNotExist(Entity), + #[error("The entity {0} was requested mutably more than once")] + AliasedMutability(Entity), +} + pub(super) fn consume_buffer( In(key): In>, mut access: BufferAccessMut, @@ -38,7 +52,16 @@ where pub fn push_into_buffer( In((input, key)): In<(T, BufferKey)>, mut access: BufferAccessMut, -) -> Result<(), QueryEntityError> { - access.get_mut(&key)?.push(input); +) -> Result<(), BufferAccessError> { + access + .get_mut(&key) + .map_err(|err| match err { + QueryEntityError::QueryDoesNotMatch(e, _) => BufferAccessError::QueryDoesNotMatch(e), + QueryEntityError::EntityDoesNotExist(e) => { + BufferAccessError::EntityDoesNotExist(e.entity) + } + QueryEntityError::AliasedMutability(e) => BufferAccessError::AliasedMutability(e), + })? + .push(input); Ok(()) } diff --git a/src/chain/split.rs b/src/chain/split.rs index beb1c83e..48dbbe08 100644 --- a/src/chain/split.rs +++ b/src/chain/split.rs @@ -168,7 +168,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Splittable> SplitBuilder<'w, 's, 'a, 'b, T> { } let target = self.builder.commands.spawn(UnusedTarget).id(); - self.builder.commands.add(ConnectToSplit:: { + self.builder.commands.queue(ConnectToSplit:: { source: self.outputs.source, target, key, diff --git a/src/chain/unzip.rs b/src/chain/unzip.rs index 96a7bbc7..4abc012f 100644 --- a/src/chain/unzip.rs +++ b/src/chain/unzip.rs @@ -15,7 +15,7 @@ * */ -use bevy_utils::all_tuples; +use variadics_please::all_tuples; use itertools::Itertools; use smallvec::SmallVec; @@ -61,7 +61,7 @@ macro_rules! impl_unzippable_for_tuple { )* ); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(output.scope()), output.id(), ForkUnzip::::new(ForkTargetStorage(targets)), @@ -79,7 +79,7 @@ macro_rules! impl_unzippable_for_tuple { let ($($D,)*) = world.get::(source).or_broken()?.0.iter().copied().next_tuple().or_broken()?; let ($($T,)*) = inputs; $( - if let Some(mut t_mut) = world.get_entity_mut($D) { + if let Ok(mut t_mut) = world.get_entity_mut($D) { t_mut.give_input(session, $T, roster)?; } )* diff --git a/src/channel.rs b/src/channel.rs index 758605d4..1abacf12 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -17,7 +17,8 @@ use bevy_ecs::{ prelude::{Entity, Resource, World}, - system::{CommandQueue, Commands}, + system::Commands, + world::CommandQueue, }; use tokio::sync::mpsc::{ @@ -223,7 +224,7 @@ mod tests { let count = context .app - .world + .world() .get::(hello.provider()) .unwrap() .0; @@ -231,7 +232,7 @@ mod tests { let count = context .app - .world + .world() .get::(repeat.provider()) .unwrap() .0; diff --git a/src/diagram.rs b/src/diagram.rs index 493fe735..e4cff255 100644 --- a/src/diagram.rs +++ b/src/diagram.rs @@ -181,7 +181,7 @@ impl<'de> Deserialize<'de> for NamespacedOperation { impl JsonSchema for NamespacedOperation { fn json_schema(generator: &mut SchemaGenerator) -> Schema { - let mut schema = SchemaObject::new_ref(Self::schema_name()); + let mut schema = SchemaObject::default(); schema.instance_type = Some(SingleOrVec::Single(Box::new(InstanceType::Object))); schema.object = Some(Box::new(ObjectValidation { max_properties: Some(1), @@ -969,7 +969,7 @@ impl Diagram { /// "#; /// /// let diagram = Diagram::from_json_str(json_str)?; - /// let workflow = app.world.command(|cmds| diagram.spawn_io_workflow::(cmds, ®istry))?; + /// let workflow = app.world_mut().command(|cmds| diagram.spawn_io_workflow::(cmds, ®istry))?; /// # Ok::<_, Box>(()) /// ``` // TODO(koonpeng): Support streams other than `()` #43. @@ -1045,7 +1045,7 @@ impl Diagram { /// "#; /// /// let diagram = Diagram::from_json_str(json_str)?; - /// let workflow = app.world.command(|cmds| diagram.spawn_io_workflow::(cmds, ®istry))?; + /// let workflow = app.world_mut().command(|cmds| diagram.spawn_io_workflow::(cmds, ®istry))?; /// # Ok::<_, Box>(()) /// ``` pub fn spawn_io_workflow( diff --git a/src/diagram/registration.rs b/src/diagram/registration.rs index b52e1aa6..fe66cff2 100644 --- a/src/diagram/registration.rs +++ b/src/diagram/registration.rs @@ -153,7 +153,7 @@ impl DynOutput { }); } - builder.commands().add(Connect { + builder.commands().queue(Connect { original_target: self.id(), new_target: input.id(), }); diff --git a/src/diagram/section_schema.rs b/src/diagram/section_schema.rs index 798bac5e..a262fffd 100644 --- a/src/diagram/section_schema.rs +++ b/src/diagram/section_schema.rs @@ -17,7 +17,7 @@ use std::{collections::HashMap, sync::Arc}; -use schemars::JsonSchema; +use schemars::{schema::Schema, JsonSchema}; use serde::{Deserialize, Serialize}; use crate::{ @@ -39,10 +39,89 @@ pub enum SectionProvider { Template(OperationName), } +/// schemars generates schemas with `additionalProperties: false` for enums. +/// When the enum is flatten, that `additionalProperties: false` is inherited by the parent +/// struct, which leads to schemas that can never be valid. +/// +/// Example: +/// +/// ```json +/// { +/// "description": "Connect the request to a registered section.\n\n``` # bevy_impulse::Diagram::from_json_str(r#\" { \"version\": \"0.1.0\", \"start\": \"section_op\", \"ops\": { \"section_op\": { \"type\": \"section\", \"builder\": \"my_section_builder\", \"connect\": { \"my_section_output\": { \"builtin\": \"terminate\" } } } } } # \"#)?; # Ok::<_, serde_json::Error>(()) ```\n\nCustom sections can also be created via templates ``` # bevy_impulse::Diagram::from_json_str(r#\" { \"version\": \"0.1.0\", \"templates\": { \"my_template\": { \"inputs\": [\"section_input\"], \"outputs\": [\"section_output\"], \"buffers\": [], \"ops\": { \"section_input\": { \"type\": \"node\", \"builder\": \"my_node\", \"next\": \"section_output\" } } } }, \"start\": \"section_op\", \"ops\": { \"section_op\": { \"type\": \"section\", \"template\": \"my_template\", \"connect\": { \"section_output\": { \"builtin\": \"terminate\" } } } } } # \"#)?; # Ok::<_, serde_json::Error>(()) ```", +/// "type": "object", +/// "oneOf": [ +/// { +/// "type": "object", +/// "required": [ +/// "builder" +/// ], +/// "properties": { +/// "builder": { +/// "type": "string" +/// } +/// }, +/// "additionalProperties": false +/// }, +/// { +/// "type": "object", +/// "required": [ +/// "template" +/// ], +/// "properties": { +/// "template": { +/// "type": "string" +/// } +/// }, +/// "additionalProperties": false +/// } +/// ], +/// "required": [ +/// "type" +/// ], +/// "properties": { +/// "config": { +/// "default": null +/// }, +/// "connect": { +/// "default": {}, +/// "type": "object", +/// "additionalProperties": { +/// "$ref": "#/definitions/NextOperation" +/// } +/// }, +/// "type": { +/// "type": "string", +/// "enum": [ +/// "section" +/// ] +/// } +/// } +/// }, +/// ``` +/// +/// Here the section schema needs to have a `builder` or `template` with no additional properties. +/// Which includes other properties like `type`, `config` etc, but `type` is also required which +/// breaks the schema. +fn fix_additional_properties(generator: &mut schemars::gen::SchemaGenerator) -> Schema { + let mut schema = generator.root_schema_for::().schema; + schema.metadata.as_mut().unwrap().title = None; + let one_ofs = schema.subschemas.as_mut().unwrap().one_of.as_mut().unwrap(); + for subschema in one_ofs { + match subschema { + Schema::Object(schema) => schema.object.as_mut().unwrap().additional_properties = None, + _ => { + panic!("expected object schema") + } + } + } + Schema::Object(schema) +} + #[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "snake_case")] pub struct SectionSchema { #[serde(flatten)] + #[schemars(schema_with = "fix_additional_properties")] pub(super) provider: SectionProvider, #[serde(default)] pub(super) config: serde_json::Value, @@ -626,7 +705,7 @@ mod tests { .unwrap(); let mut context = TestingContext::minimal_plugins(); - let mut promise = context.app.world.command(|cmds| { + let mut promise = context.app.world_mut().command(|cmds| { let workflow = diagram .spawn_io_workflow::(cmds, ®istry) .unwrap(); @@ -662,7 +741,7 @@ mod tests { let mut context = TestingContext::minimal_plugins(); let err = context .app - .world + .world_mut() .command(|cmds| diagram.spawn_io_workflow::(cmds, ®istry)) .unwrap_err(); let section_err = match err.code { @@ -707,7 +786,7 @@ mod tests { let mut context = TestingContext::minimal_plugins(); let err = context .app - .world + .world_mut() .command(|cmds| { diagram.spawn_io_workflow::(cmds, &fixture.registry) }) diff --git a/src/diagram/testing.rs b/src/diagram/testing.rs index 84f4b904..996777a2 100644 --- a/src/diagram/testing.rs +++ b/src/diagram/testing.rs @@ -40,7 +40,7 @@ impl DiagramTestFixture { { self.context .app - .world + .world_mut() .command(|cmds| diagram.spawn_workflow(cmds, &self.registry)) } diff --git a/src/diagram/unzip_schema.rs b/src/diagram/unzip_schema.rs index 89fad930..af9aa692 100644 --- a/src/diagram/unzip_schema.rs +++ b/src/diagram/unzip_schema.rs @@ -15,9 +15,9 @@ * */ -use bevy_utils::all_tuples_with_size; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use variadics_please::all_tuples_with_size; use crate::Builder; diff --git a/src/disposal.rs b/src/disposal.rs index c27e7ebd..fe9cdb86 100644 --- a/src/disposal.rs +++ b/src/disposal.rs @@ -545,7 +545,7 @@ pub fn emit_disposal( world: &mut World, roster: &mut OperationRoster, ) { - if let Some(mut source_mut) = world.get_entity_mut(source) { + if let Ok(mut source_mut) = world.get_entity_mut(source) { source_mut.emit_disposal(session, disposal, roster); } else { world diff --git a/src/flush.rs b/src/flush.rs index 622c3113..87b131b5 100644 --- a/src/flush.rs +++ b/src/flush.rs @@ -17,11 +17,10 @@ use bevy_derive::{Deref, DerefMut}; use bevy_ecs::{ - prelude::{Added, Entity, Query, QueryState, Resource, With, World}, - schedule::{IntoSystemConfigs, SystemConfigs}, - system::{Command, SystemState}, + prelude::*, + schedule::{IntoScheduleConfigs, ScheduleConfigs}, + system::{ScheduleSystem, SystemState}, }; -use bevy_hierarchy::{BuildWorldChildren, Children, DespawnRecursiveExt}; use smallvec::SmallVec; @@ -67,7 +66,7 @@ pub struct FlushParameters { pub single_threaded_poll_limit: Option, } -pub fn flush_impulses() -> SystemConfigs { +pub fn flush_impulses() -> ScheduleConfigs { flush_impulses_impl.into_configs() } @@ -88,8 +87,8 @@ fn flush_impulses_impl( let mut loop_count = 0; while !roster.is_empty() { for e in roster.deferred_despawn.drain(..) { - if let Some(e_mut) = world.get_entity_mut(e) { - e_mut.despawn_recursive(); + if let Ok(e_mut) = world.get_entity_mut(e) { + e_mut.despawn(); } } @@ -329,13 +328,13 @@ fn drop_target(target: Entity, world: &mut World, roster: &mut OperationRoster, } if let Some(detached_impulse) = detached_impulse { - if let Some(mut detached_impulse_mut) = world.get_entity_mut(detached_impulse) { - detached_impulse_mut.remove_parent(); + if let Ok(mut detached_impulse_mut) = world.get_entity_mut(detached_impulse) { + detached_impulse_mut.remove::(); } } - if let Some(unused_target_mut) = world.get_entity_mut(target) { - unused_target_mut.despawn_recursive(); + if let Ok(unused_target_mut) = world.get_entity_mut(target) { + unused_target_mut.despawn(); } if unused { diff --git a/src/impulse.rs b/src/impulse.rs index 77765ecd..b5c71ae6 100644 --- a/src/impulse.rs +++ b/src/impulse.rs @@ -15,8 +15,10 @@ * */ -use bevy_ecs::prelude::{Bundle, Commands, Component, Entity, Event}; -use bevy_hierarchy::BuildChildren; +use bevy_ecs::{ + hierarchy::ChildOf, + prelude::{Bundle, Commands, Component, Entity, Event}, +}; use std::future::Future; @@ -79,7 +81,7 @@ where /// | [`Self::detach`]
[`Self::send_event`] | This will never be dropped | /// | Using none of the above | The impulse will immediately be dropped during a flush, so it will never be run at all.
This will also push an error into [`UnhandledErrors`](crate::UnhandledErrors). | pub fn detach(self) -> Impulse<'w, 's, 'a, Response, Streams> { - self.commands.add(Detach { + self.commands.queue(Detach { target: self.target, }); self @@ -90,7 +92,7 @@ where #[must_use] pub fn take(self) -> Recipient { let (response_sender, response_promise) = Promise::::new(); - self.commands.add(AddImpulse::new( + self.commands.queue(AddImpulse::new( self.target, TakenResponse::::new(response_sender), )); @@ -108,7 +110,7 @@ where /// Take only the response data that comes out of the request. pub fn take_response(self) -> Promise { let (response_sender, response_promise) = Promise::::new(); - self.commands.add(AddImpulse::new( + self.commands.queue(AddImpulse::new( self.target, TakenResponse::::new(response_sender), )); @@ -133,7 +135,7 @@ where .entity(source) .insert((Cancellable::new(cancel_impulse), ImpulseMarker)) .remove::() - .set_parent(target); + .insert(ChildOf(target)); provider.connect(None, source, target, self.commands); Impulse { source, @@ -212,7 +214,7 @@ where /// [`Self::detach`] before calling this. pub fn store(self, target: Entity) { self.commands - .add(AddImpulse::new(self.target, Store::::new(target))); + .queue(AddImpulse::new(self.target, Store::::new(target))); let mut map = StreamTargetMap::default(); let stream_targets = Streams::collect_streams(self.source, target, &mut map, self.commands); @@ -249,7 +251,7 @@ where /// If the entity despawns then the request gets cancelled unless you used /// [`Self::detach`] before calling this. pub fn push(self, target: Entity) { - self.commands.add(AddImpulse::new( + self.commands.queue(AddImpulse::new( self.target, Push::::new(target, false), )); @@ -282,7 +284,7 @@ where /// [`Self::store`] or [`Self::push`]. Alternatively you can transform it /// into a bundle using [`Self::map_block`] or [`Self::map_async`]. pub fn insert(self, target: Entity) { - self.commands.add(AddImpulse::new( + self.commands.queue(AddImpulse::new( self.target, Insert::::new(target), )); @@ -299,7 +301,7 @@ where /// Using this will also effectively [detach](Self::detach) the impulse. pub fn send_event(self) { self.commands - .add(AddImpulse::new(self.target, SendEvent::::new())); + .queue(AddImpulse::new(self.target, SendEvent::::new())); } } @@ -337,7 +339,7 @@ impl Default for Collection { #[cfg(test)] mod tests { use crate::{prelude::*, testing::*, ContinuousQueueView}; - use bevy_utils::label::DynEq; + use bevy_ecs::label::DynEq; use smallvec::SmallVec; use std::{ sync::{Arc, Mutex}, diff --git a/src/impulse/detach.rs b/src/impulse/detach.rs index 88b01080..225c98a3 100644 --- a/src/impulse/detach.rs +++ b/src/impulse/detach.rs @@ -15,10 +15,7 @@ * */ -use bevy_ecs::{ - prelude::{Component, Entity, World}, - system::Command, -}; +use bevy_ecs::prelude::{Command, Component, Entity, World}; use anyhow::anyhow; @@ -44,7 +41,7 @@ pub(crate) struct Detach { impl Command for Detach { fn apply(self, world: &mut World) { let backtrace; - if let Some(mut session_mut) = world.get_entity_mut(self.target) { + if let Ok(mut session_mut) = world.get_entity_mut(self.target) { if let Some(mut detached) = session_mut.get_mut::() { detached.0 = true; session_mut.remove::(); diff --git a/src/impulse/finished.rs b/src/impulse/finished.rs index e068608e..6e2a8be1 100644 --- a/src/impulse/finished.rs +++ b/src/impulse/finished.rs @@ -15,8 +15,6 @@ * */ -use bevy_hierarchy::prelude::DespawnRecursiveExt; - use crate::{Impulsive, OperationRequest, OperationResult, OperationSetup, OrBroken}; /// During an impulse flush, this impulse gets automatically added to the end of @@ -32,10 +30,7 @@ impl Impulsive for Finished { fn execute(OperationRequest { source, world, .. }: OperationRequest) -> OperationResult { // If this gets triggered that means the impulse chain is finished - world - .get_entity_mut(source) - .or_broken()? - .despawn_recursive(); + world.get_entity_mut(source).or_broken()?.despawn(); Ok(()) } } diff --git a/src/impulse/insert.rs b/src/impulse/insert.rs index 6adb7474..d2ea2bd4 100644 --- a/src/impulse/insert.rs +++ b/src/impulse/insert.rs @@ -16,7 +16,6 @@ */ use bevy_ecs::prelude::{Bundle, Component, Entity}; -use bevy_hierarchy::DespawnRecursiveExt; use crate::{ add_lifecycle_dependency, Impulsive, Input, InputBundle, ManageInput, OperationRequest, @@ -51,11 +50,11 @@ impl Impulsive for Insert { let mut source_mut = world.get_entity_mut(source).or_broken()?; let Input { data, .. } = source_mut.take_input::()?; let target = source_mut.get::>().or_broken()?.target; - if let Some(mut target_mut) = world.get_entity_mut(target) { + if let Ok(mut target_mut) = world.get_entity_mut(target) { target_mut.insert(data); } - world.entity_mut(source).despawn_recursive(); + world.entity_mut(source).despawn(); Ok(()) } } diff --git a/src/impulse/internal.rs b/src/impulse/internal.rs index 6fd26fd1..baac55f3 100644 --- a/src/impulse/internal.rs +++ b/src/impulse/internal.rs @@ -15,11 +15,7 @@ * */ -use bevy_ecs::{ - prelude::{Component, Entity, Resource, World}, - system::Command, -}; -use bevy_hierarchy::DespawnRecursiveExt; +use bevy_ecs::prelude::{Command, Component, Entity, Resource, World}; use backtrace::Backtrace; @@ -102,7 +98,7 @@ fn perform_impulse( // Do nothing } Err(OperationError::Broken(backtrace)) => { - if let Some(mut source_mut) = world.get_entity_mut(source) { + if let Ok(mut source_mut) = world.get_entity_mut(source) { source_mut.emit_broken(backtrace, roster); } else { world @@ -165,8 +161,8 @@ pub(crate) fn cancel_impulse( } } - if let Some(terminal_mut) = world.get_entity_mut(terminal) { - terminal_mut.despawn_recursive(); + if let Ok(terminal_mut) = world.get_entity_mut(terminal) { + terminal_mut.despawn(); } Ok(()) @@ -230,7 +226,7 @@ pub(crate) fn add_lifecycle_dependency(source: Entity, target: Entity, world: &m if let Some(mut lifecycle) = world.get_mut::(target) { lifecycle.sources.push(source); - } else if let Some(mut target_mut) = world.get_entity_mut(target) { + } else if let Ok(mut target_mut) = world.get_entity_mut(target) { target_mut.insert(ImpulseLifecycle::new(source, sender)); } else { // The target is already despawned diff --git a/src/impulse/push.rs b/src/impulse/push.rs index fa2d5729..6f053104 100644 --- a/src/impulse/push.rs +++ b/src/impulse/push.rs @@ -16,7 +16,6 @@ */ use bevy_ecs::prelude::{Component, Entity}; -use bevy_hierarchy::DespawnRecursiveExt; use crate::{ add_lifecycle_dependency, Collection, Impulsive, Input, InputBundle, ManageInput, @@ -63,7 +62,7 @@ impl Impulsive for Push { collection.items.push(Storage { session, data }); target_mut.insert(collection); } - world.entity_mut(source).despawn_recursive(); + world.entity_mut(source).despawn(); Ok(()) } } diff --git a/src/impulse/send_event.rs b/src/impulse/send_event.rs index 4a2454e0..babd7e6c 100644 --- a/src/impulse/send_event.rs +++ b/src/impulse/send_event.rs @@ -15,13 +15,11 @@ * */ -use bevy_ecs::prelude::Event; -use bevy_hierarchy::DespawnRecursiveExt; - use crate::{ Impulsive, Input, InputBundle, ManageInput, OperationRequest, OperationResult, OperationSetup, OrBroken, }; +use bevy_ecs::prelude::Event; pub(crate) struct SendEvent { _ignore: std::marker::PhantomData, @@ -44,7 +42,7 @@ impl Impulsive for SendEvent { fn execute(OperationRequest { source, world, .. }: OperationRequest) -> OperationResult { let mut source_mut = world.get_entity_mut(source).or_broken()?; let Input { data, .. } = source_mut.take_input::()?; - source_mut.despawn_recursive(); + source_mut.despawn(); world.send_event(data); Ok(()) } diff --git a/src/impulse/store.rs b/src/impulse/store.rs index d1c0e634..4c4e63e8 100644 --- a/src/impulse/store.rs +++ b/src/impulse/store.rs @@ -16,7 +16,6 @@ */ use bevy_ecs::prelude::{Component, Entity}; -use bevy_hierarchy::DespawnRecursiveExt; use crate::{ add_lifecycle_dependency, Impulsive, Input, InputBundle, ManageInput, OperationRequest, @@ -51,10 +50,10 @@ impl Impulsive for Store { let mut source_mut = world.get_entity_mut(source).or_broken()?; let Input { session, data } = source_mut.take_input::()?; let target = source_mut.get::>().or_broken()?.target; - if let Some(mut target_mut) = world.get_entity_mut(target) { + if let Ok(mut target_mut) = world.get_entity_mut(target) { target_mut.insert(Storage { data, session }); } - world.entity_mut(source).despawn_recursive(); + world.entity_mut(source).despawn(); Ok(()) } } diff --git a/src/impulse/taken.rs b/src/impulse/taken.rs index 9c8c3142..fac035cc 100644 --- a/src/impulse/taken.rs +++ b/src/impulse/taken.rs @@ -16,7 +16,6 @@ */ use bevy_ecs::prelude::Component; -use bevy_hierarchy::DespawnRecursiveExt; use tokio::sync::mpsc::UnboundedSender as Sender; @@ -60,7 +59,7 @@ impl Impulsive for TakenResponse { let Input { data, .. } = source_mut.take_input::()?; let sender = source_mut.take::>().or_broken()?.sender; sender.send(data).ok(); - source_mut.despawn_recursive(); + source_mut.despawn(); Ok(()) } @@ -101,7 +100,7 @@ where let mut target_mut = world.get_entity_mut(cancel.target).or_broken()?; let taken = target_mut.take::>().or_broken()?; taken.sender.cancel(cancel.cancellation).ok(); - target_mut.despawn_recursive(); + target_mut.despawn(); Ok(()) } diff --git a/src/input.rs b/src/input.rs index b5b1ba9a..5354122b 100644 --- a/src/input.rs +++ b/src/input.rs @@ -16,8 +16,7 @@ */ use bevy_ecs::{ - prelude::{Bundle, Component, Entity}, - system::Command, + prelude::{Bundle, Command, Component, Entity}, world::{EntityRef, EntityWorldMut, World}, }; diff --git a/src/map.rs b/src/map.rs index ff9adf30..645786ef 100644 --- a/src/map.rs +++ b/src/map.rs @@ -137,7 +137,7 @@ where target: Entity, commands: &mut Commands, ) { - commands.add(AddOperation::new( + commands.queue(AddOperation::new( scope, source, OperateBlockingMap::new(target, self.def), @@ -280,7 +280,7 @@ where target: Entity, commands: &mut Commands, ) { - commands.add(AddOperation::new( + commands.queue(AddOperation::new( scope, source, OperateAsyncMap::new(target, self.def), diff --git a/src/map_once.rs b/src/map_once.rs index bd34d213..ea06f4ab 100644 --- a/src/map_once.rs +++ b/src/map_once.rs @@ -74,7 +74,7 @@ where type Streams = (); fn connect(self, _: Option, source: Entity, target: Entity, commands: &mut Commands) { - commands.add(AddImpulse::new( + commands.queue(AddImpulse::new( source, ImpulseBlockingMap::new(target, self.def), )); @@ -183,7 +183,7 @@ where type Streams = Streams; fn connect(self, _: Option, source: Entity, target: Entity, commands: &mut Commands) { - commands.add(AddImpulse::new( + commands.queue(AddImpulse::new( source, ImpulseAsyncMap::new(target, self.def), )); diff --git a/src/node.rs b/src/node.rs index 764aec23..a55f7aec 100644 --- a/src/node.rs +++ b/src/node.rs @@ -126,7 +126,7 @@ impl Output { Response: Clone, { assert_eq!(self.scope, builder.scope); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(self.scope), self.target, ForkClone::::new(ForkTargetStorage::new()), @@ -169,7 +169,7 @@ impl ForkCloneOutput { .commands .spawn((SingleInputStorage::new(self.id()), UnusedTarget)) .id(); - builder.commands.add(AddBranchToForkClone { + builder.commands.queue(AddBranchToForkClone { source: self.source, target, }); diff --git a/src/operation.rs b/src/operation.rs index 58953238..7fd287e9 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -22,10 +22,9 @@ use crate::{ use bevy_derive::Deref; use bevy_ecs::{ - prelude::{Component, Entity, World}, - system::Command, + hierarchy::ChildOf, + prelude::{Command, Component, Entity, World}, }; -use bevy_hierarchy::prelude::BuildWorldChildren; use backtrace::Backtrace; @@ -621,7 +620,7 @@ impl Command for AddOperation { if let Some(scope) = self.scope { source_mut .insert(ScopeStorage::new(scope)) - .set_parent(scope); + .insert(ChildOf(scope)); match world.get_mut::(scope).or_broken() { Ok(mut contents) => { contents.add_node(self.source); @@ -658,7 +657,7 @@ pub fn execute_operation(request: OperationRequest) { // which end up getting dropped during a cleanup. In that case, the // source entity will be totally despawned, so check for that before // concluding that this is broken. - if request.world.get_entity(request.source).is_some() { + if request.world.get_entity(request.source).is_ok() { // The node does not have an operation and is not an unused target, // so this is broken somehow. request diff --git a/src/operation/fork_clone.rs b/src/operation/fork_clone.rs index 8b113eb0..9a9f6870 100644 --- a/src/operation/fork_clone.rs +++ b/src/operation/fork_clone.rs @@ -15,10 +15,7 @@ * */ -use bevy_ecs::{ - prelude::{Entity, World}, - system::Command, -}; +use bevy_ecs::prelude::{Command, Entity, World}; use anyhow::anyhow; diff --git a/src/operation/injection.rs b/src/operation/injection.rs index 20a3f3a0..136de540 100644 --- a/src/operation/injection.rs +++ b/src/operation/injection.rs @@ -24,11 +24,7 @@ use crate::{ SingleTargetStorage, StreamPack, StreamTargetMap, }; -use bevy_ecs::{ - prelude::{Component, Entity}, - system::Command, -}; -use bevy_hierarchy::prelude::DespawnRecursiveExt; +use bevy_ecs::prelude::{Command, Component, Entity}; use smallvec::SmallVec; @@ -338,7 +334,7 @@ where let mut source_mut = world.get_entity_mut(source).or_broken()?; let Input { session, data } = source_mut.take_input::()?; let injector = source_mut.get::().or_broken()?.0; - source_mut.despawn_recursive(); + source_mut.despawn(); let mut injector_mut = world.get_entity_mut(injector).or_broken()?; let target = injector_mut.get::().or_broken()?.get(); let mut storage = injector_mut.get_mut::().or_broken()?; @@ -350,7 +346,7 @@ where storage.list.retain(|injected| injected.finish != source); let mut task_mut = world.get_entity_mut(injected.task).or_broken()?; task_mut.transfer_disposals(injector)?; - task_mut.despawn_recursive(); + task_mut.despawn(); world .get_entity_mut(target) diff --git a/src/operation/operate_buffer.rs b/src/operation/operate_buffer.rs index 95d1d467..c3196355 100644 --- a/src/operation/operate_buffer.rs +++ b/src/operation/operate_buffer.rs @@ -15,10 +15,7 @@ * */ -use bevy_ecs::{ - prelude::{Bundle, Component, Entity, World}, - system::Command, -}; +use bevy_ecs::prelude::{Bundle, Command, Component, Entity, World}; use std::{collections::HashMap, sync::Arc}; @@ -229,7 +226,7 @@ impl Command for OnNewBufferValue { buffer_targets.0.push(self.buffer); - let Some(mut target_mut) = world.get_entity_mut(self.target) else { + let Ok(mut target_mut) = world.get_entity_mut(self.target) else { self.on_failure(world); return; }; diff --git a/src/operation/operate_service.rs b/src/operation/operate_service.rs index 9437fa0c..d4b8f19e 100644 --- a/src/operation/operate_service.rs +++ b/src/operation/operate_service.rs @@ -198,7 +198,7 @@ fn dispose_for_unavailable_service( roster: &mut OperationRoster, ) { let disposal = Disposal::service_unavailable(service, source); - if let Some(mut source_mut) = world.get_entity_mut(source) { + if let Ok(mut source_mut) = world.get_entity_mut(source) { while let Ok(Input { session, .. }) = source_mut.take_input::() { source_mut.emit_disposal(session, disposal.clone(), roster); } diff --git a/src/operation/operate_split.rs b/src/operation/operate_split.rs index da2dead8..4b1a5cc3 100644 --- a/src/operation/operate_split.rs +++ b/src/operation/operate_split.rs @@ -15,10 +15,7 @@ * */ -use bevy_ecs::{ - prelude::{Component, Entity, World}, - system::Command, -}; +use bevy_ecs::prelude::{Command, Component, Entity, World}; use smallvec::SmallVec; use std::{collections::HashMap, sync::Arc}; diff --git a/src/operation/operate_task.rs b/src/operation/operate_task.rs index 222a2b8a..2f1c2bc5 100644 --- a/src/operation/operate_task.rs +++ b/src/operation/operate_task.rs @@ -16,10 +16,9 @@ */ use bevy_ecs::{ - prelude::{Component, Entity, Resource, World}, - system::Command, + hierarchy::ChildOf, + prelude::{Command, Component, Entity, Resource, World}, }; -use bevy_hierarchy::{BuildWorldChildren, DespawnRecursiveExt}; use std::{future::Future, pin::Pin, sync::Arc, task::Context, task::Poll}; @@ -114,7 +113,7 @@ impl OperateTask(self.node).map(|s| s.get()); let mut source_mut = world.entity_mut(source); - source_mut.set_parent(self.node); + source_mut.insert(ChildOf(self.node)); if let Some(scope) = scope { source_mut.insert(ScopeStorage::new(scope)); } @@ -188,7 +187,7 @@ where JobWakerStorage(waker), StopTask(stop_task::), )) - .set_parent(node); + .insert(ChildOf(node)); let mut node_mut = world.get_entity_mut(node).or_broken()?; let mut tasks = node_mut.get_mut::().or_broken()?; @@ -382,7 +381,7 @@ fn cleanup_task( roster.unblock(unblock); } - if let Some(mut node_mut) = world.get_entity_mut(node) { + if let Ok(mut node_mut) = world.get_entity_mut(node) { if let Some(mut active_tasks) = node_mut.get_mut::() { let mut cleanup_ready = true; active_tasks.list.retain( @@ -420,8 +419,8 @@ fn cleanup_task( }; }; - if let Some(source_mut) = world.get_entity_mut(source) { - source_mut.despawn_recursive(); + if let Ok(source_mut) = world.get_entity_mut(source) { + source_mut.despawn(); } roster.purge(source); diff --git a/src/operation/scope.rs b/src/operation/scope.rs index f757653b..192a8e33 100644 --- a/src/operation/scope.rs +++ b/src/operation/scope.rs @@ -28,8 +28,10 @@ use crate::{ use backtrace::Backtrace; -use bevy_ecs::prelude::{Commands, Component, Entity, World}; -use bevy_hierarchy::{BuildChildren, DespawnRecursiveExt}; +use bevy_ecs::{ + hierarchy::ChildOf, + prelude::{Commands, Component, Entity, World}, +}; use smallvec::SmallVec; @@ -370,8 +372,8 @@ where if result.is_err() { // We won't be executing this scope after all, so despawn the scoped // session that we created. - if let Some(scoped_session_mut) = world.get_entity_mut(scoped_session) { - scoped_session_mut.despawn_recursive(); + if let Ok(scoped_session_mut) = world.get_entity_mut(scoped_session) { + scoped_session_mut.despawn(); } return result; } @@ -520,10 +522,10 @@ where ) -> ScopeEndpoints { let enter_scope = commands.spawn((EntryForScope(scope_id), UnusedTarget)).id(); - let terminal = commands.spawn(()).set_parent(scope_id).id(); + let terminal = commands.spawn(()).insert(ChildOf(scope_id)).id(); let finish_scope_cancel = commands .spawn(FinishCleanupForScope(scope_id)) - .set_parent(scope_id) + .insert(ChildOf(scope_id)) .id(); let scope = OperateScope:: { @@ -537,9 +539,9 @@ where // Note: We need to make sure the scope object gets set up before any of // its endpoints, otherwise the ScopeContents component will be missing // during setup. - commands.add(AddOperation::new(parent_scope, scope_id, scope)); + commands.queue(AddOperation::new(parent_scope, scope_id, scope)); - commands.add(AddOperation::new( + commands.queue(AddOperation::new( // We do not consider the terminal node to be "inside" the scope, // otherwise it will get cleaned up prematurely None, @@ -547,7 +549,7 @@ where Terminate::::new(scope_id), )); - commands.add(AddOperation::new( + commands.queue(AddOperation::new( // We do not consider the finish cancel node to be "inside" the // scope, otherwise it will get cleaned up prematurely None, @@ -1543,9 +1545,9 @@ impl FinishCleanup { clear_scope_buffers(scope, scoped_session, world)?; - if world.get_entity(scoped_session).is_some() { - if let Some(scoped_session_mut) = world.get_entity_mut(scoped_session) { - scoped_session_mut.despawn_recursive(); + if world.get_entity(scoped_session).is_ok() { + if let Ok(scoped_session_mut) = world.get_entity_mut(scoped_session) { + scoped_session_mut.despawn(); } } diff --git a/src/request.rs b/src/request.rs index 1fcab2e7..93e561d5 100644 --- a/src/request.rs +++ b/src/request.rs @@ -16,10 +16,10 @@ */ use bevy_ecs::{ + hierarchy::ChildOf, prelude::{Commands, World}, - system::CommandQueue, + world::CommandQueue, }; -use bevy_hierarchy::BuildChildren; use std::future::Future; @@ -106,11 +106,11 @@ impl<'w, 's> RequestExt<'w, 's> for Commands<'w, 's> { )) // We set the parent of this source to the target so that when the // target gets despawned, this will also be despawned. - .set_parent(target) + .insert(ChildOf(target)) .id(); provider.connect(None, source, target, self); - self.add(InputCommand { + self.queue(InputCommand { session: source, target: source, data: request, diff --git a/src/service.rs b/src/service.rs index f4b0a72c..e5de88e4 100644 --- a/src/service.rs +++ b/src/service.rs @@ -23,11 +23,12 @@ use crate::{ use bevy_app::prelude::App; use bevy_derive::{Deref, DerefMut}; use bevy_ecs::{ + define_label, + intern::Interned, prelude::{Commands, Component, Entity, Event, World}, schedule::ScheduleLabel, }; pub use bevy_impulse_derive::DeliveryLabel; -use bevy_utils::{define_label, intern::Interned}; use std::{any::TypeId, collections::HashSet, sync::OnceLock}; use thiserror::Error as ThisError; @@ -218,7 +219,7 @@ define_label!( pub mod utils { /// Used by the procedural macro for DeliveryLabel - pub use bevy_utils::label::DynEq; + pub use bevy_ecs::label::DynEq; } /// When using a service, you can bundle in delivery instructions that affect @@ -586,7 +587,7 @@ where target: Entity, commands: &mut Commands, ) { - commands.add(AddOperation::new( + commands.queue(AddOperation::new( scope, source, OperateService::new(self, target), @@ -637,7 +638,7 @@ mod tests { .add_systems(Update, sys_find_service); app.update(); - assert!(app.world.resource::().0); + assert!(app.world().resource::().0); } #[test] @@ -648,7 +649,7 @@ mod tests { .add_systems(Update, sys_find_service); app.update(); - assert!(app.world.resource::().0); + assert!(app.world().resource::().0); } #[test] @@ -659,7 +660,7 @@ mod tests { .add_systems(Update, sys_find_service); app.update(); - assert!(app.world.resource::().0); + assert!(app.world().resource::().0); } #[test] @@ -672,7 +673,7 @@ mod tests { .add_systems(Update, sys_use_my_service_provider); app.update(); - assert!(app.world.resource::().0); + assert!(app.world().resource::().0); } #[test] @@ -683,7 +684,7 @@ mod tests { .add_systems(Update, sys_find_service); app.update(); - assert!(app.world.resource::().0); + assert!(app.world().resource::().0); } #[test] @@ -694,7 +695,7 @@ mod tests { .add_systems(Update, sys_find_service); app.update(); - assert!(app.world.resource::().0); + assert!(app.world().resource::().0); } #[test] @@ -707,7 +708,7 @@ mod tests { .add_systems(Update, sys_find_service); app.update(); - assert!(app.world.resource::().0); + assert!(app.world().resource::().0); } fn sys_async_service( @@ -807,9 +808,9 @@ mod tests { let mut recipient = context.command(|commands| commands.request((), event_streamer).take()); - context.app.world.send_event(CustomEvent(0)); - context.app.world.send_event(CustomEvent(1)); - context.app.world.send_event(CustomEvent(2)); + context.app.world_mut().send_event(CustomEvent(0)); + context.app.world_mut().send_event(CustomEvent(1)); + context.app.world_mut().send_event(CustomEvent(2)); context.run_with_conditions(&mut recipient.response, 1); diff --git a/src/service/async_srv.rs b/src/service/async_srv.rs index 3cdce5f2..463164c8 100644 --- a/src/service/async_srv.rs +++ b/src/service/async_srv.rs @@ -31,7 +31,6 @@ use bevy_ecs::{ system::{BoxedSystem, EntityCommands, IntoSystem}, world::EntityWorldMut, }; -use bevy_hierarchy::prelude::DespawnRecursiveExt; use std::future::Future; @@ -39,17 +38,17 @@ pub trait IsAsyncService {} #[derive(Component)] struct AsyncServiceStorage( - Option, Task>>, + Option>, Task>>, ); #[derive(Component)] struct UninitAsyncServiceStorage( - BoxedSystem, Task>, + BoxedSystem>, Task>, ); impl IntoService<(Request, Streams, Task, M)> for Sys where - Sys: IntoSystem, Task, M>, + Sys: IntoSystem>, Task, M>, Task: Future + 'static + Sendish, Request: 'static + Send + Sync, Task::Output: 'static + Send + Sync, @@ -77,7 +76,7 @@ where impl IsAsyncService<(Request, Streams, Task, M)> for Sys where - Sys: IntoSystem, Task, M>, + Sys: IntoSystem>, Task, M>, Task: Future + 'static + Sendish, Request: 'static + Send + Sync, Task::Output: 'static + Send + Sync, @@ -151,8 +150,8 @@ where for cancelled in cancelled { let disposal = Disposal::supplanted(cancelled.source, source, session); emit_disposal(cancelled.source, cancelled.session, disposal, world, roster); - if let Some(task_mut) = world.get_entity_mut(cancelled.task_id) { - task_mut.despawn_recursive(); + if let Ok(task_mut) = world.get_entity_mut(cancelled.task_id) { + task_mut.despawn(); } } if let Some(stop) = stop { @@ -244,7 +243,7 @@ where roster, }, } = cmd; - let mut service = if let Some(mut provider_mut) = world.get_entity_mut(provider) { + let mut service = if let Ok(mut provider_mut) = world.get_entity_mut(provider) { if let Some(mut storage) = provider_mut.get_mut::>() { @@ -406,7 +405,7 @@ pub trait IntoAsyncService { impl IntoAsyncService> for Sys where - Sys: IntoSystem, + Sys: IntoSystem, Response, M>, Request: 'static + Send, Response: 'static + Send, { @@ -418,7 +417,7 @@ where impl IntoService<(Request, Task, M)> for AsAsyncService where - Sys: IntoSystem, + Sys: IntoSystem, Task, M>, Task: Future + 'static + Sendish, Request: 'static + Send + Sync, Task::Output: 'static + Send + Sync, @@ -441,7 +440,7 @@ where impl IsAsyncService<(Request, Task, M)> for AsAsyncService where - Sys: IntoSystem, + Sys: IntoSystem, Task, M>, Task: Future + 'static + Sendish, Request: 'static + Send + Sync, Task::Output: 'static + Send + Sync, diff --git a/src/service/blocking.rs b/src/service/blocking.rs index 1f10bcf0..ab342f88 100644 --- a/src/service/blocking.rs +++ b/src/service/blocking.rs @@ -32,18 +32,18 @@ pub struct Blocking(std::marker::PhantomData); #[derive(Component)] struct BlockingServiceStorage( - Option, Response>>, + Option>, Response>>, ); #[derive(Component)] struct UninitBlockingServiceStorage( - BoxedSystem, Response>, + BoxedSystem>, Response>, ); impl IntoService> for Sys where - Sys: IntoSystem, Response, M>, + Sys: IntoSystem>, Response, M>, Request: 'static + Send + Sync, Response: 'static + Send + Sync, Streams: StreamPack, @@ -97,7 +97,7 @@ where .or_broken()? .take_input::()?; - let mut service = if let Some(mut provider_mut) = world.get_entity_mut(provider) { + let mut service = if let Ok(mut provider_mut) = world.get_entity_mut(provider) { if let Some(mut storage) = provider_mut.get_mut::>() { @@ -146,7 +146,7 @@ where let mut unused_streams = UnusedStreams::new(source); Streams::process_buffer(streams, source, session, &mut unused_streams, world, roster)?; - if let Some(mut provider_mut) = world.get_entity_mut(provider) { + if let Ok(mut provider_mut) = world.get_entity_mut(provider) { if let Some(mut storage) = provider_mut.get_mut::>() { @@ -191,7 +191,7 @@ pub trait IntoBlockingService { impl IntoBlockingService> for Sys where - Sys: IntoSystem, + Sys: IntoSystem, Response, M>, Request: 'static, Response: 'static, { @@ -204,7 +204,7 @@ where impl IntoService> for AsBlockingService where - Sys: IntoSystem, + Sys: IntoSystem, Response, M>, Request: 'static + Send + Sync, Response: 'static + Send + Sync, { diff --git a/src/service/continuous.rs b/src/service/continuous.rs index af394f2e..894bc34b 100644 --- a/src/service/continuous.rs +++ b/src/service/continuous.rs @@ -16,12 +16,12 @@ */ use bevy_ecs::{ - prelude::{Commands, Component, Entity, Event, EventReader, In, Local, Query, World}, - schedule::IntoSystemConfigs, - system::{Command, IntoSystem, SystemParam}, + hierarchy::ChildOf, + prelude::{Command, Commands, Component, Entity, Event, EventReader, In, Local, Query, World}, + schedule::IntoScheduleConfigs, + system::{IntoSystem, ScheduleSystem, SystemParam}, world::EntityWorldMut, }; -use bevy_hierarchy::prelude::{BuildWorldChildren, DespawnRecursiveExt}; use smallvec::SmallVec; @@ -37,7 +37,7 @@ use crate::{ StreamPack, StreamTargetMap, UnhandledErrors, }; -pub use bevy_ecs::schedule::SystemConfigs; +pub use bevy_ecs::schedule::ScheduleConfigs; pub struct ContinuousServiceKey { provider: Entity, @@ -476,7 +476,7 @@ where if !responses.is_empty() { self.commands - .add(DeliverResponses:: { + .queue(DeliverResponses:: { responses, _ignore: Default::default(), }); @@ -648,8 +648,8 @@ where } } - if let Some(task_mut) = world.get_entity_mut(task_id) { - task_mut.despawn_recursive(); + if let Ok(task_mut) = world.get_entity_mut(task_id) { + task_mut.despawn(); } } @@ -740,7 +740,7 @@ where session, data: request, } = source_mut.take_input::()?; - let task_id = world.spawn(()).set_parent(source).id(); + let task_id = world.spawn(()).insert(ChildOf(source)).id(); let Some(mut delivery) = world.get_mut::>(provider) else { dispose_for_despawned_service(provider, world, roster); @@ -778,8 +778,8 @@ where for cancelled in cancelled { let disposal = Disposal::supplanted(cancelled.source, source, session); emit_disposal(cancelled.source, cancelled.session, disposal, world, roster); - if let Some(task_mut) = world.get_entity_mut(cancelled.task_id) { - task_mut.despawn_recursive(); + if let Ok(task_mut) = world.get_entity_mut(cancelled.task_id) { + task_mut.despawn(); } } if let Some(stop) = stop { @@ -815,8 +815,8 @@ where let disposal = Disposal::supplanted(stop.source, source, session); emit_disposal(stop.source, stop.session, disposal, world, roster); - if let Some(task_mut) = world.get_entity_mut(stop.task_id) { - task_mut.despawn_recursive(); + if let Ok(task_mut) = world.get_entity_mut(stop.task_id) { + task_mut.despawn(); } } @@ -942,7 +942,7 @@ fn serve_next_continuous_request( impl IntoContinuousService<(Request, Response, Streams, M)> for Sys where - Sys: IntoSystem, (), M>, + Sys: IntoSystem>, (), M>, Request: 'static + Send + Sync, Response: 'static + Send + Sync, Streams: StreamPack, @@ -951,7 +951,10 @@ where type Response = Response; type Streams = Streams; - fn into_system_config(self, entity_mut: &mut EntityWorldMut) -> SystemConfigs { + fn into_system_config( + self, + entity_mut: &mut EntityWorldMut, + ) -> ScheduleConfigs { let provider = entity_mut .insert(( ContinuousQueueStorage::::new(), diff --git a/src/service/discovery.rs b/src/service/discovery.rs index bb2262c2..4d804868 100644 --- a/src/service/discovery.rs +++ b/src/service/discovery.rs @@ -17,7 +17,7 @@ use bevy_ecs::{ prelude::{Entity, Query, With}, - query::{QueryEntityError, QueryIter, ReadOnlyWorldQuery}, + query::{QueryEntityError, QueryFilter, QueryIter}, system::SystemParam, }; @@ -36,7 +36,7 @@ where Request: 'static + Send + Sync, Response: 'static + Send + Sync, Streams: StreamFilter + 'static, - Filter: ReadOnlyWorldQuery + 'static, + Filter: QueryFilter + 'static, { query: Query< 'w, @@ -56,7 +56,7 @@ where Request: 'static + Send + Sync, Response: 'static + Send + Sync, Streams: StreamFilter, - Filter: ReadOnlyWorldQuery + 'static, + Filter: QueryFilter + 'static, { pub fn iter(&self) -> IterServiceDiscovery<'_, 's, Request, Response, Streams, Filter> { IterServiceDiscovery { @@ -78,7 +78,7 @@ where Request: 'static + Send + Sync, Response: 'static + Send + Sync, Streams: StreamFilter, - Filter: ReadOnlyWorldQuery + 'static, + Filter: QueryFilter + 'static, { inner: QueryIter< 'w, @@ -98,7 +98,7 @@ where Request: 'static + Send + Sync, Response: 'static + Send + Sync, Streams: StreamFilter, - Filter: ReadOnlyWorldQuery + 'static, + Filter: QueryFilter + 'static, { type Item = Service; diff --git a/src/service/service_builder.rs b/src/service/service_builder.rs index 8d3a4297..93ce67e0 100644 --- a/src/service/service_builder.rs +++ b/src/service/service_builder.rs @@ -19,8 +19,8 @@ use crate::{stream::*, Delivery, IntoContinuousService, IntoService, Service}; use bevy_app::prelude::App; use bevy_ecs::{ - schedule::{ScheduleLabel, SystemConfigs}, - system::{Commands, EntityCommands}, + schedule::{ScheduleConfigs, ScheduleLabel}, + system::{Commands, EntityCommands, ScheduleSystem}, world::EntityWorldMut, }; @@ -130,7 +130,7 @@ impl ServiceBuilder { Srv::Response: 'static + Send + Sync, Srv::Streams: StreamPack, { - let mut entity_mut = app.world.spawn(()); + let mut entity_mut = app.world_mut().spawn(()); self.service.insert_service_mut(&mut entity_mut); let service = Service::::new(entity_mut.id()); entity_mut.insert(::StreamAvailableBundle::default()); @@ -161,7 +161,7 @@ where Srv::Response: 'static + Send + Sync, Srv::Streams: StreamPack, { - let mut entity_mut = app.world.spawn(()); + let mut entity_mut = app.world_mut().spawn(()); let provider = entity_mut.id(); let config = self.service.into_system_config(&mut entity_mut); let config = self.configure.apply(config); @@ -393,15 +393,15 @@ impl AlsoAdd for () { impl ConfigureContinuousService for T where - T: FnOnce(SystemConfigs) -> SystemConfigs, + T: FnOnce(ScheduleConfigs) -> ScheduleConfigs, { - fn apply(self, config: SystemConfigs) -> SystemConfigs { + fn apply(self, config: ScheduleConfigs) -> ScheduleConfigs { (self)(config) } } impl ConfigureContinuousService for () { - fn apply(self, config: SystemConfigs) -> SystemConfigs { + fn apply(self, config: ScheduleConfigs) -> ScheduleConfigs { config } } diff --git a/src/service/traits.rs b/src/service/traits.rs index 0cabf930..82db4ad2 100644 --- a/src/service/traits.rs +++ b/src/service/traits.rs @@ -21,7 +21,11 @@ use crate::{ }; use bevy_app::prelude::App; -use bevy_ecs::{schedule::SystemConfigs, system::EntityCommands, world::EntityWorldMut}; +use bevy_ecs::{ + schedule::ScheduleConfigs, + system::{EntityCommands, ScheduleSystem}, + world::EntityWorldMut, +}; pub trait ServiceTrait { // TODO(@mxgrey): Are we using these associated types anymore? @@ -45,7 +49,8 @@ pub trait IntoContinuousService { type Response; type Streams; - fn into_system_config(self, entity_mut: &mut EntityWorldMut) -> SystemConfigs; + fn into_system_config(self, entity_mut: &mut EntityWorldMut) + -> ScheduleConfigs; } /// This trait allows service systems to be converted into a builder that @@ -117,5 +122,5 @@ pub trait AlsoAdd { } pub trait ConfigureContinuousService { - fn apply(self, config: SystemConfigs) -> SystemConfigs; + fn apply(self, config: ScheduleConfigs) -> ScheduleConfigs; } diff --git a/src/service/workflow.rs b/src/service/workflow.rs index 647c6820..5045d94a 100644 --- a/src/service/workflow.rs +++ b/src/service/workflow.rs @@ -25,7 +25,6 @@ use crate::{ }; use bevy_ecs::prelude::{Component, Entity, World}; -use bevy_hierarchy::prelude::DespawnRecursiveExt; pub(crate) struct WorkflowHooks {} @@ -139,8 +138,8 @@ where ); if result.is_err() { - if let Some(scoped_session_mut) = world.get_entity_mut(scoped_session) { - scoped_session_mut.despawn_recursive(); + if let Ok(scoped_session_mut) = world.get_entity_mut(scoped_session) { + scoped_session_mut.despawn(); } } @@ -336,8 +335,8 @@ fn serve_next_workflow_request( .is_err() { // The workflow will not run, so we should despawn the scoped session - if let Some(scoped_session_mut) = world.get_entity_mut(scoped_session) { - scoped_session_mut.despawn_recursive(); + if let Ok(scoped_session_mut) = world.get_entity_mut(scoped_session) { + scoped_session_mut.despawn(); } // The service did not launch so we should move onto the next item diff --git a/src/stream.rs b/src/stream.rs index 5fe97060..03c063c0 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -17,14 +17,13 @@ use bevy_derive::{Deref, DerefMut}; use bevy_ecs::{ - prelude::{Bundle, Commands, Component, Entity, With, World}, - query::{ReadOnlyWorldQuery, WorldQuery}, - system::Command, + hierarchy::ChildOf, + prelude::{Bundle, Command, Commands, Component, Entity, With, World}, + query::{QueryData, QueryFilter, ReadOnlyQueryData}, }; -use bevy_hierarchy::BuildChildren; pub use bevy_impulse_derive::Stream; -use bevy_utils::all_tuples; use futures::{future::BoxFuture, join}; +use variadics_please::all_tuples; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver as Receiver}; @@ -78,7 +77,7 @@ pub trait Stream: 'static + Send + Sync + Sized { ) -> (InputSlot, Output) { let source = commands.spawn(()).id(); let target = commands.spawn(UnusedTarget).id(); - commands.add(AddOperation::new( + commands.queue(AddOperation::new( Some(in_scope), source, RedirectScopeStream::::new(target), @@ -92,7 +91,7 @@ pub trait Stream: 'static + Send + Sync + Sized { fn spawn_workflow_stream(builder: &mut Builder) -> InputSlot { let source = builder.commands.spawn(()).id(); - builder.commands.add(AddOperation::new( + builder.commands.queue(AddOperation::new( Some(builder.scope()), source, RedirectWorkflowStream::::new(), @@ -137,12 +136,12 @@ pub trait Stream: 'static + Send + Sync + Sized { .spawn(()) // Set the parent of this stream to be the session so it can be // recursively despawned together. - .set_parent(source) + .insert(ChildOf(source)) .id(); let index = map.add(target); - commands.add(AddImpulse::new(target, TakenStream::new(sender))); + commands.queue(AddImpulse::new(target, TakenStream::new(sender))); (StreamTargetStorage::new(index), receiver) } @@ -153,8 +152,8 @@ pub trait Stream: 'static + Send + Sync + Sized { map: &mut StreamTargetMap, commands: &mut Commands, ) -> StreamTargetStorage { - let redirect = commands.spawn(()).set_parent(source).id(); - commands.add(AddImpulse::new(redirect, Push::::new(target, true))); + let redirect = commands.spawn(()).insert(ChildOf(source)).id(); + commands.queue(AddImpulse::new(redirect, Push::::new(target, true))); let index = map.add(redirect); StreamTargetStorage::new(index) } @@ -346,7 +345,7 @@ impl StreamTargetMap { /// streams to be packed together as one generic argument. pub trait StreamPack: 'static + Send + Sync { type StreamAvailableBundle: Bundle + Default; - type StreamFilter: ReadOnlyWorldQuery; + type StreamFilter: QueryFilter; type StreamStorageBundle: Bundle + Clone; type StreamInputPack; type StreamOutputPack; @@ -354,7 +353,7 @@ pub trait StreamPack: 'static + Send + Sync { type Channel: Send; type Forward: Future + Send; type Buffer: Clone; - type TargetIndexQuery: ReadOnlyWorldQuery; + type TargetIndexQuery: ReadOnlyQueryData; fn spawn_scope_streams( in_scope: Entity, @@ -391,7 +390,7 @@ pub trait StreamPack: 'static + Send + Sync { fn make_channel(inner: &Arc, world: &World) -> Self::Channel; fn make_buffer( - target_index: ::Item<'_>, + target_index: ::Item<'_>, target_map: Option<&StreamTargetMap>, ) -> Self::Buffer; @@ -535,7 +534,7 @@ impl StreamPack for T { session: Entity, commands: &mut Commands, ) { - commands.add(SendStreams:: { + commands.queue(SendStreams:: { source, session, container: buffer.container.take(), @@ -782,7 +781,7 @@ macro_rules! impl_streampack_for_tuple { } fn make_buffer( - target_index: ::Item<'_>, + target_index: ::Item<'_>, target_map: Option<&StreamTargetMap>, ) -> Self::Buffer { let ($($T,)*) = target_index; @@ -936,7 +935,7 @@ impl Command for SendStreams { /// } /// ``` pub trait StreamFilter { - type Filter: ReadOnlyWorldQuery; + type Filter: QueryFilter; type Pack: StreamPack; } diff --git a/src/testing.rs b/src/testing.rs index f7ba4664..86798142 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -16,11 +16,12 @@ */ use bevy_app::ScheduleRunnerPlugin; -pub use bevy_app::{App, Update}; -use bevy_core::{FrameCountPlugin, TaskPoolPlugin, TypeRegistrationPlugin}; +pub use bevy_app::{App, TaskPoolPlugin, Update}; +use bevy_diagnostic::FrameCountPlugin; pub use bevy_ecs::{ prelude::{Commands, Component, Entity, In, Local, Query, ResMut, Resource, World}, - system::{CommandQueue, IntoSystem}, + system::IntoSystem, + world::CommandQueue, }; use bevy_time::TimePlugin; @@ -51,7 +52,6 @@ impl TestingContext { let mut app = App::new(); app.add_plugins(( TaskPoolPlugin::default(), - TypeRegistrationPlugin, FrameCountPlugin, TimePlugin, ScheduleRunnerPlugin::default(), @@ -63,13 +63,13 @@ impl TestingContext { pub fn set_flush_loop_limit(&mut self, limit: Option) { self.app - .world + .world_mut() .get_resource_or_insert_with(FlushParameters::default) .flush_loop_limit = limit; } pub fn command(&mut self, f: impl FnOnce(&mut Commands) -> U) -> U { - self.app.world.command(f) + self.app.world_mut().command(f) } /// Build a simple workflow with a single input and output, and no streams @@ -148,7 +148,7 @@ impl TestingContext { } pub fn no_unhandled_errors(&self) -> bool { - let Some(errors) = self.app.world.get_resource::() else { + let Some(errors) = self.app.world().get_resource::() else { return true; }; @@ -156,20 +156,23 @@ impl TestingContext { } pub fn get_unhandled_errors(&self) -> Option<&UnhandledErrors> { - self.app.world.get_resource::() + self.app.world().get_resource::() } // Check that all buffers in the world are empty pub fn confirm_buffers_empty(&mut self) -> Result<(), Vec> { - let mut query = self.app.world.query::<(Entity, &GetBufferedSessionsFn)>(); + let mut query = self + .app + .world_mut() + .query::<(Entity, &GetBufferedSessionsFn)>(); let buffers: Vec<_> = query - .iter(&self.app.world) + .iter(self.app.world()) .map(|(e, get_sessions)| (e, get_sessions.0)) .collect(); let mut non_empty_buffers = Vec::new(); for (e, get_sessions) in buffers { - if !get_sessions(e, &self.app.world).is_ok_and(|s| s.is_empty()) { + if !get_sessions(e, self.app.world()).is_ok_and(|s| s.is_empty()) { non_empty_buffers.push(e); } } diff --git a/src/workflow.rs b/src/workflow.rs index 29cd3581..e46a8044 100644 --- a/src/workflow.rs +++ b/src/workflow.rs @@ -16,10 +16,10 @@ */ use bevy_ecs::{ + hierarchy::ChildOf, prelude::{Commands, World}, - system::CommandQueue, + world::CommandQueue, }; -use bevy_hierarchy::BuildChildren; use crate::{ Builder, DeliveryChoice, InputSlot, OperateScope, Output, ScopeEndpoints, ScopeSettingsStorage, @@ -262,7 +262,7 @@ impl<'w, 's> SpawnWorkflowExt for Commands<'w, 's> { let service = service.id(); self.entity(scope_id) .insert(ScopeSettingsStorage(settings.scope)) - .set_parent(service); + .insert(ChildOf(service)); WorkflowService::::cast(service) }