From 8c8f74017524ca9870ff43cd38ae6041bdca0a38 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 2 Feb 2025 00:00:00 +0000 Subject: [PATCH] sync: `oneshot::Receiver::is_closed()` this commit introduces a new method to `tokio::sync::oneshot::Receiver`. this method returns true if the channel is closed. this is similar to the existing `tokio::sync::mpsc::UnboundedReceiver::is_closed()` and `tokio::sync::mpsc::Receiver::is_closed()` methods. see: * https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.is_closed * https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#method.is_closed * https://github.com/tokio-rs/tokio/pull/7137#discussion_r1940242052 Signed-off-by: katelyn martin --- tokio/src/sync/oneshot.rs | 58 +++++++++++++++++++++++++++++++++++++ tokio/tests/sync_oneshot.rs | 30 +++++++++++++++++++ 2 files changed, 88 insertions(+) diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 2b346eae81c..886de80496a 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -931,6 +931,64 @@ impl Receiver { } } + /// Checks if the channel has been closed. + /// + /// This happens when the corresponding sender is either dropped or sends a + /// value, or when this receiver has closed the channel. + /// + /// # Examples + /// + /// Sending a value. + /// + /// ``` + /// use tokio::sync::oneshot; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = oneshot::channel(); + /// assert!(!rx.is_closed()); + /// + /// tx.send(0).unwrap(); + /// assert!(rx.is_closed()); + /// } + /// ``` + /// + /// Dropping the sender. + /// + /// ``` + /// use tokio::sync::oneshot; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = oneshot::channel::<()>(); + /// assert!(!rx.is_closed()); + /// drop(tx); + /// assert!(rx.is_closed()); + /// } + /// ``` + /// + /// Closing the receiver. + /// + /// ``` + /// use tokio::sync::oneshot; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = oneshot::channel::<()>(); + /// assert!(!rx.is_closed()); + /// rx.close(); + /// assert!(rx.is_closed()); + /// } + /// ``` + pub fn is_closed(&self) -> bool { + if let Some(inner) = self.inner.as_ref() { + let state = State::load(&inner.state, Acquire); + state.is_closed() || state.is_complete() + } else { + true + } + } + /// Attempts to receive a value. /// /// If a pending value exists in the channel, it is returned. If no value diff --git a/tokio/tests/sync_oneshot.rs b/tokio/tests/sync_oneshot.rs index 127f7cb61b8..1acd43707d6 100644 --- a/tokio/tests/sync_oneshot.rs +++ b/tokio/tests/sync_oneshot.rs @@ -292,3 +292,33 @@ fn sender_changes_task() { assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx))); } + +#[test] +fn receiver_is_closed_send() { + let (tx, rx) = oneshot::channel::(); + assert!( + !rx.is_closed(), + "channel is NOT closed before value is sent" + ); + tx.send(17).unwrap(); + assert!(rx.is_closed(), "channel IS closed after value is sent"); +} + +#[test] +fn receiver_is_closed_drop() { + let (tx, rx) = oneshot::channel::(); + assert!( + !rx.is_closed(), + "channel is NOT closed before sender is dropped" + ); + drop(tx); + assert!(rx.is_closed(), "channel IS closed after sender is dropped"); +} + +#[test] +fn receiver_is_closed_rx_close() { + let (_tx, mut rx) = oneshot::channel::(); + assert!(!rx.is_closed()); + rx.close(); + assert!(rx.is_closed()); +}