Skip to content

Commit b9d4ea1

Browse files
committed
sync: oneshot::Receiver::is_finished()
this commit introduces a method that can be used to inquire whether a oneshot channel's receiver has or has not yielded a value. this is useful for callers that may be polling the receiver as a future, to avoid inducing a panic. the receiver panics if polled after yielding a `Poll::Ready<T>`. note that this is acceptable, per the `Future::poll()` documentation regarding panics: > Once a future has completed (returned Ready from poll), calling its poll method again may panic, block forever, or cause other kinds of problems; the Future trait places no requirements on the effects of such a call. NB: this commit makes one somewhat noteworthy change to the implementation of `<Receiver<T> as Future>::poll()`. the inner state is now taken when an error is yielded. this also follows the rules proscribed by `std::future::Future::poll()`, to be clear! the upside of this is that it means a broken or closed channel, e.g. when the sender is dropped, will settle as "finished" after it yields an error. see: <https://doc.rust-lang.org/stable/std/future/trait.Future.html#panics>. Signed-off-by: katelyn martin <[email protected]>
1 parent 4d90bf4 commit b9d4ea1

File tree

2 files changed

+120
-3
lines changed

2 files changed

+120
-3
lines changed

tokio/src/sync/oneshot.rs

+64-3
Original file line numberDiff line numberDiff line change
@@ -1028,6 +1028,67 @@ impl<T> Receiver<T> {
10281028
}
10291029
}
10301030

1031+
/// Checks if this receiver is finished.
1032+
///
1033+
/// This returns true if this receiver has already yielded a [`Poll::Ready`] result, whether
1034+
/// that was a value `T`, or a [`RecvError`].
1035+
///
1036+
/// # Examples
1037+
///
1038+
/// Sending a value and polling it.
1039+
///
1040+
/// ```
1041+
/// use futures::task::noop_waker_ref;
1042+
/// use tokio::sync::oneshot;
1043+
///
1044+
/// use std::future::Future;
1045+
/// use std::pin::Pin;
1046+
/// use std::task::{Context, Poll};
1047+
///
1048+
/// #[tokio::main]
1049+
/// async fn main() {
1050+
/// let (tx, mut rx) = oneshot::channel();
1051+
///
1052+
/// // A receiver is not finished when it is initialized.
1053+
/// assert!(!rx.is_finished());
1054+
///
1055+
/// // A receiver is not finished it is polled and is still pending.
1056+
/// let poll = Pin::new(&mut rx).poll(&mut Context::from_waker(noop_waker_ref()));
1057+
/// assert_eq!(poll, Poll::Pending);
1058+
///
1059+
/// // A receiver is not finished if a value has been sent, but not yet read.
1060+
/// tx.send(0).unwrap();
1061+
/// assert!(!rx.is_finished());
1062+
///
1063+
/// // A receiver *is* finished after it has been polled and yielded a value.
1064+
/// let poll = Pin::new(&mut rx).poll(&mut Context::from_waker(noop_waker_ref()));
1065+
/// assert_eq!(poll, Poll::Ready(Ok(0)));
1066+
/// assert!(rx.is_finished());
1067+
/// }
1068+
/// ```
1069+
///
1070+
/// Dropping the sender.
1071+
///
1072+
/// ```
1073+
/// use tokio::sync::oneshot;
1074+
///
1075+
/// #[tokio::main]
1076+
/// async fn main() {
1077+
/// let (tx, mut rx) = oneshot::channel::<()>();
1078+
///
1079+
/// // A receiver is not immediately finished when the sender is dropped.
1080+
/// drop(tx);
1081+
/// assert!(!rx.is_finished());
1082+
///
1083+
/// // A receiver *is* finished after it has been polled and yielded an error.
1084+
/// let _ = (&mut rx).await.unwrap_err();
1085+
/// assert!(rx.is_finished());
1086+
/// }
1087+
/// ```
1088+
pub fn is_finished(&self) -> bool {
1089+
self.inner.is_none()
1090+
}
1091+
10311092
/// Attempts to receive a value.
10321093
///
10331094
/// If a pending value exists in the channel, it is returned. If no value
@@ -1203,18 +1264,18 @@ impl<T> Future for Receiver<T> {
12031264

12041265
let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
12051266
#[cfg(all(tokio_unstable, feature = "tracing"))]
1206-
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;
1267+
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into);
12071268

12081269
#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
1209-
let res = ready!(inner.poll_recv(cx))?;
1270+
let res = ready!(inner.poll_recv(cx)).map_err(Into::into);
12101271

12111272
res
12121273
} else {
12131274
panic!("called after complete");
12141275
};
12151276

12161277
self.inner = None;
1217-
Ready(Ok(ret))
1278+
Ready(ret)
12181279
}
12191280
}
12201281

tokio/tests/sync_oneshot.rs

+56
Original file line numberDiff line numberDiff line change
@@ -360,3 +360,59 @@ fn receiver_is_empty_rx_close() {
360360
rx.close();
361361
assert!(rx.is_empty());
362362
}
363+
364+
#[test]
365+
fn receiver_is_finished_send() {
366+
let (tx, mut rx) = oneshot::channel::<i32>();
367+
368+
assert!(
369+
!rx.is_finished(),
370+
"channel is NOT finished before value is sent"
371+
);
372+
tx.send(17).unwrap();
373+
assert!(
374+
!rx.is_finished(),
375+
"channel is NOT finished after value is sent"
376+
);
377+
378+
let mut task = task::spawn(());
379+
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
380+
assert_ready_eq!(poll, Ok(17));
381+
382+
assert!(rx.is_finished(), "channel IS finished after value is read");
383+
}
384+
385+
#[test]
386+
fn receiver_is_finished_drop() {
387+
let (tx, mut rx) = oneshot::channel::<i32>();
388+
389+
assert!(
390+
!rx.is_finished(),
391+
"channel is NOT finished before sender is dropped"
392+
);
393+
drop(tx);
394+
assert!(
395+
!rx.is_finished(),
396+
"channel is NOT finished after sender is dropped"
397+
);
398+
399+
let mut task = task::spawn(());
400+
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
401+
assert_ready_err!(poll);
402+
403+
assert!(rx.is_finished(), "channel IS finished after value is read");
404+
}
405+
406+
#[test]
407+
fn receiver_is_finished_rx_close() {
408+
let (_tx, mut rx) = oneshot::channel::<i32>();
409+
assert!(!rx.is_finished(), "channel is NOT finished before closing");
410+
rx.close();
411+
assert!(!rx.is_finished(), "channel is NOT finished before closing");
412+
413+
let mut task = task::spawn(());
414+
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
415+
assert_ready_err!(poll);
416+
417+
assert!(rx.is_finished(), "channel IS finished after value is read");
418+
}

0 commit comments

Comments
 (0)