diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 2b346eae81c..481c97e95b8 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -931,6 +931,164 @@ impl Receiver { } } + /// Checks if a channel is empty. + /// + /// This method returns `true` if the channel has no messages. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::oneshot; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = oneshot::channel(); + /// assert!(rx.is_empty()); + /// + /// tx.send(0).unwrap(); + /// assert!(!rx.is_empty()); + /// + /// assert_eq!((&mut rx).await, Ok(0)); + /// assert!(rx.is_empty()); + /// } + /// ``` + pub fn is_empty(&self) -> bool { + if let Some(inner) = self.inner.as_ref() { + let state = State::load(&inner.state, Acquire); + if state.is_complete() { + // SAFETY: If `state.is_complete()` returns true, then the + // `VALUE_SENT` bit has been set and the sender side of the + // channel will no longer attempt to access the inner + // `UnsafeCell`. Therefore, it is now safe for us to access the + // cell. + // + // The channel is empty if it does not have a value. + unsafe { !inner.has_value() } + } else if state.is_closed() { + // The receiver closed the channel... + true + } else { + // No value has been sent yet. + true + } + } else { + true + } + } + + /// Checks if the channel has been closed. + /// + /// This happens when the corresponding sender is either dropped or sends a + /// value, or when this receiver has closed the channel. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::oneshot; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = oneshot::channel::<()>(); + /// assert!(!rx.is_closed()); + /// drop(tx); + /// assert!(rx.is_closed()); + /// } + /// ``` + /// + /// ``` + /// use tokio::sync::oneshot; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = oneshot::channel::<()>(); + /// assert!(!rx.is_closed()); + /// rx.close(); + /// assert!(rx.is_closed()); + /// } + /// ``` + /// + /// ``` + /// use tokio::sync::oneshot; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = oneshot::channel(); + /// assert!(!rx.is_closed()); + /// + /// tx.send(0).unwrap(); + /// assert!(rx.is_closed()); + /// } + /// ``` + pub fn is_closed(&self) -> bool { + if let Some(inner) = self.inner.as_ref() { + let state = State::load(&inner.state, Acquire); + state.is_closed() || state.is_complete() + } else { + true + } + } + + /// Checks if this receiver is finished. + /// + /// This returns true if this receiver has already yielded a [`Poll::Ready`] result, whether + /// that was a value `T`, or a [`RecvError`]. + /// + /// # Examples + /// + /// Sending a value and polling it. + /// + /// ``` + /// use futures::task::noop_waker_ref; + /// use tokio::sync::oneshot; + /// + /// use std::future::Future; + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = oneshot::channel(); + /// + /// // A receiver is not finished when it is initialized. + /// assert!(!rx.is_finished()); + /// + /// // A receiver is not finished it is polled and is still pending. + /// let poll = Pin::new(&mut rx).poll(&mut Context::from_waker(noop_waker_ref())); + /// assert_eq!(poll, Poll::Pending); + /// assert!(!rx.is_finished()); + /// + /// // A receiver is not finished if a value has been sent, but not yet read. + /// tx.send(0).unwrap(); + /// assert!(!rx.is_finished()); + /// + /// // A receiver *is* finished after it has been polled and yielded a value. + /// assert_eq!((&mut rx).await, Ok(0)); + /// assert!(rx.is_finished()); + /// } + /// ``` + /// + /// Dropping the sender. + /// + /// ``` + /// use tokio::sync::oneshot; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = oneshot::channel::<()>(); + /// + /// // A receiver is not immediately finished when the sender is dropped. + /// drop(tx); + /// assert!(!rx.is_finished()); + /// + /// // A receiver *is* finished after it has been polled and yielded an error. + /// let _ = (&mut rx).await.unwrap_err(); + /// assert!(rx.is_finished()); + /// } + /// ``` + pub fn is_finished(&self) -> bool { + self.inner.is_none() + } + /// Attempts to receive a value. /// /// If a pending value exists in the channel, it is returned. If no value @@ -1106,10 +1264,10 @@ impl Future for Receiver { let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() { #[cfg(all(tokio_unstable, feature = "tracing"))] - let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?; + let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into); #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let res = ready!(inner.poll_recv(cx))?; + let res = ready!(inner.poll_recv(cx)).map_err(Into::into); res } else { @@ -1117,7 +1275,7 @@ impl Future for Receiver { }; self.inner = None; - Ready(Ok(ret)) + Ready(ret) } } @@ -1233,6 +1391,19 @@ impl Inner { unsafe fn consume_value(&self) -> Option { self.value.with_mut(|ptr| (*ptr).take()) } + + /// Returns true if there is a value. This function does not check `state`. + /// + /// # Safety + /// + /// Calling this method concurrently on multiple threads will result in a + /// data race. The `VALUE_SENT` state bit is used to ensure that only the + /// sender *or* the receiver will call this method at a given point in time. + /// If `VALUE_SENT` is not set, then only the sender may call this method; + /// if it is set, then only the receiver may call this method. + unsafe fn has_value(&self) -> bool { + self.value.with(|ptr| (*ptr).is_some()) + } } unsafe impl Send for Inner {} diff --git a/tokio/tests/sync_oneshot.rs b/tokio/tests/sync_oneshot.rs index 127f7cb61b8..cc3546f887f 100644 --- a/tokio/tests/sync_oneshot.rs +++ b/tokio/tests/sync_oneshot.rs @@ -292,3 +292,127 @@ fn sender_changes_task() { assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx))); } + +#[test] +fn receiver_is_closed_send() { + let (tx, rx) = oneshot::channel::(); + assert!( + !rx.is_closed(), + "channel is NOT closed before value is sent" + ); + tx.send(17).unwrap(); + assert!(rx.is_closed(), "channel IS closed after value is sent"); +} + +#[test] +fn receiver_is_closed_drop() { + let (tx, rx) = oneshot::channel::(); + assert!( + !rx.is_closed(), + "channel is NOT closed before sender is dropped" + ); + drop(tx); + assert!(rx.is_closed(), "channel IS closed after sender is dropped"); +} + +#[test] +fn receiver_is_closed_rx_close() { + let (_tx, mut rx) = oneshot::channel::(); + assert!(!rx.is_closed()); + rx.close(); + assert!(rx.is_closed()); +} + +#[test] +fn receiver_is_empty_send() { + let (tx, mut rx) = oneshot::channel::(); + + assert!(rx.is_empty(), "channel IS empty before value is sent"); + tx.send(17).unwrap(); + assert!(!rx.is_empty(), "channel is NOT empty after value is sent"); + + let mut task = task::spawn(()); + let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx)); + assert_ready_eq!(poll, Ok(17)); + + assert!(rx.is_empty(), "channel IS empty after value is read"); +} + +#[test] +fn receiver_is_empty_drop() { + let (tx, mut rx) = oneshot::channel::(); + + assert!(rx.is_empty(), "channel IS empty before sender is dropped"); + drop(tx); + assert!(rx.is_empty(), "channel IS empty after sender is dropped"); + + let mut task = task::spawn(()); + let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx)); + assert_ready_err!(poll); + + assert!(rx.is_empty(), "channel IS empty after value is read"); +} + +#[test] +fn receiver_is_empty_rx_close() { + let (_tx, mut rx) = oneshot::channel::(); + assert!(rx.is_empty()); + rx.close(); + assert!(rx.is_empty()); +} + +#[test] +fn receiver_is_finished_send() { + let (tx, mut rx) = oneshot::channel::(); + + assert!( + !rx.is_finished(), + "channel is NOT finished before value is sent" + ); + tx.send(17).unwrap(); + assert!( + !rx.is_finished(), + "channel is NOT finished after value is sent" + ); + + let mut task = task::spawn(()); + let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx)); + assert_ready_eq!(poll, Ok(17)); + + assert!(rx.is_finished(), "channel IS finished after value is read"); +} + +#[test] +fn receiver_is_finished_drop() { + let (tx, mut rx) = oneshot::channel::(); + + assert!( + !rx.is_finished(), + "channel is NOT finished before sender is dropped" + ); + drop(tx); + assert!( + !rx.is_finished(), + "channel is NOT finished after sender is dropped" + ); + + let mut task = task::spawn(()); + let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx)); + assert_ready_err!(poll); + + assert!(rx.is_finished(), "channel IS finished after value is read"); +} + +#[test] +fn receiver_is_finished_rx_close() { + let (_tx, mut rx) = oneshot::channel::(); + assert!(!rx.is_finished(), "channel is NOT finished before closing"); + rx.close(); + assert!(!rx.is_finished(), "channel is NOT finished before closing"); + + let mut task = task::spawn(()); + let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx)); + assert_ready_err!(poll); + + assert!(rx.is_finished(), "channel IS finished after value is read"); +}