Skip to content

Async deallocations #366

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ pub struct AudioBufferOptions {
///
#[derive(Clone, Debug)]
pub struct AudioBuffer {
pub(crate) channels: Vec<ChannelData>,
pub(crate) sample_rate: f32,
channels: Vec<ChannelData>,
sample_rate: f32,
}

impl AudioBuffer {
Expand All @@ -91,6 +91,14 @@ impl AudioBuffer {
}
}

/// Creates an invalid, but non-allocating AudioBuffer to be used as placeholder
pub(crate) fn tombstone() -> Self {
Self {
channels: Default::default(),
sample_rate: Default::default(),
}
}

/// Convert raw samples to an AudioBuffer
///
/// The outer Vec determine the channels. The inner Vecs should have the same length.
Expand Down
46 changes: 26 additions & 20 deletions src/node/audio_buffer_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl AudioBufferSourceNode {
loop_state: loop_state.clone(),
render_state: AudioBufferRendererState::default(),
ended_triggered: false,
buffer_deallocator: Some(llq::Node::new(Box::new(AudioBuffer::tombstone()))),
};

let inner_state = InnerState {
Expand Down Expand Up @@ -379,6 +380,7 @@ struct AudioBufferSourceRenderer {
loop_state: LoopState,
render_state: AudioBufferRendererState,
ended_triggered: bool,
buffer_deallocator: Option<llq::Node<Box<dyn Any + Send>>>,
}

impl AudioBufferSourceRenderer {
Expand All @@ -395,6 +397,26 @@ impl AudioBufferSourceRenderer {
ControlMessage::LoopEnd(loop_end) => self.loop_state.end = *loop_end,
}
}

fn on_end(&mut self, scope: &RenderScope) {
if self.ended_triggered {
return; // only run once
}
self.ended_triggered = true;

// notify the control thread
scope.send_ended_event();

// deallocate the AudioBuffer asynchronously - not in the render thread
if let Some(buffer) = self.buffer.take() {
// the holder contains a tombstone AudioBuffer that can be dropped without deallocation
let mut holder = self.buffer_deallocator.take().unwrap();
// replace the contents of the holder with the actual buffer
*holder.downcast_mut().unwrap() = buffer;
// ship the buffer to the deallocator thread
scope.deallocate_async(holder);
}
}
}

impl AudioProcessor for AudioBufferSourceRenderer {
Expand Down Expand Up @@ -461,13 +483,7 @@ impl AudioProcessor for AudioBufferSourceRenderer {
|| self.render_state.buffer_time_elapsed >= self.duration
{
output.make_silent(); // also converts to mono

// @note: we need this check because this is called a until the program
// ends, such as if the node was never removed from the graph
if !self.ended_triggered {
scope.send_ended_event();
self.ended_triggered = true;
}
self.on_end(scope);
return false;
}

Expand All @@ -479,19 +495,13 @@ impl AudioProcessor for AudioBufferSourceRenderer {
if !is_looping {
if computed_playback_rate > 0. && buffer_time >= buffer_duration {
output.make_silent(); // also converts to mono
if !self.ended_triggered {
scope.send_ended_event();
self.ended_triggered = true;
}
self.on_end(scope);
return false;
}

if computed_playback_rate < 0. && buffer_time < 0. {
output.make_silent(); // also converts to mono
if !self.ended_triggered {
scope.send_ended_event();
self.ended_triggered = true;
}
self.on_end(scope);
return false;
}
}
Expand Down Expand Up @@ -772,11 +782,7 @@ impl AudioProcessor for AudioBufferSourceRenderer {
std::mem::swap(current_buffer, buffer);
} else {
// Creating the tombstone buffer does not cause allocations.
let tombstone_buffer = AudioBuffer {
channels: Default::default(),
sample_rate: Default::default(),
};
self.buffer = Some(std::mem::replace(buffer, tombstone_buffer));
self.buffer = Some(std::mem::replace(buffer, AudioBuffer::tombstone()));
}
return;
};
Expand Down
7 changes: 7 additions & 0 deletions src/render/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::{graph::Node, AudioRenderQuantum};
use crossbeam_channel::Sender;
use rustc_hash::FxHashMap;
use std::cell::{Cell, RefCell};
use std::rc::Rc;

use std::any::Any;
use std::ops::Deref;
Expand All @@ -25,6 +26,7 @@ pub struct RenderScope {

pub(crate) node_id: Cell<AudioNodeId>,
pub(crate) event_sender: Option<Sender<EventDispatch>>,
pub(crate) garbage_collector: Rc<RefCell<llq::Producer<Box<dyn Any + Send>>>>,
}

impl RenderScope {
Expand Down Expand Up @@ -62,6 +64,11 @@ impl RenderScope {
let _ = sender.try_send(EventDispatch::processor_error(self.node_id.get(), event));
}
}

#[allow(dead_code)]
pub(crate) fn deallocate_async(&self, value: llq::Node<Box<dyn Any + Send>>) {
self.garbage_collector.borrow_mut().push(value);
}
}

/// Interface for audio processing code that runs on the audio rendering thread.
Expand Down
14 changes: 9 additions & 5 deletions src/render/thread.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! Communicates with the control thread and ships audio samples to the hardware

use std::any::Any;
use std::cell::Cell;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -32,7 +33,7 @@ pub(crate) struct RenderThread {
buffer_offset: Option<(usize, AudioRenderQuantum)>,
load_value_sender: Option<Sender<AudioRenderCapacityLoad>>,
event_sender: Option<Sender<EventDispatch>>,
garbage_collector: llq::Producer<Box<dyn Any + Send>>,
garbage_collector: Rc<RefCell<llq::Producer<Box<dyn Any + Send>>>>,
}

// SAFETY:
Expand Down Expand Up @@ -66,7 +67,7 @@ impl RenderThread {
buffer_offset: None,
load_value_sender,
event_sender,
garbage_collector: gc_producer,
garbage_collector: Rc::new(RefCell::new(gc_producer)),
}
}

Expand Down Expand Up @@ -129,7 +130,7 @@ impl RenderThread {
}
NodeMessage { id, mut msg } => {
self.graph.as_mut().unwrap().route_message(id, msg.as_mut());
self.garbage_collector.push(msg);
self.garbage_collector.borrow_mut().push(msg);
}
}
}
Expand Down Expand Up @@ -166,6 +167,7 @@ impl RenderThread {
sample_rate: self.sample_rate,
event_sender: self.event_sender.clone(),
node_id: Cell::new(AudioNodeId(0)), // placeholder value
garbage_collector: Rc::clone(&self.garbage_collector),
};

// render audio graph
Expand Down Expand Up @@ -264,6 +266,7 @@ impl RenderThread {
sample_rate: self.sample_rate,
event_sender: self.event_sender.clone(),
node_id: Cell::new(AudioNodeId(0)), // placeholder value
garbage_collector: Rc::clone(&self.garbage_collector),
};

// render audio graph, clone it in case we need to mutate/store the value later
Expand Down Expand Up @@ -302,6 +305,7 @@ impl RenderThread {
impl Drop for RenderThread {
fn drop(&mut self) {
self.garbage_collector
.borrow_mut()
.push(llq::Node::new(Box::new(TerminateGarbageCollectorThread)));
log::info!("Audio render thread has been dropped");
}
Expand All @@ -320,7 +324,7 @@ fn spawn_garbage_collector_thread(consumer: llq::Consumer<Box<dyn Any + Send>>)
}

fn run_garbage_collector_thread(mut consumer: llq::Consumer<Box<dyn Any + Send>>) {
log::info!("Entering garbage collector thread");
log::debug!("Entering garbage collector thread");
loop {
if let Some(node) = consumer.pop() {
if node
Expand Down