From 1784abfb4f00d3f7c64e94fe887b9093c8c64dbc Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sat, 17 Aug 2019 08:48:14 -0700 Subject: [PATCH 01/18] Pin the toolchain [ci skip] --- rust-toolchain | 1 + 1 file changed, 1 insertion(+) create mode 100644 rust-toolchain diff --git a/rust-toolchain b/rust-toolchain new file mode 100644 index 000000000..524b40da4 --- /dev/null +++ b/rust-toolchain @@ -0,0 +1 @@ +nightly-2019-08-16 From 513f9a6cc5c69cff232ad8d50394345e467df4b8 Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sun, 18 Aug 2019 10:11:39 -0700 Subject: [PATCH 02/18] Enable "async-await" for futures-preview (select!) [ci skip] --- Cargo.toml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 70ae25416..1b4f8aeb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,6 @@ docs = [] async-task = "1.0.0" cfg-if = "0.1.9" crossbeam-channel = "0.3.9" -futures-preview = "0.3.0-alpha.17" futures-timer = "0.3.0" lazy_static = "1.3.0" log = { version = "0.4.8", features = ["kv_unstable"] } @@ -38,6 +37,10 @@ pin-utils = "0.1.0-alpha.4" slab = "0.4.2" surf = "1.0.1" +[dependencies.futures-preview] +version = "0.3.0-alpha.17" +features = ["async-await", "nightly"] + [dev-dependencies] femme = "1.1.0" tempdir = "0.3.7" From 69854354fb3e348311e72f16bcae004959e6b703 Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sat, 17 Aug 2019 08:48:39 -0700 Subject: [PATCH 03/18] Fix tests for futures.md [ci skip] --- docs/src/concepts/futures.md | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/docs/src/concepts/futures.md b/docs/src/concepts/futures.md index 2eea14695..cf20e3f31 100644 --- a/docs/src/concepts/futures.md +++ b/docs/src/concepts/futures.md @@ -50,12 +50,14 @@ Remember the talk about "deferred computation" in the intro? That's all it is. I Let's have a look at a simple function, specifically the return value: -```rust +```rust,edition2018 +# use std::{fs::File, io::{self, Read}}; +# fn read_file(path: &str) -> Result { - let mut file = File.open(path)?; + let mut file = File::open(path)?; let mut contents = String::new(); file.read_to_string(&mut contents)?; - contents + Ok(contents) } ``` @@ -64,12 +66,14 @@ Note that this return value talks about the past. The past has a drawback: all d But we wanted to abstract over *computation* and let someone else choose how to run it. That's fundamentally incompatible with looking at the results of previous computation all the time. So, let's find a type that *describes* a computation without running it. Let's look at the function again: -```rust +```rust,edition2018 +# use std::{fs::File, io::{self, Read}}; +# fn read_file(path: &str) -> Result { - let mut file = File.open(path)?; + let mut file = File::open(path)?; let mut contents = String::new(); file.read_to_string(&mut contents)?; - contents + Ok(contents) } ``` @@ -79,10 +83,11 @@ This is the moment where we could reach for [threads](https://en.wikipedia.org/w What we are searching for is something that represents ongoing work towards a result in the future. Whenever we say "something" in Rust, we almost always mean a trait. Let's start with an incomplete definition of the `Future` trait: -```rust +```rust,edition2018 +# use std::{pin::Pin, task::{Context, Poll}}; +# trait Future { type Output; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll; } ``` @@ -105,14 +110,17 @@ Note that calling `poll` again after case 1 happened may result in confusing beh While the `Future` trait has existed in Rust for a while, it was inconvenient to build and describe them. For this, Rust now has a special syntax: `async`. The example from above, implemented with `async-std`, would look like this: -```rust -use async_std::fs::File; - +```rust,edition2018 +# #![feature(async_await)] +# extern crate async_std; +# use async_std::{fs::File, io::Read}; +# use std::io; +# async fn read_file(path: &str) -> Result { - let mut file = File.open(path).await?; + let mut file = File::open(path).await?; let mut contents = String::new(); file.read_to_string(&mut contents).await?; - contents + Ok(contents) } ``` From 825dc891707f80103420d8db22a4ff85061dd998 Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sun, 18 Aug 2019 13:49:52 -0700 Subject: [PATCH 04/18] Add empty data.csv for tasks.md tests [ci skip] --- docs/src/concepts/data.csv | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 docs/src/concepts/data.csv diff --git a/docs/src/concepts/data.csv b/docs/src/concepts/data.csv new file mode 100644 index 000000000..e69de29bb From b682847cbafc56f62f11f23c26f8a7754639f675 Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sat, 17 Aug 2019 09:03:05 -0700 Subject: [PATCH 05/18] Fix tests for tasks.md [ci skip] --- docs/src/concepts/tasks.md | 53 ++++++++++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/docs/src/concepts/tasks.md b/docs/src/concepts/tasks.md index 22921530a..124fb1ab4 100644 --- a/docs/src/concepts/tasks.md +++ b/docs/src/concepts/tasks.md @@ -4,15 +4,17 @@ Now that we know what Futures are, we want to run them! In `async-std`, the [`tasks`][tasks] module is responsible for this. The simplest way is using the `block_on` function: -```rust -use async_std::fs::File; -use async_std::task; - +```rust,edition2018 +# #![feature(async_await)] +# extern crate async_std; +# use async_std::{fs::File, io::Read, task}; +# use std::io; +# async fn read_file(path: &str) -> Result { let mut file = File::open(path).await?; let mut contents = String::new(); file.read_to_string(&mut contents).await?; - contents + Ok(contents) } fn main() { @@ -31,24 +33,37 @@ fn main() { This asks the runtime baked into `async_std` to execute the code that reads a file. Let's go one by one, though, inside to outside. -```rust +```rust,edition2018 +# #![feature(async_await)] +# extern crate async_std; +# use async_std::{fs::File, io::Read, task}; +# use std::io; +# +# async fn read_file(path: &str) -> Result { +# let mut file = File::open(path).await?; +# let mut contents = String::new(); +# file.read_to_string(&mut contents).await?; +# Ok(contents) +# } +# async { let result = read_file("data.csv").await; match result { Ok(s) => println!("{}", s), Err(e) => println!("Error reading file: {:?}", e) } -} +}; ``` This is an `async` *block*. Async blocks are necessary to call `async` functions, and will instruct the compiler to include all the relevant instructions to do so. In Rust, all blocks return a value and `async` blocks happen to return a value of the kind `Future`. But let's get to the interesting part: -```rust - -task::spawn(async { }) - +```rust,edition2018 +# #![feature(async_await)] +# extern crate async_std; +# use async_std::task; +task::spawn(async { }); ``` `spawn` takes a `Future` and starts running it on a `Task`. It returns a `JoinHandle`. Futures in Rust are sometimes called *cold* Futures. You need something that starts running them. To run a Future, there may be some additional bookkeeping required, e.g. whether it's running or finished, where it is being placed in memory and what the current state is. This bookkeeping part is abstracted away in a `Task`. @@ -72,7 +87,10 @@ Tasks in `async_std` are one of the core abstractions. Much like Rust's `thread` `Task`s are assumed to run _concurrently_, potentially by sharing a thread of execution. This means that operations blocking an _operating system thread_, such as `std::thread::sleep` or io function from Rust's `std` library will _stop execution of all tasks sharing this thread_. Other libraries (such as database drivers) have similar behaviour. Note that _blocking the current thread_ is not in and by itself bad behaviour, just something that does not mix well with the concurrent execution model of `async-std`. Essentially, never do this: -```rust +```rust,edition2018 +# #![feature(async_await)] +# extern crate async_std; +# use async_std::task; fn main() { task::block_on(async { // this is std::fs, which blocks @@ -91,7 +109,10 @@ In case of `panic`, behaviour differs depending on whether there's a reasonable In practice, that means that `block_on` propagates panics to the blocking component: -```rust +```rust,edition2018,should_panic +# #![feature(async_await)] +# extern crate async_std; +# use async_std::task; fn main() { task::block_on(async { panic!("test"); @@ -106,7 +127,11 @@ note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace. While panicing a spawned task will abort: -```rust +```rust,edition2018,should_panic +# #![feature(async_await)] +# extern crate async_std; +# use async_std::task; +# use std::time::Duration; task::spawn(async { panic!("test"); }); From ff2f1d4122638e2eca89e647d804bbd69072337b Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sat, 17 Aug 2019 09:27:22 -0700 Subject: [PATCH 06/18] Fix tests for accept_loop.md [ci skip] --- docs/src/tutorial/accept_loop.md | 63 +++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 18 deletions(-) diff --git a/docs/src/tutorial/accept_loop.md b/docs/src/tutorial/accept_loop.md index 54b4187f3..30eaab343 100644 --- a/docs/src/tutorial/accept_loop.md +++ b/docs/src/tutorial/accept_loop.md @@ -2,19 +2,17 @@ Let's implement the scaffold of the server: a loop that binds a TCP socket to an address and starts accepting connections. - First of all, let's add required import boilerplate: -```rust +```rust,edition2018 #![feature(async_await)] - -use std::net::ToSocketAddrs; // 1 - +# extern crate async_std; use async_std::{ prelude::*, // 2 - task, // 3 + task, // 3 net::TcpListener, // 4 }; +use std::net::ToSocketAddrs; // 1 type Result = std::result::Result>; // 5 ``` @@ -29,10 +27,19 @@ type Result = std::result::Result To propagate the errors, we will use a boxed error trait object. Do you know that there's `From<&'_ str> for Box` implementation in stdlib, which allows you to use strings with `?` operator? - Now we can write the server's accept loop: -```rust +```rust,edition2018 +# #![feature(async_await)] +# extern crate async_std; +# use async_std::{ +# net::TcpListener, +# prelude::Stream, +# }; +# use std::net::ToSocketAddrs; +# +# type Result = std::result::Result>; +# async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1 let listener = TcpListener::bind(addr).await?; // 2 let mut incoming = listener.incoming(); @@ -50,20 +57,40 @@ async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1 Mirroring API of `std` is an explicit design goal of `async_std`. 3. Here, we would like to iterate incoming sockets, just how one would do in `std`: - ```rust - let listener: std::net::TcpListener = unimplemented!(); - for stream in listener.incoming() { - - } - ``` +```rust,edition2018,should_panic +let listener: std::net::TcpListener = unimplemented!(); +for stream in listener.incoming() { +} +``` - Unfortunately this doesn't quite work with `async` yet, because there's no support for `async` for-loops in the language yet. - For this reason we have to implement the loop manually, by using `while let Some(item) = iter.next().await` pattern. +Unfortunately this doesn't quite work with `async` yet, because there's no support for `async` for-loops in the language yet. +For this reason we have to implement the loop manually, by using `while let Some(item) = iter.next().await` pattern. Finally, let's add main: -```rust -fn main() -> Result<()> { +```rust,edition2018 +# #![feature(async_await)] +# extern crate async_std; +# use async_std::{ +# net::TcpListener, +# prelude::Stream, +# task, +# }; +# use std::net::ToSocketAddrs; +# +# type Result = std::result::Result>; +# +# async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1 +# let listener = TcpListener::bind(addr).await?; // 2 +# let mut incoming = listener.incoming(); +# while let Some(stream) = incoming.next().await { // 3 +# // TODO +# } +# Ok(()) +# } +# +// main +fn run() -> Result<()> { let fut = server("127.0.0.1:8080"); task::block_on(fut) } From 5774f59a9e9b6e9194b04c73b8b65cbd4e09d11d Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sat, 17 Aug 2019 09:57:30 -0700 Subject: [PATCH 07/18] Fix tests for receiving_messages.md [ci skip] --- docs/src/tutorial/receiving_messages.md | 68 ++++++++++++++++++++++--- 1 file changed, 61 insertions(+), 7 deletions(-) diff --git a/docs/src/tutorial/receiving_messages.md b/docs/src/tutorial/receiving_messages.md index 09266674b..40d0ebc69 100644 --- a/docs/src/tutorial/receiving_messages.md +++ b/docs/src/tutorial/receiving_messages.md @@ -7,9 +7,19 @@ We need to: 2. interpret the first line as a login 3. parse the rest of the lines as a `login: message` -```rust -use async_std::net::TcpStream; - +```rust,edition2018 +# #![feature(async_await)] +# extern crate async_std; +# use async_std::{ +# io::{BufRead, BufReader}, +# net::{TcpListener, TcpStream}, +# prelude::Stream, +# task, +# }; +# use std::net::ToSocketAddrs; +# +# type Result = std::result::Result>; +# async fn server(addr: impl ToSocketAddrs) -> Result<()> { let listener = TcpListener::bind(addr).await?; let mut incoming = listener.incoming(); @@ -63,9 +73,46 @@ One serious problem in the above solution is that, while we correctly propagate That is, `task::spawn` does not return an error immediately (it can't, it needs to run the future to completion first), only after it is joined. We can "fix" it by waiting for the task to be joined, like this: -```rust +```rust,edition2018 +# #![feature(async_await)] +# #![feature(async_closure)] +# extern crate async_std; +# use async_std::{ +# io::{BufRead, BufReader}, +# net::{TcpListener, TcpStream}, +# prelude::Stream, +# task, +# }; +# use std::net::ToSocketAddrs; +# +# type Result = std::result::Result>; +# +# async fn client(stream: TcpStream) -> Result<()> { +# let reader = BufReader::new(&stream); // 2 +# let mut lines = reader.lines(); +# +# let name = match lines.next().await { // 3 +# None => Err("peer disconnected immediately")?, +# Some(line) => line?, +# }; +# println!("name = {}", name); +# +# while let Some(line) = lines.next().await { // 4 +# let line = line?; +# let (dest, msg) = match line.find(':') { // 5 +# None => continue, +# Some(idx) => (&line[..idx], line[idx + 1 ..].trim()), +# }; +# let dest: Vec = dest.split(',').map(|name| name.trim().to_string()).collect(); +# let msg: String = msg.trim().to_string(); +# } +# Ok(()) +# } +# +# async move |stream| { let handle = task::spawn(client(stream)); -handle.await? +handle.await +# }; ``` The `.await` waits until the client finishes, and `?` propagates the result. @@ -78,10 +125,17 @@ That is, a flaky internet connection of one peer brings down the whole chat room A correct way to handle client errors in this case is log them, and continue serving other clients. So let's use a helper function for this: -```rust +```rust,edition2018 +# #![feature(async_await)] +# extern crate async_std; +# use async_std::{ +# io, +# prelude::Future, +# task, +# }; fn spawn_and_log_error(fut: F) -> task::JoinHandle<()> where - F: Future> + Send + 'static, + F: Future> + Send + 'static, { task::spawn(async move { if let Err(e) = fut.await { From c189e3dd727f371559109c516cfbf5e3a2464a50 Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sat, 17 Aug 2019 10:10:55 -0700 Subject: [PATCH 08/18] Fix tests for sending_messages.md [ci skip] --- docs/src/tutorial/sending_messages.md | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/docs/src/tutorial/sending_messages.md b/docs/src/tutorial/sending_messages.md index da62ad544..a4f412ff0 100644 --- a/docs/src/tutorial/sending_messages.md +++ b/docs/src/tutorial/sending_messages.md @@ -11,10 +11,20 @@ So let's create a `client_writer` task which receives messages over a channel an This task would be the point of serialization of messages. if Alice and Charley send two messages to Bob at the same time, Bob will see the messages in the same order as they arrive in the channel. -```rust +```rust,edition2018 +# #![feature(async_await)] +# extern crate async_std; +# extern crate futures; +# use async_std::{ +# io::Write, +# net::TcpStream, +# prelude::Stream, +# }; +# use std::sync::Arc; use futures::channel::mpsc; // 1 use futures::SinkExt; +# type Result = std::result::Result>; type Sender = mpsc::UnboundedSender; // 2 type Receiver = mpsc::UnboundedReceiver; From 3c3f95ee96c0e201976f34b753b7aabf18f43b64 Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sat, 17 Aug 2019 10:37:58 -0700 Subject: [PATCH 09/18] Fix tests for connecting_readers_and_writers.md [ci skip] --- .../connecting_readers_and_writers.md | 44 ++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/docs/src/tutorial/connecting_readers_and_writers.md b/docs/src/tutorial/connecting_readers_and_writers.md index 531656b38..43235929f 100644 --- a/docs/src/tutorial/connecting_readers_and_writers.md +++ b/docs/src/tutorial/connecting_readers_and_writers.md @@ -10,7 +10,49 @@ We can create a dedicated broker tasks which owns the `peers` map and communicat By hiding `peers` inside such an "actor" task, we remove the need for mutxes and also make serialization point explicit. The order of events "Bob sends message to Alice" and "Alice joins" is determined by the order of the corresponding events in the broker's event queue. -```rust +```rust,edition2018 +# #![feature(async_await)] +# extern crate async_std; +# extern crate futures; +# use async_std::{ +# io::{Write}, +# net::TcpStream, +# prelude::{Future, Stream}, +# task, +# }; +# use futures::channel::mpsc; +# use futures::SinkExt; +# use std::{ +# collections::hash_map::{Entry, HashMap}, +# sync::Arc, +# }; +# +# type Result = std::result::Result>; +# type Sender = mpsc::UnboundedSender; +# type Receiver = mpsc::UnboundedReceiver; +# +# async fn client_writer( +# mut messages: Receiver, +# stream: Arc, +# ) -> Result<()> { +# let mut stream = &*stream; +# while let Some(msg) = messages.next().await { +# stream.write_all(msg.as_bytes()).await?; +# } +# Ok(()) +# } +# +# fn spawn_and_log_error(fut: F) -> task::JoinHandle<()> +# where +# F: Future> + Send + 'static, +# { +# task::spawn(async move { +# if let Err(e) = fut.await { +# eprintln!("{}", e) +# } +# }) +# } +# #[derive(Debug)] enum Event { // 1 NewPeer { From 0e210ef8e2ebcc9ac42334fd278f0c917e4690f0 Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sat, 17 Aug 2019 11:37:12 -0700 Subject: [PATCH 10/18] Fix tests for all_together.md [ci skip] --- docs/src/tutorial/all_together.md | 40 +++++++++++++++++++------------ 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/docs/src/tutorial/all_together.md b/docs/src/tutorial/all_together.md index 91101bd82..f82496feb 100644 --- a/docs/src/tutorial/all_together.md +++ b/docs/src/tutorial/all_together.md @@ -3,36 +3,46 @@ At this point, we only need to start the broker to get a fully-functioning (in the happy case!) chat: -```rust +```rust,edition2018 #![feature(async_await)] - -use std::{ - net::ToSocketAddrs, - sync::Arc, - collections::hash_map::{HashMap, Entry}, +# extern crate async_std; +# extern crate futures; +use async_std::{ + io::{self, BufReader}, + net::{TcpListener, TcpStream}, + prelude::*, + task, }; - use futures::{ channel::mpsc, SinkExt, }; - -use async_std::{ - io::BufReader, - prelude::*, - task, - net::{TcpListener, TcpStream}, +use std::{ + collections::hash_map::{HashMap, Entry}, + net::ToSocketAddrs, + sync::Arc, }; type Result = std::result::Result>; type Sender = mpsc::UnboundedSender; type Receiver = mpsc::UnboundedReceiver; - -fn main() -> Result<()> { +// main +fn run() -> Result<()> { task::block_on(server("127.0.0.1:8080")) } +fn spawn_and_log_error(fut: F) -> task::JoinHandle<()> +where + F: Future> + Send + 'static, +{ + task::spawn(async move { + if let Err(e) = fut.await { + eprintln!("{}", e) + } + }) +} + async fn server(addr: impl ToSocketAddrs) -> Result<()> { let listener = TcpListener::bind(addr).await?; From 6f42cf35f1fcdca382cde9e5730752964ce22117 Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sat, 17 Aug 2019 11:37:19 -0700 Subject: [PATCH 11/18] Fix tests for clean_shutdown.md [ci skip] --- docs/src/tutorial/clean_shutdown.md | 181 +++++++++++++++++++++++++++- 1 file changed, 177 insertions(+), 4 deletions(-) diff --git a/docs/src/tutorial/clean_shutdown.md b/docs/src/tutorial/clean_shutdown.md index 714a92640..685363c00 100644 --- a/docs/src/tutorial/clean_shutdown.md +++ b/docs/src/tutorial/clean_shutdown.md @@ -20,7 +20,123 @@ In `a-chat`, we already have an unidirectional flow of messages: `reader -> brok However, we never wait for broker and writers, which might cause some messages to get dropped. Let's add waiting to the server: -```rust +```rust,edition2018 +# #![feature(async_await)] +# extern crate async_std; +# extern crate futures; +# use async_std::{ +# io::{self, BufReader}, +# net::{TcpListener, TcpStream}, +# prelude::*, +# task, +# }; +# use futures::{ +# channel::mpsc, +# SinkExt, +# }; +# use std::{ +# collections::hash_map::{HashMap, Entry}, +# net::ToSocketAddrs, +# sync::Arc, +# }; +# +# type Result = std::result::Result>; +# type Sender = mpsc::UnboundedSender; +# type Receiver = mpsc::UnboundedReceiver; +# +# fn spawn_and_log_error(fut: F) -> task::JoinHandle<()> +# where +# F: Future> + Send + 'static, +# { +# task::spawn(async move { +# if let Err(e) = fut.await { +# eprintln!("{}", e) +# } +# }) +# } +# +# +# async fn client(mut broker: Sender, stream: TcpStream) -> Result<()> { +# let stream = Arc::new(stream); // 2 +# let reader = BufReader::new(&*stream); +# let mut lines = reader.lines(); +# +# let name = match lines.next().await { +# None => Err("peer disconnected immediately")?, +# Some(line) => line?, +# }; +# broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream) }).await // 3 +# .unwrap(); +# +# while let Some(line) = lines.next().await { +# let line = line?; +# let (dest, msg) = match line.find(':') { +# None => continue, +# Some(idx) => (&line[..idx], line[idx + 1 ..].trim()), +# }; +# let dest: Vec = dest.split(',').map(|name| name.trim().to_string()).collect(); +# let msg: String = msg.trim().to_string(); +# +# broker.send(Event::Message { // 4 +# from: name.clone(), +# to: dest, +# msg, +# }).await.unwrap(); +# } +# Ok(()) +# } +# +# async fn client_writer( +# mut messages: Receiver, +# stream: Arc, +# ) -> Result<()> { +# let mut stream = &*stream; +# while let Some(msg) = messages.next().await { +# stream.write_all(msg.as_bytes()).await?; +# } +# Ok(()) +# } +# +# #[derive(Debug)] +# enum Event { +# NewPeer { +# name: String, +# stream: Arc, +# }, +# Message { +# from: String, +# to: Vec, +# msg: String, +# }, +# } +# +# async fn broker(mut events: Receiver) -> Result<()> { +# let mut peers: HashMap> = HashMap::new(); +# +# while let Some(event) = events.next().await { +# match event { +# Event::Message { from, to, msg } => { +# for addr in to { +# if let Some(peer) = peers.get_mut(&addr) { +# peer.send(format!("from {}: {}\n", from, msg)).await? +# } +# } +# } +# Event::NewPeer { name, stream} => { +# match peers.entry(name) { +# Entry::Occupied(..) => (), +# Entry::Vacant(entry) => { +# let (client_sender, client_receiver) = mpsc::unbounded(); +# entry.insert(client_sender); // 4 +# spawn_and_log_error(client_writer(client_receiver, stream)); // 5 +# } +# } +# } +# } +# } +# Ok(()) +# } +# async fn server(addr: impl ToSocketAddrs) -> Result<()> { let listener = TcpListener::bind(addr).await?; @@ -40,11 +156,68 @@ async fn server(addr: impl ToSocketAddrs) -> Result<()> { And to the broker: -```rust +```rust,edition2018 +# #![feature(async_await)] +# extern crate async_std; +# extern crate futures; +# use async_std::{ +# io::{self, BufReader}, +# net::{TcpListener, TcpStream}, +# prelude::*, +# task, +# }; +# use futures::{ +# channel::mpsc, +# SinkExt, +# }; +# use std::{ +# collections::hash_map::{HashMap, Entry}, +# net::ToSocketAddrs, +# sync::Arc, +# }; +# +# type Result = std::result::Result>; +# type Sender = mpsc::UnboundedSender; +# type Receiver = mpsc::UnboundedReceiver; +# +# async fn client_writer( +# mut messages: Receiver, +# stream: Arc, +# ) -> Result<()> { +# let mut stream = &*stream; +# while let Some(msg) = messages.next().await { +# stream.write_all(msg.as_bytes()).await?; +# } +# Ok(()) +# } +# +# fn spawn_and_log_error(fut: F) -> task::JoinHandle<()> +# where +# F: Future> + Send + 'static, +# { +# task::spawn(async move { +# if let Err(e) = fut.await { +# eprintln!("{}", e) +# } +# }) +# } +# +# #[derive(Debug)] +# enum Event { +# NewPeer { +# name: String, +# stream: Arc, +# }, +# Message { +# from: String, +# to: Vec, +# msg: String, +# }, +# } +# async fn broker(mut events: Receiver) -> Result<()> { let mut writers = Vec::new(); let mut peers: HashMap> = HashMap::new(); - while let Some(event) = events.next().await { // 2 match event { Event::Message { from, to, msg } => { @@ -69,7 +242,7 @@ async fn broker(mut events: Receiver) -> Result<()> { } drop(peers); // 3 for writer in writers { // 4 - writer.await?; + writer.await; } Ok(()) } From 8f919cd5a854550d3e4661d0ed7dd18c1982e817 Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sun, 18 Aug 2019 10:48:04 -0700 Subject: [PATCH 12/18] Fix tests for handling_disconnection.md [ci skip] --- docs/src/tutorial/handling_disconnection.md | 87 +++++++++++++-------- 1 file changed, 53 insertions(+), 34 deletions(-) diff --git a/docs/src/tutorial/handling_disconnection.md b/docs/src/tutorial/handling_disconnection.md index 865bf958a..cad86961b 100644 --- a/docs/src/tutorial/handling_disconnection.md +++ b/docs/src/tutorial/handling_disconnection.md @@ -17,7 +17,18 @@ This way, we statically guarantee that we issue shutdown exactly once, even if w First, let's add a shutdown channel to the `client`: -```rust +```rust,edition2018 +# #![feature(async_await)] +# extern crate async_std; +# extern crate futures; +# use async_std::net::TcpStream; +# use futures::{channel::mpsc, SinkExt}; +# use std::sync::Arc; +# +# type Result = std::result::Result>; +# type Sender = mpsc::UnboundedSender; +# type Receiver = mpsc::UnboundedReceiver; +# #[derive(Debug)] enum Void {} // 1 @@ -35,17 +46,17 @@ enum Event { }, } -async fn client(mut broker: Sender, stream: TcpStream) -> Result<()> { +async fn client(mut broker: Sender, stream: Arc) -> Result<()> { // ... - +# let name: String = unimplemented!(); let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::(); // 3 broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream), shutdown: shutdown_receiver, }).await.unwrap(); - // ... +# unimplemented!() } ``` @@ -56,23 +67,36 @@ async fn client(mut broker: Sender, stream: TcpStream) -> Result<()> { In the `client_writer`, we now need to choose between shutdown and message channels. We use the `select` macro for this purpose: -```rust -use futures::select; -use futures::FutureExt; +```rust,edition2018 +#![feature(async_await)] +# extern crate async_std; +# extern crate futures; +# use async_std::{io::Write, net::TcpStream}; +use futures::{channel::mpsc, select, FutureExt, StreamExt}; +# use std::sync::Arc; + +# type Receiver = mpsc::UnboundedReceiver; +# type Result = std::result::Result>; +# type Sender = mpsc::UnboundedSender; + +# #[derive(Debug)] +# enum Void {} // 1 async fn client_writer( messages: &mut Receiver, stream: Arc, - mut shutdown: Receiver, // 1 + shutdown: Receiver, // 1 ) -> Result<()> { let mut stream = &*stream; + let mut messages = messages.fuse(); + let mut shutdown = shutdown.fuse(); loop { // 2 select! { - msg = messages.next().fuse() => match msg { + msg = messages.next() => match msg { Some(msg) => stream.write_all(msg.as_bytes()).await?, None => break, }, - void = shutdown.next().fuse() => match void { + void = shutdown.next() => match void { Some(void) => match void {}, // 3 None => break, } @@ -94,27 +118,20 @@ This also allows us to establish a useful invariant that the message channel str The final code looks like this: -```rust +```rust,edition2018 #![feature(async_await)] - +# extern crate async_std; +# extern crate futures; +use async_std::{ + io::{BufReader, BufRead, Write}, + net::{TcpListener, TcpStream}, + task, +}; +use futures::{channel::mpsc, future::Future, select, FutureExt, SinkExt, StreamExt}; use std::{ + collections::hash_map::{Entry, HashMap}, net::ToSocketAddrs, sync::Arc, - collections::hash_map::{HashMap, Entry}, -}; - -use futures::{ - channel::mpsc, - SinkExt, - FutureExt, - select, -}; - -use async_std::{ - io::BufReader, - prelude::*, - task, - net::{TcpListener, TcpStream}, }; type Result = std::result::Result>; @@ -124,13 +141,13 @@ type Receiver = mpsc::UnboundedReceiver; #[derive(Debug)] enum Void {} -fn main() -> Result<()> { +// main +fn run() -> Result<()> { task::block_on(server("127.0.0.1:8080")) } async fn server(addr: impl ToSocketAddrs) -> Result<()> { let listener = TcpListener::bind(addr).await?; - let (broker_sender, broker_receiver) = mpsc::unbounded(); let broker = task::spawn(broker(broker_receiver)); let mut incoming = listener.incoming(); @@ -182,16 +199,18 @@ async fn client(mut broker: Sender, stream: TcpStream) -> Result<()> { async fn client_writer( messages: &mut Receiver, stream: Arc, - mut shutdown: Receiver, + shutdown: Receiver, ) -> Result<()> { let mut stream = &*stream; + let mut messages = messages.fuse(); + let mut shutdown = shutdown.fuse(); loop { select! { - msg = messages.next().fuse() => match msg { + msg = messages.next() => match msg { Some(msg) => stream.write_all(msg.as_bytes()).await?, None => break, }, - void = shutdown.next().fuse() => match void { + void = shutdown.next() => match void { Some(void) => match void {}, None => break, } @@ -214,11 +233,11 @@ enum Event { }, } -async fn broker(mut events: Receiver) { +async fn broker(events: Receiver) { let (disconnect_sender, mut disconnect_receiver) = // 1 mpsc::unbounded::<(String, Receiver)>(); let mut peers: HashMap> = HashMap::new(); - + let mut events = events.fuse(); loop { let event = select! { event = events.next() => match event { From cf14330b1b6572d446d821039dabaf8df36610ac Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sun, 18 Aug 2019 11:46:36 -0700 Subject: [PATCH 13/18] Fix tests for small-patterns.md [ci skip] --- docs/src/patterns/small-patterns.md | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/docs/src/patterns/small-patterns.md b/docs/src/patterns/small-patterns.md index 2250d1964..6a55dfaf2 100644 --- a/docs/src/patterns/small-patterns.md +++ b/docs/src/patterns/small-patterns.md @@ -6,11 +6,12 @@ A collection of small, useful patterns. `async-std` doesn't provide a `split()` method on `io` handles. Instead, splitting a stream into a read and write half can be done like this: -```rust -use async_std::io; - -async fn echo(stream: io::TcpStream) { +```rust,edition2018 +#![feature(async_await)] +# extern crate async_std; +use async_std::{io, net::TcpStream}; +async fn echo(stream: TcpStream) { let (reader, writer) = &mut (&stream, &stream); - io::copy(reader, writer).await?; + io::copy(reader, writer).await; } -``` \ No newline at end of file +``` From 57a1356154bfc8637250f8a0aadf68a7e2727695 Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sun, 18 Aug 2019 11:47:26 -0700 Subject: [PATCH 14/18] Fix tests for implementing_a_client.md [ci skip] --- docs/src/tutorial/implementing_a_client.md | 34 +++++++++------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/docs/src/tutorial/implementing_a_client.md b/docs/src/tutorial/implementing_a_client.md index 509bd9a2a..73392b963 100644 --- a/docs/src/tutorial/implementing_a_client.md +++ b/docs/src/tutorial/implementing_a_client.md @@ -14,46 +14,40 @@ Specifically, the client should *simultaneously* read from stdin and from the so Programming this with threads is cumbersome, especially when implementing clean shutdown. With async, we can just use the `select!` macro. -```rust +```rust,edition2018 #![feature(async_await)] - -use std::net::ToSocketAddrs; - -use futures::select; -use futures::FutureExt; - +# extern crate async_std; +# extern crate futures; use async_std::{ - prelude::*, + io::{stdin, BufRead, BufReader, Write}, net::TcpStream, task, - io::{stdin, BufReader}, }; +use futures::{select, FutureExt, StreamExt}; +use std::net::ToSocketAddrs; type Result = std::result::Result>; - -fn main() -> Result<()> { - task::block_on(try_main("127.0.0.1:8080")) +// main +fn run() -> Result<()> { + task::block_on(try_run("127.0.0.1:8080")) } -async fn try_main(addr: impl ToSocketAddrs) -> Result<()> { +async fn try_run(addr: impl ToSocketAddrs) -> Result<()> { let stream = TcpStream::connect(addr).await?; let (reader, mut writer) = (&stream, &stream); // 1 - let reader = BufReader::new(reader); - let mut lines_from_server = futures::StreamExt::fuse(reader.lines()); // 2 - - let stdin = BufReader::new(stdin()); - let mut lines_from_stdin = futures::StreamExt::fuse(stdin.lines()); // 2 + let mut lines_from_server = BufReader::new(reader).lines().fuse(); // 2 + let mut lines_from_stdin = BufReader::new(stdin()).lines().fuse(); // 2 loop { select! { // 3 - line = lines_from_server.next().fuse() => match line { + line = lines_from_server.next() => match line { Some(line) => { let line = line?; println!("{}", line); }, None => break, }, - line = lines_from_stdin.next().fuse() => match line { + line = lines_from_stdin.next() => match line { Some(line) => { let line = line?; writer.write_all(line.as_bytes()).await?; From 9095388d996a7e166f92d9ece076b9a9f252892a Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sun, 18 Aug 2019 11:52:27 -0700 Subject: [PATCH 15/18] Fix policy.md [ci skip] --- docs/src/security/policy.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/security/policy.md b/docs/src/security/policy.md index 990d81144..06a08b484 100644 --- a/docs/src/security/policy.md +++ b/docs/src/security/policy.md @@ -32,7 +32,7 @@ This policy is adapted from the [Rust project](https://www.rust-lang.org/policie ## PGP Key -``` +```text -----BEGIN PGP PUBLIC KEY BLOCK----- mQENBF1Wu/ABCADJaGt4HwSlqKB9BGHWYKZj/6mTMbmc29vsEOcCSQKo6myCf9zc From a037f38de709bbfb8a2dbed372e07fb9ddd5ea8c Mon Sep 17 00:00:00 2001 From: Darin Morrison Date: Sun, 18 Aug 2019 16:35:54 -0700 Subject: [PATCH 16/18] Test and build the book on travis [ci skip] --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index bc25dc67c..bcfb3eb7e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,3 +24,4 @@ script: - cargo test --all - cargo fmt --all -- --check - if [[ -n "$BUILD_DOCS" ]]; then cargo doc --features docs; fi + - if [[ -n "$BUILD_BOOK" ]]; then cargo build --all && mdbook test -L ./target/debug/deps docs && mdbook build docs; fi From cc063ac25ffd29a99a3b5e6cd529175911de7636 Mon Sep 17 00:00:00 2001 From: Florian Gilcher Date: Mon, 19 Aug 2019 12:40:24 +0200 Subject: [PATCH 17/18] Remove rust-toolchain file --- rust-toolchain | 1 - 1 file changed, 1 deletion(-) delete mode 100644 rust-toolchain diff --git a/rust-toolchain b/rust-toolchain deleted file mode 100644 index 524b40da4..000000000 --- a/rust-toolchain +++ /dev/null @@ -1 +0,0 @@ -nightly-2019-08-16 From 27b226c2260a3772fd908f4e863929ef5e3ac7c1 Mon Sep 17 00:00:00 2001 From: Florian Gilcher Date: Mon, 19 Aug 2019 12:41:16 +0200 Subject: [PATCH 18/18] Build book on travis, but only for Linux --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index bcfb3eb7e..fc2c83837 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,7 +11,7 @@ matrix: include: - rust: nightly os: linux - env: BUILD_DOCS=1 + env: BUILD_DOCS=1 BUILD_BOOK=1 - rust: nightly os: osx osx_image: xcode9.2