Skip to content

Commit 3bf824f

Browse files
committed
sync: oneshot::Receiver::is_empty()
this commit introduces a new method to `tokio::sync::oneshot::Receiver<T>`. this method returns true if the channel has no messages waiting to be received. this is similar to the existing `tokio::sync::mpsc::Receiver::is_empty()` and `tokio::sync::broadcast::Receiver::is_empty()` methods. see: * https://docs.rs/tokio/latest/tokio/sync/broadcast/struct.Receiver.html#method.is_empty * https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.is_empty * tokio-rs#7137 (comment) Signed-off-by: katelyn martin <[email protected]>
1 parent 8fc0759 commit 3bf824f

File tree

2 files changed

+96
-0
lines changed

2 files changed

+96
-0
lines changed

tokio/src/sync/oneshot.rs

+58
Original file line numberDiff line numberDiff line change
@@ -992,6 +992,51 @@ impl<T> Receiver<T> {
992992
self.inner.is_none()
993993
}
994994

995+
/// Checks if a channel is empty.
996+
///
997+
/// This method returns `true` if the channel has no messages.
998+
///
999+
/// # Examples
1000+
///
1001+
/// ```
1002+
/// use tokio::sync::oneshot;
1003+
///
1004+
/// #[tokio::main]
1005+
/// async fn main() {
1006+
/// let (tx, mut rx) = oneshot::channel();
1007+
/// assert!(rx.is_empty());
1008+
///
1009+
/// tx.send(0).unwrap();
1010+
/// assert!(!rx.is_empty());
1011+
///
1012+
/// let _ = (&mut rx).await;
1013+
/// assert!(rx.is_empty());
1014+
/// }
1015+
/// ```
1016+
pub fn is_empty(&self) -> bool {
1017+
if let Some(inner) = self.inner.as_ref() {
1018+
let state = State::load(&inner.state, Acquire);
1019+
if state.is_complete() {
1020+
// SAFETY: If `state.is_complete()` returns true, then the
1021+
// `VALUE_SENT` bit has been set and the sender side of the
1022+
// channel will no longer attempt to access the inner
1023+
// `UnsafeCell`. Therefore, it is now safe for us to access the
1024+
// cell.
1025+
//
1026+
// The channel is empty if it does not have a value.
1027+
unsafe { !inner.has_value() }
1028+
} else if state.is_closed() {
1029+
// The receiver closed the channel...
1030+
true
1031+
} else {
1032+
// No value has been sent yet.
1033+
true
1034+
}
1035+
} else {
1036+
true
1037+
}
1038+
}
1039+
9951040
/// Attempts to receive a value.
9961041
///
9971042
/// If a pending value exists in the channel, it is returned. If no value
@@ -1294,6 +1339,19 @@ impl<T> Inner<T> {
12941339
unsafe fn consume_value(&self) -> Option<T> {
12951340
self.value.with_mut(|ptr| (*ptr).take())
12961341
}
1342+
1343+
/// Returns true if there is a value. This function does not check `state`.
1344+
///
1345+
/// # Safety
1346+
///
1347+
/// Calling this method concurrently on multiple threads will result in a
1348+
/// data race. The `VALUE_SENT` state bit is used to ensure that only the
1349+
/// sender *or* the receiver will call this method at a given point in time.
1350+
/// If `VALUE_SENT` is not set, then only the sender may call this method;
1351+
/// if it is set, then only the receiver may call this method.
1352+
unsafe fn has_value(&self) -> bool {
1353+
self.value.with(|ptr| (*ptr).is_some())
1354+
}
12971355
}
12981356

12991357
unsafe impl<T: Send> Send for Inner<T> {}

tokio/tests/sync_oneshot.rs

+38
Original file line numberDiff line numberDiff line change
@@ -386,3 +386,41 @@ fn receiver_is_terminated_rx_close() {
386386
"channel IS terminated after value is read"
387387
);
388388
}
389+
390+
#[test]
391+
fn receiver_is_empty_send() {
392+
let (tx, mut rx) = oneshot::channel::<i32>();
393+
394+
assert!(rx.is_empty(), "channel IS empty before value is sent");
395+
tx.send(17).unwrap();
396+
assert!(!rx.is_empty(), "channel is NOT empty after value is sent");
397+
398+
let mut task = task::spawn(());
399+
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
400+
assert_ready_eq!(poll, Ok(17));
401+
402+
assert!(rx.is_empty(), "channel IS empty after value is read");
403+
}
404+
405+
#[test]
406+
fn receiver_is_empty_drop() {
407+
let (tx, mut rx) = oneshot::channel::<i32>();
408+
409+
assert!(rx.is_empty(), "channel IS empty before sender is dropped");
410+
drop(tx);
411+
assert!(rx.is_empty(), "channel IS empty after sender is dropped");
412+
413+
let mut task = task::spawn(());
414+
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
415+
assert_ready_err!(poll);
416+
417+
assert!(rx.is_empty(), "channel IS empty after value is read");
418+
}
419+
420+
#[test]
421+
fn receiver_is_empty_rx_close() {
422+
let (_tx, mut rx) = oneshot::channel::<i32>();
423+
assert!(rx.is_empty());
424+
rx.close();
425+
assert!(rx.is_empty());
426+
}

0 commit comments

Comments
 (0)