Skip to content

Commit 82b79f3

Browse files
committed
sync: oneshot::Receiver::is_empty()
this commit introduces a new method to `tokio::sync::oneshot::Receiver<T>`. this method returns true if the channel has no messages waiting to be received. this is similar to the existing `tokio::sync::mpsc::Receiver::is_empty()` and `tokio::sync::broadcast::Receiver::is_empty()` methods. see: * https://docs.rs/tokio/latest/tokio/sync/broadcast/struct.Receiver.html#method.is_empty * https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.is_empty * tokio-rs#7137 (comment) Signed-off-by: katelyn martin <[email protected]>
1 parent 0a15768 commit 82b79f3

File tree

2 files changed

+101
-0
lines changed

2 files changed

+101
-0
lines changed

tokio/src/sync/oneshot.rs

+63
Original file line numberDiff line numberDiff line change
@@ -931,6 +931,56 @@ impl<T> Receiver<T> {
931931
}
932932
}
933933

934+
/// Checks if a channel is empty.
935+
///
936+
/// This method returns `true` if the channel has no messages.
937+
///
938+
/// # Examples
939+
///
940+
/// ```
941+
/// use futures::task::noop_waker_ref;
942+
/// use tokio::sync::oneshot;
943+
///
944+
/// use std::future::Future;
945+
/// use std::pin::Pin;
946+
/// use std::task::{Context, Poll};
947+
///
948+
/// #[tokio::main]
949+
/// async fn main() {
950+
/// let (tx, mut rx) = oneshot::channel();
951+
/// assert!(rx.is_empty());
952+
///
953+
/// tx.send(0).unwrap();
954+
/// assert!(!rx.is_empty());
955+
///
956+
/// let _ = (&mut rx).await;
957+
/// assert!(rx.is_empty());
958+
/// }
959+
/// ```
960+
pub fn is_empty(&self) -> bool {
961+
if let Some(inner) = self.inner.as_ref() {
962+
let state = State::load(&inner.state, Acquire);
963+
if state.is_complete() {
964+
// SAFETY: If `state.is_complete()` returns true, then the
965+
// `VALUE_SENT` bit has been set and the sender side of the
966+
// channel will no longer attempt to access the inner
967+
// `UnsafeCell`. Therefore, it is now safe for us to access the
968+
// cell.
969+
//
970+
// The channel is empty if it does not have a value.
971+
unsafe { !inner.has_value() }
972+
} else if state.is_closed() {
973+
// The receiver closed the channel...
974+
true
975+
} else {
976+
// No value has been sent yet.
977+
true
978+
}
979+
} else {
980+
true
981+
}
982+
}
983+
934984
/// Attempts to receive a value.
935985
///
936986
/// If a pending value exists in the channel, it is returned. If no value
@@ -1233,6 +1283,19 @@ impl<T> Inner<T> {
12331283
unsafe fn consume_value(&self) -> Option<T> {
12341284
self.value.with_mut(|ptr| (*ptr).take())
12351285
}
1286+
1287+
/// Returns true if there is a value. This function does not check `state`.
1288+
///
1289+
/// # Safety
1290+
///
1291+
/// Calling this method concurrently on multiple threads will result in a
1292+
/// data race. The `VALUE_SENT` state bit is used to ensure that only the
1293+
/// sender *or* the receiver will call this method at a given point in time.
1294+
/// If `VALUE_SENT` is not set, then only the sender may call this method;
1295+
/// if it is set, then only the receiver may call this method.
1296+
unsafe fn has_value(&self) -> bool {
1297+
self.value.with(|ptr| (*ptr).is_some())
1298+
}
12361299
}
12371300

12381301
unsafe impl<T: Send> Send for Inner<T> {}

tokio/tests/sync_oneshot.rs

+38
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,41 @@ fn sender_changes_task() {
292292

293293
assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
294294
}
295+
296+
#[test]
297+
fn receiver_is_empty_send() {
298+
let (tx, mut rx) = oneshot::channel::<i32>();
299+
300+
assert!(rx.is_empty(), "channel IS empty before value is sent");
301+
tx.send(17).unwrap();
302+
assert!(!rx.is_empty(), "channel is NOT empty after value is sent");
303+
304+
let mut task = task::spawn(());
305+
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
306+
assert_ready_eq!(poll, Ok(17));
307+
308+
assert!(rx.is_empty(), "channel IS empty after value is read");
309+
}
310+
311+
#[test]
312+
fn receiver_is_empty_drop() {
313+
let (tx, mut rx) = oneshot::channel::<i32>();
314+
315+
assert!(rx.is_empty(), "channel IS empty before sender is dropped");
316+
drop(tx);
317+
assert!(rx.is_empty(), "channel IS empty after sender is dropped");
318+
319+
let mut task = task::spawn(());
320+
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
321+
assert_ready_err!(poll);
322+
323+
assert!(rx.is_empty(), "channel IS empty after value is read");
324+
}
325+
326+
#[test]
327+
fn receiver_is_empty_rx_close() {
328+
let (_tx, mut rx) = oneshot::channel::<i32>();
329+
assert!(rx.is_empty());
330+
rx.close();
331+
assert!(rx.is_empty());
332+
}

0 commit comments

Comments
 (0)