diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5805d46..063ac0f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,7 +17,7 @@ jobs: matrix: rust: - stable - - 1.56.1 + - 1.77.1 steps: - name: Checkout sources uses: actions/checkout@v3 diff --git a/Cargo.toml b/Cargo.toml index 316d68c..f88c110 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ name = "async-event" version = "0.1.0" authors = ["Serge Barral "] edition = "2021" -rust-version = "1.56" +rust-version = "1.77" license = "MIT OR Apache-2.0" repository = "https://github.com/asynchronics/async-event" readme = "README.md" @@ -18,6 +18,9 @@ An efficient async condition variable for lock-free algorithms. categories = ["asynchronous", "concurrency"] keywords = ["async", "event", "atomic", "futures"] +[dependencies] +pin-project-lite = "0.2" + [dev-dependencies] tokio = { version = "1", features = ["full"] } futures-executor = "0.3" diff --git a/src/lib.rs b/src/lib.rs index 472e126..695e8e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -69,6 +69,12 @@ //! }); //! ``` +// Temporary workaround until the `async_event_loom` flag can be whitelisted +// without a `build.rs` [1]. +// +// [1]: (https://github.com/rust-lang/rust/issues/124800). +#![allow(unexpected_cfgs)] + mod loom_exports; use std::future::Future; @@ -81,6 +87,7 @@ use std::task::{Context, Poll, Waker}; use loom_exports::cell::UnsafeCell; use loom_exports::sync::atomic::{self, AtomicBool}; use loom_exports::sync::Mutex; +use pin_project_lite::pin_project; /// An object that can receive or send notifications. pub struct Event { @@ -130,9 +137,29 @@ impl Event { /// Returns a future that can be `await`ed until the provided predicate is /// satisfied. - pub fn wait_until Option, T>(&self, predicate: F) -> WaitUntil { + pub fn wait_until(&self, predicate: F) -> WaitUntil + where + F: FnMut() -> Option, + { WaitUntil::new(&self.wait_set, predicate) } + + /// Returns a future that can be `await`ed until the provided predicate is + /// satisfied or until the provided future completes. + /// + /// The deadline is specified as a `Future` that is expected to resolves to + /// `()` after some duration, such as a `tokio::time::Sleep` future. + pub fn wait_until_or_timeout( + &self, + predicate: F, + deadline: D, + ) -> WaitUntilOrTimeout + where + F: FnMut() -> Option, + D: Future, + { + WaitUntilOrTimeout::new(&self.wait_set, predicate, deadline) + } } impl Default for Event { @@ -378,6 +405,53 @@ enum WaitUntilState { Completed, } +pin_project! { + /// A future that can be `await`ed until a predicate is satisfied or until a + /// deadline elapses. + pub struct WaitUntilOrTimeout<'a, F: FnMut() -> Option, T, D: Future> { + wait_until: WaitUntil<'a, F, T>, + #[pin] + deadline: D, + } +} + +impl<'a, F, T, D> WaitUntilOrTimeout<'a, F, T, D> +where + F: FnMut() -> Option, + D: Future, +{ + /// Creates a future associated with the specified event sink that can be + /// `await`ed until the specified predicate is satisfied, or until the + /// specified timeout future completes. + fn new(wait_set: &'a WaitSet, predicate: F, deadline: D) -> Self { + Self { + wait_until: WaitUntil::new(wait_set, predicate), + deadline, + } + } +} + +impl<'a, F, T, D> Future for WaitUntilOrTimeout<'a, F, T, D> +where + F: FnMut() -> Option, + D: Future, +{ + type Output = Option; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + if let Poll::Ready(value) = Pin::new(this.wait_until).poll(cx) { + Poll::Ready(Some(value)) + } else if this.deadline.poll(cx).is_ready() { + Poll::Ready(None) + } else { + Poll::Pending + } + } +} + /// A set of notifiers. /// /// The set wraps a Mutex-protected list of notifiers and manages a flag for