Skip to content

Commit 1622629

Browse files
committed
Add poll_send
1 parent 3cc7962 commit 1622629

File tree

7 files changed

+313
-52
lines changed

7 files changed

+313
-52
lines changed

.github/workflows/build-and-test.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ jobs:
1919

2020
- name: Set current week of the year in environnement
2121
if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macOS')
22-
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
22+
run: echo "CURRENT_WEEK=$(date +%V)" >> $GITHUB_ENV
2323

2424
- name: Set current week of the year in environnement
2525
if: startsWith(matrix.os, 'windows')
26-
run: echo "::set-env name=CURRENT_WEEK::$(Get-Date -UFormat %V)"
26+
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
2727

2828
- name: Install latest ${{ matrix.rust }}
2929
uses: actions-rs/toolchain@v1

.github/workflows/lint.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
- uses: actions/checkout@v2
1414

1515
- name: Set current week of the year in environnement
16-
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
16+
run: echo "CURRENT_WEEK=$(date +%V)" >> $GITHUB_ENV
1717

1818
- uses: actions-rs/toolchain@v1
1919
with:

.github/workflows/security.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
- uses: actions/checkout@v2
1414

1515
- name: Set current week of the year in environnement
16-
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
16+
run: echo "CURRENT_WEEK=$(date +%V)" >> $GITHUB_ENV
1717

1818
- uses: actions-rs/audit-check@v1
1919
with:

Cargo.toml

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@ readme = "README.md"
1616
concurrent-queue = "1.2.2"
1717
event-listener = "2.5.1"
1818
futures-core = "0.3.8"
19-
futures-sink = "0.3.8"
2019

2120
[dev-dependencies]
2221
blocking = "0.6.0"
2322
easy-parallel = "3.1.0"
24-
futures-lite = "1.11.0"
23+
futures-lite = "1.11.2"

src/lib.rs

+98-46
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ use std::usize;
4141
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
4242
use event_listener::{Event, EventListener};
4343
use futures_core::stream::Stream;
44-
use futures_sink::Sink;
4544

4645
struct Channel<T> {
4746
/// Inner message queue.
@@ -56,6 +55,9 @@ struct Channel<T> {
5655
/// Stream operations while the channel is empty and not closed.
5756
stream_ops: Event,
5857

58+
/// Sink operations while the channel is empty and not closed.
59+
sink_ops: Event,
60+
5961
/// The number of currently active `Sender`s.
6062
sender_count: AtomicUsize,
6163

@@ -113,13 +115,15 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
113115
send_ops: Event::new(),
114116
recv_ops: Event::new(),
115117
stream_ops: Event::new(),
118+
sink_ops: Event::new(),
116119
sender_count: AtomicUsize::new(1),
117120
receiver_count: AtomicUsize::new(1),
118121
});
119122

120123
let s = Sender {
121124
channel: channel.clone(),
122125
listener: None,
126+
sending_msg: None,
123127
};
124128
let r = Receiver {
125129
channel,
@@ -153,13 +157,15 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
153157
send_ops: Event::new(),
154158
recv_ops: Event::new(),
155159
stream_ops: Event::new(),
160+
sink_ops: Event::new(),
156161
sender_count: AtomicUsize::new(1),
157162
receiver_count: AtomicUsize::new(1),
158163
});
159164

160165
let s = Sender {
161166
channel: channel.clone(),
162167
listener: None,
168+
sending_msg: None,
163169
};
164170
let r = Receiver {
165171
channel,
@@ -180,6 +186,8 @@ pub struct Sender<T> {
180186

181187
/// Listens for a recv or close event to unblock this stream.
182188
listener: Option<EventListener>,
189+
190+
sending_msg: Option<T>,
183191
}
184192

185193
impl<T> Sender<T> {
@@ -427,6 +435,91 @@ impl<T> Sender<T> {
427435
pub fn sender_count(&self) -> usize {
428436
self.channel.sender_count.load(Ordering::SeqCst)
429437
}
438+
439+
/// Attempts to send a message into the channel.
440+
/// This method takes the message inside the `message` argument and buffer it if the channel is full.
441+
/// This method returns `Pending` if the channel is full and `Ready(SendError<T>)` if it is closed.
442+
/// # Panics
443+
/// Panics if call this method with `None` message in the first call.
444+
/// # Examples
445+
///
446+
/// ```
447+
/// use async_channel::{bounded, SendError};
448+
/// use futures_lite::future;
449+
/// use std::task::Poll;
450+
451+
/// future::block_on(async {
452+
/// future::poll_fn(|cx| -> Poll<()> {
453+
/// let (mut s, r) = bounded::<u32>(1);
454+
/// assert_eq!(s.poll_send(cx, &mut Some(1)), Poll::Ready(Ok(())));
455+
/// assert_eq!(s.poll_send(cx, &mut Some(2)), Poll::Pending);
456+
/// drop(r);
457+
/// assert_eq!(
458+
/// s.poll_send(cx, &mut Some(3)),
459+
/// Poll::Ready(Err(SendError(3)))
460+
/// );
461+
/// Poll::Ready(())
462+
/// })
463+
/// .await;
464+
/// });
465+
/// ```
466+
pub fn poll_send(
467+
&mut self,
468+
cx: &mut Context<'_>,
469+
msg: &mut Option<T>,
470+
) -> Poll<Result<(), SendError<T>>> {
471+
// take() the message when calling this function for the first time.
472+
473+
if let Some(msg) = msg.take() {
474+
self.sending_msg = Some(msg);
475+
}
476+
477+
loop {
478+
// If this sink is listening for events, first wait for a notification.
479+
if let Some(listener) = &mut self.listener {
480+
futures_core::ready!(Pin::new(listener).poll(cx));
481+
self.listener = None;
482+
}
483+
484+
loop {
485+
let message = self.sending_msg.take().unwrap();
486+
// Attempt to send the item immediately
487+
match self.try_send(message) {
488+
Ok(_) => {
489+
// Great! The item has been sent sucessfully.
490+
// The stream is not blocked on an event - drop the listener.
491+
self.listener = None;
492+
return Poll::Ready(Ok(()));
493+
}
494+
Err(e) => match e {
495+
TrySendError::Full(item) => {
496+
// The channel is full now.
497+
// Store the item back to the struct for the next loop or polling.
498+
self.sending_msg = Some(item);
499+
}
500+
TrySendError::Closed(item) => {
501+
// The channel is closed.
502+
// The stream is not blocked on an event - drop the listener.
503+
self.listener = None;
504+
return Poll::Ready(Err(SendError(item)));
505+
}
506+
},
507+
}
508+
509+
// Receiving failed - now start listening for notifications or wait for one.
510+
match &mut self.listener {
511+
Some(_) => {
512+
// Create a listener and try sending the message again.
513+
break;
514+
}
515+
None => {
516+
// Go back to the outer loop to poll the listener.
517+
self.listener = Some(self.channel.sink_ops.listen());
518+
}
519+
}
520+
}
521+
}
522+
}
430523
}
431524

432525
impl<T> Drop for Sender<T> {
@@ -456,55 +549,11 @@ impl<T> Clone for Sender<T> {
456549
Sender {
457550
channel: self.channel.clone(),
458551
listener: None,
552+
sending_msg: None,
459553
}
460554
}
461555
}
462556

463-
impl<T> Sink<T> for Sender<T> {
464-
type Error = TrySendError<T>;
465-
466-
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
467-
loop {
468-
if let Some(l) = &mut self.listener {
469-
futures_core::ready!(Pin::new(l).poll(cx));
470-
self.listener = None;
471-
}
472-
473-
if self.is_full() {
474-
let listener = self.channel.recv_ops.listen();
475-
self.listener = Some(listener);
476-
} else {
477-
return Poll::Ready(Ok(()));
478-
}
479-
}
480-
}
481-
482-
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
483-
self.close();
484-
Poll::Ready(Ok(()))
485-
}
486-
487-
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
488-
loop {
489-
if let Some(l) = &mut self.listener {
490-
futures_core::ready!(Pin::new(l).poll(cx));
491-
self.listener = None;
492-
}
493-
494-
if self.is_empty() {
495-
return Poll::Ready(Ok(()));
496-
} else {
497-
let listener = self.channel.recv_ops.listen();
498-
self.listener = Some(listener);
499-
}
500-
}
501-
}
502-
503-
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
504-
self.try_send(item)
505-
}
506-
}
507-
508557
/// The receiving side of a channel.
509558
///
510559
/// Receivers can be cloned and shared among threads. When all receivers associated with a channel
@@ -549,6 +598,8 @@ impl<T> Receiver<T> {
549598
// message or gets canceled, it will notify another blocked send operation.
550599
self.channel.send_ops.notify(1);
551600

601+
self.channel.sink_ops.notify(usize::MAX);
602+
552603
Ok(msg)
553604
}
554605
Err(PopError::Empty) => Err(TryRecvError::Empty),
@@ -777,6 +828,7 @@ impl<T> Drop for Receiver<T> {
777828
if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
778829
self.channel.close();
779830
}
831+
self.channel.sink_ops.notify(usize::MAX);
780832
}
781833
}
782834

tests/bounded.rs

+105
Original file line numberDiff line numberDiff line change
@@ -383,3 +383,108 @@ fn mpmc_stream() {
383383
assert_eq!(c.load(Ordering::SeqCst), THREADS);
384384
}
385385
}
386+
387+
#[test]
388+
fn poll_send() {
389+
let (mut s, r) = bounded::<u32>(1);
390+
391+
Parallel::new()
392+
.add(|| {
393+
future::block_on(async {
394+
future::poll_fn(|cx| s.poll_send(cx, &mut Some(7u32)))
395+
.await
396+
.unwrap();
397+
});
398+
sleep(ms(1000));
399+
future::block_on(async {
400+
future::poll_fn(|cx| s.poll_send(cx, &mut Some(8u32)))
401+
.await
402+
.unwrap();
403+
});
404+
sleep(ms(1000));
405+
future::block_on(async {
406+
future::poll_fn(|cx| s.poll_send(cx, &mut Some(9u32)))
407+
.await
408+
.unwrap();
409+
});
410+
sleep(ms(1000));
411+
future::block_on(async {
412+
future::poll_fn(|cx| s.poll_send(cx, &mut Some(10u32)))
413+
.await
414+
.unwrap();
415+
});
416+
})
417+
.add(|| {
418+
sleep(ms(1500));
419+
assert_eq!(future::block_on(r.recv()), Ok(7));
420+
assert_eq!(future::block_on(r.recv()), Ok(8));
421+
assert_eq!(future::block_on(r.recv()), Ok(9));
422+
})
423+
.run();
424+
}
425+
426+
#[test]
427+
fn spsc_poll_send() {
428+
const COUNT: usize = 25_000;
429+
430+
let (s, r) = bounded::<usize>(3);
431+
432+
Parallel::new()
433+
.add({
434+
let mut r = r.clone();
435+
move || {
436+
for _ in 0..COUNT {
437+
future::block_on(r.next()).unwrap();
438+
}
439+
}
440+
})
441+
.add(|| {
442+
let s = s.clone();
443+
for i in 0..COUNT {
444+
let mut s = s.clone();
445+
future::block_on(async {
446+
future::poll_fn(|cx| s.poll_send(cx, &mut Some(i)))
447+
.await
448+
.unwrap();
449+
});
450+
}
451+
})
452+
.run();
453+
}
454+
455+
#[test]
456+
fn mpmc_poll_send() {
457+
const COUNT: usize = 25_000;
458+
const THREADS: usize = 4;
459+
460+
let (s, r) = bounded::<usize>(3);
461+
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
462+
let v = &v;
463+
464+
Parallel::new()
465+
.each(0..THREADS, {
466+
let mut r = r.clone();
467+
move |_| {
468+
for _ in 0..COUNT {
469+
let n = future::block_on(r.next()).unwrap();
470+
v[n].fetch_add(1, Ordering::SeqCst);
471+
}
472+
}
473+
})
474+
.each(0..THREADS, |_| {
475+
let s = s.clone();
476+
for i in 0..COUNT {
477+
let mut s = s.clone();
478+
future::block_on(async {
479+
future::poll_fn(|cx| s.poll_send(cx, &mut Some(i)))
480+
.await
481+
.unwrap();
482+
});
483+
}
484+
})
485+
.run();
486+
487+
for c in v {
488+
assert_eq!(c.load(Ordering::SeqCst), THREADS);
489+
}
490+
}

0 commit comments

Comments
 (0)