diff --git a/Cargo.toml b/Cargo.toml index 8325fe1..aadb95d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,8 +17,8 @@ exclude = ["/.*"] [dependencies] async-lock = "3.0.0" async-task = "4.4.0" -atomic-waker = "1.0" concurrent-queue = "2.0.0" +event-listener = { version = "5.2.0", default-features = false, features = ["std"] } fastrand = "2.0.0" futures-lite = { version = "2.0.0", default-features = false } slab = "0.4.4" diff --git a/src/lib.rs b/src/lib.rs index c13f67f..483bc41 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,14 +37,14 @@ use std::fmt; use std::marker::PhantomData; use std::panic::{RefUnwindSafe, UnwindSafe}; use std::rc::Rc; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, TryLockError}; -use std::task::{Poll, Waker}; +use std::task::Waker; use async_lock::OnceCell; use async_task::{Builder, Runnable}; -use atomic_waker::AtomicWaker; use concurrent_queue::ConcurrentQueue; +use event_listener::{listener, Event}; use futures_lite::{future, prelude::*}; use slab::Slab; use thread_local::ThreadLocal; @@ -225,7 +225,15 @@ impl<'a> Executor<'a> { /// ``` pub async fn tick(&self) { let state = self.state(); - let runnable = Ticker::new(state).runnable().await; + let runnable = state + .tick_with(|local, steal| { + local + .queue + .pop() + .ok() + .or_else(|| if steal { state.queue.pop().ok() } else { None }) + }) + .await; runnable.run(); } @@ -270,22 +278,23 @@ impl<'a> Executor<'a> { move |mut runnable| { // If possible, push into the current local queue and notify the ticker. if let Some(local) = state.local_queue.get() { - runnable = if let Err(err) = local.queue.push(runnable) { - err.into_inner() - } else { - // Wake up this thread if it's asleep, otherwise notify another - // thread to try to have the task stolen. - if let Some(waker) = local.waker.take() { - waker.wake(); + // Don't push into the local queue if no one is ticking it. + if local.tickers.load(Ordering::Acquire) > 0 { + runnable = if let Err(err) = local.queue.push(runnable) { + err.into_inner() } else { - state.notify(); + // Try to notify threads waiting on this queue. If there are + // none, notify another thread. + if local.waiters.notify_additional(1) == 0 { + state.new_tasks.notify_additional(1); + } + return; } - return; } } // If the local queue is full, fallback to pushing onto the global injector queue. state.queue.push(runnable).unwrap(); - state.notify(); + state.new_tasks.notify_additional(1); } } @@ -315,6 +324,9 @@ impl Drop for Executor<'_> { } drop(active); + for local_queue in state.local_queue.iter() { + while local_queue.queue.pop().is_ok() {} + } while state.queue.pop().is_ok() {} } } @@ -537,11 +549,8 @@ struct State { /// thread steals the task. local_queue: ThreadLocal, - /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. - notified: AtomicBool, - - /// A list of sleeping tickers. - sleepers: Mutex, + /// Tickers waiting on new tasks from the global queue. + new_tasks: Event, /// Currently active tasks. active: Mutex>, @@ -553,12 +562,7 @@ impl State { State { queue: ConcurrentQueue::unbounded(), local_queue: ThreadLocal::new(), - notified: AtomicBool::new(true), - sleepers: Mutex::new(Sleepers { - count: 0, - wakers: Vec::new(), - free_ids: Vec::new(), - }), + new_tasks: Event::new(), active: Mutex::new(Slab::new()), } } @@ -566,218 +570,40 @@ impl State { /// Notifies a sleeping ticker. #[inline] fn notify(&self) { - if self - .notified - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - { - let waker = self.sleepers.lock().unwrap().notify(); - if let Some(w) = waker { - w.wake(); - } - } + self.new_tasks.notify(1); } -} - -/// A list of sleeping tickers. -struct Sleepers { - /// Number of sleeping tickers (both notified and unnotified). - count: usize, - /// IDs and wakers of sleeping unnotified tickers. - /// - /// A sleeping ticker is notified when its waker is missing from this list. - wakers: Vec<(usize, Waker)>, + /// Run a tick using the provided function to get the next task. + async fn tick_with( + &self, + mut local_ticker: impl FnMut(&LocalQueue, bool) -> Option, + ) -> Runnable { + let local = self.local_queue.get_or_default(); - /// Reclaimed IDs. - free_ids: Vec, -} - -impl Sleepers { - /// Inserts a new sleeping ticker. - fn insert(&mut self, waker: &Waker) -> usize { - let id = match self.free_ids.pop() { - Some(id) => id, - None => self.count + 1, - }; - self.count += 1; - self.wakers.push((id, waker.clone())); - id - } - - /// Re-inserts a sleeping ticker's waker if it was notified. - /// - /// Returns `true` if the ticker was notified. - fn update(&mut self, id: usize, waker: &Waker) -> bool { - for item in &mut self.wakers { - if item.0 == id { - if !item.1.will_wake(waker) { - item.1 = waker.clone(); - } - return false; + loop { + // Try to get a runnable from the local queue. + if let Some(runnable) = local_ticker(local, false) { + return runnable; } - } - self.wakers.push((id, waker.clone())); - true - } + // Register a local waiter. + listener!(local.waiters => local_listener); - /// Removes a previously inserted sleeping ticker. - /// - /// Returns `true` if the ticker was notified. - fn remove(&mut self, id: usize) -> bool { - self.count -= 1; - self.free_ids.push(id); - - for i in (0..self.wakers.len()).rev() { - if self.wakers[i].0 == id { - self.wakers.remove(i); - return false; + // Try for a global runner. + if let Ok(runnable) = self.queue.pop() { + return runnable; } - } - true - } - /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. - fn is_notified(&self) -> bool { - self.count == 0 || self.count > self.wakers.len() - } - - /// Returns notification waker for a sleeping ticker. - /// - /// If a ticker was notified already or there are no tickers, `None` will be returned. - fn notify(&mut self) -> Option { - if self.wakers.len() == self.count { - self.wakers.pop().map(|item| item.1) - } else { - None - } - } -} - -/// Runs task one by one. -struct Ticker<'a> { - /// The executor state. - state: &'a State, - - /// Set to a non-zero sleeper ID when in sleeping state. - /// - /// States a ticker can be in: - /// 1) Woken. - /// 2a) Sleeping and unnotified. - /// 2b) Sleeping and notified. - sleeping: usize, -} + // Register a global waiter. + listener!(self.new_tasks => global_listener); -impl Ticker<'_> { - /// Creates a ticker. - fn new(state: &State) -> Ticker<'_> { - Ticker { state, sleeping: 0 } - } - - /// Moves the ticker into sleeping and unnotified state. - /// - /// Returns `false` if the ticker was already sleeping and unnotified. - fn sleep(&mut self, waker: &Waker) -> bool { - self.state - .local_queue - .get_or_default() - .waker - .register(waker); - - let mut sleepers = self.state.sleepers.lock().unwrap(); - - match self.sleeping { - // Move to sleeping state. - 0 => { - self.sleeping = sleepers.insert(waker); - } - - // Already sleeping, check if notified. - id => { - if !sleepers.update(id, waker) { - return false; - } + // Try for both again. + if let Some(runnable) = local_ticker(local, true) { + return runnable; } - } - - self.state - .notified - .store(sleepers.is_notified(), Ordering::Release); - - true - } - - /// Moves the ticker into woken state. - fn wake(&mut self) { - if self.sleeping != 0 { - let mut sleepers = self.state.sleepers.lock().unwrap(); - sleepers.remove(self.sleeping); - self.state - .notified - .store(sleepers.is_notified(), Ordering::Release); - } - self.sleeping = 0; - } - - /// Waits for the next runnable task to run. - async fn runnable(&mut self) -> Runnable { - self.runnable_with(|| { - self.state - .local_queue - .get() - .and_then(|local| local.queue.pop().ok()) - .or_else(|| self.state.queue.pop().ok()) - }) - .await - } - - /// Waits for the next runnable task to run, given a function that searches for a task. - async fn runnable_with(&mut self, mut search: impl FnMut() -> Option) -> Runnable { - future::poll_fn(|cx| { - loop { - match search() { - None => { - // Move to sleeping and unnotified state. - if !self.sleep(cx.waker()) { - // If already sleeping and unnotified, return. - return Poll::Pending; - } - } - Some(r) => { - // Wake up. - self.wake(); - - // Notify another ticker now to pick up where this ticker left off, just in - // case running the task takes a long time. - self.state.notify(); - - return Poll::Ready(r); - } - } - } - }) - .await - } -} - -impl Drop for Ticker<'_> { - fn drop(&mut self) { - // If this ticker is in sleeping state, it must be removed from the sleepers list. - if self.sleeping != 0 { - let mut sleepers = self.state.sleepers.lock().unwrap(); - let notified = sleepers.remove(self.sleeping); - - self.state - .notified - .store(sleepers.is_notified(), Ordering::Release); - - // If this ticker was notified, then notify another ticker. - if notified { - drop(sleepers); - self.state.notify(); - } + // Wait on both listeners in parallel. + local_listener.or(global_listener).await; } } } @@ -789,9 +615,6 @@ struct Runner<'a> { /// The executor state. state: &'a State, - /// Inner ticker. - ticker: Ticker<'a>, - /// Bumped every time a runnable task is found. ticks: usize, } @@ -799,12 +622,9 @@ struct Runner<'a> { impl Runner<'_> { /// Creates a runner and registers it in the executor state. fn new(state: &State) -> Runner<'_> { - let runner = Runner { - state, - ticker: Ticker::new(state), - ticks: 0, - }; - runner + state.local_queue.get_or_default().start_ticking(); + + Runner { state, ticks: 0 } } /// Waits for the next runnable task to run. @@ -812,13 +632,18 @@ impl Runner<'_> { let local = self.state.local_queue.get_or_default(); let runnable = self - .ticker - .runnable_with(|| { + .state + .tick_with(|_, try_stealing| { // Try the local queue. if let Ok(r) = local.queue.pop() { return Some(r); } + // Remaining work involves stealing. + if !try_stealing { + return None; + } + // Try stealing from the global queue. if let Ok(r) = self.state.queue.pop() { steal(&self.state.queue, &local.queue); @@ -868,12 +693,7 @@ impl Drop for Runner<'_> { fn drop(&mut self) { // Remove the local queue. if let Some(local) = self.state.local_queue.get() { - // Re-schedule remaining tasks in the local queue. - for r in local.queue.try_iter() { - // Explicitly reschedule the runnable back onto the global - // queue to avoid rescheduling onto the local one. - self.state.queue.push(r).unwrap(); - } + local.stop_ticking(self.state); } } } @@ -943,24 +763,10 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_ } } - /// Debug wrapper for the sleepers. - struct SleepCount<'a>(&'a Mutex); - - impl fmt::Debug for SleepCount<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.0.try_lock() { - Ok(lock) => fmt::Debug::fmt(&lock.count, f), - Err(TryLockError::WouldBlock) => f.write_str(""), - Err(TryLockError::Poisoned(_)) => f.write_str(""), - } - } - } - f.debug_struct(name) .field("active", &ActiveTasks(&state.active)) .field("global_tasks", &state.queue.len()) .field("local_runners", &LocalRunners(&state.local_queue)) - .field("sleepers", &SleepCount(&state.sleepers)) .finish() } @@ -972,15 +778,54 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_ /// The local queue *must* be flushed, and all pending runnables /// rescheduled onto the global queue when a runner is dropped. struct LocalQueue { + /// Queue of concurrent tasks. queue: ConcurrentQueue, - waker: AtomicWaker, + + /// Tickers waiting on an event from this queue. + waiters: Event, + + /// Number of tickers waiting on this queue. + tickers: AtomicUsize, } impl Default for LocalQueue { fn default() -> Self { Self { queue: ConcurrentQueue::bounded(512), - waker: AtomicWaker::new(), + waiters: Event::new(), + tickers: AtomicUsize::new(0), + } + } +} + +impl LocalQueue { + /// Indicate that we are now waiting on this queue. + fn start_ticking(&self) { + // Relaxed ordering is fine here. + let old_tickers = self.tickers.fetch_add(1, Ordering::Relaxed); + if old_tickers > isize::MAX as usize { + panic!("too many tickers waiting on one thread"); + } + } + + /// Indicate that we are no longer waiting on this queue. + #[inline] + fn stop_ticking(&self, state: &State) { + if self.tickers.fetch_sub(1, Ordering::Release) == 1 { + // Make sure everyone knows we're about to release tasks. + std::sync::atomic::fence(Ordering::Acquire); + + // Drain any tasks. + self.drain_tasks(state); + } + } + + /// Drain all tasks from this queue. + #[cold] + fn drain_tasks(&self, state: &State) { + while let Ok(task) = self.queue.pop() { + state.queue.push(task).ok(); + state.notify(); } } } diff --git a/tests/larger_tasks.rs b/tests/larger_tasks.rs new file mode 100644 index 0000000..ee6cc1e --- /dev/null +++ b/tests/larger_tasks.rs @@ -0,0 +1,101 @@ +//! Test for larger tasks. + +use async_executor::Executor; +use futures_lite::future::{self, block_on}; +use futures_lite::prelude::*; + +use std::sync::Arc; +use std::thread; + +#[cfg(not(miri))] +use std::time::Duration; + +fn do_run>(mut f: impl FnMut(Arc>) -> Fut) { + // This should not run for longer than two minutes. + #[cfg(not(miri))] + let (_stop_timeout, stopper) = async_channel::bounded::<()>(1); + #[cfg(not(miri))] + thread::spawn(move || { + block_on(async move { + let timeout = async { + async_io::Timer::after(Duration::from_secs(2 * 60)).await; + eprintln!("test timed out after 2m"); + std::process::exit(1) + }; + + let _ = stopper.recv().or(timeout).await; + }) + }); + + let ex = Arc::new(Executor::new()); + + // Test 1: Use the `run` command. + block_on(ex.run(f(ex.clone()))); + + // Test 2: Loop on `tick`. + block_on(async { + let ticker = async { + loop { + ex.tick().await; + } + }; + + f(ex.clone()).or(ticker).await + }); + + // Test 3: Run on many threads. + thread::scope(|scope| { + let (_signal, shutdown) = async_channel::bounded::<()>(1); + + for _ in 0..16 { + let shutdown = shutdown.clone(); + let ex = &ex; + scope.spawn(move || block_on(ex.run(shutdown.recv()))); + } + + block_on(f(ex.clone())); + }); + + // Test 4: Tick loop on many threads. + thread::scope(|scope| { + let (_signal, shutdown) = async_channel::bounded::<()>(1); + + for _ in 0..16 { + let shutdown = shutdown.clone(); + let ex = &ex; + scope.spawn(move || { + block_on(async move { + let ticker = async { + loop { + ex.tick().await; + } + }; + + shutdown.recv().or(ticker).await + }) + }); + } + + block_on(f(ex.clone())); + }); +} + +#[test] +fn smoke() { + do_run(|ex| async move { ex.spawn(async {}).await }); +} + +#[test] +fn yield_now() { + do_run(|ex| async move { ex.spawn(future::yield_now()).await }) +} + +// Miri does not support timers. +#[cfg(not(miri))] +#[test] +fn timer() { + do_run(|ex| async move { + ex.spawn(async_io::Timer::after(Duration::from_millis(5))) + .await; + }) +}