diff --git a/ci/dictionary.txt b/ci/dictionary.txt index 98490d3f..13c52ec2 100644 --- a/ci/dictionary.txt +++ b/ci/dictionary.txt @@ -24,6 +24,8 @@ FutOne FutTwo FuturesUnordered GenFuture +gRPC +html http Hyper's impl @@ -33,8 +35,13 @@ IoBlocker IOCP IoObject kqueue +localhost +LocalExecutor metadata +MockTcpStream +multi multithreaded +multithreading Mutex MyError MyFut @@ -51,15 +58,18 @@ proxying pseudocode ReadIntoBuf recognise +refactor RefCell repurposed requeue ResponseFuture reusability runtime +runtimes rustc rustup SimpleFuture +smol SocketRead SomeType spawner @@ -69,6 +79,8 @@ struct subfuture subfutures subpar +TcpListener +TcpStream threadpool TimerFuture TODO diff --git a/examples/01_05_http_server/Cargo.toml b/examples/01_05_http_server/Cargo.toml deleted file mode 100644 index 7c1d7f0e..00000000 --- a/examples/01_05_http_server/Cargo.toml +++ /dev/null @@ -1,18 +0,0 @@ -[package] -name = "example_01_05_http_server" -version = "0.2.0" -authors = ["Taylor Cramer "] -edition = "2018" - -[lib] - -[dependencies] -# Hyper is an asynchronous HTTP library. We'll use it to power our HTTP -# server and to make HTTP requests. -hyper = "0.13" -# To setup some sort of runtime needed by Hyper, we will use the Tokio runtime. -tokio = { version = "0.2", features = ["full"] } - -# (only for testing) -anyhow = "1.0.31" -reqwest = { version = "0.10.4", features = ["blocking"] } diff --git a/examples/01_05_http_server/src/lib.rs b/examples/01_05_http_server/src/lib.rs deleted file mode 100644 index c3a9a949..00000000 --- a/examples/01_05_http_server/src/lib.rs +++ /dev/null @@ -1,91 +0,0 @@ -#![cfg(test)] - -// ANCHOR: imports -use { - hyper::{ - // Following functions are used by Hyper to handle a `Request` - // and returning a `Response` in an asynchronous manner by using a Future - service::{make_service_fn, service_fn}, - // Miscellaneous types from Hyper for working with HTTP. - Body, - Client, - Request, - Response, - Server, - Uri, - }, - std::net::SocketAddr, -}; -// ANCHOR_END: imports - -// ANCHOR: boilerplate -async fn serve_req(_req: Request) -> Result, hyper::Error> { - // Always return successfully with a response containing a body with - // a friendly greeting ;) - Ok(Response::new(Body::from("hello, world!"))) -} - -async fn run_server(addr: SocketAddr) { - println!("Listening on http://{}", addr); - - // Create a server bound on the provided address - let serve_future = Server::bind(&addr) - // Serve requests using our `async serve_req` function. - // `serve` takes a type which implements the `MakeService` trait. - // `make_service_fn` converts a closure into a type which - // implements the `MakeService` trait. That closure must return a - // type that implements the `Service` trait, and `service_fn` - // converts a request-response function into a type that implements - // the `Service` trait. - .serve(make_service_fn(|_| async { - Ok::<_, hyper::Error>(service_fn(serve_req)) - })); - - // Wait for the server to complete serving or exit with an error. - // If an error occurred, print it to stderr. - if let Err(e) = serve_future.await { - eprintln!("server error: {}", e); - } -} - -#[tokio::main] -async fn main() { - // Set the address to run our socket on. - let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); - - // Call our `run_server` function, which returns a future. - // As with every `async fn`, for `run_server` to do anything, - // the returned future needs to be run using `await`; - run_server(addr).await; -} -// ANCHOR_END: boilerplate - -#[test] -fn run_main_and_query_http() -> Result<(), anyhow::Error> { - std::thread::spawn(main); - // Unfortunately, there's no good way for us to detect when the server - // has come up, so we sleep for an amount that should hopefully be - // sufficient :( - std::thread::sleep(std::time::Duration::from_secs(5)); - let response = reqwest::blocking::get("http://localhost:3000")?.text()?; - assert_eq!(response, "hello, world!"); - Ok(()) -} - -mod proxy { - use super::*; - #[allow(unused)] - async fn serve_req(_req: Request) -> Result, hyper::Error> { - // ANCHOR: parse_url - let url_str = "http://www.rust-lang.org/en-US/"; - let url = url_str.parse::().expect("failed to parse URL"); - // ANCHOR_END: parse_url - - // ANCHOR: get_request - let res = Client::new().get(url).await?; - // Return the result of the request directly to the user - println!("request finished-- returning response"); - Ok(res) - // ANCHOR_END: get_request - } -} diff --git a/examples/02_04_executor/src/lib.rs b/examples/02_04_executor/src/lib.rs index 58ed7b7a..e5120cd9 100644 --- a/examples/02_04_executor/src/lib.rs +++ b/examples/02_04_executor/src/lib.rs @@ -3,13 +3,13 @@ // ANCHOR: imports use { futures::{ - future::{FutureExt, BoxFuture}, - task::{ArcWake, waker_ref}, + future::{BoxFuture, FutureExt}, + task::{waker_ref, ArcWake}, }, std::{ future::Future, + sync::mpsc::{sync_channel, Receiver, SyncSender}, sync::{Arc, Mutex}, - sync::mpsc::{sync_channel, SyncSender, Receiver}, task::{Context, Poll}, time::Duration, }, @@ -74,7 +74,10 @@ impl ArcWake for Task { // Implement `wake` by sending this task back onto the task channel // so that it will be polled again by the executor. let cloned = arc_self.clone(); - arc_self.task_sender.send(cloned).expect("too many tasks queued"); + arc_self + .task_sender + .send(cloned) + .expect("too many tasks queued"); } } // ANCHOR_END: arcwake_for_task @@ -128,4 +131,6 @@ fn main() { // ANCHOR_END: main #[test] -fn run_main() { main() } +fn run_main() { + main() +} diff --git a/examples/08_01_sync_tcp_server/404.html b/examples/08_01_sync_tcp_server/404.html new file mode 100644 index 00000000..88d8e915 --- /dev/null +++ b/examples/08_01_sync_tcp_server/404.html @@ -0,0 +1,11 @@ + + + + + Hello! + + +

Oops!

+

Sorry, I don't know what you're asking for.

+ + diff --git a/examples/08_01_sync_tcp_server/Cargo.toml b/examples/08_01_sync_tcp_server/Cargo.toml new file mode 100644 index 00000000..3048a1e7 --- /dev/null +++ b/examples/08_01_sync_tcp_server/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "sync_tcp_server" +version = "0.1.0" +authors = ["Your Name + + + + Hello! + + +

Hello!

+

Hi from Rust

+ + diff --git a/examples/08_01_sync_tcp_server/src/main.rs b/examples/08_01_sync_tcp_server/src/main.rs new file mode 100644 index 00000000..93bd410d --- /dev/null +++ b/examples/08_01_sync_tcp_server/src/main.rs @@ -0,0 +1,39 @@ +use std::fs; +use std::io::prelude::*; +use std::net::TcpListener; +use std::net::TcpStream; + +fn main() { + // Listen for incoming TCP connections on localhost port 7878 + let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); + + // Block forever, handling each request that arrives at this IP address + for stream in listener.incoming() { + let stream = stream.unwrap(); + + handle_connection(stream); + } +} + +fn handle_connection(mut stream: TcpStream) { + // Read the first 1024 bytes of data from the stream + let mut buffer = [0; 1024]; + stream.read(&mut buffer).unwrap(); + + let get = b"GET / HTTP/1.1\r\n"; + + // Respond with greetings or a 404, + // depending on the data in the request + let (status_line, filename) = if buffer.starts_with(get) { + ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") + } else { + ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") + }; + let contents = fs::read_to_string(filename).unwrap(); + + // Write response back to the stream, + // and flush the stream to ensure the response is sent back to the client + let response = format!("{}{}", status_line, contents); + stream.write(response.as_bytes()).unwrap(); + stream.flush().unwrap(); +} diff --git a/examples/08_02_async_tcp_server/Cargo.toml b/examples/08_02_async_tcp_server/Cargo.toml new file mode 100644 index 00000000..196d1a58 --- /dev/null +++ b/examples/08_02_async_tcp_server/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "async_tcp_server" +version = "0.1.0" +authors = ["Your Name +} +// ANCHOR_END: handle_connection_async diff --git a/examples/08_03_slow_request/Cargo.toml b/examples/08_03_slow_request/Cargo.toml new file mode 100644 index 00000000..aff14020 --- /dev/null +++ b/examples/08_03_slow_request/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "slow_request" +version = "0.1.0" +authors = ["Your Name + stream.write(response.as_bytes()).await.unwrap(); + stream.flush().await.unwrap(); +} +// ANCHOR_END: handle_connection diff --git a/examples/08_05_final_tcp_server/Cargo.toml b/examples/08_05_final_tcp_server/Cargo.toml new file mode 100644 index 00000000..cb84f91f --- /dev/null +++ b/examples/08_05_final_tcp_server/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "final_tcp_server" +version = "0.1.0" +authors = ["Your Name + + + + Hello! + + +

Hello!

+

Hi from Rust

+ + diff --git a/examples/08_05_final_tcp_server/src/main.rs b/examples/08_05_final_tcp_server/src/main.rs new file mode 100644 index 00000000..81f98c8c --- /dev/null +++ b/examples/08_05_final_tcp_server/src/main.rs @@ -0,0 +1,116 @@ +use std::fs; + +use futures::stream::StreamExt; + +use async_std::net::TcpListener; +use async_std::prelude::*; +// ANCHOR: main_func +use async_std::task::spawn; + +#[async_std::main] +async fn main() { + let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap(); + listener + .incoming() + .for_each_concurrent(/* limit */ None, |stream| async move { + let stream = stream.unwrap(); + spawn(handle_connection(stream)); + }) + .await; +} +// ANCHOR_END: main_func + +use async_std::io::{Read, Write}; +use std::marker::Unpin; + +async fn handle_connection(mut stream: impl Read + Write + Unpin) { + let mut buffer = [0; 1024]; + stream.read(&mut buffer).await.unwrap(); + let get = b"GET / HTTP/1.1\r\n"; + let (status_line, filename) = if buffer.starts_with(get) { + ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") + } else { + ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") + }; + let contents = fs::read_to_string(filename).unwrap(); + let response = format!("{}{}", status_line, contents); + stream.write(response.as_bytes()).await.unwrap(); + stream.flush().await.unwrap(); +} + +#[cfg(test)] + +mod tests { + // ANCHOR: mock_read + use super::*; + use futures::io::Error; + use futures::task::{Context, Poll}; + + use std::cmp::min; + use std::pin::Pin; + + struct MockTcpStream { + read_data: Vec, + write_data: Vec, + } + + impl Read for MockTcpStream { + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context, + buf: &mut [u8], + ) -> Poll> { + let size: usize = min(self.read_data.len(), buf.len()); + buf.copy_from_slice(&self.read_data[..size]); + Poll::Ready(Ok(size)) + } + } + // ANCHOR_END: mock_read + + // ANCHOR: mock_write + impl Write for MockTcpStream { + fn poll_write( + mut self: Pin<&mut Self>, + _: &mut Context, + buf: &[u8], + ) -> Poll> { + self.write_data = Vec::from(buf); + return Poll::Ready(Ok(buf.len())); + } + fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + } + // ANCHOR_END: mock_write + + // ANCHOR: unpin + use std::marker::Unpin; + impl Unpin for MockTcpStream {} + // ANCHOR_END: unpin + + // ANCHOR: test + use std::fs; + + #[async_std::test] + async fn test_handle_connection() { + let input_bytes = b"GET / HTTP/1.1\r\n"; + let mut contents = vec![0u8; 1024]; + contents[..input_bytes.len()].clone_from_slice(input_bytes); + let mut stream = MockTcpStream { + read_data: contents, + write_data: Vec::new(), + }; + + handle_connection(&mut stream).await; + let mut buf = [0u8; 1024]; + stream.read(&mut buf).await.unwrap(); + + let expected_contents = fs::read_to_string("hello.html").unwrap(); + let expected_response = format!("HTTP/1.1 200 OK\r\n\r\n{}", expected_contents); + assert!(stream.write_data.starts_with(expected_response.as_bytes())); + } + // ANCHOR_END: test +} diff --git a/examples/Cargo.toml b/examples/Cargo.toml index b89b3115..939a0cb1 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -2,7 +2,6 @@ members = [ "01_02_why_async", "01_04_async_await_primer", - "01_05_http_server", "02_02_future_trait", "02_03_timer", "02_04_executor", @@ -12,4 +11,9 @@ members = [ "06_02_join", "06_03_select", "07_05_recursion", + "08_01_sync_tcp_server", + "08_02_async_tcp_server", + "08_03_slow_request", + "08_04_concurrent_tcp_server", + "08_05_final_tcp_server", ] diff --git a/src/01_getting_started/04_async_await_primer.md b/src/01_getting_started/04_async_await_primer.md index 5a887746..c2eaeff0 100644 --- a/src/01_getting_started/04_async_await_primer.md +++ b/src/01_getting_started/04_async_await_primer.md @@ -66,6 +66,3 @@ This would make it impossible to dance at the same time. By `.await`-ing the `learn_song` future, we allow other tasks to take over the current thread if `learn_song` is blocked. This makes it possible to run multiple futures to completion concurrently on the same thread. - -Now that you've learned the basics of `async`/`await`, let's try out an -example. diff --git a/src/01_getting_started/05_http_server_example.md b/src/01_getting_started/05_http_server_example.md deleted file mode 100644 index 521012f2..00000000 --- a/src/01_getting_started/05_http_server_example.md +++ /dev/null @@ -1,78 +0,0 @@ -# Applied: Simple HTTP Server - -Let's use `async`/`.await` to build an echo server! - -To start, run `rustup update stable` to make sure you've got stable Rust 1.39 or newer. Once you've done that, run -`cargo new async-await-echo` to create a new project, and open up -the resulting `async-await-echo` folder. - -Let's add some dependencies to the `Cargo.toml` file: - -```toml -{{#include ../../examples/01_05_http_server/Cargo.toml:9:18}} -``` - -Now that we've got our dependencies out of the way, let's start writing some -code. We have some imports to add: - -```rust,ignore -{{#include ../../examples/01_05_http_server/src/lib.rs:imports}} -``` - -Once the imports are out of the way, we can start putting together the -boilerplate to allow us to serve requests: - -```rust,ignore -{{#include ../../examples/01_05_http_server/src/lib.rs:boilerplate}} -``` - -If you `cargo run` now, you should see the message "Listening on -http://127.0.0.1:3000" printed on your terminal. If you open that URL in your -browser of choice, you'll see "hello, world!" appear in your browser. -Congratulations! You just wrote your first asynchronous webserver in Rust. - -You can also inspect the request itself, which contains information such as -the request URI, HTTP version, headers, and other metadata. For example, we -can print out the URI of the request like this: - -```rust,ignore -println!("Got request at {:?}", _req.uri()); -``` - -You may have noticed that we're not yet doing -anything asynchronous when handling the request-- we just respond immediately, -so we're not taking advantage of the flexibility that `async fn` gives us. -Rather than just returning a static message, let's try proxying the user's -request to another website using Hyper's HTTP client. - -We start by parsing out the URL we want to request: - -```rust,ignore -{{#include ../../examples/01_05_http_server/src/lib.rs:parse_url}} -``` - -Then we can create a new `hyper::Client` and use it to make a `GET` request, -returning the response to the user: - -```rust,ignore -{{#include ../../examples/01_05_http_server/src/lib.rs:get_request}} -``` - -`Client::get` returns a `hyper::client::ResponseFuture`, which implements -`Future>>` -(or `Future, Error = Error>` in futures 0.1 terms). -When we `.await` that future, an HTTP request is sent out, the current task -is suspended, and the task is queued to be continued once a response has -become available. - -Now, if you `cargo run` and open `http://127.0.0.1:3000/foo` in your browser, -you'll see the Rust homepage, and the following terminal output: - -``` -Listening on http://127.0.0.1:3000 -Got request at /foo -making request to http://www.rust-lang.org/en-US/ -request finished-- returning response -``` - -Congratulations! You just proxied an HTTP request. diff --git a/src/03_async_await/01_chapter.md b/src/03_async_await/01_chapter.md index 3d4d72f5..c4f6cfe7 100644 --- a/src/03_async_await/01_chapter.md +++ b/src/03_async_await/01_chapter.md @@ -1,7 +1,7 @@ # `async`/`.await` -In [the first chapter], we took a brief look at `async`/`.await` and used -it to build a simple server. This chapter will discuss `async`/`.await` in +In [the first chapter], we took a brief look at `async`/`.await`. +This chapter will discuss `async`/`.await` in greater detail, explaining how it works and how `async` code differs from traditional Rust programs. diff --git a/src/08_example/00_intro.md b/src/08_example/00_intro.md new file mode 100644 index 00000000..f7edba53 --- /dev/null +++ b/src/08_example/00_intro.md @@ -0,0 +1,24 @@ +# Final Project: Building a Concurrent Web Server with Async Rust +In this chapter, we'll use asynchronous Rust to modify the Rust book's +[single-threaded web server](https://doc.rust-lang.org/book/ch20-01-single-threaded.html) +to serve requests concurrently. +## Recap +Here's what the code looked like at the end of the lesson. + +`src/main.rs`: +```rust +{{#include ../../examples/08_01_sync_tcp_server/src/main.rs}} +``` + +`hello.html`: +```html +{{#include ../../examples/08_01_sync_tcp_server/hello.html}} +``` + +`404.html`: +```html +{{#include ../../examples/08_01_sync_tcp_server/404.html}} +``` + +If you run the server with `cargo run` and visit `127.0.0.1:7878` in your browser, +you'll be greeted with a friendly message from Ferris! \ No newline at end of file diff --git a/src/08_example/01_running_async_code.md b/src/08_example/01_running_async_code.md new file mode 100644 index 00000000..ad05ab56 --- /dev/null +++ b/src/08_example/01_running_async_code.md @@ -0,0 +1,80 @@ +# Running Asynchronous Code +An HTTP server should be able to serve multiple clients concurrently; +that is, it should not wait for previous requests to complete before handling the current request. +The book +[solves this problem](https://doc.rust-lang.org/book/ch20-02-multithreaded.html#turning-our-single-threaded-server-into-a-multithreaded-server) +by creating a thread pool where each connection is handled on its own thread. +Here, instead of improving throughput by adding threads, we'll achieve the same effect using asynchronous code. + +Let's modify `handle_connection` to return a future by declaring it an `async fn`: +```rust,ignore +{{#include ../../examples/08_02_async_tcp_server/src/main.rs:handle_connection_async}} +``` + +Adding `async` to the function declaration changes its return type +from the unit type `()` to a type that implements `Future`. + +If we try to compile this, the compiler warns us that it will not work: +```console +$ cargo check + Checking async-rust v0.1.0 (file:///projects/async-rust) +warning: unused implementer of `std::future::Future` that must be used + --> src/main.rs:12:9 + | +12 | handle_connection(stream); + | ^^^^^^^^^^^^^^^^^^^^^^^^^^ + | + = note: `#[warn(unused_must_use)]` on by default + = note: futures do nothing unless you `.await` or poll them +``` + +Because we haven't `await`ed or `poll`ed the result of `handle_connection`, +it'll never run. If you run the server and visit `127.0.0.1:7878` in a browser, +you'll see that the connection is refused; our server is not handling requests. + +We can't `await` or `poll` futures within synchronous code by itself. +We'll need an asynchronous runtime to handle scheduling and running futures to completion. +Please consult the section on choosing a runtime for more information on asynchronous runtimes, executors, and reactors. + +[//]: <> (TODO: Link to section on runtimes once complete.) + +## Adding an Async Runtime +Here, we'll use an executor from the `async-std` crate. +The `#[async_std::main]` attribute from `async-std` allows us to write an asynchronous main function. +To use it, enable the `attributes` feature of `async-std` in `Cargo.toml`: +```toml +[dependencies.async-std] +version = "1.6" +features = ["attributes"] +``` + +As a first step, we'll switch to an asynchronous main function, +and `await` the future returned by the async version of `handle_connection`. +Then, we'll test how the server responds. +Here's what that would look like: +```rust +{{#include ../../examples/08_02_async_tcp_server/src/main.rs:main_func}} +``` +Now, let's test to see if our server can handle connections concurrently. +Simply making `handle_connection` asynchronous doesn't mean that the server +can handle multiple connections at the same time, and we'll soon see why. + +To illustrate this, let's simulate a slow request. +When a client makes a request to `127.0.0.1:7878/sleep`, +our server will sleep for 5 seconds: + +```rust,ignore +{{#include ../../examples/08_03_slow_request/src/main.rs:handle_connection}} +``` +This is very similar to the +[simulation of a slow request](https://doc.rust-lang.org/book/ch20-02-multithreaded.html#simulating-a-slow-request-in-the-current-server-implementation) +from the Book, but with one important difference: +we're using the non-blocking function `async_std::task::sleep` instead of the blocking function `std::thread::sleep`. +It's important to remember that even if a piece of code is run within an `async fn` and `await`ed, it may still block. +To test whether our server handles connections concurrently, we'll need to ensure that `handle_connection` is non-blocking. + +If you run the server, you'll see that a request to `127.0.0.1:7878/sleep` +will block any other incoming requests for 5 seconds! +This is because there are no other concurrent tasks that can make progress +while we are `await`ing the result of `handle_connection`. +In the next section, we'll see how to use async code to handle connections concurrently. diff --git a/src/08_example/02_handling_connections_concurrently.md b/src/08_example/02_handling_connections_concurrently.md new file mode 100644 index 00000000..2fc9aa50 --- /dev/null +++ b/src/08_example/02_handling_connections_concurrently.md @@ -0,0 +1,40 @@ +# Handling Connections Concurrently +The problem with our code so far is that `listener.incoming()` is a blocking iterator. +The executor can't run other futures while `listener` waits on incoming connections, +and we can't handle a new connection until we're done with the previous one. + +In order to fix this, we'll transform `listener.incoming()` from a blocking Iterator +to a non-blocking Stream. Streams are similar to Iterators, but can be consumed asynchronously. +For more information, see the [chapter on Streams](../05_streams/01_chapter.md). + +Let's replace our blocking `std::net::TcpListener` with the non-blocking `async_std::net::TcpListener`, +and update our connection handler to accept an `async_std::net::TcpStream`: +```rust,ignore +{{#include ../../examples/08_04_concurrent_tcp_server/src/main.rs:handle_connection}} +``` + +The asynchronous version of `TcpListener` implements the `Stream` trait for `listener.incoming()`, +a change which provides two benefits. +The first is that `listener.incoming()` no longer blocks the executor. +The executor can now yield to other pending futures +while there are no incoming TCP connections to be processed. + +The second benefit is that elements from the Stream can optionally be processed concurrently, +using a Stream's `for_each_concurrent` method. +Here, we'll take advantage of this method to handle each incoming request concurrently. +We'll need to import the `Stream` trait from the `futures` crate, so our Cargo.toml now looks like this: +```diff ++[dependencies] ++futures = "0.3" + + [dependencies.async-std] + version = "1.6" + features = ["attributes"] +``` + +Now, we can handle each connection concurrently by passing `handle_connection` in through a closure function. +The closure function takes ownership of each `TcpStream`, and is run as soon as a new `TcpStream` becomes available. +As long as `handle_connection` does not block, a slow request will no longer prevent other requests from completing. +```rust,ignore +{{#include ../../examples/08_04_concurrent_tcp_server/src/main.rs:main_func}} +``` \ No newline at end of file diff --git a/src/08_example/03_multithreading.md b/src/08_example/03_multithreading.md new file mode 100644 index 00000000..8097f1a1 --- /dev/null +++ b/src/08_example/03_multithreading.md @@ -0,0 +1,25 @@ +# Serving Requests in Parallel +Our example so far has largely presented concurrency (using async code) +as an alternative to parallelism (using threads). +However, async code and threads are not mutually exclusive. +Async executors can be either single-threaded or multithreaded. +For example, the [`async-executor` crate](https://docs.rs/async-executor) used by `async-std` +has both a single-threaded `LocalExecutor` and a multi-threaded `Executor`. + +Tasks can either be run on the thread that created them or on a separate thread. +Async runtimes often provide functionality for spawning tasks onto separate threads. +Even if tasks are executed on separate threads, they should still be non-blocking. + +Some runtimes provide functions for spawning blocking tasks onto dedicated threads, +which is useful for running synchronous code from other libraries. +Tasks are usually required to be `Send`, so they can be moved to separate threads. +Some runtimes also provide functions for spawning non-`Send` tasks onto a thread-local executor. + +In our example, `for_each_concurrent` processes each connection concurrently on the same thread as the `main` function. +Here, `handle_connection` is both `Send` and non-blocking, +so we could have instead spawned new tasks to run `handle_connection`. +We can use `async_std::task::spawn` for this purpose: +```rust +{{#include ../../examples/08_05_final_tcp_server/src/main.rs:main_func}} +``` +Now we are using both concurrency and parallelism to handle multiple requests at the same time. diff --git a/src/08_example/04_tests.md b/src/08_example/04_tests.md new file mode 100644 index 00000000..e40eb1e5 --- /dev/null +++ b/src/08_example/04_tests.md @@ -0,0 +1,56 @@ +# Testing the TCP Server +Let's move on to testing our `handle_connection` function. + +First, we need a `TcpStream` to work with. +In an end-to-end or integration test, we might want to make a real TCP connection +to test our code. +One strategy for doing this is to start a listener on `localhost` port 0. +Port 0 isn't a valid UNIX port, but it'll work for testing. +The operating system will pick an open TCP port for us. + +Instead, in this example we'll write a unit test for the connection handler, +to check that the correct responses are returned for the respective inputs. +To keep our unit test isolated and deterministic, we'll replace the `TcpStream` with a mock. + +First, we'll change the signature of `handle_connection` to make it easier to test. +`handle_connection` doesn't actually require an `async_std::net::TcpStream`; +it requires any struct that implements `async_std::io::Read`, `async_std::io::Write`, and `marker::Unpin`. +Changing the type signature to reflect this allows us to pass a mock for testing. +```rust,ignore +use std::marker::Unpin; +use async_std::io::{Read, Write}; + +async fn handle_connection(mut stream: impl Read + Write + Unpin) { +``` + +Next, let's build a mock `TcpStream` that implements these traits. +First, let's implement the `Read` trait, with one method, `poll_read`. +Our mock `TcpStream` will contain some data that is copied into the read buffer, +and we'll return `Poll::Ready` to signify that the read is complete. +```rust,ignore +{{#include ../../examples/08_05_final_tcp_server/src/main.rs:mock_read}} +``` + +Our implementation of `Write` is very similar, +although we'll need to write three methods: `poll_write`, `poll_flush`, and `poll_close`. +`poll_write` will copy any input data into the mock `TcpStream`, and return `Poll::Ready` when complete. +No work needs to be done to flush or close the mock `TcpStream`, so `poll_flush` and `poll_close` +can just return `Poll::Ready`. +```rust,ignore +{{#include ../../examples/08_05_final_tcp_server/src/main.rs:mock_write}} +``` + +Lastly, our mock will need to implement `Unpin`, signifying that its location in memory can safely be moved. +For more information on pinning and the `Unpin` trait, see the [section on pinning](../04_pinning/01_chapter.md). +```rust,ignore +{{#include ../../examples/08_05_final_tcp_server/src/main.rs:unpin}} +``` + +Now we're ready to test the `handle_connection` function. +After setting up the `MockTcpStream` containing some initial data, +we can run `handle_connection` using the attribute `#[async_std::test]`, similarly to how we used `#[async_std::main]`. +To ensure that `handle_connection` works as intended, we'll check that the correct data +was written to the `MockTcpStream` based on its initial contents. +```rust,ignore +{{#include ../../examples/08_05_final_tcp_server/src/main.rs:test}} +``` diff --git a/src/SUMMARY.md b/src/SUMMARY.md index 92f22f0f..725b96aa 100644 --- a/src/SUMMARY.md +++ b/src/SUMMARY.md @@ -4,7 +4,6 @@ - [Why Async?](01_getting_started/02_why_async.md) - [The State of Asynchronous Rust](01_getting_started/03_state_of_async_rust.md) - [`async`/`.await` Primer](01_getting_started/04_async_await_primer.md) - - [Applied: HTTP Server](01_getting_started/05_http_server_example.md) - [Under the Hood: Executing `Future`s and Tasks](02_execution/01_chapter.md) - [The `Future` Trait](02_execution/02_future.md) - [Task Wakeups with `Waker`](02_execution/03_wakeups.md) @@ -26,6 +25,11 @@ - [`Send` Approximation](07_workarounds/04_send_approximation.md) - [Recursion](07_workarounds/05_recursion.md) - [`async` in Traits](07_workarounds/06_async_in_traits.md) +- [Final Project: HTTP Server](08_example/00_intro.md) + - [Running Asynchronous Code](08_example/01_running_async_code.md) + - [Handling Connections Concurrently](08_example/02_handling_connections_concurrently.md) + - [Serving Requests in Parallel](08_example/03_multithreading.md) + - [Testing the Server](08_example/04_tests.md) - [TODO: I/O](404.md) - [TODO: `AsyncRead` and `AsyncWrite`](404.md) - [TODO: Asynchronous Design Patterns: Solutions and Suggestions](404.md)