Skip to content

Commit 6cce703

Browse files
committed
Add poll_send method for Sender
1 parent 15c78f9 commit 6cce703

File tree

7 files changed

+321
-7
lines changed

7 files changed

+321
-7
lines changed

.github/workflows/build-and-test.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ jobs:
1919

2020
- name: Set current week of the year in environnement
2121
if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macOS')
22-
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
22+
run: echo "CURRENT_WEEK=$(date +%V)" >> $GITHUB_ENV
2323

2424
- name: Set current week of the year in environnement
2525
if: startsWith(matrix.os, 'windows')
26-
run: echo "::set-env name=CURRENT_WEEK::$(Get-Date -UFormat %V)"
26+
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
2727

2828
- name: Install latest ${{ matrix.rust }}
2929
uses: actions-rs/toolchain@v1

.github/workflows/lint.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
- uses: actions/checkout@v2
1414

1515
- name: Set current week of the year in environnement
16-
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
16+
run: echo "CURRENT_WEEK=$(date +%V)" >> $GITHUB_ENV
1717

1818
- uses: actions-rs/toolchain@v1
1919
with:

.github/workflows/security.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
- uses: actions/checkout@v2
1414

1515
- name: Set current week of the year in environnement
16-
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
16+
run: echo "CURRENT_WEEK=$(date +%V)" >> $GITHUB_ENV
1717

1818
- uses: actions-rs/audit-check@v1
1919
with:

Cargo.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ readme = "README.md"
1414

1515
[dependencies]
1616
concurrent-queue = "1.2.2"
17-
event-listener = "2.4.0"
18-
futures-core = "0.3.5"
17+
event-listener = "2.5.1"
18+
futures-core = "0.3.8"
1919

2020
[dev-dependencies]
2121
blocking = "0.6.0"
2222
easy-parallel = "3.1.0"
23-
futures-lite = "1.11.0"
23+
futures-lite = "1.11.2"

src/lib.rs

+104
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ struct Channel<T> {
5555
/// Stream operations while the channel is empty and not closed.
5656
stream_ops: Event,
5757

58+
/// Sink operations while the channel is empty and not closed.
59+
sink_ops: Event,
60+
5861
/// The number of currently active `Sender`s.
5962
sender_count: AtomicUsize,
6063

@@ -112,12 +115,15 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
112115
send_ops: Event::new(),
113116
recv_ops: Event::new(),
114117
stream_ops: Event::new(),
118+
sink_ops: Event::new(),
115119
sender_count: AtomicUsize::new(1),
116120
receiver_count: AtomicUsize::new(1),
117121
});
118122

119123
let s = Sender {
120124
channel: channel.clone(),
125+
listener: None,
126+
sending_msg: None,
121127
};
122128
let r = Receiver {
123129
channel,
@@ -151,12 +157,15 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
151157
send_ops: Event::new(),
152158
recv_ops: Event::new(),
153159
stream_ops: Event::new(),
160+
sink_ops: Event::new(),
154161
sender_count: AtomicUsize::new(1),
155162
receiver_count: AtomicUsize::new(1),
156163
});
157164

158165
let s = Sender {
159166
channel: channel.clone(),
167+
listener: None,
168+
sending_msg: None,
160169
};
161170
let r = Receiver {
162171
channel,
@@ -174,6 +183,11 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
174183
pub struct Sender<T> {
175184
/// Inner channel state.
176185
channel: Arc<Channel<T>>,
186+
187+
/// Listens for a recv or close event to unblock this stream.
188+
listener: Option<EventListener>,
189+
190+
sending_msg: Option<T>,
177191
}
178192

179193
impl<T> Sender<T> {
@@ -421,6 +435,91 @@ impl<T> Sender<T> {
421435
pub fn sender_count(&self) -> usize {
422436
self.channel.sender_count.load(Ordering::SeqCst)
423437
}
438+
439+
/// Attempts to send a message into the channel.
440+
/// This method takes the message inside the `message` argument and buffer it if the channel is full.
441+
/// This method returns `Pending` if the channel is full and `Ready(SendError<T>)` if it is closed.
442+
/// # Panics
443+
/// Panics if call this method with `None` message in the first call.
444+
/// # Examples
445+
///
446+
/// ```
447+
/// use async_channel::{bounded, SendError};
448+
/// use futures_lite::future;
449+
/// use std::task::Poll;
450+
451+
/// future::block_on(async {
452+
/// future::poll_fn(|cx| -> Poll<()> {
453+
/// let (mut s, r) = bounded::<u32>(1);
454+
/// assert_eq!(s.poll_send(cx, &mut Some(1)), Poll::Ready(Ok(())));
455+
/// assert_eq!(s.poll_send(cx, &mut Some(2)), Poll::Pending);
456+
/// drop(r);
457+
/// assert_eq!(
458+
/// s.poll_send(cx, &mut Some(3)),
459+
/// Poll::Ready(Err(SendError(3)))
460+
/// );
461+
/// Poll::Ready(())
462+
/// })
463+
/// .await;
464+
/// });
465+
/// ```
466+
pub fn poll_send(
467+
&mut self,
468+
cx: &mut Context<'_>,
469+
msg: &mut Option<T>,
470+
) -> Poll<Result<(), SendError<T>>> {
471+
// take() the message when calling this function for the first time.
472+
473+
if let Some(msg) = msg.take() {
474+
self.sending_msg = Some(msg);
475+
}
476+
477+
loop {
478+
// If this sink is listening for events, first wait for a notification.
479+
if let Some(listener) = &mut self.listener {
480+
futures_core::ready!(Pin::new(listener).poll(cx));
481+
self.listener = None;
482+
}
483+
484+
loop {
485+
let message = self.sending_msg.take().unwrap();
486+
// Attempt to send the item immediately
487+
match self.try_send(message) {
488+
Ok(_) => {
489+
// Great! The item has been sent sucessfully.
490+
// The stream is not blocked on an event - drop the listener.
491+
self.listener = None;
492+
return Poll::Ready(Ok(()));
493+
}
494+
Err(e) => match e {
495+
TrySendError::Full(item) => {
496+
// The channel is full now.
497+
// Store the item back to the struct for the next loop or polling.
498+
self.sending_msg = Some(item);
499+
}
500+
TrySendError::Closed(item) => {
501+
// The channel is closed.
502+
// The stream is not blocked on an event - drop the listener.
503+
self.listener = None;
504+
return Poll::Ready(Err(SendError(item)));
505+
}
506+
},
507+
}
508+
509+
// Receiving failed - now start listening for notifications or wait for one.
510+
match &mut self.listener {
511+
Some(_) => {
512+
// Create a listener and try sending the message again.
513+
break;
514+
}
515+
None => {
516+
// Go back to the outer loop to poll the listener.
517+
self.listener = Some(self.channel.sink_ops.listen());
518+
}
519+
}
520+
}
521+
}
522+
}
424523
}
425524

426525
impl<T> Drop for Sender<T> {
@@ -449,6 +548,8 @@ impl<T> Clone for Sender<T> {
449548

450549
Sender {
451550
channel: self.channel.clone(),
551+
listener: None,
552+
sending_msg: None,
452553
}
453554
}
454555
}
@@ -497,6 +598,8 @@ impl<T> Receiver<T> {
497598
// message or gets canceled, it will notify another blocked send operation.
498599
self.channel.send_ops.notify(1);
499600

601+
self.channel.sink_ops.notify(usize::MAX);
602+
500603
Ok(msg)
501604
}
502605
Err(PopError::Empty) => Err(TryRecvError::Empty),
@@ -725,6 +828,7 @@ impl<T> Drop for Receiver<T> {
725828
if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
726829
self.channel.close();
727830
}
831+
self.channel.sink_ops.notify(usize::MAX);
728832
}
729833
}
730834

tests/bounded.rs

+105
Original file line numberDiff line numberDiff line change
@@ -383,3 +383,108 @@ fn mpmc_stream() {
383383
assert_eq!(c.load(Ordering::SeqCst), THREADS);
384384
}
385385
}
386+
387+
#[test]
388+
fn poll_send() {
389+
let (mut s, r) = bounded::<u32>(1);
390+
391+
Parallel::new()
392+
.add(|| {
393+
future::block_on(async {
394+
future::poll_fn(|cx| s.poll_send(cx, &mut Some(7u32)))
395+
.await
396+
.unwrap();
397+
});
398+
sleep(ms(1000));
399+
future::block_on(async {
400+
future::poll_fn(|cx| s.poll_send(cx, &mut Some(8u32)))
401+
.await
402+
.unwrap();
403+
});
404+
sleep(ms(1000));
405+
future::block_on(async {
406+
future::poll_fn(|cx| s.poll_send(cx, &mut Some(9u32)))
407+
.await
408+
.unwrap();
409+
});
410+
sleep(ms(1000));
411+
future::block_on(async {
412+
future::poll_fn(|cx| s.poll_send(cx, &mut Some(10u32)))
413+
.await
414+
.unwrap();
415+
});
416+
})
417+
.add(|| {
418+
sleep(ms(1500));
419+
assert_eq!(future::block_on(r.recv()), Ok(7));
420+
assert_eq!(future::block_on(r.recv()), Ok(8));
421+
assert_eq!(future::block_on(r.recv()), Ok(9));
422+
})
423+
.run();
424+
}
425+
426+
#[test]
427+
fn spsc_poll_send() {
428+
const COUNT: usize = 25_000;
429+
430+
let (s, r) = bounded::<usize>(3);
431+
432+
Parallel::new()
433+
.add({
434+
let mut r = r.clone();
435+
move || {
436+
for _ in 0..COUNT {
437+
future::block_on(r.next()).unwrap();
438+
}
439+
}
440+
})
441+
.add(|| {
442+
let s = s.clone();
443+
for i in 0..COUNT {
444+
let mut s = s.clone();
445+
future::block_on(async {
446+
future::poll_fn(|cx| s.poll_send(cx, &mut Some(i)))
447+
.await
448+
.unwrap();
449+
});
450+
}
451+
})
452+
.run();
453+
}
454+
455+
#[test]
456+
fn mpmc_poll_send() {
457+
const COUNT: usize = 25_000;
458+
const THREADS: usize = 4;
459+
460+
let (s, r) = bounded::<usize>(3);
461+
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
462+
let v = &v;
463+
464+
Parallel::new()
465+
.each(0..THREADS, {
466+
let mut r = r.clone();
467+
move |_| {
468+
for _ in 0..COUNT {
469+
let n = future::block_on(r.next()).unwrap();
470+
v[n].fetch_add(1, Ordering::SeqCst);
471+
}
472+
}
473+
})
474+
.each(0..THREADS, |_| {
475+
let s = s.clone();
476+
for i in 0..COUNT {
477+
let mut s = s.clone();
478+
future::block_on(async {
479+
future::poll_fn(|cx| s.poll_send(cx, &mut Some(i)))
480+
.await
481+
.unwrap();
482+
});
483+
}
484+
})
485+
.run();
486+
487+
for c in v {
488+
assert_eq!(c.load(Ordering::SeqCst), THREADS);
489+
}
490+
}

0 commit comments

Comments
 (0)