Skip to content

Commit e18318b

Browse files
committed
sync: oneshot::Receiver::is_terminated()
this commit introduces a new method to `tokio::sync::oneshot::Receiver<T>`. broadly speaking, users of the oneshot channel are encouraged to `.await` the `Receiver<T>` directly, as it will only yield a single value. users implementing their own `std::future::Future`s directly may instead poll the receiver via `<Receiver<T> as Future<Output = Result<T, RecvError>::poll(..)`. note that the contract of `Future::poll()` states that clients should not poll a future after it has yielded `Poll::Ready(value)`. this commit provides a way to inspect the state of a receiver, to avoid violating the contact of `Future::poll(..)`, or requiring that a oneshot channel users track this state themselves externally via mechanisms like `futures::future::FusedFuture`, or wrapping the receiver in an `Option<T>`. NB: this makes a small behavioral change to the implementation of `<Receiver<T> as Future<Output = Result<T, RecvError>::poll(..)`. this change is acceptable, per the `Future::poll()` documentation regarding panics: > Once a future has completed (returned Ready from poll), calling its poll method again may panic, block forever, or cause other kinds of problems; the Future trait places no requirements on the effects of such a call. the upside of this is that it means a broken or closed channel, e.g. when the sender is dropped, will settle as "finished" after it yields an error. see: * tokio-rs#7137 (comment) * https://doc.rust-lang.org/stable/std/future/trait.Future.html#panics Signed-off-by: katelyn martin <[email protected]>
1 parent 8e13417 commit e18318b

File tree

2 files changed

+135
-3
lines changed

2 files changed

+135
-3
lines changed

tokio/src/sync/oneshot.rs

+64-3
Original file line numberDiff line numberDiff line change
@@ -931,6 +931,67 @@ impl<T> Receiver<T> {
931931
}
932932
}
933933

934+
/// Checks if this receiver is terminated.
935+
///
936+
/// This function returns true if this receiver has already yielded a [`Poll::Ready`] result.
937+
/// If so, this receiver should no longer be polled.
938+
///
939+
/// # Examples
940+
///
941+
/// Sending a value and polling it.
942+
///
943+
/// ```
944+
/// use futures::task::noop_waker_ref;
945+
/// use tokio::sync::oneshot;
946+
///
947+
/// use std::future::Future;
948+
/// use std::pin::Pin;
949+
/// use std::task::{Context, Poll};
950+
///
951+
/// #[tokio::main]
952+
/// async fn main() {
953+
/// let (tx, mut rx) = oneshot::channel();
954+
///
955+
/// // A receiver is not terminated when it is initialized.
956+
/// assert!(!rx.is_terminated());
957+
///
958+
/// // A receiver is not terminated it is polled and is still pending.
959+
/// let poll = Pin::new(&mut rx).poll(&mut Context::from_waker(noop_waker_ref()));
960+
/// assert_eq!(poll, Poll::Pending);
961+
/// assert!(!rx.is_terminated());
962+
///
963+
/// // A receiver is not terminated if a value has been sent, but not yet read.
964+
/// tx.send(0).unwrap();
965+
/// assert!(!rx.is_terminated());
966+
///
967+
/// // A receiver *is* terminated after it has been polled and yielded a value.
968+
/// assert_eq!((&mut rx).await, Ok(0));
969+
/// assert!(rx.is_terminated());
970+
/// }
971+
/// ```
972+
///
973+
/// Dropping the sender.
974+
///
975+
/// ```
976+
/// use tokio::sync::oneshot;
977+
///
978+
/// #[tokio::main]
979+
/// async fn main() {
980+
/// let (tx, mut rx) = oneshot::channel::<()>();
981+
///
982+
/// // A receiver is not immediately terminated when the sender is dropped.
983+
/// drop(tx);
984+
/// assert!(!rx.is_terminated());
985+
///
986+
/// // A receiver *is* terminated after it has been polled and yielded an error.
987+
/// let _ = (&mut rx).await.unwrap_err();
988+
/// assert!(rx.is_terminated());
989+
/// }
990+
/// ```
991+
pub fn is_terminated(&self) -> bool {
992+
self.inner.is_none()
993+
}
994+
934995
/// Attempts to receive a value.
935996
///
936997
/// If a pending value exists in the channel, it is returned. If no value
@@ -1106,18 +1167,18 @@ impl<T> Future for Receiver<T> {
11061167

11071168
let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
11081169
#[cfg(all(tokio_unstable, feature = "tracing"))]
1109-
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;
1170+
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx))).map_err(Into::into);
11101171

11111172
#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
1112-
let res = ready!(inner.poll_recv(cx))?;
1173+
let res = ready!(inner.poll_recv(cx)).map_err(Into::into);
11131174

11141175
res
11151176
} else {
11161177
panic!("called after complete");
11171178
};
11181179

11191180
self.inner = None;
1120-
Ready(Ok(ret))
1181+
Ready(ret)
11211182
}
11221183
}
11231184

tokio/tests/sync_oneshot.rs

+71
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,74 @@ fn sender_changes_task() {
292292

293293
assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
294294
}
295+
296+
#[test]
297+
fn receiver_is_terminated_send() {
298+
let (tx, mut rx) = oneshot::channel::<i32>();
299+
300+
assert!(
301+
!rx.is_terminated(),
302+
"channel is NOT terminated before value is sent"
303+
);
304+
tx.send(17).unwrap();
305+
assert!(
306+
!rx.is_terminated(),
307+
"channel is NOT terminated after value is sent"
308+
);
309+
310+
let mut task = task::spawn(());
311+
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
312+
assert_ready_eq!(poll, Ok(17));
313+
314+
assert!(
315+
rx.is_terminated(),
316+
"channel IS terminated after value is read"
317+
);
318+
}
319+
320+
#[test]
321+
fn receiver_is_terminated_drop() {
322+
let (tx, mut rx) = oneshot::channel::<i32>();
323+
324+
assert!(
325+
!rx.is_terminated(),
326+
"channel is NOT terminated before sender is dropped"
327+
);
328+
drop(tx);
329+
assert!(
330+
!rx.is_terminated(),
331+
"channel is NOT terminated after sender is dropped"
332+
);
333+
334+
let mut task = task::spawn(());
335+
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
336+
assert_ready_err!(poll);
337+
338+
assert!(
339+
rx.is_terminated(),
340+
"channel IS terminated after value is read"
341+
);
342+
}
343+
344+
#[test]
345+
fn receiver_is_terminated_rx_close() {
346+
let (_tx, mut rx) = oneshot::channel::<i32>();
347+
assert!(
348+
!rx.is_terminated(),
349+
"channel is NOT terminated before closing"
350+
);
351+
rx.close();
352+
assert!(
353+
!rx.is_terminated(),
354+
"channel is NOT terminated before closing"
355+
);
356+
357+
let mut task = task::spawn(());
358+
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
359+
assert_ready_err!(poll);
360+
361+
assert!(
362+
rx.is_terminated(),
363+
"channel IS terminated after value is read"
364+
);
365+
}

0 commit comments

Comments
 (0)