From b89618facd705d44956fd5418f86ba3dff5ea9c7 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 01:12:09 -0700 Subject: [PATCH 01/12] Remove crossbeam-channel as a runtime dependency --- crates/bevy_asset/Cargo.toml | 2 +- crates/bevy_asset/src/assets.rs | 28 ++++++--------- crates/bevy_asset/src/handle.rs | 17 ++++------ .../src/io/embedded/embedded_watcher.rs | 4 +-- crates/bevy_asset/src/io/file/file_watcher.rs | 1 - crates/bevy_asset/src/io/gated.rs | 19 +++++------ crates/bevy_asset/src/io/source.rs | 34 +++++++++---------- crates/bevy_asset/src/server/info.rs | 16 ++++----- crates/bevy_asset/src/server/mod.rs | 18 +++++----- crates/bevy_render/src/renderer/mod.rs | 12 ++----- crates/bevy_time/Cargo.toml | 2 +- crates/bevy_time/src/lib.rs | 15 ++++---- crates/bevy_winit/Cargo.toml | 2 -- 13 files changed, 76 insertions(+), 94 deletions(-) diff --git a/crates/bevy_asset/Cargo.toml b/crates/bevy_asset/Cargo.toml index 6d5e3f5fcd56b..0a54f6e7ab451 100644 --- a/crates/bevy_asset/Cargo.toml +++ b/crates/bevy_asset/Cargo.toml @@ -31,7 +31,7 @@ bevy_utils = { path = "../bevy_utils", version = "0.14.0-dev" } async-broadcast = "0.5" async-fs = "2.0" async-lock = "3.0" -crossbeam-channel = "0.5" +concurrent-queue = "2.4" downcast-rs = "1.2" futures-io = "0.3" futures-lite = "2.0.1" diff --git a/crates/bevy_asset/src/assets.rs b/crates/bevy_asset/src/assets.rs index b162f3f21b9ea..6fe17ab153947 100644 --- a/crates/bevy_asset/src/assets.rs +++ b/crates/bevy_asset/src/assets.rs @@ -8,7 +8,7 @@ use bevy_ecs::{ }; use bevy_reflect::{Reflect, TypePath}; use bevy_utils::HashMap; -use crossbeam_channel::{Receiver, Sender}; +use concurrent_queue::ConcurrentQueue; use serde::{Deserialize, Serialize}; use std::{ any::TypeId, @@ -50,22 +50,16 @@ impl AssetIndex { pub(crate) struct AssetIndexAllocator { /// A monotonically increasing index. next_index: AtomicU32, - recycled_queue_sender: Sender, + recycled_queue: ConcurrentQueue, /// This receives every recycled [`AssetIndex`]. It serves as a buffer/queue to store indices ready for reuse. - recycled_queue_receiver: Receiver, - recycled_sender: Sender, - recycled_receiver: Receiver, + recycled: ConcurrentQueue, } impl Default for AssetIndexAllocator { fn default() -> Self { - let (recycled_queue_sender, recycled_queue_receiver) = crossbeam_channel::unbounded(); - let (recycled_sender, recycled_receiver) = crossbeam_channel::unbounded(); Self { - recycled_queue_sender, - recycled_queue_receiver, - recycled_sender, - recycled_receiver, + recycled_queue: ConcurrentQueue::unbounded(), + recycled: ConcurrentQueue::unbounded(), next_index: Default::default(), } } @@ -75,9 +69,9 @@ impl AssetIndexAllocator { /// Reserves a new [`AssetIndex`], either by reusing a recycled index (with an incremented generation), or by creating a new index /// by incrementing the index counter for a given asset type `A`. pub fn reserve(&self) -> AssetIndex { - if let Ok(mut recycled) = self.recycled_queue_receiver.try_recv() { + if let Ok(mut recycled) = self.recycled_queue.pop() { recycled.generation += 1; - self.recycled_sender.send(recycled).unwrap(); + self.recycled.push(recycled).unwrap(); recycled } else { AssetIndex { @@ -91,7 +85,7 @@ impl AssetIndexAllocator { /// Queues the given `index` for reuse. This should only be done if the `index` is no longer being used. pub fn recycle(&self, index: AssetIndex) { - self.recycled_queue_sender.send(index).unwrap(); + self.recycled_queue.push(index).unwrap(); } } @@ -246,7 +240,7 @@ impl DenseAssetStorage { value: None, generation: 0, }); - while let Ok(recycled) = self.allocator.recycled_receiver.try_recv() { + while let Ok(recycled) = self.allocator.recycled.pop() { let entry = &mut self.storage[recycled.index as usize]; *entry = Entry::Some { value: None, @@ -546,7 +540,7 @@ impl Assets { // to other asset info operations let mut infos = asset_server.data.infos.write(); let mut not_ready = Vec::new(); - while let Ok(drop_event) = assets.handle_provider.drop_receiver.try_recv() { + while let Ok(drop_event) = assets.handle_provider.drop_queue.pop() { let id = drop_event.id.typed(); if drop_event.asset_server_managed { @@ -572,7 +566,7 @@ impl Assets { // TODO: this is _extremely_ inefficient find a better fix // This will also loop failed assets indefinitely. Is that ok? for event in not_ready { - assets.handle_provider.drop_sender.send(event).unwrap(); + assets.handle_provider.drop_queue.push(event).unwrap(); } } diff --git a/crates/bevy_asset/src/handle.rs b/crates/bevy_asset/src/handle.rs index b2ebea2af9024..92cab1a39f83c 100644 --- a/crates/bevy_asset/src/handle.rs +++ b/crates/bevy_asset/src/handle.rs @@ -5,7 +5,7 @@ use crate::{ use bevy_ecs::prelude::*; use bevy_reflect::{std_traits::ReflectDefault, Reflect, TypePath}; use bevy_utils::get_short_name; -use crossbeam_channel::{Receiver, Sender}; +use concurrent_queue::ConcurrentQueue; use std::{ any::TypeId, hash::{Hash, Hasher}, @@ -19,8 +19,7 @@ use uuid::Uuid; #[derive(Clone)] pub struct AssetHandleProvider { pub(crate) allocator: Arc, - pub(crate) drop_sender: Sender, - pub(crate) drop_receiver: Receiver, + pub(crate) drop_queue: Arc>, pub(crate) type_id: TypeId, } @@ -32,12 +31,10 @@ pub(crate) struct DropEvent { impl AssetHandleProvider { pub(crate) fn new(type_id: TypeId, allocator: Arc) -> Self { - let (drop_sender, drop_receiver) = crossbeam_channel::unbounded(); Self { type_id, allocator, - drop_sender, - drop_receiver, + drop_queue: Arc::new(ConcurrentQueue::unbounded()), } } @@ -57,7 +54,7 @@ impl AssetHandleProvider { ) -> Arc { Arc::new(StrongHandle { id: id.untyped(self.type_id), - drop_sender: self.drop_sender.clone(), + drop: self.drop_queue.clone(), meta_transform, path, asset_server_managed, @@ -91,12 +88,12 @@ pub struct StrongHandle { /// 1. configuration tied to the lifetime of a specific asset load /// 2. configuration that must be repeatable when the asset is hot-reloaded pub(crate) meta_transform: Option, - pub(crate) drop_sender: Sender, + pub(crate) drop: Arc>, } impl Drop for StrongHandle { fn drop(&mut self) { - let _ = self.drop_sender.send(DropEvent { + let _ = self.drop.push(DropEvent { id: self.id.internal(), asset_server_managed: self.asset_server_managed, }); @@ -109,7 +106,7 @@ impl std::fmt::Debug for StrongHandle { .field("id", &self.id) .field("asset_server_managed", &self.asset_server_managed) .field("path", &self.path) - .field("drop_sender", &self.drop_sender) + .field("drop_sender", &self.drop) .finish() } } diff --git a/crates/bevy_asset/src/io/embedded/embedded_watcher.rs b/crates/bevy_asset/src/io/embedded/embedded_watcher.rs index 485593599c489..b60cd4d6a6c02 100644 --- a/crates/bevy_asset/src/io/embedded/embedded_watcher.rs +++ b/crates/bevy_asset/src/io/embedded/embedded_watcher.rs @@ -26,7 +26,7 @@ impl EmbeddedWatcher { pub fn new( dir: Dir, root_paths: Arc, PathBuf>>>, - sender: crossbeam_channel::Sender, + sender: Arc>, debounce_wait_time: Duration, ) -> Self { let root = get_base_path(); @@ -48,7 +48,7 @@ impl AssetWatcher for EmbeddedWatcher {} /// binary-embedded Rust source files. This will read the contents of changed files from the file system and overwrite /// the initial static bytes from the file embedded in the binary. pub(crate) struct EmbeddedEventHandler { - sender: crossbeam_channel::Sender, + sender: Arc>, root_paths: Arc, PathBuf>>>, root: PathBuf, dir: Dir, diff --git a/crates/bevy_asset/src/io/file/file_watcher.rs b/crates/bevy_asset/src/io/file/file_watcher.rs index fbed3f8a0ecbe..7391bc2bd2dc3 100644 --- a/crates/bevy_asset/src/io/file/file_watcher.rs +++ b/crates/bevy_asset/src/io/file/file_watcher.rs @@ -2,7 +2,6 @@ use crate::io::{AssetSourceEvent, AssetWatcher}; use crate::path::normalize_path; use bevy_utils::tracing::error; use bevy_utils::Duration; -use crossbeam_channel::Sender; use notify_debouncer_full::{ new_debouncer, notify::{ diff --git a/crates/bevy_asset/src/io/gated.rs b/crates/bevy_asset/src/io/gated.rs index d3d2b35f1f066..7e67c3c461651 100644 --- a/crates/bevy_asset/src/io/gated.rs +++ b/crates/bevy_asset/src/io/gated.rs @@ -1,6 +1,6 @@ use crate::io::{AssetReader, AssetReaderError, PathStream, Reader}; use bevy_utils::HashMap; -use crossbeam_channel::{Receiver, Sender}; +use concurrent_queue::ConcurrentQueue; use parking_lot::RwLock; use std::{path::Path, sync::Arc}; @@ -10,7 +10,7 @@ use std::{path::Path, sync::Arc}; /// This is built primarily for unit tests. pub struct GatedReader { reader: R, - gates: Arc, (Sender<()>, Receiver<()>)>>>, + gates: Arc, ConcurrentQueue<()>>>>, } impl Clone for GatedReader { @@ -24,7 +24,7 @@ impl Clone for GatedReader { /// Opens path "gates" for a [`GatedReader`]. pub struct GateOpener { - gates: Arc, (Sender<()>, Receiver<()>)>>>, + gates: Arc, ConcurrentQueue<()>>>>, } impl GateOpener { @@ -34,8 +34,8 @@ impl GateOpener { let mut gates = self.gates.write(); let gates = gates .entry_ref(path.as_ref()) - .or_insert_with(crossbeam_channel::unbounded); - gates.0.send(()).unwrap(); + .or_insert_with(ConcurrentQueue::unbounded); + gates.push(()).unwrap(); } } @@ -56,14 +56,13 @@ impl GatedReader { impl AssetReader for GatedReader { async fn read<'a>(&'a self, path: &'a Path) -> Result>, AssetReaderError> { - let receiver = { + { let mut gates = self.gates.write(); let gates = gates .entry_ref(path.as_ref()) - .or_insert_with(crossbeam_channel::unbounded); - gates.1.clone() - }; - receiver.recv().unwrap(); + .or_insert_with(ConcurrentQueue::unbounded); + gates.pop().unwrap(); + } let result = self.reader.read(path).await?; Ok(result) } diff --git a/crates/bevy_asset/src/io/source.rs b/crates/bevy_asset/src/io/source.rs index cd42d31f01de1..809fc0c8e7c58 100644 --- a/crates/bevy_asset/src/io/source.rs +++ b/crates/bevy_asset/src/io/source.rs @@ -5,6 +5,7 @@ use crate::{ use bevy_ecs::system::Resource; use bevy_utils::tracing::{error, warn}; use bevy_utils::{CowArc, Duration, HashMap}; +use concurrent_queue::ConcurrentQueue; use std::{fmt::Display, hash::Hash, sync::Arc}; use thiserror::Error; @@ -117,7 +118,7 @@ pub struct AssetSourceBuilder { pub writer: Option Option> + Send + Sync>>, pub watcher: Option< Box< - dyn FnMut(crossbeam_channel::Sender) -> Option> + dyn FnMut(Arc>) -> Option> + Send + Sync, >, @@ -127,7 +128,7 @@ pub struct AssetSourceBuilder { Option Option> + Send + Sync>>, pub processed_watcher: Option< Box< - dyn FnMut(crossbeam_channel::Sender) -> Option> + dyn FnMut(Arc>) -> Option> + Send + Sync, >, @@ -161,8 +162,9 @@ impl AssetSourceBuilder { }; if watch { - let (sender, receiver) = crossbeam_channel::unbounded(); - match self.watcher.as_mut().and_then(|w| w(sender)) { + let receiver = Arc::new(ConcurrentQueue::unbounded()); + let sender = receiver.clone(); + match self.watcher.as_mut().and_then(|w| w(sender.clone())) { Some(w) => { source.watcher = Some(w); source.event_receiver = Some(receiver); @@ -176,7 +178,8 @@ impl AssetSourceBuilder { } if watch_processed { - let (sender, receiver) = crossbeam_channel::unbounded(); + let receiver = Arc::new(ConcurrentQueue::unbounded()); + let sender = receiver.clone(); match self.processed_watcher.as_mut().and_then(|w| w(sender)) { Some(w) => { source.processed_watcher = Some(w); @@ -213,7 +216,7 @@ impl AssetSourceBuilder { /// Will use the given `watcher` function to construct unprocessed [`AssetWatcher`] instances. pub fn with_watcher( mut self, - watcher: impl FnMut(crossbeam_channel::Sender) -> Option> + watcher: impl FnMut(Arc>) -> Option> + Send + Sync + 'static, @@ -243,7 +246,7 @@ impl AssetSourceBuilder { /// Will use the given `watcher` function to construct processed [`AssetWatcher`] instances. pub fn with_processed_watcher( mut self, - watcher: impl FnMut(crossbeam_channel::Sender) -> Option> + watcher: impl FnMut(Arc>) -> Option> + Send + Sync + 'static, @@ -364,8 +367,8 @@ pub struct AssetSource { processed_writer: Option>, watcher: Option>, processed_watcher: Option>, - event_receiver: Option>, - processed_event_receiver: Option>, + event_receiver: Option>>, + processed_event_receiver: Option>>, } impl AssetSource { @@ -416,15 +419,13 @@ impl AssetSource { /// Return's this source's unprocessed event receiver, if the source is currently watching for changes. #[inline] - pub fn event_receiver(&self) -> Option<&crossbeam_channel::Receiver> { + pub fn event_receiver(&self) -> Option<&Arc>> { self.event_receiver.as_ref() } /// Return's this source's processed event receiver, if the source is currently watching for changes. #[inline] - pub fn processed_event_receiver( - &self, - ) -> Option<&crossbeam_channel::Receiver> { + pub fn processed_event_receiver(&self) -> Option<&Arc>> { self.processed_event_receiver.as_ref() } @@ -484,10 +485,9 @@ impl AssetSource { pub fn get_default_watcher( path: String, file_debounce_wait_time: Duration, - ) -> impl FnMut(crossbeam_channel::Sender) -> Option> - + Send - + Sync { - move |sender: crossbeam_channel::Sender| { + ) -> impl FnMut(Arc>) -> Option> + Send + Sync + { + move |sender: Arc>| { #[cfg(all( feature = "file_watcher", not(target_arch = "wasm32"), diff --git a/crates/bevy_asset/src/server/info.rs b/crates/bevy_asset/src/server/info.rs index 79cdce9721f10..98df753a4259a 100644 --- a/crates/bevy_asset/src/server/info.rs +++ b/crates/bevy_asset/src/server/info.rs @@ -7,7 +7,7 @@ use crate::{ use bevy_ecs::world::World; use bevy_utils::tracing::warn; use bevy_utils::{Entry, HashMap, HashSet, TypeIdMap}; -use crossbeam_channel::Sender; +use concurrent_queue::ConcurrentQueue; use std::{ any::TypeId, sync::{Arc, Weak}, @@ -375,7 +375,7 @@ impl AssetInfos { loaded_asset_id: UntypedAssetId, loaded_asset: ErasedLoadedAsset, world: &mut World, - sender: &Sender, + sender: &ConcurrentQueue, ) { loaded_asset.value.insert(loaded_asset_id, world); let mut loading_deps = loaded_asset.dependencies; @@ -435,10 +435,10 @@ impl AssetInfos { let rec_dep_load_state = match (loading_rec_deps.len(), failed_rec_deps.len()) { (0, 0) => { sender - .send(InternalAssetEvent::LoadedWithDependencies { + .push(InternalAssetEvent::LoadedWithDependencies { id: loaded_asset_id, }) - .unwrap(); + .unwrap_or_else(|_| panic!("Failed to push internal asset event.")); RecursiveDependencyLoadState::Loaded } (_loading, 0) => RecursiveDependencyLoadState::Loading, @@ -529,7 +529,7 @@ impl AssetInfos { infos: &mut AssetInfos, loaded_id: UntypedAssetId, waiting_id: UntypedAssetId, - sender: &Sender, + sender: &ConcurrentQueue, ) { let dependants_waiting_on_rec_load = if let Some(info) = infos.get_mut(waiting_id) { info.loading_rec_dependencies.remove(&loaded_id); @@ -537,8 +537,8 @@ impl AssetInfos { info.rec_dep_load_state = RecursiveDependencyLoadState::Loaded; if info.load_state == LoadState::Loaded { sender - .send(InternalAssetEvent::LoadedWithDependencies { id: waiting_id }) - .unwrap(); + .push(InternalAssetEvent::LoadedWithDependencies { id: waiting_id }) + .unwrap_or_else(|_| panic!("Failed to push internal asset event.")); } Some(std::mem::take( &mut info.dependants_waiting_on_recursive_dep_load, @@ -690,7 +690,7 @@ impl AssetInfos { /// [`Assets`]: crate::Assets pub(crate) fn consume_handle_drop_events(&mut self) { for provider in self.handle_providers.values() { - while let Ok(drop_event) = provider.drop_receiver.try_recv() { + while let Ok(drop_event) = provider.drop_queue.pop() { let id = drop_event.id; if drop_event.asset_server_managed { Self::process_handle_drop_internal( diff --git a/crates/bevy_asset/src/server/mod.rs b/crates/bevy_asset/src/server/mod.rs index 0e4ae6aab1f78..eeb664bbc1ee6 100644 --- a/crates/bevy_asset/src/server/mod.rs +++ b/crates/bevy_asset/src/server/mod.rs @@ -21,7 +21,7 @@ use bevy_ecs::prelude::*; use bevy_tasks::IoTaskPool; use bevy_utils::tracing::{error, info}; use bevy_utils::{CowArc, HashSet}; -use crossbeam_channel::{Receiver, Sender}; +use concurrent_queue::ConcurrentQueue; use futures_lite::StreamExt; use info::*; use loaders::*; @@ -57,8 +57,7 @@ pub struct AssetServer { pub(crate) struct AssetServerData { pub(crate) infos: RwLock, pub(crate) loaders: Arc>, - asset_event_sender: Sender, - asset_event_receiver: Receiver, + asset_event_queue: ConcurrentQueue, sources: AssetSources, mode: AssetServerMode, meta_check: AssetMetaCheck, @@ -110,7 +109,6 @@ impl AssetServer { meta_check: AssetMetaCheck, watching_for_changes: bool, ) -> Self { - let (asset_event_sender, asset_event_receiver) = crossbeam_channel::unbounded(); let mut infos = AssetInfos::default(); infos.watching_for_changes = watching_for_changes; Self { @@ -118,8 +116,7 @@ impl AssetServer { sources, mode, meta_check, - asset_event_sender, - asset_event_receiver, + asset_event_queue: ConcurrentQueue::unbounded(), loaders, infos: RwLock::new(infos), }), @@ -742,7 +739,10 @@ impl AssetServer { } fn send_asset_event(&self, event: InternalAssetEvent) { - self.data.asset_event_sender.send(event).unwrap(); + self.data + .asset_event_queue + .push(event) + .unwrap_or_else(|_| panic!("Failed to push internal asset event.")); } /// Retrieves all loads states for the given asset id. @@ -1059,14 +1059,14 @@ pub fn handle_internal_asset_events(world: &mut World) { world.resource_scope(|world, server: Mut| { let mut infos = server.data.infos.write(); let mut untyped_failures = vec![]; - for event in server.data.asset_event_receiver.try_iter() { + for event in server.data.asset_event_queue.try_iter() { match event { InternalAssetEvent::Loaded { id, loaded_asset } => { infos.process_asset_load( id, loaded_asset, world, - &server.data.asset_event_sender, + &server.data.asset_event_queue, ); } InternalAssetEvent::LoadedWithDependencies { id } => { diff --git a/crates/bevy_render/src/renderer/mod.rs b/crates/bevy_render/src/renderer/mod.rs index 23ad14c136df5..28ef26e201476 100644 --- a/crates/bevy_render/src/renderer/mod.rs +++ b/crates/bevy_render/src/renderer/mod.rs @@ -105,15 +105,9 @@ pub fn render_system(world: &mut World, state: &mut SystemState(); - if let Err(error) = time_sender.0.try_send(Instant::now()) { - match error { - bevy_time::TrySendError::Full(_) => { - panic!("The TimeSender channel should always be empty during render. You might need to add the bevy::core::time_system to your app.",); - } - bevy_time::TrySendError::Disconnected(_) => { - // ignore disconnected errors, the main world probably just got dropped during shutdown - } - } + // ignore disconnected errors, the main world probably just got dropped during shutdown + if let Err(bevy_time::PushError::Full(_)) = time_sender.0.push(Instant::now()) { + panic!("The TimeSender channel should always be empty during render. You might need to add the bevy::core::time_system to your app.",); } } diff --git a/crates/bevy_time/Cargo.toml b/crates/bevy_time/Cargo.toml index a2ea432d86e1f..74c6398adf718 100644 --- a/crates/bevy_time/Cargo.toml +++ b/crates/bevy_time/Cargo.toml @@ -24,7 +24,7 @@ bevy_reflect = { path = "../bevy_reflect", version = "0.14.0-dev", features = [ bevy_utils = { path = "../bevy_utils", version = "0.14.0-dev" } # other -crossbeam-channel = "0.5.0" +concurrent-queue = "2.4" serde = { version = "1", features = ["derive"], optional = true } thiserror = "1.0" diff --git a/crates/bevy_time/src/lib.rs b/crates/bevy_time/src/lib.rs index 912a600bb237e..f1a0637147f65 100644 --- a/crates/bevy_time/src/lib.rs +++ b/crates/bevy_time/src/lib.rs @@ -33,8 +33,9 @@ use bevy_app::{prelude::*, RunFixedMainLoop}; use bevy_ecs::event::signal_event_update_system; use bevy_ecs::prelude::*; use bevy_utils::{tracing::warn, Duration, Instant}; -pub use crossbeam_channel::TrySendError; -use crossbeam_channel::{Receiver, Sender}; +use concurrent_queue::ConcurrentQueue; +pub use concurrent_queue::PushError; +use std::sync::Arc; /// Adds time functionality to Apps. #[derive(Default)] @@ -86,18 +87,18 @@ pub enum TimeUpdateStrategy { /// Channel resource used to receive time from the render world. #[derive(Resource)] -pub struct TimeReceiver(pub Receiver); +pub struct TimeReceiver(pub Arc>); /// Channel resource used to send time from the render world. #[derive(Resource)] -pub struct TimeSender(pub Sender); +pub struct TimeSender(pub Arc>); /// Creates channels used for sending time between the render world and the main world. pub fn create_time_channels() -> (TimeSender, TimeReceiver) { // bound the channel to 2 since when pipelined the render phase can finish before // the time system runs. - let (s, r) = crossbeam_channel::bounded::(2); - (TimeSender(s), TimeReceiver(r)) + let queue = Arc::new(ConcurrentQueue::bounded(2)); + (TimeSender(queue.clone()), TimeReceiver(queue)) } /// The system used to update the [`Time`] used by app logic. If there is a render world the time is @@ -112,7 +113,7 @@ fn time_system( ) { let new_time = if let Some(time_recv) = time_recv { // TODO: Figure out how to handle this when using pipelined rendering. - if let Ok(new_time) = time_recv.0.try_recv() { + if let Ok(new_time) = time_recv.0.pop() { *has_received_time = true; new_time } else { diff --git a/crates/bevy_winit/Cargo.toml b/crates/bevy_winit/Cargo.toml index d617ba32970e8..157ebb77b5699 100644 --- a/crates/bevy_winit/Cargo.toml +++ b/crates/bevy_winit/Cargo.toml @@ -48,8 +48,6 @@ winit = { version = "0.29", default-features = false, features = [ [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen = { version = "0.2" } web-sys = "0.3" -crossbeam-channel = "0.5" - [lints] workspace = true From 24c186ad4223c381306b6f0e349f1ca0779c7797 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 01:37:25 -0700 Subject: [PATCH 02/12] Try fixing file watcher --- crates/bevy_asset/src/io/file/file_watcher.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/bevy_asset/src/io/file/file_watcher.rs b/crates/bevy_asset/src/io/file/file_watcher.rs index 7391bc2bd2dc3..d5ebe3d487814 100644 --- a/crates/bevy_asset/src/io/file/file_watcher.rs +++ b/crates/bevy_asset/src/io/file/file_watcher.rs @@ -11,6 +11,7 @@ use notify_debouncer_full::{ }, DebounceEventResult, Debouncer, FileIdMap, }; +use concurrent_queue::ConcurrentQueue; use std::path::{Path, PathBuf}; /// An [`AssetWatcher`] that watches the filesystem for changes to asset files in a given root folder and emits [`AssetSourceEvent`] @@ -25,7 +26,7 @@ pub struct FileWatcher { impl FileWatcher { pub fn new( root: PathBuf, - sender: Sender, + sender: Arc>, debounce_wait_time: Duration, ) -> Result { let root = normalize_path(super::get_base_path().join(root).as_path()); @@ -243,7 +244,7 @@ pub(crate) fn new_asset_event_debouncer( } pub(crate) struct FileEventHandler { - sender: Sender, + sender: Arc>, root: PathBuf, last_event: Option, } @@ -259,7 +260,7 @@ impl FilesystemEventHandler for FileEventHandler { fn handle(&mut self, _absolute_paths: &[PathBuf], event: AssetSourceEvent) { if self.last_event.as_ref() != Some(&event) { self.last_event = Some(event.clone()); - self.sender.send(event).unwrap(); + self.sender.push(event).unwrap(); } } } From 45397dd9fd71b4991da6bad09a86ad8c65d1cd23 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 01:54:47 -0700 Subject: [PATCH 03/12] Formatting --- crates/bevy_asset/src/io/file/file_watcher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_asset/src/io/file/file_watcher.rs b/crates/bevy_asset/src/io/file/file_watcher.rs index d5ebe3d487814..05f391a485af9 100644 --- a/crates/bevy_asset/src/io/file/file_watcher.rs +++ b/crates/bevy_asset/src/io/file/file_watcher.rs @@ -2,6 +2,7 @@ use crate::io::{AssetSourceEvent, AssetWatcher}; use crate::path::normalize_path; use bevy_utils::tracing::error; use bevy_utils::Duration; +use concurrent_queue::ConcurrentQueue; use notify_debouncer_full::{ new_debouncer, notify::{ @@ -11,7 +12,6 @@ use notify_debouncer_full::{ }, DebounceEventResult, Debouncer, FileIdMap, }; -use concurrent_queue::ConcurrentQueue; use std::path::{Path, PathBuf}; /// An [`AssetWatcher`] that watches the filesystem for changes to asset files in a given root folder and emits [`AssetSourceEvent`] From 8cbd88ce4a58f87a5ce1bebefaf27c005795a16e Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 02:05:05 -0700 Subject: [PATCH 04/12] Fix up imports --- crates/bevy_asset/src/io/embedded/embedded_watcher.rs | 1 + crates/bevy_asset/src/io/file/file_watcher.rs | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/bevy_asset/src/io/embedded/embedded_watcher.rs b/crates/bevy_asset/src/io/embedded/embedded_watcher.rs index b60cd4d6a6c02..5f26e522c8549 100644 --- a/crates/bevy_asset/src/io/embedded/embedded_watcher.rs +++ b/crates/bevy_asset/src/io/embedded/embedded_watcher.rs @@ -5,6 +5,7 @@ use crate::io::{ }; use bevy_utils::tracing::warn; use bevy_utils::{Duration, HashMap}; +use concurrent_queue::ConcurrentQueue; use notify_debouncer_full::{notify::RecommendedWatcher, Debouncer, FileIdMap}; use parking_lot::RwLock; use std::{ diff --git a/crates/bevy_asset/src/io/file/file_watcher.rs b/crates/bevy_asset/src/io/file/file_watcher.rs index 05f391a485af9..a63042bcae861 100644 --- a/crates/bevy_asset/src/io/file/file_watcher.rs +++ b/crates/bevy_asset/src/io/file/file_watcher.rs @@ -12,7 +12,10 @@ use notify_debouncer_full::{ }, DebounceEventResult, Debouncer, FileIdMap, }; -use std::path::{Path, PathBuf}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; /// An [`AssetWatcher`] that watches the filesystem for changes to asset files in a given root folder and emits [`AssetSourceEvent`] /// for each relevant change. This uses [`notify_debouncer_full`] to retrieve "debounced" filesystem events. From 842906451844c03c4c18e5f36601e1150be933d3 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 02:18:13 -0700 Subject: [PATCH 05/12] Last send --- crates/bevy_asset/src/io/embedded/embedded_watcher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_asset/src/io/embedded/embedded_watcher.rs b/crates/bevy_asset/src/io/embedded/embedded_watcher.rs index 5f26e522c8549..47916394c68ee 100644 --- a/crates/bevy_asset/src/io/embedded/embedded_watcher.rs +++ b/crates/bevy_asset/src/io/embedded/embedded_watcher.rs @@ -83,7 +83,7 @@ impl FilesystemEventHandler for EmbeddedEventHandler { } } self.last_event = Some(event.clone()); - self.sender.send(event).unwrap(); + self.sender.push(event).unwrap(); } } } From 3542d1ff30cfd676b924db418e70f176e3285b20 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 02:24:57 -0700 Subject: [PATCH 06/12] Use async-channel for GatedReader --- crates/bevy_asset/Cargo.toml | 1 + crates/bevy_asset/src/io/gated.rs | 21 ++++++++++++--------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/crates/bevy_asset/Cargo.toml b/crates/bevy_asset/Cargo.toml index 0a54f6e7ab451..9350dabbba93f 100644 --- a/crates/bevy_asset/Cargo.toml +++ b/crates/bevy_asset/Cargo.toml @@ -31,6 +31,7 @@ bevy_utils = { path = "../bevy_utils", version = "0.14.0-dev" } async-broadcast = "0.5" async-fs = "2.0" async-lock = "3.0" +async-channel = "2.2" concurrent-queue = "2.4" downcast-rs = "1.2" futures-io = "0.3" diff --git a/crates/bevy_asset/src/io/gated.rs b/crates/bevy_asset/src/io/gated.rs index 7e67c3c461651..c6cef733bedcd 100644 --- a/crates/bevy_asset/src/io/gated.rs +++ b/crates/bevy_asset/src/io/gated.rs @@ -1,6 +1,6 @@ use crate::io::{AssetReader, AssetReaderError, PathStream, Reader}; +use async_channel::{Sender, Receiver}; use bevy_utils::HashMap; -use concurrent_queue::ConcurrentQueue; use parking_lot::RwLock; use std::{path::Path, sync::Arc}; @@ -10,7 +10,7 @@ use std::{path::Path, sync::Arc}; /// This is built primarily for unit tests. pub struct GatedReader { reader: R, - gates: Arc, ConcurrentQueue<()>>>>, + gates: Arc, (Sender<()>, Receiver<()>)>>>, } impl Clone for GatedReader { @@ -24,7 +24,7 @@ impl Clone for GatedReader { /// Opens path "gates" for a [`GatedReader`]. pub struct GateOpener { - gates: Arc, ConcurrentQueue<()>>>>, + gates: Arc, (Sender<()>, Receiver<()>)>>>, } impl GateOpener { @@ -34,8 +34,9 @@ impl GateOpener { let mut gates = self.gates.write(); let gates = gates .entry_ref(path.as_ref()) - .or_insert_with(ConcurrentQueue::unbounded); - gates.push(()).unwrap(); + .or_insert_with(async_channel::unbounded); + // Should never fail as these channels are always initialized as unbounded. + gates.0.try_send(()).unwrap(); } } @@ -56,13 +57,15 @@ impl GatedReader { impl AssetReader for GatedReader { async fn read<'a>(&'a self, path: &'a Path) -> Result>, AssetReaderError> { - { + let receiver = { let mut gates = self.gates.write(); let gates = gates .entry_ref(path.as_ref()) - .or_insert_with(ConcurrentQueue::unbounded); - gates.pop().unwrap(); - } + .or_insert_with(async_channel::unbounded); + gates.1.clone() + }; + // Should never fail as these channels are always initialized as unbounded and never closed. + receiver.recv().await.unwrap(); let result = self.reader.read(path).await?; Ok(result) } From c2470395625b354f232372e8f58fdb7b9f7d8dbe Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 02:30:32 -0700 Subject: [PATCH 07/12] Formatting --- crates/bevy_asset/src/io/gated.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_asset/src/io/gated.rs b/crates/bevy_asset/src/io/gated.rs index c6cef733bedcd..b03c9d4f4b6ca 100644 --- a/crates/bevy_asset/src/io/gated.rs +++ b/crates/bevy_asset/src/io/gated.rs @@ -1,5 +1,5 @@ use crate::io::{AssetReader, AssetReaderError, PathStream, Reader}; -use async_channel::{Sender, Receiver}; +use async_channel::{Receiver, Sender}; use bevy_utils::HashMap; use parking_lot::RwLock; use std::{path::Path, sync::Arc}; From ad9bdb36141957895448548013ec15b1cfbc3852 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 03:14:09 -0700 Subject: [PATCH 08/12] Add EventQueue replacement for Arc> --- crates/bevy_asset/src/assets.rs | 6 +- crates/bevy_asset/src/handle.rs | 16 ++--- .../src/io/embedded/embedded_watcher.rs | 18 ++--- crates/bevy_asset/src/io/file/file_watcher.rs | 7 +- crates/bevy_asset/src/io/source.rs | 48 ++++++------- crates/bevy_asset/src/lib.rs | 2 + crates/bevy_asset/src/server/info.rs | 2 +- crates/bevy_asset/src/utils.rs | 72 +++++++++++++++++++ 8 files changed, 122 insertions(+), 49 deletions(-) create mode 100644 crates/bevy_asset/src/utils.rs diff --git a/crates/bevy_asset/src/assets.rs b/crates/bevy_asset/src/assets.rs index 6fe17ab153947..65f6fca125fb4 100644 --- a/crates/bevy_asset/src/assets.rs +++ b/crates/bevy_asset/src/assets.rs @@ -240,7 +240,7 @@ impl DenseAssetStorage { value: None, generation: 0, }); - while let Ok(recycled) = self.allocator.recycled.pop() { + for recycled in self.allocator.recycled.try_iter() { let entry = &mut self.storage[recycled.index as usize]; *entry = Entry::Some { value: None, @@ -540,7 +540,7 @@ impl Assets { // to other asset info operations let mut infos = asset_server.data.infos.write(); let mut not_ready = Vec::new(); - while let Ok(drop_event) = assets.handle_provider.drop_queue.pop() { + while let Ok(drop_event) = assets.handle_provider.drop_queue.try_recv() { let id = drop_event.id.typed(); if drop_event.asset_server_managed { @@ -566,7 +566,7 @@ impl Assets { // TODO: this is _extremely_ inefficient find a better fix // This will also loop failed assets indefinitely. Is that ok? for event in not_ready { - assets.handle_provider.drop_queue.push(event).unwrap(); + assets.handle_provider.drop_queue.send(event); } } diff --git a/crates/bevy_asset/src/handle.rs b/crates/bevy_asset/src/handle.rs index 92cab1a39f83c..3ecd0f0dae2bb 100644 --- a/crates/bevy_asset/src/handle.rs +++ b/crates/bevy_asset/src/handle.rs @@ -1,11 +1,11 @@ use crate::{ - meta::MetaTransform, Asset, AssetId, AssetIndexAllocator, AssetPath, InternalAssetId, - UntypedAssetId, + meta::MetaTransform, + utils::{EventQueue, EventSender}, + Asset, AssetId, AssetIndexAllocator, AssetPath, InternalAssetId, UntypedAssetId, }; use bevy_ecs::prelude::*; use bevy_reflect::{std_traits::ReflectDefault, Reflect, TypePath}; use bevy_utils::get_short_name; -use concurrent_queue::ConcurrentQueue; use std::{ any::TypeId, hash::{Hash, Hasher}, @@ -19,7 +19,7 @@ use uuid::Uuid; #[derive(Clone)] pub struct AssetHandleProvider { pub(crate) allocator: Arc, - pub(crate) drop_queue: Arc>, + pub(crate) drop_queue: EventQueue, pub(crate) type_id: TypeId, } @@ -34,7 +34,7 @@ impl AssetHandleProvider { Self { type_id, allocator, - drop_queue: Arc::new(ConcurrentQueue::unbounded()), + drop_queue: EventQueue::new(), } } @@ -54,7 +54,7 @@ impl AssetHandleProvider { ) -> Arc { Arc::new(StrongHandle { id: id.untyped(self.type_id), - drop: self.drop_queue.clone(), + drop: self.drop_queue.sender(), meta_transform, path, asset_server_managed, @@ -88,12 +88,12 @@ pub struct StrongHandle { /// 1. configuration tied to the lifetime of a specific asset load /// 2. configuration that must be repeatable when the asset is hot-reloaded pub(crate) meta_transform: Option, - pub(crate) drop: Arc>, + pub(crate) drop: EventSender, } impl Drop for StrongHandle { fn drop(&mut self) { - let _ = self.drop.push(DropEvent { + self.drop.send(DropEvent { id: self.id.internal(), asset_server_managed: self.asset_server_managed, }); diff --git a/crates/bevy_asset/src/io/embedded/embedded_watcher.rs b/crates/bevy_asset/src/io/embedded/embedded_watcher.rs index 47916394c68ee..61e7152833e41 100644 --- a/crates/bevy_asset/src/io/embedded/embedded_watcher.rs +++ b/crates/bevy_asset/src/io/embedded/embedded_watcher.rs @@ -1,11 +1,13 @@ -use crate::io::{ - file::{get_asset_path, get_base_path, new_asset_event_debouncer, FilesystemEventHandler}, - memory::Dir, - AssetSourceEvent, AssetWatcher, +use crate::{ + io::{ + file::{get_asset_path, get_base_path, new_asset_event_debouncer, FilesystemEventHandler}, + memory::Dir, + AssetSourceEvent, AssetWatcher, + }, + EventSender, }; use bevy_utils::tracing::warn; use bevy_utils::{Duration, HashMap}; -use concurrent_queue::ConcurrentQueue; use notify_debouncer_full::{notify::RecommendedWatcher, Debouncer, FileIdMap}; use parking_lot::RwLock; use std::{ @@ -27,7 +29,7 @@ impl EmbeddedWatcher { pub fn new( dir: Dir, root_paths: Arc, PathBuf>>>, - sender: Arc>, + sender: EventSender, debounce_wait_time: Duration, ) -> Self { let root = get_base_path(); @@ -49,7 +51,7 @@ impl AssetWatcher for EmbeddedWatcher {} /// binary-embedded Rust source files. This will read the contents of changed files from the file system and overwrite /// the initial static bytes from the file embedded in the binary. pub(crate) struct EmbeddedEventHandler { - sender: Arc>, + sender: EventSender, root_paths: Arc, PathBuf>>>, root: PathBuf, dir: Dir, @@ -83,7 +85,7 @@ impl FilesystemEventHandler for EmbeddedEventHandler { } } self.last_event = Some(event.clone()); - self.sender.push(event).unwrap(); + self.sender.send(event); } } } diff --git a/crates/bevy_asset/src/io/file/file_watcher.rs b/crates/bevy_asset/src/io/file/file_watcher.rs index a63042bcae861..82591e07e4da3 100644 --- a/crates/bevy_asset/src/io/file/file_watcher.rs +++ b/crates/bevy_asset/src/io/file/file_watcher.rs @@ -2,7 +2,6 @@ use crate::io::{AssetSourceEvent, AssetWatcher}; use crate::path::normalize_path; use bevy_utils::tracing::error; use bevy_utils::Duration; -use concurrent_queue::ConcurrentQueue; use notify_debouncer_full::{ new_debouncer, notify::{ @@ -29,7 +28,7 @@ pub struct FileWatcher { impl FileWatcher { pub fn new( root: PathBuf, - sender: Arc>, + sender: EventSender, debounce_wait_time: Duration, ) -> Result { let root = normalize_path(super::get_base_path().join(root).as_path()); @@ -247,7 +246,7 @@ pub(crate) fn new_asset_event_debouncer( } pub(crate) struct FileEventHandler { - sender: Arc>, + sender: EventSender, root: PathBuf, last_event: Option, } @@ -263,7 +262,7 @@ impl FilesystemEventHandler for FileEventHandler { fn handle(&mut self, _absolute_paths: &[PathBuf], event: AssetSourceEvent) { if self.last_event.as_ref() != Some(&event) { self.last_event = Some(event.clone()); - self.sender.push(event).unwrap(); + self.sender.send(event); } } } diff --git a/crates/bevy_asset/src/io/source.rs b/crates/bevy_asset/src/io/source.rs index 809fc0c8e7c58..70aa7b468126d 100644 --- a/crates/bevy_asset/src/io/source.rs +++ b/crates/bevy_asset/src/io/source.rs @@ -1,11 +1,11 @@ use crate::{ io::{processor_gated::ProcessorGatedReader, AssetSourceEvent, AssetWatcher}, processor::AssetProcessorData, + utils::{EventQueue, EventReceiver, EventSender}, }; use bevy_ecs::system::Resource; use bevy_utils::tracing::{error, warn}; use bevy_utils::{CowArc, Duration, HashMap}; -use concurrent_queue::ConcurrentQueue; use std::{fmt::Display, hash::Hash, sync::Arc}; use thiserror::Error; @@ -118,9 +118,7 @@ pub struct AssetSourceBuilder { pub writer: Option Option> + Send + Sync>>, pub watcher: Option< Box< - dyn FnMut(Arc>) -> Option> - + Send - + Sync, + dyn FnMut(EventSender) -> Option> + Send + Sync, >, >, pub processed_reader: Option Box + Send + Sync>>, @@ -128,9 +126,7 @@ pub struct AssetSourceBuilder { Option Option> + Send + Sync>>, pub processed_watcher: Option< Box< - dyn FnMut(Arc>) -> Option> - + Send - + Sync, + dyn FnMut(EventSender) -> Option> + Send + Sync, >, >, pub watch_warning: Option<&'static str>, @@ -162,12 +158,11 @@ impl AssetSourceBuilder { }; if watch { - let receiver = Arc::new(ConcurrentQueue::unbounded()); - let sender = receiver.clone(); - match self.watcher.as_mut().and_then(|w| w(sender.clone())) { + let queue = EventQueue::new(); + match self.watcher.as_mut().and_then(|w| w(queue.sender())) { Some(w) => { source.watcher = Some(w); - source.event_receiver = Some(receiver); + source.event_receiver = Some(queue.receiver()); } None => { if let Some(warning) = self.watch_warning { @@ -178,12 +173,15 @@ impl AssetSourceBuilder { } if watch_processed { - let receiver = Arc::new(ConcurrentQueue::unbounded()); - let sender = receiver.clone(); - match self.processed_watcher.as_mut().and_then(|w| w(sender)) { + let queue = EventQueue::new(); + match self + .processed_watcher + .as_mut() + .and_then(|w| w(queue.sender())) + { Some(w) => { source.processed_watcher = Some(w); - source.processed_event_receiver = Some(receiver); + source.processed_event_receiver = Some(queue.receiver()); } None => { if let Some(warning) = self.processed_watch_warning { @@ -216,7 +214,7 @@ impl AssetSourceBuilder { /// Will use the given `watcher` function to construct unprocessed [`AssetWatcher`] instances. pub fn with_watcher( mut self, - watcher: impl FnMut(Arc>) -> Option> + watcher: impl FnMut(EventSender) -> Option> + Send + Sync + 'static, @@ -246,7 +244,7 @@ impl AssetSourceBuilder { /// Will use the given `watcher` function to construct processed [`AssetWatcher`] instances. pub fn with_processed_watcher( mut self, - watcher: impl FnMut(Arc>) -> Option> + watcher: impl FnMut(EventSender) -> Option> + Send + Sync + 'static, @@ -367,8 +365,8 @@ pub struct AssetSource { processed_writer: Option>, watcher: Option>, processed_watcher: Option>, - event_receiver: Option>>, - processed_event_receiver: Option>>, + event_receiver: Option>, + processed_event_receiver: Option>, } impl AssetSource { @@ -419,14 +417,14 @@ impl AssetSource { /// Return's this source's unprocessed event receiver, if the source is currently watching for changes. #[inline] - pub fn event_receiver(&self) -> Option<&Arc>> { - self.event_receiver.as_ref() + pub fn event_receiver(&self) -> Option> { + self.event_receiver.as_ref().cloned() } /// Return's this source's processed event receiver, if the source is currently watching for changes. #[inline] - pub fn processed_event_receiver(&self) -> Option<&Arc>> { - self.processed_event_receiver.as_ref() + pub fn processed_event_receiver(&self) -> Option> { + self.processed_event_receiver.as_ref().cloned() } /// Returns true if the assets in this source should be processed. @@ -485,9 +483,9 @@ impl AssetSource { pub fn get_default_watcher( path: String, file_debounce_wait_time: Duration, - ) -> impl FnMut(Arc>) -> Option> + Send + Sync + ) -> impl FnMut(EventSender) -> Option> + Send + Sync { - move |sender: Arc>| { + move |sender: EventSender| { #[cfg(all( feature = "file_watcher", not(target_arch = "wasm32"), diff --git a/crates/bevy_asset/src/lib.rs b/crates/bevy_asset/src/lib.rs index 4b3be75ecd80c..62b9d9e3d8a11 100644 --- a/crates/bevy_asset/src/lib.rs +++ b/crates/bevy_asset/src/lib.rs @@ -30,6 +30,7 @@ mod loader; mod path; mod reflect; mod server; +mod utils; pub use assets::*; pub use bevy_asset_macros::Asset; @@ -43,6 +44,7 @@ pub use loader::*; pub use path::*; pub use reflect::*; pub use server::*; +pub use utils::*; /// Rusty Object Notation, a crate used to serialize and deserialize bevy assets. pub use ron; diff --git a/crates/bevy_asset/src/server/info.rs b/crates/bevy_asset/src/server/info.rs index 98df753a4259a..f63a72654b6f3 100644 --- a/crates/bevy_asset/src/server/info.rs +++ b/crates/bevy_asset/src/server/info.rs @@ -690,7 +690,7 @@ impl AssetInfos { /// [`Assets`]: crate::Assets pub(crate) fn consume_handle_drop_events(&mut self) { for provider in self.handle_providers.values() { - while let Ok(drop_event) = provider.drop_queue.pop() { + for drop_event in provider.drop_queue.try_iter() { let id = drop_event.id; if drop_event.asset_server_managed { Self::process_handle_drop_internal( diff --git a/crates/bevy_asset/src/utils.rs b/crates/bevy_asset/src/utils.rs new file mode 100644 index 0000000000000..3700ce959dc02 --- /dev/null +++ b/crates/bevy_asset/src/utils.rs @@ -0,0 +1,72 @@ +use concurrent_queue::ConcurrentQueue; +use std::sync::Arc; + +pub(crate) struct EventQueue(Arc>); + +impl EventQueue { + pub fn new() -> Self { + Self(Arc::new(ConcurrentQueue::unbounded())) + } + + pub fn sender(&self) -> EventSender { + EventSender(self.0.clone()) + } + + pub fn receiver(&self) -> EventReceiver { + EventReceiver(self.0.clone()) + } + + pub fn send(&self, value: T) { + self.0 + .push(value) + .unwrap_or_else(|_| panic!("Failed to send value.")); + } + + pub fn try_iter(&self) -> concurrent_queue::TryIter { + self.0.try_iter() + } +} + +impl Clone for EventQueue { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +/// A strictly non-blocking sender for a multi-producer multi-consumer channel. +#[derive(Debug)] +pub struct EventSender(Arc>); + +impl EventSender { + pub fn send(&self, value: T) { + self.0 + .push(value) + .unwrap_or_else(|_| panic!("Failed to send value.")); + } +} + +impl Clone for EventSender { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +/// A strictly non-blocking reciever for a multi-producer multi-consumer channel. +#[derive(Debug)] +pub struct EventReceiver(Arc>); + +impl EventReceiver { + pub fn try_recv(&self) -> Result { + self.0.pop() + } + + pub fn try_iter(&self) -> concurrent_queue::TryIter { + self.0.try_iter() + } +} + +impl Clone for EventReceiver { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} From a1ce6fd64e779fcf105ee90f6fc9d3ea86301193 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 21 Apr 2024 03:23:21 -0700 Subject: [PATCH 09/12] Cleanup time's use of the queues. --- crates/bevy_asset/src/io/file/file_watcher.rs | 6 ++---- crates/bevy_asset/src/utils.rs | 4 ++++ crates/bevy_render/src/lib.rs | 6 +++--- crates/bevy_render/src/renderer/mod.rs | 4 ++-- crates/bevy_time/src/lib.rs | 21 +++++++------------ 5 files changed, 19 insertions(+), 22 deletions(-) diff --git a/crates/bevy_asset/src/io/file/file_watcher.rs b/crates/bevy_asset/src/io/file/file_watcher.rs index 82591e07e4da3..c43df82835c84 100644 --- a/crates/bevy_asset/src/io/file/file_watcher.rs +++ b/crates/bevy_asset/src/io/file/file_watcher.rs @@ -1,5 +1,6 @@ use crate::io::{AssetSourceEvent, AssetWatcher}; use crate::path::normalize_path; +use crate::EventSender; use bevy_utils::tracing::error; use bevy_utils::Duration; use notify_debouncer_full::{ @@ -11,10 +12,7 @@ use notify_debouncer_full::{ }, DebounceEventResult, Debouncer, FileIdMap, }; -use std::{ - path::{Path, PathBuf}, - sync::Arc, -}; +use std::path::{Path, PathBuf}; /// An [`AssetWatcher`] that watches the filesystem for changes to asset files in a given root folder and emits [`AssetSourceEvent`] /// for each relevant change. This uses [`notify_debouncer_full`] to retrieve "debounced" filesystem events. diff --git a/crates/bevy_asset/src/utils.rs b/crates/bevy_asset/src/utils.rs index 3700ce959dc02..a303bd6d69028 100644 --- a/crates/bevy_asset/src/utils.rs +++ b/crates/bevy_asset/src/utils.rs @@ -22,6 +22,10 @@ impl EventQueue { .unwrap_or_else(|_| panic!("Failed to send value.")); } + pub fn try_recv(&self) -> Result { + self.0.pop() + } + pub fn try_iter(&self) -> concurrent_queue::TryIter { self.0.try_iter() } diff --git a/crates/bevy_render/src/lib.rs b/crates/bevy_render/src/lib.rs index 822a77b2e6278..ddbc8cb4d4406 100644 --- a/crates/bevy_render/src/lib.rs +++ b/crates/bevy_render/src/lib.rs @@ -468,9 +468,9 @@ unsafe fn initialize_render_app(app: &mut App) { extract(main_world, render_world); }); - let (sender, receiver) = bevy_time::create_time_channels(); - render_app.insert_resource(sender); - app.insert_resource(receiver); + let event_queue = bevy_time::create_time_event_queue(); + render_app.insert_resource(event_queue.clone()); + app.insert_resource(event_queue); app.insert_sub_app(RenderApp, render_app); } diff --git a/crates/bevy_render/src/renderer/mod.rs b/crates/bevy_render/src/renderer/mod.rs index 28ef26e201476..fc1dbb43e2415 100644 --- a/crates/bevy_render/src/renderer/mod.rs +++ b/crates/bevy_render/src/renderer/mod.rs @@ -16,7 +16,7 @@ use crate::{ view::{ExtractedWindows, ViewTarget}, }; use bevy_ecs::{prelude::*, system::SystemState}; -use bevy_time::TimeSender; +use bevy_time::TimeEventQueue; use bevy_utils::Instant; use std::sync::Arc; use wgpu::{ @@ -104,7 +104,7 @@ pub fn render_system(world: &mut World, state: &mut SystemState(); + let time_sender = world.resource::(); // ignore disconnected errors, the main world probably just got dropped during shutdown if let Err(bevy_time::PushError::Full(_)) = time_sender.0.push(Instant::now()) { panic!("The TimeSender channel should always be empty during render. You might need to add the bevy::core::time_system to your app.",); diff --git a/crates/bevy_time/src/lib.rs b/crates/bevy_time/src/lib.rs index f1a0637147f65..7c721d5ff30cf 100644 --- a/crates/bevy_time/src/lib.rs +++ b/crates/bevy_time/src/lib.rs @@ -85,20 +85,15 @@ pub enum TimeUpdateStrategy { ManualDuration(Duration), } -/// Channel resource used to receive time from the render world. -#[derive(Resource)] -pub struct TimeReceiver(pub Arc>); - -/// Channel resource used to send time from the render world. -#[derive(Resource)] -pub struct TimeSender(pub Arc>); +/// Queue resource used to receive time from the render world. +#[derive(Resource, Clone)] +pub struct TimeEventQueue(pub Arc>); /// Creates channels used for sending time between the render world and the main world. -pub fn create_time_channels() -> (TimeSender, TimeReceiver) { +pub fn create_time_event_queue() -> TimeEventQueue { // bound the channel to 2 since when pipelined the render phase can finish before // the time system runs. - let queue = Arc::new(ConcurrentQueue::bounded(2)); - (TimeSender(queue.clone()), TimeReceiver(queue)) + TimeEventQueue(Arc::new(ConcurrentQueue::bounded(2))) } /// The system used to update the [`Time`] used by app logic. If there is a render world the time is @@ -108,12 +103,12 @@ fn time_system( mut virtual_time: ResMut>, mut time: ResMut