diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 6201464c546..d1b2616c3fa 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -931,6 +931,64 @@ impl Receiver { } } + /// Checks if this receiver is terminated. + /// + /// This function returns true if this receiver has already yielded a [`Poll::Ready`] result. + /// If so, this receiver should no longer be polled. + /// + /// # Examples + /// + /// Sending a value and polling it. + /// + /// ``` + /// use tokio::sync::oneshot; + /// + /// use std::task::Poll; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = oneshot::channel(); + /// + /// // A receiver is not terminated when it is initialized. + /// assert!(!rx.is_terminated()); + /// + /// // A receiver is not terminated it is polled and is still pending. + /// let poll = futures::poll!(&mut rx); + /// assert_eq!(poll, Poll::Pending); + /// assert!(!rx.is_terminated()); + /// + /// // A receiver is not terminated if a value has been sent, but not yet read. + /// tx.send(0).unwrap(); + /// assert!(!rx.is_terminated()); + /// + /// // A receiver *is* terminated after it has been polled and yielded a value. + /// assert_eq!((&mut rx).await, Ok(0)); + /// assert!(rx.is_terminated()); + /// } + /// ``` + /// + /// Dropping the sender. + /// + /// ``` + /// use tokio::sync::oneshot; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = oneshot::channel::<()>(); + /// + /// // A receiver is not immediately terminated when the sender is dropped. + /// drop(tx); + /// assert!(!rx.is_terminated()); + /// + /// // A receiver *is* terminated after it has been polled and yielded an error. + /// let _ = (&mut rx).await.unwrap_err(); + /// assert!(rx.is_terminated()); + /// } + /// ``` + pub fn is_terminated(&self) -> bool { + self.inner.is_none() + } + /// Attempts to receive a value. /// /// If a pending value exists in the channel, it is returned. If no value @@ -1106,10 +1164,10 @@ impl Future for Receiver { let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() { #[cfg(all(tokio_unstable, feature = "tracing"))] - let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?; + let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into); #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] - let res = ready!(inner.poll_recv(cx))?; + let res = ready!(inner.poll_recv(cx)).map_err(Into::into); res } else { @@ -1117,7 +1175,7 @@ impl Future for Receiver { }; self.inner = None; - Ready(Ok(ret)) + Ready(ret) } } diff --git a/tokio/tests/sync_oneshot.rs b/tokio/tests/sync_oneshot.rs index 127f7cb61b8..9443be36d04 100644 --- a/tokio/tests/sync_oneshot.rs +++ b/tokio/tests/sync_oneshot.rs @@ -292,3 +292,97 @@ fn sender_changes_task() { assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx))); } + +#[test] +fn receiver_is_terminated_send() { + let (tx, mut rx) = oneshot::channel::(); + + assert!( + !rx.is_terminated(), + "channel is NOT terminated before value is sent" + ); + tx.send(17).unwrap(); + assert!( + !rx.is_terminated(), + "channel is NOT terminated after value is sent" + ); + + let mut task = task::spawn(()); + let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx)); + assert_ready_eq!(poll, Ok(17)); + + assert!( + rx.is_terminated(), + "channel IS terminated after value is read" + ); +} + +#[test] +fn receiver_is_terminated_try_recv() { + let (tx, mut rx) = oneshot::channel::(); + + assert!( + !rx.is_terminated(), + "channel is NOT terminated before value is sent" + ); + tx.send(17).unwrap(); + assert!( + !rx.is_terminated(), + "channel is NOT terminated after value is sent" + ); + + let value = rx.try_recv().expect("value is waiting"); + assert_eq!(value, 17); + + assert!( + rx.is_terminated(), + "channel IS terminated after value is read" + ); +} + +#[test] +fn receiver_is_terminated_drop() { + let (tx, mut rx) = oneshot::channel::(); + + assert!( + !rx.is_terminated(), + "channel is NOT terminated before sender is dropped" + ); + drop(tx); + assert!( + !rx.is_terminated(), + "channel is NOT terminated after sender is dropped" + ); + + let mut task = task::spawn(()); + let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx)); + assert_ready_err!(poll); + + assert!( + rx.is_terminated(), + "channel IS terminated after value is read" + ); +} + +#[test] +fn receiver_is_terminated_rx_close() { + let (_tx, mut rx) = oneshot::channel::(); + assert!( + !rx.is_terminated(), + "channel is NOT terminated before closing" + ); + rx.close(); + assert!( + !rx.is_terminated(), + "channel is NOT terminated before closing" + ); + + let mut task = task::spawn(()); + let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx)); + assert_ready_err!(poll); + + assert!( + rx.is_terminated(), + "channel IS terminated after value is read" + ); +}