diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 0dab7b3dd..874af81fe 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -83,3 +83,32 @@ jobs: - name: Check minimal versions run: cargo clean; cargo update -Zminimal-versions; cargo check if: matrix.rust == 'nightly' + + msrv: + name: Check MSRV (${{ matrix.rust }}) + needs: [style] + strategy: + matrix: + rust: + - 1.49 # never go past Hyper's own MSRV + + os: + - ubuntu-latest + + runs-on: ${{ matrix.os }} + + steps: + - name: Checkout + uses: actions/checkout@v1 + + - name: Install Rust (${{ matrix.rust }}) + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: ${{ matrix.rust }} + override: true + + - name: Check + uses: actions-rs/cargo@v1 + with: + command: check diff --git a/CHANGELOG.md b/CHANGELOG.md index 14fa00350..7b00632f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,35 @@ +# 0.3.13 (March 31, 2022) + +* Update private internal `tokio-util` dependency. + +# 0.3.12 (March 9, 2022) + +* Avoid time operations that can panic (#599) +* Bump MSRV to Rust 1.49 (#606) +* Fix header decoding error when a header name is contained at a continuation + header boundary (#589) +* Remove I/O type names from handshake `tracing` spans (#608) + +# 0.3.11 (January 26, 2022) + +* Make `SendStream::poll_capacity` never return `Ok(Some(0))` (#596) +* Fix panic when receiving already reset push promise (#597) + +# 0.3.10 (January 6, 2022) + +* Add `Error::is_go_away()` and `Error::is_remote()` methods. +* Fix panic if receiving malformed PUSH_PROMISE with stream ID of 0. + +# 0.3.9 (December 9, 2021) + +* Fix hang related to new `max_send_buffer_size`. + +# 0.3.8 (December 8, 2021) + +* Add "extended CONNECT support". Adds `h2::ext::Protocol`, which is used for request and response extensions to connect new protocols over an HTTP/2 stream. +* Add `max_send_buffer_size` options to client and server builders, and a default of ~400MB. This acts like a high-water mark for the `poll_capacity()` method. +* Fix panic if receiving malformed HEADERS with stream ID of 0. + # 0.3.7 (October 22, 2021) * Fix panic if server sends a malformed frame on a stream client was about to open. diff --git a/Cargo.toml b/Cargo.toml index 7bbf1647e..bc1388051 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,13 +5,13 @@ name = "h2" # - html_root_url. # - Update CHANGELOG.md. # - Create git tag -version = "0.3.7" +version = "0.3.13" license = "MIT" authors = [ "Carl Lerche ", "Sean McArthur ", ] -description = "An HTTP/2.0 client and server" +description = "An HTTP/2 client and server" documentation = "https://docs.rs/h2" repository = "https://github.com/hyperium/h2" readme = "README.md" @@ -43,7 +43,7 @@ members = [ futures-core = { version = "0.3", default-features = false } futures-sink = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false } -tokio-util = { version = "0.6", features = ["codec"] } +tokio-util = { version = "0.7.1", features = ["codec"] } tokio = { version = "1", features = ["io-util"] } bytes = "1" http = "0.2" @@ -55,22 +55,20 @@ indexmap = { version = "1.5.2", features = ["std"] } [dev-dependencies] # Fuzzing -quickcheck = { version = "0.4.1", default-features = false } -rand = "0.3.15" +quickcheck = { version = "1.0.3", default-features = false } +rand = "0.8.4" # HPACK fixtures -hex = "0.2.0" -walkdir = "1.0.0" +hex = "0.4.3" +walkdir = "2.3.2" serde = "1.0.0" serde_json = "1.0.0" # Examples tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "net"] } -env_logger = { version = "0.5.3", default-features = false } -rustls = "0.19" -tokio-rustls = "0.22" -webpki = "0.21" -webpki-roots = "0.21" +env_logger = { version = "0.9", default-features = false } +tokio-rustls = "0.23.2" +webpki-roots = "0.22.2" [package.metadata.docs.rs] features = ["stream"] diff --git a/README.md b/README.md index 63627b706..2e1599914 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # H2 -A Tokio aware, HTTP/2.0 client & server implementation for Rust. +A Tokio aware, HTTP/2 client & server implementation for Rust. [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) [![Crates.io](https://img.shields.io/crates/v/h2.svg)](https://crates.io/crates/h2) @@ -12,21 +12,21 @@ More information about this crate can be found in the [crate documentation][dox] ## Features -* Client and server HTTP/2.0 implementation. -* Implements the full HTTP/2.0 specification. +* Client and server HTTP/2 implementation. +* Implements the full HTTP/2 specification. * Passes [h2spec](https://github.com/summerwind/h2spec). * Focus on performance and correctness. * Built on [Tokio](https://tokio.rs). ## Non goals -This crate is intended to only be an implementation of the HTTP/2.0 +This crate is intended to only be an implementation of the HTTP/2 specification. It does not handle: * Managing TCP connections * HTTP 1.0 upgrade * TLS -* Any feature not described by the HTTP/2.0 specification. +* Any feature not described by the HTTP/2 specification. This crate is now used by [hyper](https://github.com/hyperium/hyper), which will provide all of these features. @@ -55,7 +55,7 @@ fn main() { **How does h2 compare to [solicit] or [rust-http2]?** -The h2 library has implemented more of the details of the HTTP/2.0 specification +The h2 library has implemented more of the details of the HTTP/2 specification than any other Rust library. It also passes the [h2spec] set of tests. The h2 library is rapidly approaching "production ready" quality. diff --git a/examples/akamai.rs b/examples/akamai.rs index 29d8a9347..e522b37ff 100644 --- a/examples/akamai.rs +++ b/examples/akamai.rs @@ -3,9 +3,9 @@ use http::{Method, Request}; use tokio::net::TcpStream; use tokio_rustls::TlsConnector; -use rustls::Session; -use webpki::DNSNameRef; +use tokio_rustls::rustls::{OwnedTrustAnchor, RootCertStore, ServerName}; +use std::convert::TryFrom; use std::error::Error; use std::net::ToSocketAddrs; @@ -16,9 +16,19 @@ pub async fn main() -> Result<(), Box> { let _ = env_logger::try_init(); let tls_client_config = std::sync::Arc::new({ - let mut c = rustls::ClientConfig::new(); - c.root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + let mut root_store = RootCertStore::empty(); + root_store.add_server_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| { + OwnedTrustAnchor::from_subject_spki_name_constraints( + ta.subject, + ta.spki, + ta.name_constraints, + ) + })); + + let mut c = tokio_rustls::rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(root_store) + .with_no_client_auth(); c.alpn_protocols.push(ALPN_H2.as_bytes().to_owned()); c }); @@ -33,13 +43,13 @@ pub async fn main() -> Result<(), Box> { println!("ADDR: {:?}", addr); let tcp = TcpStream::connect(&addr).await?; - let dns_name = DNSNameRef::try_from_ascii_str("http2.akamai.com").unwrap(); + let dns_name = ServerName::try_from("http2.akamai.com").unwrap(); let connector = TlsConnector::from(tls_client_config); let res = connector.connect(dns_name, tcp).await; let tls = res.unwrap(); { let (_, session) = tls.get_ref(); - let negotiated_protocol = session.get_alpn_protocol(); + let negotiated_protocol = session.alpn_protocol(); assert_eq!( Some(ALPN_H2.as_bytes()), negotiated_protocol.as_ref().map(|x| &**x) diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml index ca32138e2..aafb60ae7 100644 --- a/fuzz/Cargo.toml +++ b/fuzz/Cargo.toml @@ -13,12 +13,10 @@ cargo-fuzz = true arbitrary = { version = "1", features = ["derive"] } libfuzzer-sys = { version = "0.4.0", features = ["arbitrary-derive"] } tokio = { version = "1", features = [ "full" ] } -bytes = "0.5.2" h2 = { path = "../", features = [ "unstable" ] } h2-support = { path = "../tests/h2-support" } futures = { version = "0.3", default-features = false, features = ["std"] } http = "0.2" -env_logger = { version = "0.5.3", default-features = false } # Prevent this from interfering with workspaces [workspace] diff --git a/src/client.rs b/src/client.rs index 9cd0b8f46..e75cd3507 100644 --- a/src/client.rs +++ b/src/client.rs @@ -136,6 +136,7 @@ //! [`Error`]: ../struct.Error.html use crate::codec::{Codec, SendError, UserError}; +use crate::ext::Protocol; use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId}; use crate::proto::{self, Error}; use crate::{FlowControl, PingPong, RecvStream, SendStream}; @@ -319,6 +320,9 @@ pub struct Builder { /// Initial target window size for new connections. initial_target_connection_window_size: Option, + /// Maximum amount of bytes to "buffer" for writing per stream. + max_send_buffer_size: usize, + /// Maximum number of locally reset streams to keep at a time. reset_stream_max: usize, @@ -517,6 +521,19 @@ where (response, stream) }) } + + /// Returns whether the [extended CONNECT protocol][1] is enabled or not. + /// + /// This setting is configured by the server peer by sending the + /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame. + /// This method returns the currently acknowledged value recieved from the + /// remote. + /// + /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 + /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3 + pub fn is_extended_connect_protocol_enabled(&self) -> bool { + self.inner.is_extended_connect_protocol_enabled() + } } impl fmt::Debug for SendRequest @@ -614,6 +631,7 @@ impl Builder { /// ``` pub fn new() -> Builder { Builder { + max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, initial_target_connection_window_size: None, @@ -948,6 +966,24 @@ impl Builder { self } + /// Sets the maximum send buffer size per stream. + /// + /// Once a stream has buffered up to (or over) the maximum, the stream's + /// flow control will not "poll" additional capacity. Once bytes for the + /// stream have been written to the connection, the send buffer capacity + /// will be freed up again. + /// + /// The default is currently ~400MB, but may change. + /// + /// # Panics + /// + /// This function panics if `max` is larger than `u32::MAX`. + pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { + assert!(max <= std::u32::MAX as usize); + self.max_send_buffer_size = max; + self + } + /// Enables or disables server push promises. /// /// This value is included in the initial SETTINGS handshake. When set, the @@ -1118,7 +1154,7 @@ where let builder = Builder::new(); builder .handshake(io) - .instrument(tracing::trace_span!("client_handshake", io = %std::any::type_name::())) + .instrument(tracing::trace_span!("client_handshake")) .await } @@ -1170,6 +1206,7 @@ where proto::Config { next_stream_id: builder.stream_id, initial_max_send_streams: builder.initial_max_send_streams, + max_send_buffer_size: builder.max_send_buffer_size, reset_stream_duration: builder.reset_stream_duration, reset_stream_max: builder.reset_stream_max, settings: builder.settings.clone(), @@ -1246,11 +1283,10 @@ where /// This method returns the currently acknowledged value recieved from the /// remote. /// - /// [settings]: https://tools.ietf.org/html/rfc7540#section-5.1.2 + /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2 pub fn max_concurrent_send_streams(&self) -> usize { self.inner.max_send_streams() } - /// Returns the maximum number of concurrent streams that may be initiated /// by the server on this connection. /// @@ -1416,6 +1452,7 @@ impl Peer { pub fn convert_send_message( id: StreamId, request: Request<()>, + protocol: Option, end_of_stream: bool, ) -> Result { use http::request::Parts; @@ -1435,7 +1472,7 @@ impl Peer { // Build the set pseudo header set. All requests will include `method` // and `path`. - let mut pseudo = Pseudo::request(method, uri); + let mut pseudo = Pseudo::request(method, uri, protocol); if pseudo.scheme.is_none() { // If the scheme is not set, then there are a two options. diff --git a/src/error.rs b/src/error.rs index fdbfc0d1b..6c8f6ed80 100644 --- a/src/error.rs +++ b/src/error.rs @@ -57,7 +57,7 @@ impl Error { } } - /// Returns the true if the error is an io::Error + /// Returns true if the error is an io::Error pub fn is_io(&self) -> bool { match self.kind { Kind::Io(_) => true, @@ -86,6 +86,21 @@ impl Error { kind: Kind::Io(err), } } + + /// Returns true if the error is from a `GOAWAY`. + pub fn is_go_away(&self) -> bool { + matches!(self.kind, Kind::GoAway(..)) + } + + /// Returns true if the error was received in a frame from the remote. + /// + /// Such as from a received `RST_STREAM` or `GOAWAY` frame. + pub fn is_remote(&self) -> bool { + matches!( + self.kind, + Kind::GoAway(_, _, Initiator::Remote) | Kind::Reset(_, _, Initiator::Remote) + ) + } } impl From for Error { diff --git a/src/ext.rs b/src/ext.rs new file mode 100644 index 000000000..cf383a495 --- /dev/null +++ b/src/ext.rs @@ -0,0 +1,55 @@ +//! Extensions specific to the HTTP/2 protocol. + +use crate::hpack::BytesStr; + +use bytes::Bytes; +use std::fmt; + +/// Represents the `:protocol` pseudo-header used by +/// the [Extended CONNECT Protocol]. +/// +/// [Extended CONNECT Protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 +#[derive(Clone, Eq, PartialEq)] +pub struct Protocol { + value: BytesStr, +} + +impl Protocol { + /// Converts a static string to a protocol name. + pub const fn from_static(value: &'static str) -> Self { + Self { + value: BytesStr::from_static(value), + } + } + + /// Returns a str representation of the header. + pub fn as_str(&self) -> &str { + self.value.as_str() + } + + pub(crate) fn try_from(bytes: Bytes) -> Result { + Ok(Self { + value: BytesStr::try_from(bytes)?, + }) + } +} + +impl<'a> From<&'a str> for Protocol { + fn from(value: &'a str) -> Self { + Self { + value: BytesStr::from(value), + } + } +} + +impl AsRef<[u8]> for Protocol { + fn as_ref(&self) -> &[u8] { + self.value.as_ref() + } +} + +impl fmt::Debug for Protocol { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.value.fmt(f) + } +} diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 0851d7660..bcb905013 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -1,4 +1,5 @@ use super::{util, StreamDependency, StreamId}; +use crate::ext::Protocol; use crate::frame::{Error, Frame, Head, Kind}; use crate::hpack::{self, BytesStr}; @@ -66,6 +67,7 @@ pub struct Pseudo { pub scheme: Option, pub authority: Option, pub path: Option, + pub protocol: Option, // Response pub status: Option, @@ -146,6 +148,10 @@ impl Headers { tracing::trace!("loading headers; flags={:?}", flags); + if head.stream_id().is_zero() { + return Err(Error::InvalidStreamId); + } + // Read the padding length if flags.is_padded() { if src.is_empty() { @@ -288,6 +294,10 @@ impl fmt::Debug for Headers { .field("stream_id", &self.stream_id) .field("flags", &self.flags); + if let Some(ref protocol) = self.header_block.pseudo.protocol { + builder.field("protocol", protocol); + } + if let Some(ref dep) = self.stream_dep { builder.field("stream_dep", dep); } @@ -390,6 +400,10 @@ impl PushPromise { let flags = PushPromiseFlag(head.flag()); let mut pad = 0; + if head.stream_id().is_zero() { + return Err(Error::InvalidStreamId); + } + // Read the padding length if flags.is_padded() { if src.is_empty() { @@ -525,7 +539,7 @@ impl Continuation { // ===== impl Pseudo ===== impl Pseudo { - pub fn request(method: Method, uri: Uri) -> Self { + pub fn request(method: Method, uri: Uri, protocol: Option) -> Self { let parts = uri::Parts::from(uri); let mut path = parts @@ -546,6 +560,7 @@ impl Pseudo { scheme: None, authority: None, path: Some(path).filter(|p| !p.is_empty()), + protocol, status: None, }; @@ -571,6 +586,7 @@ impl Pseudo { scheme: None, authority: None, path: None, + protocol: None, status: Some(status), } } @@ -589,6 +605,11 @@ impl Pseudo { self.scheme = Some(bytes_str); } + #[cfg(feature = "unstable")] + pub fn set_protocol(&mut self, protocol: Protocol) { + self.protocol = Some(protocol); + } + pub fn set_authority(&mut self, authority: BytesStr) { self.authority = Some(authority); } @@ -677,6 +698,10 @@ impl Iterator for Iter { return Some(Path(path)); } + if let Some(protocol) = pseudo.protocol.take() { + return Some(Protocol(protocol)); + } + if let Some(status) = pseudo.status.take() { return Some(Status(status)); } @@ -875,6 +900,7 @@ impl HeaderBlock { Method(v) => set_pseudo!(method, v), Scheme(v) => set_pseudo!(scheme, v), Path(v) => set_pseudo!(path, v), + Protocol(v) => set_pseudo!(protocol, v), Status(v) => set_pseudo!(status, v), } }); diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 5a682b634..8b5351b51 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -11,7 +11,7 @@ use std::fmt; /// /// # Examples /// -/// ```rust +/// ```rust,ignore /// let buf: [u8; 4] = [0, 0, 0, 1]; /// assert_eq!(1u32, unpack_octets_4!(buf, 0, u32)); /// ``` diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 523f20b06..080d0f4e5 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -13,6 +13,7 @@ pub struct Settings { initial_window_size: Option, max_frame_size: Option, max_header_list_size: Option, + enable_connect_protocol: Option, } /// An enum that lists all valid settings that can be sent in a SETTINGS @@ -27,6 +28,7 @@ pub enum Setting { InitialWindowSize(u32), MaxFrameSize(u32), MaxHeaderListSize(u32), + EnableConnectProtocol(u32), } #[derive(Copy, Clone, Eq, PartialEq, Default)] @@ -107,6 +109,14 @@ impl Settings { self.enable_push = Some(enable as u32); } + pub fn is_extended_connect_protocol_enabled(&self) -> Option { + self.enable_connect_protocol.map(|val| val != 0) + } + + pub fn set_enable_connect_protocol(&mut self, val: Option) { + self.enable_connect_protocol = val; + } + pub fn header_table_size(&self) -> Option { self.header_table_size } @@ -181,6 +191,14 @@ impl Settings { Some(MaxHeaderListSize(val)) => { settings.max_header_list_size = Some(val); } + Some(EnableConnectProtocol(val)) => match val { + 0 | 1 => { + settings.enable_connect_protocol = Some(val); + } + _ => { + return Err(Error::InvalidSettingValue); + } + }, None => {} } } @@ -236,6 +254,10 @@ impl Settings { if let Some(v) = self.max_header_list_size { f(MaxHeaderListSize(v)); } + + if let Some(v) = self.enable_connect_protocol { + f(EnableConnectProtocol(v)); + } } } @@ -269,6 +291,9 @@ impl fmt::Debug for Settings { Setting::MaxHeaderListSize(v) => { builder.field("max_header_list_size", &v); } + Setting::EnableConnectProtocol(v) => { + builder.field("enable_connect_protocol", &v); + } }); builder.finish() @@ -291,6 +316,7 @@ impl Setting { 4 => Some(InitialWindowSize(val)), 5 => Some(MaxFrameSize(val)), 6 => Some(MaxHeaderListSize(val)), + 8 => Some(EnableConnectProtocol(val)), _ => None, } } @@ -322,6 +348,7 @@ impl Setting { InitialWindowSize(v) => (4, v), MaxFrameSize(v) => (5, v), MaxHeaderListSize(v) => (6, v), + EnableConnectProtocol(v) => (8, v), }; dst.put_u16(kind); diff --git a/src/fuzz_bridge.rs b/src/fuzz_bridge.rs index 6132deeb4..3ea8b591c 100644 --- a/src/fuzz_bridge.rs +++ b/src/fuzz_bridge.rs @@ -1,7 +1,7 @@ #[cfg(fuzzing)] pub mod fuzz_logic { use crate::hpack; - use bytes::{BufMut, BytesMut}; + use bytes::BytesMut; use http::header::HeaderName; use std::io::Cursor; diff --git a/src/hpack/decoder.rs b/src/hpack/decoder.rs index e4b34d1fc..988b48db1 100644 --- a/src/hpack/decoder.rs +++ b/src/hpack/decoder.rs @@ -142,6 +142,12 @@ struct Table { max_size: usize, } +struct StringMarker { + offset: usize, + len: usize, + string: Option, +} + // ===== impl Decoder ===== impl Decoder { @@ -279,10 +285,13 @@ impl Decoder { // First, read the header name if table_idx == 0 { + let old_pos = buf.position(); + let name_marker = self.try_decode_string(buf)?; + let value_marker = self.try_decode_string(buf)?; + buf.set_position(old_pos); // Read the name as a literal - let name = self.decode_string(buf)?; - let value = self.decode_string(buf)?; - + let name = name_marker.consume(buf); + let value = value_marker.consume(buf); Header::new(name, value) } else { let e = self.table.get(table_idx)?; @@ -292,7 +301,11 @@ impl Decoder { } } - fn decode_string(&mut self, buf: &mut Cursor<&mut BytesMut>) -> Result { + fn try_decode_string( + &mut self, + buf: &mut Cursor<&mut BytesMut>, + ) -> Result { + let old_pos = buf.position(); const HUFF_FLAG: u8 = 0b1000_0000; // The first bit in the first byte contains the huffman encoded flag. @@ -309,17 +322,34 @@ impl Decoder { return Err(DecoderError::NeedMore(NeedMore::StringUnderflow)); } + let offset = (buf.position() - old_pos) as usize; if huff { let ret = { let raw = &buf.chunk()[..len]; - huffman::decode(raw, &mut self.buffer).map(BytesMut::freeze) + huffman::decode(raw, &mut self.buffer).map(|buf| StringMarker { + offset, + len, + string: Some(BytesMut::freeze(buf)), + }) }; buf.advance(len); - return ret; + ret + } else { + buf.advance(len); + Ok(StringMarker { + offset, + len, + string: None, + }) } + } - Ok(take(buf, len)) + fn decode_string(&mut self, buf: &mut Cursor<&mut BytesMut>) -> Result { + let old_pos = buf.position(); + let marker = self.try_decode_string(buf)?; + buf.set_position(old_pos); + Ok(marker.consume(buf)) } } @@ -433,6 +463,19 @@ fn take(buf: &mut Cursor<&mut BytesMut>, n: usize) -> Bytes { head.freeze() } +impl StringMarker { + fn consume(self, buf: &mut Cursor<&mut BytesMut>) -> Bytes { + buf.advance(self.offset); + match self.string { + Some(string) => { + buf.advance(self.len); + string + } + None => take(buf, self.len), + } + } +} + fn consume(buf: &mut Cursor<&mut BytesMut>) { // remove bytes from the internal BytesMut when they have been successfully // decoded. This is a more permanent cursor position, which will be @@ -850,4 +893,48 @@ mod test { huffman::encode(src, &mut buf); buf } + + #[test] + fn test_decode_continuation_header_with_non_huff_encoded_name() { + let mut de = Decoder::new(0); + let value = huff_encode(b"bar"); + let mut buf = BytesMut::new(); + // header name is non_huff encoded + buf.extend(&[0b01000000, 0x00 | 3]); + buf.extend(b"foo"); + // header value is partial + buf.extend(&[0x80 | 3]); + buf.extend(&value[0..1]); + + let mut res = vec![]; + let e = de + .decode(&mut Cursor::new(&mut buf), |h| { + res.push(h); + }) + .unwrap_err(); + // decode error because the header value is partial + assert_eq!(e, DecoderError::NeedMore(NeedMore::StringUnderflow)); + + // extend buf with the remaining header value + buf.extend(&value[1..]); + let _ = de + .decode(&mut Cursor::new(&mut buf), |h| { + res.push(h); + }) + .unwrap(); + + assert_eq!(res.len(), 1); + assert_eq!(de.table.size(), 0); + + match res[0] { + Header::Field { + ref name, + ref value, + } => { + assert_eq!(name, "foo"); + assert_eq!(value, "bar"); + } + _ => panic!(), + } + } } diff --git a/src/hpack/header.rs b/src/hpack/header.rs index 8d6136e16..e6df555ab 100644 --- a/src/hpack/header.rs +++ b/src/hpack/header.rs @@ -1,4 +1,5 @@ use super::{DecoderError, NeedMore}; +use crate::ext::Protocol; use bytes::Bytes; use http::header::{HeaderName, HeaderValue}; @@ -14,6 +15,7 @@ pub enum Header { Method(Method), Scheme(BytesStr), Path(BytesStr), + Protocol(Protocol), Status(StatusCode), } @@ -25,6 +27,7 @@ pub enum Name<'a> { Method, Scheme, Path, + Protocol, Status, } @@ -51,6 +54,7 @@ impl Header> { Method(v) => Method(v), Scheme(v) => Scheme(v), Path(v) => Path(v), + Protocol(v) => Protocol(v), Status(v) => Status(v), }) } @@ -79,6 +83,10 @@ impl Header { let value = BytesStr::try_from(value)?; Ok(Header::Path(value)) } + b"protocol" => { + let value = Protocol::try_from(value)?; + Ok(Header::Protocol(value)) + } b"status" => { let status = StatusCode::from_bytes(&value)?; Ok(Header::Status(status)) @@ -104,6 +112,7 @@ impl Header { Header::Method(ref v) => 32 + 7 + v.as_ref().len(), Header::Scheme(ref v) => 32 + 7 + v.len(), Header::Path(ref v) => 32 + 5 + v.len(), + Header::Protocol(ref v) => 32 + 9 + v.as_str().len(), Header::Status(_) => 32 + 7 + 3, } } @@ -116,6 +125,7 @@ impl Header { Header::Method(..) => Name::Method, Header::Scheme(..) => Name::Scheme, Header::Path(..) => Name::Path, + Header::Protocol(..) => Name::Protocol, Header::Status(..) => Name::Status, } } @@ -127,6 +137,7 @@ impl Header { Header::Method(ref v) => v.as_ref().as_ref(), Header::Scheme(ref v) => v.as_ref(), Header::Path(ref v) => v.as_ref(), + Header::Protocol(ref v) => v.as_ref(), Header::Status(ref v) => v.as_str().as_ref(), } } @@ -156,6 +167,10 @@ impl Header { Header::Path(ref b) => a == b, _ => false, }, + Header::Protocol(ref a) => match *other { + Header::Protocol(ref b) => a == b, + _ => false, + }, Header::Status(ref a) => match *other { Header::Status(ref b) => a == b, _ => false, @@ -205,6 +220,7 @@ impl From
for Header> { Header::Method(v) => Header::Method(v), Header::Scheme(v) => Header::Scheme(v), Header::Path(v) => Header::Path(v), + Header::Protocol(v) => Header::Protocol(v), Header::Status(v) => Header::Status(v), } } @@ -221,6 +237,7 @@ impl<'a> Name<'a> { Name::Method => Ok(Header::Method(Method::from_bytes(&*value)?)), Name::Scheme => Ok(Header::Scheme(BytesStr::try_from(value)?)), Name::Path => Ok(Header::Path(BytesStr::try_from(value)?)), + Name::Protocol => Ok(Header::Protocol(Protocol::try_from(value)?)), Name::Status => { match StatusCode::from_bytes(&value) { Ok(status) => Ok(Header::Status(status)), @@ -238,6 +255,7 @@ impl<'a> Name<'a> { Name::Method => b":method", Name::Scheme => b":scheme", Name::Path => b":path", + Name::Protocol => b":protocol", Name::Status => b":status", } } diff --git a/src/hpack/table.rs b/src/hpack/table.rs index 2328743a8..0124f216d 100644 --- a/src/hpack/table.rs +++ b/src/hpack/table.rs @@ -751,6 +751,7 @@ fn index_static(header: &Header) -> Option<(usize, bool)> { "/index.html" => Some((5, true)), _ => Some((4, false)), }, + Header::Protocol(..) => None, Header::Status(ref v) => match u16::from(*v) { 200 => Some((8, true)), 204 => Some((9, true)), diff --git a/src/hpack/test/fixture.rs b/src/hpack/test/fixture.rs index 6d0448425..3428c3958 100644 --- a/src/hpack/test/fixture.rs +++ b/src/hpack/test/fixture.rs @@ -134,6 +134,7 @@ fn key_str(e: &Header) -> &str { Header::Method(..) => ":method", Header::Scheme(..) => ":scheme", Header::Path(..) => ":path", + Header::Protocol(..) => ":protocol", Header::Status(..) => ":status", } } @@ -145,6 +146,7 @@ fn value_str(e: &Header) -> &str { Header::Method(ref m) => m.as_str(), Header::Scheme(ref v) => &**v, Header::Path(ref v) => &**v, + Header::Protocol(ref v) => v.as_str(), Header::Status(ref v) => v.as_str(), } } diff --git a/src/hpack/test/fuzz.rs b/src/hpack/test/fuzz.rs index 1d05a97c5..ad0d47b6b 100644 --- a/src/hpack/test/fuzz.rs +++ b/src/hpack/test/fuzz.rs @@ -4,7 +4,9 @@ use http::header::{HeaderName, HeaderValue}; use bytes::BytesMut; use quickcheck::{Arbitrary, Gen, QuickCheck, TestResult}; -use rand::{Rng, SeedableRng, StdRng}; +use rand::distributions::Slice; +use rand::rngs::StdRng; +use rand::{thread_rng, Rng, SeedableRng}; use std::io::Cursor; @@ -46,9 +48,9 @@ struct HeaderFrame { } impl FuzzHpack { - fn new(seed: [usize; 4]) -> FuzzHpack { + fn new(seed: [u8; 32]) -> FuzzHpack { // Seed the RNG - let mut rng = StdRng::from_seed(&seed); + let mut rng = StdRng::from_seed(seed); // Generates a bunch of source headers let mut source: Vec>> = vec![]; @@ -58,12 +60,12 @@ impl FuzzHpack { } // Actual test run headers - let num: usize = rng.gen_range(40, 500); + let num: usize = rng.gen_range(40..500); let mut frames: Vec = vec![]; let mut added = 0; - let skew: i32 = rng.gen_range(1, 5); + let skew: i32 = rng.gen_range(1..5); // Rough number of headers to add while added < num { @@ -72,24 +74,24 @@ impl FuzzHpack { headers: vec![], }; - match rng.gen_range(0, 20) { + match rng.gen_range(0..20) { 0 => { // Two resizes - let high = rng.gen_range(128, MAX_CHUNK * 2); - let low = rng.gen_range(0, high); + let high = rng.gen_range(128..MAX_CHUNK * 2); + let low = rng.gen_range(0..high); frame.resizes.extend(&[low, high]); } 1..=3 => { - frame.resizes.push(rng.gen_range(128, MAX_CHUNK * 2)); + frame.resizes.push(rng.gen_range(128..MAX_CHUNK * 2)); } _ => {} } let mut is_name_required = true; - for _ in 0..rng.gen_range(1, (num - added) + 1) { - let x: f64 = rng.gen_range(0.0, 1.0); + for _ in 0..rng.gen_range(1..(num - added) + 1) { + let x: f64 = rng.gen_range(0.0..1.0); let x = x.powi(skew); let i = (x * source.len() as f64) as usize; @@ -177,31 +179,31 @@ impl FuzzHpack { } impl Arbitrary for FuzzHpack { - fn arbitrary(g: &mut G) -> Self { - FuzzHpack::new(quickcheck::Rng::gen(g)) + fn arbitrary(_: &mut Gen) -> Self { + FuzzHpack::new(thread_rng().gen()) } } fn gen_header(g: &mut StdRng) -> Header> { use http::{Method, StatusCode}; - if g.gen_weighted_bool(10) { - match g.next_u32() % 5 { + if g.gen_ratio(1, 10) { + match g.gen_range(0u32..5) { 0 => { let value = gen_string(g, 4, 20); Header::Authority(to_shared(value)) } 1 => { - let method = match g.next_u32() % 6 { + let method = match g.gen_range(0u32..6) { 0 => Method::GET, 1 => Method::POST, 2 => Method::PUT, 3 => Method::PATCH, 4 => Method::DELETE, 5 => { - let n: usize = g.gen_range(3, 7); + let n: usize = g.gen_range(3..7); let bytes: Vec = (0..n) - .map(|_| g.choose(b"ABCDEFGHIJKLMNOPQRSTUVWXYZ").unwrap().clone()) + .map(|_| *g.sample(Slice::new(b"ABCDEFGHIJKLMNOPQRSTUVWXYZ").unwrap())) .collect(); Method::from_bytes(&bytes).unwrap() @@ -212,7 +214,7 @@ fn gen_header(g: &mut StdRng) -> Header> { Header::Method(method) } 2 => { - let value = match g.next_u32() % 2 { + let value = match g.gen_range(0u32..2) { 0 => "http", 1 => "https", _ => unreachable!(), @@ -221,7 +223,7 @@ fn gen_header(g: &mut StdRng) -> Header> { Header::Scheme(to_shared(value.to_string())) } 3 => { - let value = match g.next_u32() % 100 { + let value = match g.gen_range(0u32..100) { 0 => "/".to_string(), 1 => "/index.html".to_string(), _ => gen_string(g, 2, 20), @@ -237,14 +239,14 @@ fn gen_header(g: &mut StdRng) -> Header> { _ => unreachable!(), } } else { - let name = if g.gen_weighted_bool(10) { + let name = if g.gen_ratio(1, 10) { None } else { Some(gen_header_name(g)) }; let mut value = gen_header_value(g); - if g.gen_weighted_bool(30) { + if g.gen_ratio(1, 30) { value.set_sensitive(true); } @@ -255,84 +257,86 @@ fn gen_header(g: &mut StdRng) -> Header> { fn gen_header_name(g: &mut StdRng) -> HeaderName { use http::header; - if g.gen_weighted_bool(2) { - g.choose(&[ - header::ACCEPT, - header::ACCEPT_CHARSET, - header::ACCEPT_ENCODING, - header::ACCEPT_LANGUAGE, - header::ACCEPT_RANGES, - header::ACCESS_CONTROL_ALLOW_CREDENTIALS, - header::ACCESS_CONTROL_ALLOW_HEADERS, - header::ACCESS_CONTROL_ALLOW_METHODS, - header::ACCESS_CONTROL_ALLOW_ORIGIN, - header::ACCESS_CONTROL_EXPOSE_HEADERS, - header::ACCESS_CONTROL_MAX_AGE, - header::ACCESS_CONTROL_REQUEST_HEADERS, - header::ACCESS_CONTROL_REQUEST_METHOD, - header::AGE, - header::ALLOW, - header::ALT_SVC, - header::AUTHORIZATION, - header::CACHE_CONTROL, - header::CONNECTION, - header::CONTENT_DISPOSITION, - header::CONTENT_ENCODING, - header::CONTENT_LANGUAGE, - header::CONTENT_LENGTH, - header::CONTENT_LOCATION, - header::CONTENT_RANGE, - header::CONTENT_SECURITY_POLICY, - header::CONTENT_SECURITY_POLICY_REPORT_ONLY, - header::CONTENT_TYPE, - header::COOKIE, - header::DNT, - header::DATE, - header::ETAG, - header::EXPECT, - header::EXPIRES, - header::FORWARDED, - header::FROM, - header::HOST, - header::IF_MATCH, - header::IF_MODIFIED_SINCE, - header::IF_NONE_MATCH, - header::IF_RANGE, - header::IF_UNMODIFIED_SINCE, - header::LAST_MODIFIED, - header::LINK, - header::LOCATION, - header::MAX_FORWARDS, - header::ORIGIN, - header::PRAGMA, - header::PROXY_AUTHENTICATE, - header::PROXY_AUTHORIZATION, - header::PUBLIC_KEY_PINS, - header::PUBLIC_KEY_PINS_REPORT_ONLY, - header::RANGE, - header::REFERER, - header::REFERRER_POLICY, - header::REFRESH, - header::RETRY_AFTER, - header::SERVER, - header::SET_COOKIE, - header::STRICT_TRANSPORT_SECURITY, - header::TE, - header::TRAILER, - header::TRANSFER_ENCODING, - header::USER_AGENT, - header::UPGRADE, - header::UPGRADE_INSECURE_REQUESTS, - header::VARY, - header::VIA, - header::WARNING, - header::WWW_AUTHENTICATE, - header::X_CONTENT_TYPE_OPTIONS, - header::X_DNS_PREFETCH_CONTROL, - header::X_FRAME_OPTIONS, - header::X_XSS_PROTECTION, - ]) - .unwrap() + if g.gen_ratio(1, 2) { + g.sample( + Slice::new(&[ + header::ACCEPT, + header::ACCEPT_CHARSET, + header::ACCEPT_ENCODING, + header::ACCEPT_LANGUAGE, + header::ACCEPT_RANGES, + header::ACCESS_CONTROL_ALLOW_CREDENTIALS, + header::ACCESS_CONTROL_ALLOW_HEADERS, + header::ACCESS_CONTROL_ALLOW_METHODS, + header::ACCESS_CONTROL_ALLOW_ORIGIN, + header::ACCESS_CONTROL_EXPOSE_HEADERS, + header::ACCESS_CONTROL_MAX_AGE, + header::ACCESS_CONTROL_REQUEST_HEADERS, + header::ACCESS_CONTROL_REQUEST_METHOD, + header::AGE, + header::ALLOW, + header::ALT_SVC, + header::AUTHORIZATION, + header::CACHE_CONTROL, + header::CONNECTION, + header::CONTENT_DISPOSITION, + header::CONTENT_ENCODING, + header::CONTENT_LANGUAGE, + header::CONTENT_LENGTH, + header::CONTENT_LOCATION, + header::CONTENT_RANGE, + header::CONTENT_SECURITY_POLICY, + header::CONTENT_SECURITY_POLICY_REPORT_ONLY, + header::CONTENT_TYPE, + header::COOKIE, + header::DNT, + header::DATE, + header::ETAG, + header::EXPECT, + header::EXPIRES, + header::FORWARDED, + header::FROM, + header::HOST, + header::IF_MATCH, + header::IF_MODIFIED_SINCE, + header::IF_NONE_MATCH, + header::IF_RANGE, + header::IF_UNMODIFIED_SINCE, + header::LAST_MODIFIED, + header::LINK, + header::LOCATION, + header::MAX_FORWARDS, + header::ORIGIN, + header::PRAGMA, + header::PROXY_AUTHENTICATE, + header::PROXY_AUTHORIZATION, + header::PUBLIC_KEY_PINS, + header::PUBLIC_KEY_PINS_REPORT_ONLY, + header::RANGE, + header::REFERER, + header::REFERRER_POLICY, + header::REFRESH, + header::RETRY_AFTER, + header::SERVER, + header::SET_COOKIE, + header::STRICT_TRANSPORT_SECURITY, + header::TE, + header::TRAILER, + header::TRANSFER_ENCODING, + header::USER_AGENT, + header::UPGRADE, + header::UPGRADE_INSECURE_REQUESTS, + header::VARY, + header::VIA, + header::WARNING, + header::WWW_AUTHENTICATE, + header::X_CONTENT_TYPE_OPTIONS, + header::X_DNS_PREFETCH_CONTROL, + header::X_FRAME_OPTIONS, + header::X_XSS_PROTECTION, + ]) + .unwrap(), + ) .clone() } else { let value = gen_string(g, 1, 25); @@ -349,9 +353,7 @@ fn gen_string(g: &mut StdRng, min: usize, max: usize) -> String { let bytes: Vec<_> = (min..max) .map(|_| { // Chars to pick from - g.choose(b"ABCDEFGHIJKLMNOPQRSTUVabcdefghilpqrstuvwxyz----") - .unwrap() - .clone() + *g.sample(Slice::new(b"ABCDEFGHIJKLMNOPQRSTUVabcdefghilpqrstuvwxyz----").unwrap()) }) .collect(); diff --git a/src/lib.rs b/src/lib.rs index cb02acaff..be42b100e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,7 +78,7 @@ //! [`server::handshake`]: server/fn.handshake.html //! [`client::handshake`]: client/fn.handshake.html -#![doc(html_root_url = "https://docs.rs/h2/0.3.7")] +#![doc(html_root_url = "https://docs.rs/h2/0.3.13")] #![deny(missing_debug_implementations, missing_docs)] #![cfg_attr(test, deny(warnings))] @@ -120,6 +120,7 @@ mod frame; pub mod frame; pub mod client; +pub mod ext; pub mod server; mod share; diff --git a/src/proto/connection.rs b/src/proto/connection.rs index a75df4369..cd011a1d5 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -77,6 +77,7 @@ struct DynConnection<'a, B: Buf = Bytes> { pub(crate) struct Config { pub next_stream_id: StreamId, pub initial_max_send_streams: usize, + pub max_send_buffer_size: usize, pub reset_stream_duration: Duration, pub reset_stream_max: usize, pub settings: frame::Settings, @@ -108,8 +109,13 @@ where .initial_window_size() .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE), initial_max_send_streams: config.initial_max_send_streams, + local_max_buffer_size: config.max_send_buffer_size, local_next_stream_id: config.next_stream_id, local_push_enabled: config.settings.is_push_enabled().unwrap_or(true), + extended_connect_protocol_enabled: config + .settings + .is_extended_connect_protocol_enabled() + .unwrap_or(false), local_reset_duration: config.reset_stream_duration, local_reset_max: config.reset_stream_max, remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, @@ -147,6 +153,13 @@ where self.inner.settings.send_settings(settings) } + /// Send a new SETTINGS frame with extended CONNECT protocol enabled. + pub(crate) fn set_enable_connect_protocol(&mut self) -> Result<(), UserError> { + let mut settings = frame::Settings::default(); + settings.set_enable_connect_protocol(Some(1)); + self.inner.settings.send_settings(settings) + } + /// Returns the maximum number of concurrent streams that may be initiated /// by this peer. pub(crate) fn max_send_streams(&self) -> usize { diff --git a/src/proto/mod.rs b/src/proto/mod.rs index d505e77f3..5ec7bf992 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -33,3 +33,4 @@ pub type WindowSize = u32; pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; pub const DEFAULT_RESET_STREAM_MAX: usize = 10; pub const DEFAULT_RESET_STREAM_SECS: u64 = 30; +pub const DEFAULT_MAX_SEND_BUFFER_SIZE: usize = 1024 * 400; diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 44f4c2df4..6cc617209 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -117,6 +117,8 @@ impl Settings { tracing::trace!("ACK sent; applying settings"); + streams.apply_remote_settings(settings)?; + if let Some(val) = settings.header_table_size() { dst.set_send_header_table_size(val as usize); } @@ -124,8 +126,6 @@ impl Settings { if let Some(val) = settings.max_frame_size() { dst.set_max_send_frame_size(val as usize); } - - streams.apply_remote_settings(settings)?; } self.remote = None; diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 608395c0f..de2a2c85a 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -41,12 +41,18 @@ pub struct Config { /// MAX_CONCURRENT_STREAMS specified in the frame. pub initial_max_send_streams: usize, + /// Max amount of DATA bytes to buffer per stream. + pub local_max_buffer_size: usize, + /// The stream ID to start the next local stream with pub local_next_stream_id: StreamId, /// If the local peer is willing to receive push promises pub local_push_enabled: bool, + /// If extended connect protocol is enabled. + pub extended_connect_protocol_enabled: bool, + /// How long a locally reset stream should ignore frames pub local_reset_duration: Duration, diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index eaaee162b..c2904aca9 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -51,6 +51,9 @@ pub(super) struct Prioritize { /// What `DATA` frame is currently being sent in the codec. in_flight_data_frame: InFlightData, + + /// The maximum amount of bytes a stream should buffer. + max_buffer_size: usize, } #[derive(Debug, Eq, PartialEq)] @@ -93,9 +96,14 @@ impl Prioritize { flow, last_opened_id: StreamId::ZERO, in_flight_data_frame: InFlightData::Nothing, + max_buffer_size: config.local_max_buffer_size, } } + pub(crate) fn max_buffer_size(&self) -> usize { + self.max_buffer_size + } + /// Queue a frame to be sent to the remote pub fn queue_frame( &mut self, @@ -424,7 +432,7 @@ impl Prioritize { tracing::trace!(capacity = assign, "assigning"); // Assign the capacity to the stream - stream.assign_capacity(assign); + stream.assign_capacity(assign, self.max_buffer_size); // Claim the capacity from the connection self.flow.claim_capacity(assign); @@ -741,6 +749,11 @@ impl Prioritize { stream.buffered_send_data -= len as usize; stream.requested_send_capacity -= len; + // If the capacity was limited because of the + // max_send_buffer_size, then consider waking + // the send task again... + stream.notify_if_can_buffer_more(self.max_buffer_size); + // Assign the capacity back to the connection that // was just consumed from the stream in the previous // line. diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index be996b963..3af1af3a1 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -56,6 +56,9 @@ pub(super) struct Recv { /// If push promises are allowed to be received. is_push_enabled: bool, + + /// If extended connect protocol is enabled. + is_extended_connect_protocol_enabled: bool, } #[derive(Debug)] @@ -103,6 +106,7 @@ impl Recv { buffer: Buffer::new(), refused: None, is_push_enabled: config.local_push_enabled, + is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled, } } @@ -216,6 +220,14 @@ impl Recv { let stream_id = frame.stream_id(); let (pseudo, fields) = frame.into_parts(); + + if pseudo.protocol.is_some() { + if counts.peer().is_server() && !self.is_extended_connect_protocol_enabled { + proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into()); + } + } + if !pseudo.is_informational() { let message = counts .peer() @@ -449,60 +461,58 @@ impl Recv { settings: &frame::Settings, store: &mut Store, ) -> Result<(), proto::Error> { - let target = if let Some(val) = settings.initial_window_size() { - val - } else { - return Ok(()); - }; + if let Some(val) = settings.is_extended_connect_protocol_enabled() { + self.is_extended_connect_protocol_enabled = val; + } - let old_sz = self.init_window_sz; - self.init_window_sz = target; + if let Some(target) = settings.initial_window_size() { + let old_sz = self.init_window_sz; + self.init_window_sz = target; - tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,); + tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,); - // Per RFC 7540 §6.9.2: - // - // In addition to changing the flow-control window for streams that are - // not yet active, a SETTINGS frame can alter the initial flow-control - // window size for streams with active flow-control windows (that is, - // streams in the "open" or "half-closed (remote)" state). When the - // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust - // the size of all stream flow-control windows that it maintains by the - // difference between the new value and the old value. - // - // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available - // space in a flow-control window to become negative. A sender MUST - // track the negative flow-control window and MUST NOT send new - // flow-controlled frames until it receives WINDOW_UPDATE frames that - // cause the flow-control window to become positive. - - if target < old_sz { - // We must decrease the (local) window on every open stream. - let dec = old_sz - target; - tracing::trace!("decrementing all windows; dec={}", dec); - - store.for_each(|mut stream| { - stream.recv_flow.dec_recv_window(dec); - Ok(()) - }) - } else if target > old_sz { - // We must increase the (local) window on every open stream. - let inc = target - old_sz; - tracing::trace!("incrementing all windows; inc={}", inc); - store.for_each(|mut stream| { - // XXX: Shouldn't the peer have already noticed our - // overflow and sent us a GOAWAY? - stream - .recv_flow - .inc_window(inc) - .map_err(proto::Error::library_go_away)?; - stream.recv_flow.assign_capacity(inc); - Ok(()) - }) - } else { - // size is the same... so do nothing - Ok(()) + // Per RFC 7540 §6.9.2: + // + // In addition to changing the flow-control window for streams that are + // not yet active, a SETTINGS frame can alter the initial flow-control + // window size for streams with active flow-control windows (that is, + // streams in the "open" or "half-closed (remote)" state). When the + // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust + // the size of all stream flow-control windows that it maintains by the + // difference between the new value and the old value. + // + // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available + // space in a flow-control window to become negative. A sender MUST + // track the negative flow-control window and MUST NOT send new + // flow-controlled frames until it receives WINDOW_UPDATE frames that + // cause the flow-control window to become positive. + + if target < old_sz { + // We must decrease the (local) window on every open stream. + let dec = old_sz - target; + tracing::trace!("decrementing all windows; dec={}", dec); + + store.for_each(|mut stream| { + stream.recv_flow.dec_recv_window(dec); + }) + } else if target > old_sz { + // We must increase the (local) window on every open stream. + let inc = target - old_sz; + tracing::trace!("incrementing all windows; inc={}", inc); + store.try_for_each(|mut stream| { + // XXX: Shouldn't the peer have already noticed our + // overflow and sent us a GOAWAY? + stream + .recv_flow + .inc_window(inc) + .map_err(proto::Error::library_go_away)?; + stream.recv_flow.assign_capacity(inc); + Ok::<_, proto::Error>(()) + })?; + } } + + Ok(()) } pub fn is_end_stream(&self, stream: &store::Ptr) -> bool { @@ -776,6 +786,16 @@ impl Recv { } } + pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) { + if let Ok(next_id) = self.next_stream_id { + // !Peer::is_local_init should have been called beforehand + debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated()); + if id >= next_id { + self.next_stream_id = id.next_id(); + } + } + } + /// Returns true if the remote peer can reserve a stream with the given ID. pub fn ensure_can_reserve(&self) -> Result<(), Error> { if !self.is_push_enabled { @@ -840,7 +860,10 @@ impl Recv { let reset_duration = self.reset_duration; while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| { let reset_at = stream.reset_at.expect("reset_at must be set if in queue"); - now - reset_at > reset_duration + // rust-lang/rust#86470 tracks a bug in the standard library where `Instant` + // subtraction can panic (because, on some platforms, `Instant` isn't actually + // monotonic). We use a saturating operation to avoid this panic here. + now.saturating_duration_since(reset_at) > reset_duration }) { counts.transition_after(stream, true); } diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 3735d13dd..2c5a38c80 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -35,6 +35,9 @@ pub(super) struct Send { prioritize: Prioritize, is_push_enabled: bool, + + /// If extended connect protocol is enabled. + is_extended_connect_protocol_enabled: bool, } /// A value to detect which public API has called `poll_reset`. @@ -53,6 +56,7 @@ impl Send { next_stream_id: Ok(config.local_next_stream_id), prioritize: Prioritize::new(config), is_push_enabled: true, + is_extended_connect_protocol_enabled: false, } } @@ -329,14 +333,12 @@ impl Send { /// Current available stream send capacity pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { - let available = stream.send_flow.available().as_size(); + let available = stream.send_flow.available().as_size() as usize; let buffered = stream.buffered_send_data; - if available as usize <= buffered { - 0 - } else { - available - buffered as WindowSize - } + available + .min(self.prioritize.max_buffer_size()) + .saturating_sub(buffered) as WindowSize } pub fn poll_reset( @@ -429,6 +431,10 @@ impl Send { counts: &mut Counts, task: &mut Option, ) -> Result<(), Error> { + if let Some(val) = settings.is_extended_connect_protocol_enabled() { + self.is_extended_connect_protocol_enabled = val; + } + // Applies an update to the remote endpoint's initial window size. // // Per RFC 7540 §6.9.2: @@ -490,16 +496,14 @@ impl Send { // TODO: Should this notify the producer when the capacity // of a stream is reduced? Maybe it should if the capacity // is reduced to zero, allowing the producer to stop work. - - Ok::<_, Error>(()) - })?; + }); self.prioritize .assign_connection_capacity(total_reclaimed, store, counts); } else if val > old_val { let inc = val - old_val; - store.for_each(|mut stream| { + store.try_for_each(|mut stream| { self.recv_stream_window_update(inc, buffer, &mut stream, counts, task) .map_err(Error::library_go_away) })?; @@ -554,4 +558,8 @@ impl Send { } } } + + pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { + self.is_extended_connect_protocol_enabled + } } diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index ac58f43ac..3e34b7cb2 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -4,6 +4,7 @@ use slab; use indexmap::{self, IndexMap}; +use std::convert::Infallible; use std::fmt; use std::marker::PhantomData; use std::ops; @@ -128,7 +129,20 @@ impl Store { } } - pub fn for_each(&mut self, mut f: F) -> Result<(), E> + pub(crate) fn for_each(&mut self, mut f: F) + where + F: FnMut(Ptr), + { + match self.try_for_each(|ptr| { + f(ptr); + Ok::<_, Infallible>(()) + }) { + Ok(()) => (), + Err(infallible) => match infallible {}, + } + } + + pub fn try_for_each(&mut self, mut f: F) -> Result<(), E> where F: FnMut(Ptr) -> Result<(), E>, { diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 79de47a9a..36d515bad 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -260,20 +260,30 @@ impl Stream { self.ref_count == 0 && !self.state.is_closed() } - pub fn assign_capacity(&mut self, capacity: WindowSize) { + pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) { debug_assert!(capacity > 0); - self.send_capacity_inc = true; self.send_flow.assign_capacity(capacity); tracing::trace!( - " assigned capacity to stream; available={}; buffered={}; id={:?}", + " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={}", self.send_flow.available(), self.buffered_send_data, - self.id + self.id, + max_buffer_size ); + self.notify_if_can_buffer_more(max_buffer_size); + } + + /// If the capacity was limited because of the max_send_buffer_size, + /// then consider waking the send task again... + pub fn notify_if_can_buffer_more(&mut self, max_buffer_size: usize) { + let available = self.send_flow.available().as_size() as usize; + let buffered = self.buffered_send_data; + // Only notify if the capacity exceeds the amount of buffered data - if self.send_flow.available() > self.buffered_send_data { + if available.min(max_buffer_size) > buffered { + self.send_capacity_inc = true; tracing::trace!(" notifying task"); self.notify_send(); } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 4962db8d2..3e7ae97d9 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -2,6 +2,7 @@ use super::recv::RecvHeaderBlockError; use super::store::{self, Entry, Resolve, Store}; use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId}; use crate::codec::{Codec, SendError, UserError}; +use crate::ext::Protocol; use crate::frame::{self, Frame, Reason}; use crate::proto::{peer, Error, Initiator, Open, Peer, WindowSize}; use crate::{client, proto, server}; @@ -214,6 +215,8 @@ where use super::stream::ContentLength; use http::Method; + let protocol = request.extensions_mut().remove::(); + // Clear before taking lock, incase extensions contain a StreamRef. request.extensions_mut().clear(); @@ -261,7 +264,8 @@ where } // Convert the message - let headers = client::Peer::convert_send_message(stream_id, request, end_of_stream)?; + let headers = + client::Peer::convert_send_message(stream_id, request, protocol, end_of_stream)?; let mut stream = me.store.insert(stream.id, stream); @@ -294,6 +298,15 @@ where send_buffer: self.send_buffer.clone(), }) } + + pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { + self.inner + .lock() + .unwrap() + .actions + .send + .is_extended_connect_protocol_enabled() + } } impl DynStreams<'_, B> { @@ -643,15 +656,12 @@ impl Inner { let last_processed_id = actions.recv.last_processed_id(); - self.store - .for_each(|stream| { - counts.transition(stream, |counts, stream| { - actions.recv.handle_error(&err, &mut *stream); - actions.send.handle_error(send_buffer, stream, counts); - Ok::<_, ()>(()) - }) + self.store.for_each(|stream| { + counts.transition(stream, |counts, stream| { + actions.recv.handle_error(&err, &mut *stream); + actions.send.handle_error(send_buffer, stream, counts); }) - .unwrap(); + }); actions.conn_error = Some(err); @@ -674,19 +684,14 @@ impl Inner { let err = Error::remote_go_away(frame.debug_data().clone(), frame.reason()); - self.store - .for_each(|stream| { - if stream.id > last_stream_id { - counts.transition(stream, |counts, stream| { - actions.recv.handle_error(&err, &mut *stream); - actions.send.handle_error(send_buffer, stream, counts); - Ok::<_, ()>(()) - }) - } else { - Ok::<_, ()>(()) - } - }) - .unwrap(); + self.store.for_each(|stream| { + if stream.id > last_stream_id { + counts.transition(stream, |counts, stream| { + actions.recv.handle_error(&err, &mut *stream); + actions.send.handle_error(send_buffer, stream, counts); + }) + } + }); actions.conn_error = Some(err); @@ -807,18 +812,15 @@ impl Inner { tracing::trace!("Streams::recv_eof"); - self.store - .for_each(|stream| { - counts.transition(stream, |counts, stream| { - actions.recv.recv_eof(stream); + self.store.for_each(|stream| { + counts.transition(stream, |counts, stream| { + actions.recv.recv_eof(stream); - // This handles resetting send state associated with the - // stream - actions.send.handle_error(send_buffer, stream, counts); - Ok::<_, ()>(()) - }) + // This handles resetting send state associated with the + // stream + actions.send.handle_error(send_buffer, stream, counts); }) - .expect("recv_eof"); + }); actions.clear_queues(clear_pending_accept, &mut self.store, counts); Ok(()) @@ -881,6 +883,10 @@ impl Inner { // We normally would open this stream, so update our // next-send-id record. self.actions.send.maybe_reset_next_stream_id(id); + } else { + // We normally would recv this stream, so update our + // next-recv-id record. + self.actions.recv.maybe_reset_next_stream_id(id); } let stream = Stream::new(id, 0, 0); diff --git a/src/server.rs b/src/server.rs index 5e4bba97c..844f8e83b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -126,7 +126,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; -use std::{convert, fmt, io, mem}; +use std::{fmt, io}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tracing::instrument::{Instrument, Instrumented}; @@ -245,6 +245,9 @@ pub struct Builder { /// Initial target window size for new connections. initial_target_connection_window_size: Option, + + /// Maximum amount of bytes to "buffer" for writing per stream. + max_send_buffer_size: usize, } /// Send a response back to the client @@ -298,8 +301,8 @@ enum Handshaking { Flushing(Instrumented>>), /// State 2. Connection is waiting for the client preface. ReadingPreface(Instrumented>>), - /// Dummy state for `mem::replace`. - Empty, + /// State 3. Handshake is done, polling again would panic. + Done, } /// Flush a Sink @@ -364,7 +367,7 @@ where B: Buf + 'static, { fn handshake2(io: T, builder: Builder) -> Handshake { - let span = tracing::trace_span!("server_handshake", io = %std::any::type_name::()); + let span = tracing::trace_span!("server_handshake"); let entered = span.enter(); // Create the codec. @@ -384,7 +387,8 @@ where .expect("invalid SETTINGS frame"); // Create the handshake future. - let state = Handshaking::from(codec); + let state = + Handshaking::Flushing(Flush::new(codec).instrument(tracing::trace_span!("flush"))); drop(entered); @@ -470,6 +474,19 @@ where Ok(()) } + /// Enables the [extended CONNECT protocol]. + /// + /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 + /// + /// # Errors + /// + /// Returns an error if a previous call is still pending acknowledgement + /// from the remote endpoint. + pub fn enable_connect_protocol(&mut self) -> Result<(), crate::Error> { + self.connection.set_enable_connect_protocol()?; + Ok(()) + } + /// Returns `Ready` when the underlying connection has closed. /// /// If any new inbound streams are received during a call to `poll_closed`, @@ -620,6 +637,7 @@ impl Builder { reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, settings: Settings::default(), initial_target_connection_window_size: None, + max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, } } @@ -857,6 +875,24 @@ impl Builder { self } + /// Sets the maximum send buffer size per stream. + /// + /// Once a stream has buffered up to (or over) the maximum, the stream's + /// flow control will not "poll" additional capacity. Once bytes for the + /// stream have been written to the connection, the send buffer capacity + /// will be freed up again. + /// + /// The default is currently ~400MB, but may change. + /// + /// # Panics + /// + /// This function panics if `max` is larger than `u32::MAX`. + pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { + assert!(max <= std::u32::MAX as usize); + self.max_send_buffer_size = max; + self + } + /// Sets the maximum number of concurrent locally reset streams. /// /// When a stream is explicitly reset by either calling @@ -904,6 +940,14 @@ impl Builder { self } + /// Enables the [extended CONNECT protocol]. + /// + /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 + pub fn enable_connect_protocol(&mut self) -> &mut Self { + self.settings.set_enable_connect_protocol(Some(1)); + self + } + /// Creates a new configured HTTP/2 server backed by `io`. /// /// It is expected that `io` already be in an appropriate state to commence @@ -1226,62 +1270,58 @@ where let span = self.span.clone(); // XXX(eliza): T_T let _e = span.enter(); tracing::trace!(state = ?self.state); - use crate::server::Handshaking::*; - - self.state = if let Flushing(ref mut flush) = self.state { - // We're currently flushing a pending SETTINGS frame. Poll the - // flush future, and, if it's completed, advance our state to wait - // for the client preface. - let codec = match Pin::new(flush).poll(cx)? { - Poll::Pending => { - tracing::trace!(flush.poll = %"Pending"); - return Poll::Pending; + + loop { + match &mut self.state { + Handshaking::Flushing(flush) => { + // We're currently flushing a pending SETTINGS frame. Poll the + // flush future, and, if it's completed, advance our state to wait + // for the client preface. + let codec = match Pin::new(flush).poll(cx)? { + Poll::Pending => { + tracing::trace!(flush.poll = %"Pending"); + return Poll::Pending; + } + Poll::Ready(flushed) => { + tracing::trace!(flush.poll = %"Ready"); + flushed + } + }; + self.state = Handshaking::ReadingPreface( + ReadPreface::new(codec).instrument(tracing::trace_span!("read_preface")), + ); } - Poll::Ready(flushed) => { - tracing::trace!(flush.poll = %"Ready"); - flushed + Handshaking::ReadingPreface(read) => { + let codec = ready!(Pin::new(read).poll(cx)?); + + self.state = Handshaking::Done; + + let connection = proto::Connection::new( + codec, + Config { + next_stream_id: 2.into(), + // Server does not need to locally initiate any streams + initial_max_send_streams: 0, + max_send_buffer_size: self.builder.max_send_buffer_size, + reset_stream_duration: self.builder.reset_stream_duration, + reset_stream_max: self.builder.reset_stream_max, + settings: self.builder.settings.clone(), + }, + ); + + tracing::trace!("connection established!"); + let mut c = Connection { connection }; + if let Some(sz) = self.builder.initial_target_connection_window_size { + c.set_target_window_size(sz); + } + + return Poll::Ready(Ok(c)); + } + Handshaking::Done => { + panic!("Handshaking::poll() called again after handshaking was complete") } - }; - Handshaking::from(ReadPreface::new(codec)) - } else { - // Otherwise, we haven't actually advanced the state, but we have - // to replace it with itself, because we have to return a value. - // (note that the assignment to `self.state` has to be outside of - // the `if let` block above in order to placate the borrow checker). - mem::replace(&mut self.state, Handshaking::Empty) - }; - let poll = if let ReadingPreface(ref mut read) = self.state { - // We're now waiting for the client preface. Poll the `ReadPreface` - // future. If it has completed, we will create a `Connection` handle - // for the connection. - Pin::new(read).poll(cx) - // Actually creating the `Connection` has to occur outside of this - // `if let` block, because we've borrowed `self` mutably in order - // to poll the state and won't be able to borrow the SETTINGS frame - // as well until we release the borrow for `poll()`. - } else { - unreachable!("Handshake::poll() state was not advanced completely!") - }; - poll?.map(|codec| { - let connection = proto::Connection::new( - codec, - Config { - next_stream_id: 2.into(), - // Server does not need to locally initiate any streams - initial_max_send_streams: 0, - reset_stream_duration: self.builder.reset_stream_duration, - reset_stream_max: self.builder.reset_stream_max, - settings: self.builder.settings.clone(), - }, - ); - - tracing::trace!("connection established!"); - let mut c = Connection { connection }; - if let Some(sz) = self.builder.initial_target_connection_window_size { - c.set_target_window_size(sz); } - Ok(c) - }) + } } } @@ -1360,7 +1400,7 @@ impl Peer { _, ) = request.into_parts(); - let pseudo = Pseudo::request(method, uri); + let pseudo = Pseudo::request(method, uri, None); Ok(frame::PushPromise::new( stream_id, @@ -1410,6 +1450,11 @@ impl proto::Peer for Peer { malformed!("malformed headers: missing method"); } + let has_protocol = pseudo.protocol.is_some(); + if !is_connect && has_protocol { + malformed!("malformed headers: :protocol on non-CONNECT request"); + } + if pseudo.status.is_some() { malformed!("malformed headers: :status field on request"); } @@ -1432,7 +1477,7 @@ impl proto::Peer for Peer { // A :scheme is required, except CONNECT. if let Some(scheme) = pseudo.scheme { - if is_connect { + if is_connect && !has_protocol { malformed!(":scheme in CONNECT"); } let maybe_scheme = scheme.parse(); @@ -1450,12 +1495,12 @@ impl proto::Peer for Peer { if parts.authority.is_some() { parts.scheme = Some(scheme); } - } else if !is_connect { + } else if !is_connect || has_protocol { malformed!("malformed headers: missing scheme"); } if let Some(path) = pseudo.path { - if is_connect { + if is_connect && !has_protocol { malformed!(":path in CONNECT"); } @@ -1468,6 +1513,8 @@ impl proto::Peer for Peer { parts.path_and_query = Some(maybe_path.or_else(|why| { malformed!("malformed headers: malformed path ({:?}): {}", path, why,) })?); + } else if is_connect && has_protocol { + malformed!("malformed headers: missing path in extended CONNECT"); } b = b.uri(parts); @@ -1497,42 +1544,9 @@ where #[inline] fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { match *self { - Handshaking::Flushing(_) => write!(f, "Handshaking::Flushing(_)"), - Handshaking::ReadingPreface(_) => write!(f, "Handshaking::ReadingPreface(_)"), - Handshaking::Empty => write!(f, "Handshaking::Empty"), + Handshaking::Flushing(_) => f.write_str("Flushing(_)"), + Handshaking::ReadingPreface(_) => f.write_str("ReadingPreface(_)"), + Handshaking::Done => f.write_str("Done"), } } } - -impl convert::From>> for Handshaking -where - T: AsyncRead + AsyncWrite, - B: Buf, -{ - #[inline] - fn from(flush: Flush>) -> Self { - Handshaking::Flushing(flush.instrument(tracing::trace_span!("flush"))) - } -} - -impl convert::From>> for Handshaking -where - T: AsyncRead + AsyncWrite, - B: Buf, -{ - #[inline] - fn from(read: ReadPreface>) -> Self { - Handshaking::ReadingPreface(read.instrument(tracing::trace_span!("read_preface"))) - } -} - -impl convert::From>> for Handshaking -where - T: AsyncRead + AsyncWrite, - B: Buf, -{ - #[inline] - fn from(codec: Codec>) -> Self { - Handshaking::from(Flush::new(codec)) - } -} diff --git a/tests/h2-fuzz/Cargo.toml b/tests/h2-fuzz/Cargo.toml index 524627f31..dadb62c92 100644 --- a/tests/h2-fuzz/Cargo.toml +++ b/tests/h2-fuzz/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" [dependencies] h2 = { path = "../.." } -env_logger = { version = "0.5.3", default-features = false } +env_logger = { version = "0.9", default-features = false } futures = { version = "0.3", default-features = false, features = ["std"] } honggfuzz = "0.5" http = "0.2" diff --git a/tests/h2-support/Cargo.toml b/tests/h2-support/Cargo.toml index e97c6b310..f178178eb 100644 --- a/tests/h2-support/Cargo.toml +++ b/tests/h2-support/Cargo.toml @@ -7,9 +7,11 @@ edition = "2018" [dependencies] h2 = { path = "../..", features = ["stream", "unstable"] } +atty = "0.2" bytes = "1" tracing = "0.1" -tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "chrono", "ansi"] } +tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } +tracing-tree = "0.2" futures = { version = "0.3", default-features = false } http = "0.2" tokio = { version = "1", features = ["time"] } diff --git a/tests/h2-support/src/assert.rs b/tests/h2-support/src/assert.rs index 8bc6d25c7..88e3d4f7c 100644 --- a/tests/h2-support/src/assert.rs +++ b/tests/h2-support/src/assert.rs @@ -47,6 +47,16 @@ macro_rules! assert_settings { }}; } +#[macro_export] +macro_rules! assert_go_away { + ($frame:expr) => {{ + match $frame { + h2::frame::Frame::GoAway(v) => v, + f => panic!("expected GO_AWAY; actual={:?}", f), + } + }}; +} + #[macro_export] macro_rules! poll_err { ($transport:expr) => {{ @@ -80,6 +90,7 @@ macro_rules! assert_default_settings { use h2::frame::Frame; +#[track_caller] pub fn assert_frame_eq, U: Into>(t: T, u: U) { let actual: Frame = t.into(); let expected: Frame = u.into(); diff --git a/tests/h2-support/src/frames.rs b/tests/h2-support/src/frames.rs index 824bc5c19..f2c07bacb 100644 --- a/tests/h2-support/src/frames.rs +++ b/tests/h2-support/src/frames.rs @@ -4,7 +4,10 @@ use std::fmt; use bytes::Bytes; use http::{self, HeaderMap, StatusCode}; -use h2::frame::{self, Frame, StreamId}; +use h2::{ + ext::Protocol, + frame::{self, Frame, StreamId}, +}; pub const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; pub const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; @@ -109,7 +112,9 @@ impl Mock { let method = method.try_into().unwrap(); let uri = uri.try_into().unwrap(); let (id, _, fields) = self.into_parts(); - let frame = frame::Headers::new(id, frame::Pseudo::request(method, uri), fields); + let extensions = Default::default(); + let pseudo = frame::Pseudo::request(method, uri, extensions); + let frame = frame::Headers::new(id, pseudo, fields); Mock(frame) } @@ -179,6 +184,15 @@ impl Mock { Mock(frame::Headers::new(id, pseudo, fields)) } + pub fn protocol(self, value: &str) -> Self { + let (id, mut pseudo, fields) = self.into_parts(); + let value = Protocol::from(value); + + pseudo.set_protocol(value); + + Mock(frame::Headers::new(id, pseudo, fields)) + } + pub fn eos(mut self) -> Self { self.0.set_end_stream(); self @@ -230,8 +244,9 @@ impl Mock { let method = method.try_into().unwrap(); let uri = uri.try_into().unwrap(); let (id, promised, _, fields) = self.into_parts(); - let frame = - frame::PushPromise::new(id, promised, frame::Pseudo::request(method, uri), fields); + let extensions = Default::default(); + let pseudo = frame::Pseudo::request(method, uri, extensions); + let frame = frame::PushPromise::new(id, promised, pseudo, fields); Mock(frame) } @@ -352,6 +367,11 @@ impl Mock { self.0.set_enable_push(false); self } + + pub fn enable_connect_protocol(mut self, val: u32) -> Self { + self.0.set_enable_connect_protocol(Some(val)); + self + } } impl From> for frame::Settings { diff --git a/tests/h2-support/src/mock.rs b/tests/h2-support/src/mock.rs index b5df9ad9b..cc314cd06 100644 --- a/tests/h2-support/src/mock.rs +++ b/tests/h2-support/src/mock.rs @@ -221,22 +221,15 @@ impl Handle { let settings = settings.into(); self.send(settings.into()).await.unwrap(); - let frame = self.next().await; - let settings = match frame { - Some(frame) => match frame.unwrap() { - Frame::Settings(settings) => { - // Send the ACK - let ack = frame::Settings::ack(); + let frame = self.next().await.expect("unexpected EOF").unwrap(); + let settings = assert_settings!(frame); - // TODO: Don't unwrap? - self.send(ack.into()).await.unwrap(); + // Send the ACK + let ack = frame::Settings::ack(); + + // TODO: Don't unwrap? + self.send(ack.into()).await.unwrap(); - settings - } - frame => panic!("unexpected frame; frame={:?}", frame), - }, - None => panic!("unexpected EOF"), - }; let frame = self.next().await; let f = assert_settings!(frame.unwrap().unwrap()); diff --git a/tests/h2-support/src/prelude.rs b/tests/h2-support/src/prelude.rs index 1fcb0dcc4..86ef3249e 100644 --- a/tests/h2-support/src/prelude.rs +++ b/tests/h2-support/src/prelude.rs @@ -2,6 +2,7 @@ pub use h2; pub use h2::client; +pub use h2::ext::Protocol; pub use h2::frame::StreamId; pub use h2::server; pub use h2::*; @@ -20,8 +21,8 @@ pub use super::{Codec, SendFrame}; // Re-export macros pub use super::{ - assert_closed, assert_data, assert_default_settings, assert_headers, assert_ping, poll_err, - poll_frame, raw_codec, + assert_closed, assert_data, assert_default_settings, assert_go_away, assert_headers, + assert_ping, assert_settings, poll_err, poll_frame, raw_codec, }; pub use super::assert::assert_frame_eq; diff --git a/tests/h2-support/src/trace.rs b/tests/h2-support/src/trace.rs index 4ac11742c..87038c350 100644 --- a/tests/h2-support/src/trace.rs +++ b/tests/h2-support/src/trace.rs @@ -1,41 +1,17 @@ -use std::{io, str}; pub use tracing; pub use tracing_subscriber; -pub fn init() -> tracing::dispatcher::DefaultGuard { - tracing::subscriber::set_default( - tracing_subscriber::fmt() - .with_max_level(tracing::Level::TRACE) - .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) - .with_writer(PrintlnWriter { _p: () }) - .finish(), - ) -} - -struct PrintlnWriter { - _p: (), -} +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; -impl tracing_subscriber::fmt::MakeWriter for PrintlnWriter { - type Writer = PrintlnWriter; - fn make_writer(&self) -> Self::Writer { - PrintlnWriter { _p: () } - } -} - -impl io::Write for PrintlnWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - let s = str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; - println!("{}", s); - Ok(s.len()) - } - - fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> io::Result<()> { - println!("{}", fmt); - Ok(()) - } +pub fn init() -> tracing::dispatcher::DefaultGuard { + let use_colors = atty::is(atty::Stream::Stdout); + let layer = tracing_tree::HierarchicalLayer::default() + .with_writer(tracing_subscriber::fmt::writer::TestWriter::default()) + .with_indent_lines(true) + .with_ansi(use_colors) + .with_targets(true) + .with_indent_amount(2); - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } + tracing_subscriber::registry().with(layer).set_default() } diff --git a/tests/h2-support/src/util.rs b/tests/h2-support/src/util.rs index ec768badc..1150d5925 100644 --- a/tests/h2-support/src/util.rs +++ b/tests/h2-support/src/util.rs @@ -32,6 +32,7 @@ pub async fn yield_once() { .await; } +/// Should only be called after a non-0 capacity was requested for the stream. pub fn wait_for_capacity(stream: h2::SendStream, target: usize) -> WaitForCapacity { WaitForCapacity { stream: Some(stream), @@ -54,14 +55,19 @@ impl Future for WaitForCapacity { type Output = h2::SendStream; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let _ = ready!(self.stream().poll_capacity(cx)).unwrap(); + loop { + let _ = ready!(self.stream().poll_capacity(cx)).unwrap(); - let act = self.stream().capacity(); + let act = self.stream().capacity(); - if act >= self.target { - return Poll::Ready(self.stream.take().unwrap().into()); - } + // If a non-0 capacity was requested for the stream before calling + // wait_for_capacity, then poll_capacity should return Pending + // until there is a non-0 capacity. + assert_ne!(act, 0); - Poll::Pending + if act >= self.target { + return Poll::Ready(self.stream.take().unwrap().into()); + } + } } } diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 2af0bdeec..9635bcc6c 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -1305,6 +1305,153 @@ async fn informational_while_local_streaming() { join(srv, h2).await; } +#[tokio::test] +async fn extended_connect_protocol_disabled_by_default() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + + srv.recv_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + }; + + let h2 = async move { + let (mut client, mut h2) = client::handshake(io).await.unwrap(); + + // we send a simple req here just to drive the connection so we can + // receive the server settings. + let request = Request::get("https://example.com/").body(()).unwrap(); + // first request is allowed + let (response, _) = client.send_request(request, true).unwrap(); + h2.drive(response).await.unwrap(); + + assert!(!client.is_extended_connect_protocol_enabled()); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn extended_connect_protocol_enabled_during_handshake() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv + .assert_client_handshake_with_settings(frames::settings().enable_connect_protocol(1)) + .await; + assert_default_settings!(settings); + + srv.recv_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + }; + + let h2 = async move { + let (mut client, mut h2) = client::handshake(io).await.unwrap(); + + // we send a simple req here just to drive the connection so we can + // receive the server settings. + let request = Request::get("https://example.com/").body(()).unwrap(); + let (response, _) = client.send_request(request, true).unwrap(); + h2.drive(response).await.unwrap(); + + assert!(client.is_extended_connect_protocol_enabled()); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn invalid_connect_protocol_enabled_setting() { + h2_support::trace_init!(); + + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send a settings frame + srv.send(frames::settings().enable_connect_protocol(2).into()) + .await + .unwrap(); + srv.read_preface().await.unwrap(); + + let settings = assert_settings!(srv.next().await.expect("unexpected EOF").unwrap()); + assert_default_settings!(settings); + + // Send the ACK + let ack = frame::Settings::ack(); + + // TODO: Don't unwrap? + srv.send(ack.into()).await.unwrap(); + + let frame = srv.next().await.unwrap().unwrap(); + let go_away = assert_go_away!(frame); + assert_eq!(go_away.reason(), Reason::PROTOCOL_ERROR); + }; + + let h2 = async move { + let (mut client, mut h2) = client::handshake(io).await.unwrap(); + + // we send a simple req here just to drive the connection so we can + // receive the server settings. + let request = Request::get("https://example.com/").body(()).unwrap(); + let (response, _) = client.send_request(request, true).unwrap(); + + let error = h2.drive(response).await.unwrap_err(); + assert_eq!(error.reason(), Some(Reason::PROTOCOL_ERROR)); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn extended_connect_request() { + h2_support::trace_init!(); + + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv + .assert_client_handshake_with_settings(frames::settings().enable_connect_protocol(1)) + .await; + assert_default_settings!(settings); + + srv.recv_frame( + frames::headers(1) + .request("CONNECT", "http://bread/baguette") + .protocol("the-bread-protocol") + .eos(), + ) + .await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + }; + + let h2 = async move { + let (mut client, mut h2) = client::handshake(io).await.unwrap(); + + let request = Request::connect("http://bread/baguette") + .extension(Protocol::from("the-bread-protocol")) + .body(()) + .unwrap(); + let (response, _) = client.send_request(request, true).unwrap(); + h2.drive(response).await.unwrap(); + }; + + join(srv, h2).await; +} + const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index be04a61b7..92e7a532f 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -1561,3 +1561,239 @@ async fn data_padding() { join(srv, h2).await; } + +#[tokio::test] +async fn poll_capacity_after_send_data_and_reserve() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv + .assert_client_handshake_with_settings(frames::settings().initial_window_size(5)) + .await; + assert_default_settings!(settings); + srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/")) + .await; + srv.send_frame(frames::headers(1).response(200)).await; + srv.recv_frame(frames::data(1, &b"abcde"[..])).await; + srv.send_frame(frames::window_update(1, 5)).await; + srv.recv_frame(frames::data(1, &b""[..]).eos()).await; + }; + + let h2 = async move { + let (mut client, mut h2) = client::handshake(io).await.unwrap(); + let request = Request::builder() + .method(Method::POST) + .uri("https://www.example.com/") + .body(()) + .unwrap(); + + let (response, mut stream) = client.send_request(request, false).unwrap(); + + let response = h2.drive(response).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + stream.send_data("abcde".into(), false).unwrap(); + + stream.reserve_capacity(5); + + // Initial window size was 5 so current capacity is 0 even if we just reserved. + assert_eq!(stream.capacity(), 0); + + // This will panic if there is a bug causing h2 to return Ok(0) from poll_capacity. + let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await; + + stream.send_data("".into(), true).unwrap(); + + // Wait for the connection to close + h2.await.unwrap(); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn poll_capacity_after_send_data_and_reserve_with_max_send_buffer_size() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv + .assert_client_handshake_with_settings(frames::settings().initial_window_size(10)) + .await; + assert_default_settings!(settings); + srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/")) + .await; + srv.send_frame(frames::headers(1).response(200)).await; + srv.recv_frame(frames::data(1, &b"abcde"[..])).await; + srv.send_frame(frames::window_update(1, 10)).await; + srv.recv_frame(frames::data(1, &b""[..]).eos()).await; + }; + + let h2 = async move { + let (mut client, mut h2) = client::Builder::new() + .max_send_buffer_size(5) + .handshake::<_, Bytes>(io) + .await + .unwrap(); + let request = Request::builder() + .method(Method::POST) + .uri("https://www.example.com/") + .body(()) + .unwrap(); + + let (response, mut stream) = client.send_request(request, false).unwrap(); + + let response = h2.drive(response).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + stream.send_data("abcde".into(), false).unwrap(); + + stream.reserve_capacity(5); + + // Initial window size was 10 but with a max send buffer size of 10 in the client, + // so current capacity is 0 even if we just reserved. + assert_eq!(stream.capacity(), 0); + + // This will panic if there is a bug causing h2 to return Ok(0) from poll_capacity. + let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await; + + stream.send_data("".into(), true).unwrap(); + + // Wait for the connection to close + h2.await.unwrap(); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn max_send_buffer_size_overflow() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/")) + .await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + srv.recv_frame(frames::data(1, &[0; 10][..])).await; + srv.recv_frame(frames::data(1, &[][..]).eos()).await; + }; + + let client = async move { + let (mut client, mut conn) = client::Builder::new() + .max_send_buffer_size(5) + .handshake::<_, Bytes>(io) + .await + .unwrap(); + let request = Request::builder() + .method(Method::POST) + .uri("https://www.example.com/") + .body(()) + .unwrap(); + + let (response, mut stream) = client.send_request(request, false).unwrap(); + + let response = conn.drive(response).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + assert_eq!(stream.capacity(), 0); + stream.reserve_capacity(10); + assert_eq!( + stream.capacity(), + 5, + "polled capacity not over max buffer size" + ); + + stream.send_data([0; 10][..].into(), false).unwrap(); + + stream.reserve_capacity(15); + assert_eq!( + stream.capacity(), + 0, + "now with buffered over the max, don't overflow" + ); + stream.send_data([0; 0][..].into(), true).unwrap(); + + // Wait for the connection to close + conn.await.unwrap(); + }; + + join(srv, client).await; +} + +#[tokio::test] +async fn max_send_buffer_size_poll_capacity_wakes_task() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/")) + .await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + srv.recv_frame(frames::data(1, &[0; 5][..])).await; + srv.recv_frame(frames::data(1, &[0; 5][..])).await; + srv.recv_frame(frames::data(1, &[0; 5][..])).await; + srv.recv_frame(frames::data(1, &[0; 5][..])).await; + srv.recv_frame(frames::data(1, &[][..]).eos()).await; + }; + + let client = async move { + let (mut client, mut conn) = client::Builder::new() + .max_send_buffer_size(5) + .handshake::<_, Bytes>(io) + .await + .unwrap(); + let request = Request::builder() + .method(Method::POST) + .uri("https://www.example.com/") + .body(()) + .unwrap(); + + let (response, mut stream) = client.send_request(request, false).unwrap(); + + let response = conn.drive(response).await.unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + assert_eq!(stream.capacity(), 0); + const TO_SEND: usize = 20; + stream.reserve_capacity(TO_SEND); + assert_eq!( + stream.capacity(), + 5, + "polled capacity not over max buffer size" + ); + + let t1 = tokio::spawn(async move { + let mut sent = 0; + let buf = [0; TO_SEND]; + loop { + match poll_fn(|cx| stream.poll_capacity(cx)).await { + None => panic!("no cap"), + Some(Err(e)) => panic!("cap error: {:?}", e), + Some(Ok(cap)) => { + stream + .send_data(buf[sent..(sent + cap)].to_vec().into(), false) + .unwrap(); + sent += cap; + if sent >= TO_SEND { + break; + } + } + } + } + stream.send_data(Bytes::new(), true).unwrap(); + }); + + // Wait for the connection to close + conn.await.unwrap(); + t1.await.unwrap(); + }; + + join(srv, client).await; +} diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index e60483d0d..b3bf1a286 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -1149,3 +1149,191 @@ async fn send_reset_explicitly() { join(client, srv).await; } + +#[tokio::test] +async fn extended_connect_protocol_disabled_by_default() { + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + + assert_eq!(settings.is_extended_connect_protocol_enabled(), None); + + client + .send_frame( + frames::headers(1) + .request("CONNECT", "http://bread/baguette") + .protocol("the-bread-protocol"), + ) + .await; + + client.recv_frame(frames::reset(1).protocol_error()).await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + + poll_fn(move |cx| srv.poll_closed(cx)) + .await + .expect("server"); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn extended_connect_protocol_enabled_during_handshake() { + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + + assert_eq!(settings.is_extended_connect_protocol_enabled(), Some(true)); + + client + .send_frame( + frames::headers(1) + .request("CONNECT", "http://bread/baguette") + .protocol("the-bread-protocol"), + ) + .await; + + client.recv_frame(frames::headers(1).response(200)).await; + }; + + let srv = async move { + let mut builder = server::Builder::new(); + + builder.enable_connect_protocol(); + + let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake"); + + let (_req, mut stream) = srv.next().await.unwrap().unwrap(); + + let rsp = Response::new(()); + stream.send_response(rsp, false).unwrap(); + + poll_fn(move |cx| srv.poll_closed(cx)) + .await + .expect("server"); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn reject_pseudo_protocol_on_non_connect_request() { + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + + assert_eq!(settings.is_extended_connect_protocol_enabled(), Some(true)); + + client + .send_frame( + frames::headers(1) + .request("GET", "http://bread/baguette") + .protocol("the-bread-protocol"), + ) + .await; + + client.recv_frame(frames::reset(1).protocol_error()).await; + }; + + let srv = async move { + let mut builder = server::Builder::new(); + + builder.enable_connect_protocol(); + + let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake"); + + assert!(srv.next().await.is_none()); + + poll_fn(move |cx| srv.poll_closed(cx)) + .await + .expect("server"); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn reject_authority_target_on_extended_connect_request() { + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + + assert_eq!(settings.is_extended_connect_protocol_enabled(), Some(true)); + + client + .send_frame( + frames::headers(1) + .request("CONNECT", "bread:80") + .protocol("the-bread-protocol"), + ) + .await; + + client.recv_frame(frames::reset(1).protocol_error()).await; + }; + + let srv = async move { + let mut builder = server::Builder::new(); + + builder.enable_connect_protocol(); + + let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake"); + + assert!(srv.next().await.is_none()); + + poll_fn(move |cx| srv.poll_closed(cx)) + .await + .expect("server"); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn reject_non_authority_target_on_connect_request() { + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + + assert_eq!(settings.is_extended_connect_protocol_enabled(), Some(true)); + + client + .send_frame(frames::headers(1).request("CONNECT", "https://bread/baguette")) + .await; + + client.recv_frame(frames::reset(1).protocol_error()).await; + }; + + let srv = async move { + let mut builder = server::Builder::new(); + + builder.enable_connect_protocol(); + + let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake"); + + assert!(srv.next().await.is_none()); + + poll_fn(move |cx| srv.poll_closed(cx)) + .await + .expect("server"); + }; + + join(client, srv).await; +} diff --git a/util/genfixture/Cargo.toml b/util/genfixture/Cargo.toml index 694a99496..cce7eb1b1 100644 --- a/util/genfixture/Cargo.toml +++ b/util/genfixture/Cargo.toml @@ -6,4 +6,4 @@ publish = false edition = "2018" [dependencies] -walkdir = "1.0.0" +walkdir = "2.3.2"