Skip to content

Update to new futures_api #1514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ matrix:

# When updating this, the reminder to update the minimum required version in README.md.
- name: cargo test (minimum required version)
rust: nightly-2019-02-15
rust: nightly-2019-04-13

- name: cargo clippy
rust: nightly
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"futures-channel",
"futures-executor",
"futures-io",
"futures-select-macro",
"futures-sink",
"futures-util",
"futures-test",
Expand Down
40 changes: 20 additions & 20 deletions futures-channel/benches/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
ready,
stream::{Stream, StreamExt},
sink::Sink,
task::{Waker, Poll},
task::{Context, Poll},
},
futures_test::task::noop_waker_ref,
std::pin::Pin,
Expand All @@ -18,7 +18,7 @@ use {
/// Single producer, single consumer
#[bench]
fn unbounded_1_tx(b: &mut Bencher) {
let waker = noop_waker_ref();
let mut cx = Context::from_waker(noop_waker_ref());
b.iter(|| {
let (tx, mut rx) = mpsc::unbounded();

Expand All @@ -27,20 +27,20 @@ fn unbounded_1_tx(b: &mut Bencher) {
for i in 0..1000 {

// Poll, not ready, park
assert_eq!(Poll::Pending, rx.poll_next_unpin(waker));
assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));

UnboundedSender::unbounded_send(&tx, i).unwrap();

// Now poll ready
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(waker));
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
}
})
}

/// 100 producers, single consumer
#[bench]
fn unbounded_100_tx(b: &mut Bencher) {
let waker = noop_waker_ref();
let mut cx = Context::from_waker(noop_waker_ref());
b.iter(|| {
let (tx, mut rx) = mpsc::unbounded();

Expand All @@ -49,26 +49,26 @@ fn unbounded_100_tx(b: &mut Bencher) {
// 1000 send/recv operations total, result should be divided by 1000
for _ in 0..10 {
for i in 0..tx.len() {
assert_eq!(Poll::Pending, rx.poll_next_unpin(waker));
assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));

UnboundedSender::unbounded_send(&tx[i], i).unwrap();

assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(waker));
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
}
}
})
}

#[bench]
fn unbounded_uncontended(b: &mut Bencher) {
let waker = noop_waker_ref();
let mut cx = Context::from_waker(noop_waker_ref());
b.iter(|| {
let (tx, mut rx) = mpsc::unbounded();

for i in 0..1000 {
UnboundedSender::unbounded_send(&tx, i).expect("send");
// No need to create a task, because poll is not going to park.
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(waker));
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
}
})
}
Expand All @@ -84,41 +84,41 @@ struct TestSender {
impl Stream for TestSender {
type Item = u32;

fn poll_next(mut self: Pin<&mut Self>, waker: &Waker)
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
let this = &mut *self;
let mut tx = Pin::new(&mut this.tx);

ready!(tx.as_mut().poll_ready(waker)).unwrap();
ready!(tx.as_mut().poll_ready(cx)).unwrap();
tx.as_mut().start_send(this.last + 1).unwrap();
this.last += 1;
assert_eq!(Poll::Ready(Ok(())), tx.as_mut().poll_flush(waker));
assert_eq!(Poll::Ready(Ok(())), tx.as_mut().poll_flush(cx));
Poll::Ready(Some(this.last))
}
}

/// Single producers, single consumer
#[bench]
fn bounded_1_tx(b: &mut Bencher) {
let waker = noop_waker_ref();
let mut cx = Context::from_waker(noop_waker_ref());
b.iter(|| {
let (tx, mut rx) = mpsc::channel(0);

let mut tx = TestSender { tx, last: 0 };

for i in 0..1000 {
assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(waker));
assert_eq!(Poll::Pending, tx.poll_next_unpin(waker));
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(waker));
assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx));
assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx));
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
}
})
}

/// 100 producers, single consumer
#[bench]
fn bounded_100_tx(b: &mut Bencher) {
let waker = noop_waker_ref();
let mut cx = Context::from_waker(noop_waker_ref());
b.iter(|| {
// Each sender can send one item after specified capacity
let (tx, mut rx) = mpsc::channel(0);
Expand All @@ -133,11 +133,11 @@ fn bounded_100_tx(b: &mut Bencher) {
for i in 0..10 {
for j in 0..tx.len() {
// Send an item
assert_eq!(Poll::Ready(Some(i + 1)), tx[j].poll_next_unpin(waker));
assert_eq!(Poll::Ready(Some(i + 1)), tx[j].poll_next_unpin(&mut cx));
// Then block
assert_eq!(Poll::Pending, tx[j].poll_next_unpin(waker));
assert_eq!(Poll::Pending, tx[j].poll_next_unpin(&mut cx));
// Recv the item
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(waker));
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
}
}
})
Expand Down
24 changes: 12 additions & 12 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
// by the queue structure.

use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Waker, Poll};
use futures_core::task::{Context, Poll, Waker};
use futures_core::task::__internal::AtomicWaker;
use std::any::Any;
use std::error::Error;
Expand Down Expand Up @@ -555,7 +555,7 @@ impl<T> SenderInner<T> {
/// - `Err(SendError)` if the receiver has been dropped.
fn poll_ready(
&mut self,
waker: &Waker
cx: &mut Context<'_>,
) -> Poll<Result<(), SendError>> {
let state = decode_state(self.inner.state.load(SeqCst));
if !state.is_open {
Expand All @@ -564,7 +564,7 @@ impl<T> SenderInner<T> {
}));
}

self.poll_unparked(Some(waker)).map(Ok)
self.poll_unparked(Some(cx)).map(Ok)
}

/// Returns whether this channel is closed without needing a context.
Expand All @@ -582,7 +582,7 @@ impl<T> SenderInner<T> {
self.inner.recv_task.wake();
}

fn poll_unparked(&mut self, waker: Option<&Waker>) -> Poll<()> {
fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
// First check the `maybe_parked` variable. This avoids acquiring the
// lock in most cases
if self.maybe_parked {
Expand All @@ -600,7 +600,7 @@ impl<T> SenderInner<T> {
//
// Update the task in case the `Sender` has been moved to another
// task
task.task = waker.cloned();
task.task = cx.map(|cx| cx.waker().clone());

Poll::Pending
} else {
Expand Down Expand Up @@ -649,12 +649,12 @@ impl<T> Sender<T> {
/// - `Err(SendError)` if the receiver has been dropped.
pub fn poll_ready(
&mut self,
waker: &Waker,
cx: &mut Context<'_>,
) -> Poll<Result<(), SendError>> {
let inner = self.0.as_mut().ok_or(SendError {
kind: SendErrorKind::Disconnected,
})?;
inner.poll_ready(waker)
inner.poll_ready(cx)
}

/// Returns whether this channel is closed without needing a context.
Expand All @@ -679,7 +679,7 @@ impl<T> UnboundedSender<T> {
/// Check if the channel is ready to receive a message.
pub fn poll_ready(
&self,
_: &Waker,
_: &mut Context<'_>,
) -> Poll<Result<(), SendError>> {
let inner = self.0.as_ref().ok_or(SendError {
kind: SendErrorKind::Disconnected,
Expand Down Expand Up @@ -904,7 +904,7 @@ impl<T> Stream for Receiver<T> {

fn poll_next(
mut self: Pin<&mut Self>,
waker: &Waker,
cx: &mut Context<'_>,
) -> Poll<Option<T>> {
// Try to read a message off of the message queue.
match self.next_message() {
Expand All @@ -916,7 +916,7 @@ impl<T> Stream for Receiver<T> {
},
Poll::Pending => {
// There are no messages to read, in this case, park.
self.inner.as_ref().unwrap().recv_task.register(waker);
self.inner.as_ref().unwrap().recv_task.register(cx.waker());
// Check queue again after parking to prevent race condition:
// a message could be added to the queue after previous `next_message`
// before `register` call.
Expand Down Expand Up @@ -971,9 +971,9 @@ impl<T> Stream for UnboundedReceiver<T> {

fn poll_next(
mut self: Pin<&mut Self>,
waker: &Waker,
cx: &mut Context<'_>,
) -> Poll<Option<T>> {
Pin::new(&mut self.0).poll_next(waker)
Pin::new(&mut self.0).poll_next(cx)
}
}

Expand Down
18 changes: 9 additions & 9 deletions futures-channel/src/oneshot.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! A channel for sending a single message between asynchronous tasks.

use futures_core::future::Future;
use futures_core::task::{Waker, Poll};
use futures_core::task::{Context, Poll, Waker};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
Expand Down Expand Up @@ -154,7 +154,7 @@ impl<T> Inner<T> {
}
}

fn poll_cancel(&self, waker: &Waker) -> Poll<()> {
fn poll_cancel(&self, cx: &mut Context<'_>) -> Poll<()> {
// Fast path up first, just read the flag and see if our other half is
// gone. This flag is set both in our destructor and the oneshot
// destructor, but our destructor hasn't run yet so if it's set then the
Expand All @@ -176,7 +176,7 @@ impl<T> Inner<T> {
// `Receiver` may have been dropped. The first thing it does is set the
// flag, and if it fails to acquire the lock it assumes that we'll see
// the flag later on. So... we then try to see the flag later on!
let handle = waker.clone();
let handle = cx.waker().clone();
match self.tx_task.try_lock() {
Some(mut p) => *p = Some(handle),
None => return Poll::Ready(()),
Expand Down Expand Up @@ -249,7 +249,7 @@ impl<T> Inner<T> {
}
}

fn recv(&self, waker: &Waker) -> Poll<Result<T, Canceled>> {
fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
// Check to see if some data has arrived. If it hasn't then we need to
// block our task.
//
Expand All @@ -260,7 +260,7 @@ impl<T> Inner<T> {
let done = if self.complete.load(SeqCst) {
true
} else {
let task = waker.clone();
let task = cx.waker().clone();
match self.rx_task.try_lock() {
Some(mut slot) => { *slot = Some(task); false },
None => true,
Expand Down Expand Up @@ -348,8 +348,8 @@ impl<T> Sender<T> {
/// alive and may be able to receive a message if sent. The current task,
/// however, is scheduled to receive a notification if the corresponding
/// `Receiver` goes away.
pub fn poll_cancel(&mut self, waker: &Waker) -> Poll<()> {
self.inner.poll_cancel(waker)
pub fn poll_cancel(&mut self, cx: &mut Context<'_>) -> Poll<()> {
self.inner.poll_cancel(cx)
}

/// Tests to see whether this `Sender`'s corresponding `Receiver`
Expand Down Expand Up @@ -416,9 +416,9 @@ impl<T> Future for Receiver<T> {

fn poll(
self: Pin<&mut Self>,
waker: &Waker,
cx: &mut Context<'_>,
) -> Poll<Result<T, Canceled>> {
self.inner.recv(waker)
self.inner.recv(cx)
}
}

Expand Down
4 changes: 2 additions & 2 deletions futures-channel/tests/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) {
fn drop_sender() {
let (tx, mut rx) = mpsc::channel::<u32>(1);
drop(tx);
let f = poll_fn(|lw| {
rx.poll_next_unpin(lw)
let f = poll_fn(|cx| {
rx.poll_next_unpin(cx)
});
assert_eq!(block_on(f), None)
}
Expand Down
Loading