From dbaa3a42856d23c6bdebb6125c2205d965532178 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 22 Nov 2021 17:04:24 -0800 Subject: [PATCH 01/26] fix: properly reject prioritized HEADERS with stream ID of zero --- src/frame/headers.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 0851d7660..2fc9561cf 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -146,6 +146,10 @@ impl Headers { tracing::trace!("loading headers; flags={:?}", flags); + if head.stream_id().is_zero() { + return Err(Error::InvalidStreamId); + } + // Read the padding length if flags.is_padded() { if src.is_empty() { From 87969c1f296173a7838956165014b4828dc5d5db Mon Sep 17 00:00:00 2001 From: Anthony Ramine <123095+nox@users.noreply.github.com> Date: Wed, 24 Nov 2021 10:05:10 +0100 Subject: [PATCH 02/26] Implement the extended CONNECT protocol from RFC 8441 (#565) --- src/client.rs | 20 ++- src/ext.rs | 55 ++++++++ src/frame/headers.rs | 20 ++- src/frame/settings.rs | 27 ++++ src/hpack/header.rs | 18 +++ src/hpack/table.rs | 1 + src/hpack/test/fixture.rs | 2 + src/lib.rs | 1 + src/proto/connection.rs | 11 ++ src/proto/settings.rs | 4 +- src/proto/streams/mod.rs | 3 + src/proto/streams/recv.rs | 110 ++++++++------- src/proto/streams/send.rs | 18 ++- src/proto/streams/store.rs | 16 ++- src/proto/streams/streams.rs | 66 ++++----- src/server.rs | 36 ++++- tests/h2-support/src/assert.rs | 11 ++ tests/h2-support/src/frames.rs | 28 +++- tests/h2-support/src/mock.rs | 21 +-- tests/h2-support/src/prelude.rs | 5 +- tests/h2-tests/tests/client_request.rs | 147 +++++++++++++++++++ tests/h2-tests/tests/server.rs | 188 +++++++++++++++++++++++++ 22 files changed, 691 insertions(+), 117 deletions(-) create mode 100644 src/ext.rs diff --git a/src/client.rs b/src/client.rs index 9cd0b8f46..3a818a582 100644 --- a/src/client.rs +++ b/src/client.rs @@ -136,6 +136,7 @@ //! [`Error`]: ../struct.Error.html use crate::codec::{Codec, SendError, UserError}; +use crate::ext::Protocol; use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId}; use crate::proto::{self, Error}; use crate::{FlowControl, PingPong, RecvStream, SendStream}; @@ -517,6 +518,19 @@ where (response, stream) }) } + + /// Returns whether the [extended CONNECT protocol][1] is enabled or not. + /// + /// This setting is configured by the server peer by sending the + /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame. + /// This method returns the currently acknowledged value recieved from the + /// remote. + /// + /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 + /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3 + pub fn is_extended_connect_protocol_enabled(&self) -> bool { + self.inner.is_extended_connect_protocol_enabled() + } } impl fmt::Debug for SendRequest @@ -1246,11 +1260,10 @@ where /// This method returns the currently acknowledged value recieved from the /// remote. /// - /// [settings]: https://tools.ietf.org/html/rfc7540#section-5.1.2 + /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2 pub fn max_concurrent_send_streams(&self) -> usize { self.inner.max_send_streams() } - /// Returns the maximum number of concurrent streams that may be initiated /// by the server on this connection. /// @@ -1416,6 +1429,7 @@ impl Peer { pub fn convert_send_message( id: StreamId, request: Request<()>, + protocol: Option, end_of_stream: bool, ) -> Result { use http::request::Parts; @@ -1435,7 +1449,7 @@ impl Peer { // Build the set pseudo header set. All requests will include `method` // and `path`. - let mut pseudo = Pseudo::request(method, uri); + let mut pseudo = Pseudo::request(method, uri, protocol); if pseudo.scheme.is_none() { // If the scheme is not set, then there are a two options. diff --git a/src/ext.rs b/src/ext.rs new file mode 100644 index 000000000..cf383a495 --- /dev/null +++ b/src/ext.rs @@ -0,0 +1,55 @@ +//! Extensions specific to the HTTP/2 protocol. + +use crate::hpack::BytesStr; + +use bytes::Bytes; +use std::fmt; + +/// Represents the `:protocol` pseudo-header used by +/// the [Extended CONNECT Protocol]. +/// +/// [Extended CONNECT Protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 +#[derive(Clone, Eq, PartialEq)] +pub struct Protocol { + value: BytesStr, +} + +impl Protocol { + /// Converts a static string to a protocol name. + pub const fn from_static(value: &'static str) -> Self { + Self { + value: BytesStr::from_static(value), + } + } + + /// Returns a str representation of the header. + pub fn as_str(&self) -> &str { + self.value.as_str() + } + + pub(crate) fn try_from(bytes: Bytes) -> Result { + Ok(Self { + value: BytesStr::try_from(bytes)?, + }) + } +} + +impl<'a> From<&'a str> for Protocol { + fn from(value: &'a str) -> Self { + Self { + value: BytesStr::from(value), + } + } +} + +impl AsRef<[u8]> for Protocol { + fn as_ref(&self) -> &[u8] { + self.value.as_ref() + } +} + +impl fmt::Debug for Protocol { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.value.fmt(f) + } +} diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 2fc9561cf..05d77234f 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -1,4 +1,5 @@ use super::{util, StreamDependency, StreamId}; +use crate::ext::Protocol; use crate::frame::{Error, Frame, Head, Kind}; use crate::hpack::{self, BytesStr}; @@ -66,6 +67,7 @@ pub struct Pseudo { pub scheme: Option, pub authority: Option, pub path: Option, + pub protocol: Option, // Response pub status: Option, @@ -292,6 +294,10 @@ impl fmt::Debug for Headers { .field("stream_id", &self.stream_id) .field("flags", &self.flags); + if let Some(ref protocol) = self.header_block.pseudo.protocol { + builder.field("protocol", protocol); + } + if let Some(ref dep) = self.stream_dep { builder.field("stream_dep", dep); } @@ -529,7 +535,7 @@ impl Continuation { // ===== impl Pseudo ===== impl Pseudo { - pub fn request(method: Method, uri: Uri) -> Self { + pub fn request(method: Method, uri: Uri, protocol: Option) -> Self { let parts = uri::Parts::from(uri); let mut path = parts @@ -550,6 +556,7 @@ impl Pseudo { scheme: None, authority: None, path: Some(path).filter(|p| !p.is_empty()), + protocol, status: None, }; @@ -575,6 +582,7 @@ impl Pseudo { scheme: None, authority: None, path: None, + protocol: None, status: Some(status), } } @@ -593,6 +601,11 @@ impl Pseudo { self.scheme = Some(bytes_str); } + #[cfg(feature = "unstable")] + pub fn set_protocol(&mut self, protocol: Protocol) { + self.protocol = Some(protocol); + } + pub fn set_authority(&mut self, authority: BytesStr) { self.authority = Some(authority); } @@ -681,6 +694,10 @@ impl Iterator for Iter { return Some(Path(path)); } + if let Some(protocol) = pseudo.protocol.take() { + return Some(Protocol(protocol)); + } + if let Some(status) = pseudo.status.take() { return Some(Status(status)); } @@ -879,6 +896,7 @@ impl HeaderBlock { Method(v) => set_pseudo!(method, v), Scheme(v) => set_pseudo!(scheme, v), Path(v) => set_pseudo!(path, v), + Protocol(v) => set_pseudo!(protocol, v), Status(v) => set_pseudo!(status, v), } }); diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 523f20b06..080d0f4e5 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -13,6 +13,7 @@ pub struct Settings { initial_window_size: Option, max_frame_size: Option, max_header_list_size: Option, + enable_connect_protocol: Option, } /// An enum that lists all valid settings that can be sent in a SETTINGS @@ -27,6 +28,7 @@ pub enum Setting { InitialWindowSize(u32), MaxFrameSize(u32), MaxHeaderListSize(u32), + EnableConnectProtocol(u32), } #[derive(Copy, Clone, Eq, PartialEq, Default)] @@ -107,6 +109,14 @@ impl Settings { self.enable_push = Some(enable as u32); } + pub fn is_extended_connect_protocol_enabled(&self) -> Option { + self.enable_connect_protocol.map(|val| val != 0) + } + + pub fn set_enable_connect_protocol(&mut self, val: Option) { + self.enable_connect_protocol = val; + } + pub fn header_table_size(&self) -> Option { self.header_table_size } @@ -181,6 +191,14 @@ impl Settings { Some(MaxHeaderListSize(val)) => { settings.max_header_list_size = Some(val); } + Some(EnableConnectProtocol(val)) => match val { + 0 | 1 => { + settings.enable_connect_protocol = Some(val); + } + _ => { + return Err(Error::InvalidSettingValue); + } + }, None => {} } } @@ -236,6 +254,10 @@ impl Settings { if let Some(v) = self.max_header_list_size { f(MaxHeaderListSize(v)); } + + if let Some(v) = self.enable_connect_protocol { + f(EnableConnectProtocol(v)); + } } } @@ -269,6 +291,9 @@ impl fmt::Debug for Settings { Setting::MaxHeaderListSize(v) => { builder.field("max_header_list_size", &v); } + Setting::EnableConnectProtocol(v) => { + builder.field("enable_connect_protocol", &v); + } }); builder.finish() @@ -291,6 +316,7 @@ impl Setting { 4 => Some(InitialWindowSize(val)), 5 => Some(MaxFrameSize(val)), 6 => Some(MaxHeaderListSize(val)), + 8 => Some(EnableConnectProtocol(val)), _ => None, } } @@ -322,6 +348,7 @@ impl Setting { InitialWindowSize(v) => (4, v), MaxFrameSize(v) => (5, v), MaxHeaderListSize(v) => (6, v), + EnableConnectProtocol(v) => (8, v), }; dst.put_u16(kind); diff --git a/src/hpack/header.rs b/src/hpack/header.rs index 8d6136e16..e6df555ab 100644 --- a/src/hpack/header.rs +++ b/src/hpack/header.rs @@ -1,4 +1,5 @@ use super::{DecoderError, NeedMore}; +use crate::ext::Protocol; use bytes::Bytes; use http::header::{HeaderName, HeaderValue}; @@ -14,6 +15,7 @@ pub enum Header { Method(Method), Scheme(BytesStr), Path(BytesStr), + Protocol(Protocol), Status(StatusCode), } @@ -25,6 +27,7 @@ pub enum Name<'a> { Method, Scheme, Path, + Protocol, Status, } @@ -51,6 +54,7 @@ impl Header> { Method(v) => Method(v), Scheme(v) => Scheme(v), Path(v) => Path(v), + Protocol(v) => Protocol(v), Status(v) => Status(v), }) } @@ -79,6 +83,10 @@ impl Header { let value = BytesStr::try_from(value)?; Ok(Header::Path(value)) } + b"protocol" => { + let value = Protocol::try_from(value)?; + Ok(Header::Protocol(value)) + } b"status" => { let status = StatusCode::from_bytes(&value)?; Ok(Header::Status(status)) @@ -104,6 +112,7 @@ impl Header { Header::Method(ref v) => 32 + 7 + v.as_ref().len(), Header::Scheme(ref v) => 32 + 7 + v.len(), Header::Path(ref v) => 32 + 5 + v.len(), + Header::Protocol(ref v) => 32 + 9 + v.as_str().len(), Header::Status(_) => 32 + 7 + 3, } } @@ -116,6 +125,7 @@ impl Header { Header::Method(..) => Name::Method, Header::Scheme(..) => Name::Scheme, Header::Path(..) => Name::Path, + Header::Protocol(..) => Name::Protocol, Header::Status(..) => Name::Status, } } @@ -127,6 +137,7 @@ impl Header { Header::Method(ref v) => v.as_ref().as_ref(), Header::Scheme(ref v) => v.as_ref(), Header::Path(ref v) => v.as_ref(), + Header::Protocol(ref v) => v.as_ref(), Header::Status(ref v) => v.as_str().as_ref(), } } @@ -156,6 +167,10 @@ impl Header { Header::Path(ref b) => a == b, _ => false, }, + Header::Protocol(ref a) => match *other { + Header::Protocol(ref b) => a == b, + _ => false, + }, Header::Status(ref a) => match *other { Header::Status(ref b) => a == b, _ => false, @@ -205,6 +220,7 @@ impl From
for Header> { Header::Method(v) => Header::Method(v), Header::Scheme(v) => Header::Scheme(v), Header::Path(v) => Header::Path(v), + Header::Protocol(v) => Header::Protocol(v), Header::Status(v) => Header::Status(v), } } @@ -221,6 +237,7 @@ impl<'a> Name<'a> { Name::Method => Ok(Header::Method(Method::from_bytes(&*value)?)), Name::Scheme => Ok(Header::Scheme(BytesStr::try_from(value)?)), Name::Path => Ok(Header::Path(BytesStr::try_from(value)?)), + Name::Protocol => Ok(Header::Protocol(Protocol::try_from(value)?)), Name::Status => { match StatusCode::from_bytes(&value) { Ok(status) => Ok(Header::Status(status)), @@ -238,6 +255,7 @@ impl<'a> Name<'a> { Name::Method => b":method", Name::Scheme => b":scheme", Name::Path => b":path", + Name::Protocol => b":protocol", Name::Status => b":status", } } diff --git a/src/hpack/table.rs b/src/hpack/table.rs index 2328743a8..0124f216d 100644 --- a/src/hpack/table.rs +++ b/src/hpack/table.rs @@ -751,6 +751,7 @@ fn index_static(header: &Header) -> Option<(usize, bool)> { "/index.html" => Some((5, true)), _ => Some((4, false)), }, + Header::Protocol(..) => None, Header::Status(ref v) => match u16::from(*v) { 200 => Some((8, true)), 204 => Some((9, true)), diff --git a/src/hpack/test/fixture.rs b/src/hpack/test/fixture.rs index 6d0448425..3428c3958 100644 --- a/src/hpack/test/fixture.rs +++ b/src/hpack/test/fixture.rs @@ -134,6 +134,7 @@ fn key_str(e: &Header) -> &str { Header::Method(..) => ":method", Header::Scheme(..) => ":scheme", Header::Path(..) => ":path", + Header::Protocol(..) => ":protocol", Header::Status(..) => ":status", } } @@ -145,6 +146,7 @@ fn value_str(e: &Header) -> &str { Header::Method(ref m) => m.as_str(), Header::Scheme(ref v) => &**v, Header::Path(ref v) => &**v, + Header::Protocol(ref v) => v.as_str(), Header::Status(ref v) => v.as_str(), } } diff --git a/src/lib.rs b/src/lib.rs index cb02acaff..db6b4888c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -120,6 +120,7 @@ mod frame; pub mod frame; pub mod client; +pub mod ext; pub mod server; mod share; diff --git a/src/proto/connection.rs b/src/proto/connection.rs index a75df4369..d1b8b5125 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -110,6 +110,10 @@ where initial_max_send_streams: config.initial_max_send_streams, local_next_stream_id: config.next_stream_id, local_push_enabled: config.settings.is_push_enabled().unwrap_or(true), + extended_connect_protocol_enabled: config + .settings + .is_extended_connect_protocol_enabled() + .unwrap_or(false), local_reset_duration: config.reset_stream_duration, local_reset_max: config.reset_stream_max, remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, @@ -147,6 +151,13 @@ where self.inner.settings.send_settings(settings) } + /// Send a new SETTINGS frame with extended CONNECT protocol enabled. + pub(crate) fn set_enable_connect_protocol(&mut self) -> Result<(), UserError> { + let mut settings = frame::Settings::default(); + settings.set_enable_connect_protocol(Some(1)); + self.inner.settings.send_settings(settings) + } + /// Returns the maximum number of concurrent streams that may be initiated /// by this peer. pub(crate) fn max_send_streams(&self) -> usize { diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 44f4c2df4..6cc617209 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -117,6 +117,8 @@ impl Settings { tracing::trace!("ACK sent; applying settings"); + streams.apply_remote_settings(settings)?; + if let Some(val) = settings.header_table_size() { dst.set_send_header_table_size(val as usize); } @@ -124,8 +126,6 @@ impl Settings { if let Some(val) = settings.max_frame_size() { dst.set_max_send_frame_size(val as usize); } - - streams.apply_remote_settings(settings)?; } self.remote = None; diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 608395c0f..0fd61a29a 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -47,6 +47,9 @@ pub struct Config { /// If the local peer is willing to receive push promises pub local_push_enabled: bool, + /// If extended connect protocol is enabled. + pub extended_connect_protocol_enabled: bool, + /// How long a locally reset stream should ignore frames pub local_reset_duration: Duration, diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index be996b963..e613c26b3 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -56,6 +56,9 @@ pub(super) struct Recv { /// If push promises are allowed to be received. is_push_enabled: bool, + + /// If extended connect protocol is enabled. + is_extended_connect_protocol_enabled: bool, } #[derive(Debug)] @@ -103,6 +106,7 @@ impl Recv { buffer: Buffer::new(), refused: None, is_push_enabled: config.local_push_enabled, + is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled, } } @@ -216,6 +220,14 @@ impl Recv { let stream_id = frame.stream_id(); let (pseudo, fields) = frame.into_parts(); + + if pseudo.protocol.is_some() { + if counts.peer().is_server() && !self.is_extended_connect_protocol_enabled { + proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into()); + } + } + if !pseudo.is_informational() { let message = counts .peer() @@ -449,60 +461,58 @@ impl Recv { settings: &frame::Settings, store: &mut Store, ) -> Result<(), proto::Error> { - let target = if let Some(val) = settings.initial_window_size() { - val - } else { - return Ok(()); - }; + if let Some(val) = settings.is_extended_connect_protocol_enabled() { + self.is_extended_connect_protocol_enabled = val; + } - let old_sz = self.init_window_sz; - self.init_window_sz = target; + if let Some(target) = settings.initial_window_size() { + let old_sz = self.init_window_sz; + self.init_window_sz = target; - tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,); + tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,); - // Per RFC 7540 §6.9.2: - // - // In addition to changing the flow-control window for streams that are - // not yet active, a SETTINGS frame can alter the initial flow-control - // window size for streams with active flow-control windows (that is, - // streams in the "open" or "half-closed (remote)" state). When the - // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust - // the size of all stream flow-control windows that it maintains by the - // difference between the new value and the old value. - // - // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available - // space in a flow-control window to become negative. A sender MUST - // track the negative flow-control window and MUST NOT send new - // flow-controlled frames until it receives WINDOW_UPDATE frames that - // cause the flow-control window to become positive. - - if target < old_sz { - // We must decrease the (local) window on every open stream. - let dec = old_sz - target; - tracing::trace!("decrementing all windows; dec={}", dec); - - store.for_each(|mut stream| { - stream.recv_flow.dec_recv_window(dec); - Ok(()) - }) - } else if target > old_sz { - // We must increase the (local) window on every open stream. - let inc = target - old_sz; - tracing::trace!("incrementing all windows; inc={}", inc); - store.for_each(|mut stream| { - // XXX: Shouldn't the peer have already noticed our - // overflow and sent us a GOAWAY? - stream - .recv_flow - .inc_window(inc) - .map_err(proto::Error::library_go_away)?; - stream.recv_flow.assign_capacity(inc); - Ok(()) - }) - } else { - // size is the same... so do nothing - Ok(()) + // Per RFC 7540 §6.9.2: + // + // In addition to changing the flow-control window for streams that are + // not yet active, a SETTINGS frame can alter the initial flow-control + // window size for streams with active flow-control windows (that is, + // streams in the "open" or "half-closed (remote)" state). When the + // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust + // the size of all stream flow-control windows that it maintains by the + // difference between the new value and the old value. + // + // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available + // space in a flow-control window to become negative. A sender MUST + // track the negative flow-control window and MUST NOT send new + // flow-controlled frames until it receives WINDOW_UPDATE frames that + // cause the flow-control window to become positive. + + if target < old_sz { + // We must decrease the (local) window on every open stream. + let dec = old_sz - target; + tracing::trace!("decrementing all windows; dec={}", dec); + + store.for_each(|mut stream| { + stream.recv_flow.dec_recv_window(dec); + }) + } else if target > old_sz { + // We must increase the (local) window on every open stream. + let inc = target - old_sz; + tracing::trace!("incrementing all windows; inc={}", inc); + store.try_for_each(|mut stream| { + // XXX: Shouldn't the peer have already noticed our + // overflow and sent us a GOAWAY? + stream + .recv_flow + .inc_window(inc) + .map_err(proto::Error::library_go_away)?; + stream.recv_flow.assign_capacity(inc); + Ok::<_, proto::Error>(()) + })?; + } } + + Ok(()) } pub fn is_end_stream(&self, stream: &store::Ptr) -> bool { diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 3735d13dd..e3fcf6d32 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -35,6 +35,9 @@ pub(super) struct Send { prioritize: Prioritize, is_push_enabled: bool, + + /// If extended connect protocol is enabled. + is_extended_connect_protocol_enabled: bool, } /// A value to detect which public API has called `poll_reset`. @@ -53,6 +56,7 @@ impl Send { next_stream_id: Ok(config.local_next_stream_id), prioritize: Prioritize::new(config), is_push_enabled: true, + is_extended_connect_protocol_enabled: false, } } @@ -429,6 +433,10 @@ impl Send { counts: &mut Counts, task: &mut Option, ) -> Result<(), Error> { + if let Some(val) = settings.is_extended_connect_protocol_enabled() { + self.is_extended_connect_protocol_enabled = val; + } + // Applies an update to the remote endpoint's initial window size. // // Per RFC 7540 §6.9.2: @@ -490,16 +498,14 @@ impl Send { // TODO: Should this notify the producer when the capacity // of a stream is reduced? Maybe it should if the capacity // is reduced to zero, allowing the producer to stop work. - - Ok::<_, Error>(()) - })?; + }); self.prioritize .assign_connection_capacity(total_reclaimed, store, counts); } else if val > old_val { let inc = val - old_val; - store.for_each(|mut stream| { + store.try_for_each(|mut stream| { self.recv_stream_window_update(inc, buffer, &mut stream, counts, task) .map_err(Error::library_go_away) })?; @@ -554,4 +560,8 @@ impl Send { } } } + + pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { + self.is_extended_connect_protocol_enabled + } } diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index ac58f43ac..3e34b7cb2 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -4,6 +4,7 @@ use slab; use indexmap::{self, IndexMap}; +use std::convert::Infallible; use std::fmt; use std::marker::PhantomData; use std::ops; @@ -128,7 +129,20 @@ impl Store { } } - pub fn for_each(&mut self, mut f: F) -> Result<(), E> + pub(crate) fn for_each(&mut self, mut f: F) + where + F: FnMut(Ptr), + { + match self.try_for_each(|ptr| { + f(ptr); + Ok::<_, Infallible>(()) + }) { + Ok(()) => (), + Err(infallible) => match infallible {}, + } + } + + pub fn try_for_each(&mut self, mut f: F) -> Result<(), E> where F: FnMut(Ptr) -> Result<(), E>, { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 4962db8d2..5c235c15c 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -2,6 +2,7 @@ use super::recv::RecvHeaderBlockError; use super::store::{self, Entry, Resolve, Store}; use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId}; use crate::codec::{Codec, SendError, UserError}; +use crate::ext::Protocol; use crate::frame::{self, Frame, Reason}; use crate::proto::{peer, Error, Initiator, Open, Peer, WindowSize}; use crate::{client, proto, server}; @@ -214,6 +215,8 @@ where use super::stream::ContentLength; use http::Method; + let protocol = request.extensions_mut().remove::(); + // Clear before taking lock, incase extensions contain a StreamRef. request.extensions_mut().clear(); @@ -261,7 +264,8 @@ where } // Convert the message - let headers = client::Peer::convert_send_message(stream_id, request, end_of_stream)?; + let headers = + client::Peer::convert_send_message(stream_id, request, protocol, end_of_stream)?; let mut stream = me.store.insert(stream.id, stream); @@ -294,6 +298,15 @@ where send_buffer: self.send_buffer.clone(), }) } + + pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { + self.inner + .lock() + .unwrap() + .actions + .send + .is_extended_connect_protocol_enabled() + } } impl DynStreams<'_, B> { @@ -643,15 +656,12 @@ impl Inner { let last_processed_id = actions.recv.last_processed_id(); - self.store - .for_each(|stream| { - counts.transition(stream, |counts, stream| { - actions.recv.handle_error(&err, &mut *stream); - actions.send.handle_error(send_buffer, stream, counts); - Ok::<_, ()>(()) - }) + self.store.for_each(|stream| { + counts.transition(stream, |counts, stream| { + actions.recv.handle_error(&err, &mut *stream); + actions.send.handle_error(send_buffer, stream, counts); }) - .unwrap(); + }); actions.conn_error = Some(err); @@ -674,19 +684,14 @@ impl Inner { let err = Error::remote_go_away(frame.debug_data().clone(), frame.reason()); - self.store - .for_each(|stream| { - if stream.id > last_stream_id { - counts.transition(stream, |counts, stream| { - actions.recv.handle_error(&err, &mut *stream); - actions.send.handle_error(send_buffer, stream, counts); - Ok::<_, ()>(()) - }) - } else { - Ok::<_, ()>(()) - } - }) - .unwrap(); + self.store.for_each(|stream| { + if stream.id > last_stream_id { + counts.transition(stream, |counts, stream| { + actions.recv.handle_error(&err, &mut *stream); + actions.send.handle_error(send_buffer, stream, counts); + }) + } + }); actions.conn_error = Some(err); @@ -807,18 +812,15 @@ impl Inner { tracing::trace!("Streams::recv_eof"); - self.store - .for_each(|stream| { - counts.transition(stream, |counts, stream| { - actions.recv.recv_eof(stream); + self.store.for_each(|stream| { + counts.transition(stream, |counts, stream| { + actions.recv.recv_eof(stream); - // This handles resetting send state associated with the - // stream - actions.send.handle_error(send_buffer, stream, counts); - Ok::<_, ()>(()) - }) + // This handles resetting send state associated with the + // stream + actions.send.handle_error(send_buffer, stream, counts); }) - .expect("recv_eof"); + }); actions.clear_queues(clear_pending_accept, &mut self.store, counts); Ok(()) diff --git a/src/server.rs b/src/server.rs index 491446460..1eb40312c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -470,6 +470,19 @@ where Ok(()) } + /// Enables the [extended CONNECT protocol]. + /// + /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 + /// + /// # Errors + /// + /// Returns an error if a previous call is still pending acknowledgement + /// from the remote endpoint. + pub fn enable_connect_protocol(&mut self) -> Result<(), crate::Error> { + self.connection.set_enable_connect_protocol()?; + Ok(()) + } + /// Returns `Ready` when the underlying connection has closed. /// /// If any new inbound streams are received during a call to `poll_closed`, @@ -904,6 +917,14 @@ impl Builder { self } + /// Enables the [extended CONNECT protocol]. + /// + /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 + pub fn enable_connect_protocol(&mut self) -> &mut Self { + self.settings.set_enable_connect_protocol(Some(1)); + self + } + /// Creates a new configured HTTP/2 server backed by `io`. /// /// It is expected that `io` already be in an appropriate state to commence @@ -1360,7 +1381,7 @@ impl Peer { _, ) = request.into_parts(); - let pseudo = Pseudo::request(method, uri); + let pseudo = Pseudo::request(method, uri, None); Ok(frame::PushPromise::new( stream_id, @@ -1410,6 +1431,11 @@ impl proto::Peer for Peer { malformed!("malformed headers: missing method"); } + let has_protocol = pseudo.protocol.is_some(); + if !is_connect && has_protocol { + malformed!("malformed headers: :protocol on non-CONNECT request"); + } + if pseudo.status.is_some() { malformed!("malformed headers: :status field on request"); } @@ -1432,7 +1458,7 @@ impl proto::Peer for Peer { // A :scheme is required, except CONNECT. if let Some(scheme) = pseudo.scheme { - if is_connect { + if is_connect && !has_protocol { malformed!(":scheme in CONNECT"); } let maybe_scheme = scheme.parse(); @@ -1450,12 +1476,12 @@ impl proto::Peer for Peer { if parts.authority.is_some() { parts.scheme = Some(scheme); } - } else if !is_connect { + } else if !is_connect || has_protocol { malformed!("malformed headers: missing scheme"); } if let Some(path) = pseudo.path { - if is_connect { + if is_connect && !has_protocol { malformed!(":path in CONNECT"); } @@ -1468,6 +1494,8 @@ impl proto::Peer for Peer { parts.path_and_query = Some(maybe_path.or_else(|why| { malformed!("malformed headers: malformed path ({:?}): {}", path, why,) })?); + } else if is_connect && has_protocol { + malformed!("malformed headers: missing path in extended CONNECT"); } b = b.uri(parts); diff --git a/tests/h2-support/src/assert.rs b/tests/h2-support/src/assert.rs index 8bc6d25c7..88e3d4f7c 100644 --- a/tests/h2-support/src/assert.rs +++ b/tests/h2-support/src/assert.rs @@ -47,6 +47,16 @@ macro_rules! assert_settings { }}; } +#[macro_export] +macro_rules! assert_go_away { + ($frame:expr) => {{ + match $frame { + h2::frame::Frame::GoAway(v) => v, + f => panic!("expected GO_AWAY; actual={:?}", f), + } + }}; +} + #[macro_export] macro_rules! poll_err { ($transport:expr) => {{ @@ -80,6 +90,7 @@ macro_rules! assert_default_settings { use h2::frame::Frame; +#[track_caller] pub fn assert_frame_eq, U: Into>(t: T, u: U) { let actual: Frame = t.into(); let expected: Frame = u.into(); diff --git a/tests/h2-support/src/frames.rs b/tests/h2-support/src/frames.rs index 824bc5c19..f2c07bacb 100644 --- a/tests/h2-support/src/frames.rs +++ b/tests/h2-support/src/frames.rs @@ -4,7 +4,10 @@ use std::fmt; use bytes::Bytes; use http::{self, HeaderMap, StatusCode}; -use h2::frame::{self, Frame, StreamId}; +use h2::{ + ext::Protocol, + frame::{self, Frame, StreamId}, +}; pub const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; pub const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; @@ -109,7 +112,9 @@ impl Mock { let method = method.try_into().unwrap(); let uri = uri.try_into().unwrap(); let (id, _, fields) = self.into_parts(); - let frame = frame::Headers::new(id, frame::Pseudo::request(method, uri), fields); + let extensions = Default::default(); + let pseudo = frame::Pseudo::request(method, uri, extensions); + let frame = frame::Headers::new(id, pseudo, fields); Mock(frame) } @@ -179,6 +184,15 @@ impl Mock { Mock(frame::Headers::new(id, pseudo, fields)) } + pub fn protocol(self, value: &str) -> Self { + let (id, mut pseudo, fields) = self.into_parts(); + let value = Protocol::from(value); + + pseudo.set_protocol(value); + + Mock(frame::Headers::new(id, pseudo, fields)) + } + pub fn eos(mut self) -> Self { self.0.set_end_stream(); self @@ -230,8 +244,9 @@ impl Mock { let method = method.try_into().unwrap(); let uri = uri.try_into().unwrap(); let (id, promised, _, fields) = self.into_parts(); - let frame = - frame::PushPromise::new(id, promised, frame::Pseudo::request(method, uri), fields); + let extensions = Default::default(); + let pseudo = frame::Pseudo::request(method, uri, extensions); + let frame = frame::PushPromise::new(id, promised, pseudo, fields); Mock(frame) } @@ -352,6 +367,11 @@ impl Mock { self.0.set_enable_push(false); self } + + pub fn enable_connect_protocol(mut self, val: u32) -> Self { + self.0.set_enable_connect_protocol(Some(val)); + self + } } impl From> for frame::Settings { diff --git a/tests/h2-support/src/mock.rs b/tests/h2-support/src/mock.rs index b5df9ad9b..cc314cd06 100644 --- a/tests/h2-support/src/mock.rs +++ b/tests/h2-support/src/mock.rs @@ -221,22 +221,15 @@ impl Handle { let settings = settings.into(); self.send(settings.into()).await.unwrap(); - let frame = self.next().await; - let settings = match frame { - Some(frame) => match frame.unwrap() { - Frame::Settings(settings) => { - // Send the ACK - let ack = frame::Settings::ack(); + let frame = self.next().await.expect("unexpected EOF").unwrap(); + let settings = assert_settings!(frame); - // TODO: Don't unwrap? - self.send(ack.into()).await.unwrap(); + // Send the ACK + let ack = frame::Settings::ack(); + + // TODO: Don't unwrap? + self.send(ack.into()).await.unwrap(); - settings - } - frame => panic!("unexpected frame; frame={:?}", frame), - }, - None => panic!("unexpected EOF"), - }; let frame = self.next().await; let f = assert_settings!(frame.unwrap().unwrap()); diff --git a/tests/h2-support/src/prelude.rs b/tests/h2-support/src/prelude.rs index 1fcb0dcc4..86ef3249e 100644 --- a/tests/h2-support/src/prelude.rs +++ b/tests/h2-support/src/prelude.rs @@ -2,6 +2,7 @@ pub use h2; pub use h2::client; +pub use h2::ext::Protocol; pub use h2::frame::StreamId; pub use h2::server; pub use h2::*; @@ -20,8 +21,8 @@ pub use super::{Codec, SendFrame}; // Re-export macros pub use super::{ - assert_closed, assert_data, assert_default_settings, assert_headers, assert_ping, poll_err, - poll_frame, raw_codec, + assert_closed, assert_data, assert_default_settings, assert_go_away, assert_headers, + assert_ping, assert_settings, poll_err, poll_frame, raw_codec, }; pub use super::assert::assert_frame_eq; diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 2af0bdeec..9635bcc6c 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -1305,6 +1305,153 @@ async fn informational_while_local_streaming() { join(srv, h2).await; } +#[tokio::test] +async fn extended_connect_protocol_disabled_by_default() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + + srv.recv_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + }; + + let h2 = async move { + let (mut client, mut h2) = client::handshake(io).await.unwrap(); + + // we send a simple req here just to drive the connection so we can + // receive the server settings. + let request = Request::get("https://example.com/").body(()).unwrap(); + // first request is allowed + let (response, _) = client.send_request(request, true).unwrap(); + h2.drive(response).await.unwrap(); + + assert!(!client.is_extended_connect_protocol_enabled()); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn extended_connect_protocol_enabled_during_handshake() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv + .assert_client_handshake_with_settings(frames::settings().enable_connect_protocol(1)) + .await; + assert_default_settings!(settings); + + srv.recv_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + }; + + let h2 = async move { + let (mut client, mut h2) = client::handshake(io).await.unwrap(); + + // we send a simple req here just to drive the connection so we can + // receive the server settings. + let request = Request::get("https://example.com/").body(()).unwrap(); + let (response, _) = client.send_request(request, true).unwrap(); + h2.drive(response).await.unwrap(); + + assert!(client.is_extended_connect_protocol_enabled()); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn invalid_connect_protocol_enabled_setting() { + h2_support::trace_init!(); + + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send a settings frame + srv.send(frames::settings().enable_connect_protocol(2).into()) + .await + .unwrap(); + srv.read_preface().await.unwrap(); + + let settings = assert_settings!(srv.next().await.expect("unexpected EOF").unwrap()); + assert_default_settings!(settings); + + // Send the ACK + let ack = frame::Settings::ack(); + + // TODO: Don't unwrap? + srv.send(ack.into()).await.unwrap(); + + let frame = srv.next().await.unwrap().unwrap(); + let go_away = assert_go_away!(frame); + assert_eq!(go_away.reason(), Reason::PROTOCOL_ERROR); + }; + + let h2 = async move { + let (mut client, mut h2) = client::handshake(io).await.unwrap(); + + // we send a simple req here just to drive the connection so we can + // receive the server settings. + let request = Request::get("https://example.com/").body(()).unwrap(); + let (response, _) = client.send_request(request, true).unwrap(); + + let error = h2.drive(response).await.unwrap_err(); + assert_eq!(error.reason(), Some(Reason::PROTOCOL_ERROR)); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn extended_connect_request() { + h2_support::trace_init!(); + + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv + .assert_client_handshake_with_settings(frames::settings().enable_connect_protocol(1)) + .await; + assert_default_settings!(settings); + + srv.recv_frame( + frames::headers(1) + .request("CONNECT", "http://bread/baguette") + .protocol("the-bread-protocol") + .eos(), + ) + .await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + }; + + let h2 = async move { + let (mut client, mut h2) = client::handshake(io).await.unwrap(); + + let request = Request::connect("http://bread/baguette") + .extension(Protocol::from("the-bread-protocol")) + .body(()) + .unwrap(); + let (response, _) = client.send_request(request, true).unwrap(); + h2.drive(response).await.unwrap(); + }; + + join(srv, h2).await; +} + const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index e60483d0d..b3bf1a286 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -1149,3 +1149,191 @@ async fn send_reset_explicitly() { join(client, srv).await; } + +#[tokio::test] +async fn extended_connect_protocol_disabled_by_default() { + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + + assert_eq!(settings.is_extended_connect_protocol_enabled(), None); + + client + .send_frame( + frames::headers(1) + .request("CONNECT", "http://bread/baguette") + .protocol("the-bread-protocol"), + ) + .await; + + client.recv_frame(frames::reset(1).protocol_error()).await; + }; + + let srv = async move { + let mut srv = server::handshake(io).await.expect("handshake"); + + poll_fn(move |cx| srv.poll_closed(cx)) + .await + .expect("server"); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn extended_connect_protocol_enabled_during_handshake() { + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + + assert_eq!(settings.is_extended_connect_protocol_enabled(), Some(true)); + + client + .send_frame( + frames::headers(1) + .request("CONNECT", "http://bread/baguette") + .protocol("the-bread-protocol"), + ) + .await; + + client.recv_frame(frames::headers(1).response(200)).await; + }; + + let srv = async move { + let mut builder = server::Builder::new(); + + builder.enable_connect_protocol(); + + let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake"); + + let (_req, mut stream) = srv.next().await.unwrap().unwrap(); + + let rsp = Response::new(()); + stream.send_response(rsp, false).unwrap(); + + poll_fn(move |cx| srv.poll_closed(cx)) + .await + .expect("server"); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn reject_pseudo_protocol_on_non_connect_request() { + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + + assert_eq!(settings.is_extended_connect_protocol_enabled(), Some(true)); + + client + .send_frame( + frames::headers(1) + .request("GET", "http://bread/baguette") + .protocol("the-bread-protocol"), + ) + .await; + + client.recv_frame(frames::reset(1).protocol_error()).await; + }; + + let srv = async move { + let mut builder = server::Builder::new(); + + builder.enable_connect_protocol(); + + let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake"); + + assert!(srv.next().await.is_none()); + + poll_fn(move |cx| srv.poll_closed(cx)) + .await + .expect("server"); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn reject_authority_target_on_extended_connect_request() { + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + + assert_eq!(settings.is_extended_connect_protocol_enabled(), Some(true)); + + client + .send_frame( + frames::headers(1) + .request("CONNECT", "bread:80") + .protocol("the-bread-protocol"), + ) + .await; + + client.recv_frame(frames::reset(1).protocol_error()).await; + }; + + let srv = async move { + let mut builder = server::Builder::new(); + + builder.enable_connect_protocol(); + + let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake"); + + assert!(srv.next().await.is_none()); + + poll_fn(move |cx| srv.poll_closed(cx)) + .await + .expect("server"); + }; + + join(client, srv).await; +} + +#[tokio::test] +async fn reject_non_authority_target_on_connect_request() { + h2_support::trace_init!(); + + let (io, mut client) = mock::new(); + + let client = async move { + let settings = client.assert_server_handshake().await; + + assert_eq!(settings.is_extended_connect_protocol_enabled(), Some(true)); + + client + .send_frame(frames::headers(1).request("CONNECT", "https://bread/baguette")) + .await; + + client.recv_frame(frames::reset(1).protocol_error()).await; + }; + + let srv = async move { + let mut builder = server::Builder::new(); + + builder.enable_connect_protocol(); + + let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake"); + + assert!(srv.next().await.is_none()); + + poll_fn(move |cx| srv.poll_closed(cx)) + .await + .expect("server"); + }; + + join(client, srv).await; +} From e9e0f27b8013b61862e2e0aa81175ab08244d629 Mon Sep 17 00:00:00 2001 From: Anthony Ramine Date: Thu, 2 Dec 2021 12:46:50 +0100 Subject: [PATCH 03/26] Add test that would make wait_for_capacity hang if it doesn't loop --- tests/h2-support/src/util.rs | 12 +++---- tests/h2-tests/tests/flow_control.rs | 50 ++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 6 deletions(-) diff --git a/tests/h2-support/src/util.rs b/tests/h2-support/src/util.rs index ec768badc..b3322c4d2 100644 --- a/tests/h2-support/src/util.rs +++ b/tests/h2-support/src/util.rs @@ -54,14 +54,14 @@ impl Future for WaitForCapacity { type Output = h2::SendStream; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let _ = ready!(self.stream().poll_capacity(cx)).unwrap(); + loop { + let _ = ready!(self.stream().poll_capacity(cx)).unwrap(); - let act = self.stream().capacity(); + let act = self.stream().capacity(); - if act >= self.target { - return Poll::Ready(self.stream.take().unwrap().into()); + if act >= self.target { + return Poll::Ready(self.stream.take().unwrap().into()); + } } - - Poll::Pending } } diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index be04a61b7..e7d630808 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -1561,3 +1561,53 @@ async fn data_padding() { join(srv, h2).await; } + +#[tokio::test] +async fn poll_capacity_after_send_data_and_reserve() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv + .assert_client_handshake_with_settings(frames::settings().initial_window_size(5)) + .await; + assert_default_settings!(settings); + srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/")) + .await; + srv.send_frame(frames::headers(1).response(200)).await; + srv.recv_frame(frames::data(1, &b"abcde"[..])).await; + srv.send_frame(frames::window_update(1, 5)).await; + srv.recv_frame(frames::data(1, &b""[..]).eos()).await; + }; + + let h2 = async move { + let (mut client, mut h2) = client::handshake(io).await.unwrap(); + let request = Request::builder() + .method(Method::POST) + .uri("https://www.example.com/") + .body(()) + .unwrap(); + + let (response, mut stream) = client.send_request(request, false).unwrap(); + + let response = h2.drive(response).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + stream.send_data("abcde".into(), false).unwrap(); + + stream.reserve_capacity(5); + + // Initial window size was 5 so current capacity is 0 even if we just reserved. + assert_eq!(stream.capacity(), 0); + + // The first call to `poll_capacity` in `wait_for_capacity` will return 0. + let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await; + + stream.send_data("".into(), true).unwrap(); + + // Wait for the connection to close + h2.await.unwrap(); + }; + + join(srv, h2).await; +} From efa113bac6252104cd65284091814f8d13cd36dc Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 8 Dec 2021 10:03:15 -0800 Subject: [PATCH 04/26] Add max send buffer per stream option (#580) --- src/client.rs | 23 +++++++++++ src/proto/connection.rs | 2 + src/proto/mod.rs | 1 + src/proto/streams/mod.rs | 3 ++ src/proto/streams/send.rs | 12 +++--- src/server.rs | 23 +++++++++++ tests/h2-tests/tests/flow_control.rs | 57 ++++++++++++++++++++++++++++ 7 files changed, 115 insertions(+), 6 deletions(-) diff --git a/src/client.rs b/src/client.rs index 3a818a582..d4ec3b906 100644 --- a/src/client.rs +++ b/src/client.rs @@ -320,6 +320,9 @@ pub struct Builder { /// Initial target window size for new connections. initial_target_connection_window_size: Option, + /// Maximum amount of bytes to "buffer" for writing per stream. + max_send_buffer_size: usize, + /// Maximum number of locally reset streams to keep at a time. reset_stream_max: usize, @@ -628,6 +631,7 @@ impl Builder { /// ``` pub fn new() -> Builder { Builder { + max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, initial_target_connection_window_size: None, @@ -962,6 +966,24 @@ impl Builder { self } + /// Sets the maximum send buffer size per stream. + /// + /// Once a stream has buffered up to (or over) the maximum, the stream's + /// flow control will not "poll" additional capacity. Once bytes for the + /// stream have been written to the connection, the send buffer capacity + /// will be freed up again. + /// + /// The default is currently ~400MB, but may change. + /// + /// # Panics + /// + /// This function panics if `max` is larger than `u32::MAX`. + pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { + assert!(max <= std::u32::MAX as usize); + self.max_send_buffer_size = max; + self + } + /// Enables or disables server push promises. /// /// This value is included in the initial SETTINGS handshake. When set, the @@ -1184,6 +1206,7 @@ where proto::Config { next_stream_id: builder.stream_id, initial_max_send_streams: builder.initial_max_send_streams, + max_send_buffer_size: builder.max_send_buffer_size, reset_stream_duration: builder.reset_stream_duration, reset_stream_max: builder.reset_stream_max, settings: builder.settings.clone(), diff --git a/src/proto/connection.rs b/src/proto/connection.rs index d1b8b5125..cd011a1d5 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -77,6 +77,7 @@ struct DynConnection<'a, B: Buf = Bytes> { pub(crate) struct Config { pub next_stream_id: StreamId, pub initial_max_send_streams: usize, + pub max_send_buffer_size: usize, pub reset_stream_duration: Duration, pub reset_stream_max: usize, pub settings: frame::Settings, @@ -108,6 +109,7 @@ where .initial_window_size() .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE), initial_max_send_streams: config.initial_max_send_streams, + local_max_buffer_size: config.max_send_buffer_size, local_next_stream_id: config.next_stream_id, local_push_enabled: config.settings.is_push_enabled().unwrap_or(true), extended_connect_protocol_enabled: config diff --git a/src/proto/mod.rs b/src/proto/mod.rs index d505e77f3..5ec7bf992 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -33,3 +33,4 @@ pub type WindowSize = u32; pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; pub const DEFAULT_RESET_STREAM_MAX: usize = 10; pub const DEFAULT_RESET_STREAM_SECS: u64 = 30; +pub const DEFAULT_MAX_SEND_BUFFER_SIZE: usize = 1024 * 400; diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 0fd61a29a..de2a2c85a 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -41,6 +41,9 @@ pub struct Config { /// MAX_CONCURRENT_STREAMS specified in the frame. pub initial_max_send_streams: usize, + /// Max amount of DATA bytes to buffer per stream. + pub local_max_buffer_size: usize, + /// The stream ID to start the next local stream with pub local_next_stream_id: StreamId, diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index e3fcf6d32..b7230030e 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -28,6 +28,9 @@ pub(super) struct Send { /// > the identified last stream. max_stream_id: StreamId, + /// The maximum amount of bytes a stream should buffer. + max_buffer_size: usize, + /// Initial window size of locally initiated streams init_window_sz: WindowSize, @@ -52,6 +55,7 @@ impl Send { pub fn new(config: &Config) -> Self { Send { init_window_sz: config.remote_init_window_sz, + max_buffer_size: config.local_max_buffer_size, max_stream_id: StreamId::MAX, next_stream_id: Ok(config.local_next_stream_id), prioritize: Prioritize::new(config), @@ -333,14 +337,10 @@ impl Send { /// Current available stream send capacity pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { - let available = stream.send_flow.available().as_size(); + let available = stream.send_flow.available().as_size() as usize; let buffered = stream.buffered_send_data; - if available as usize <= buffered { - 0 - } else { - available - buffered as WindowSize - } + available.min(self.max_buffer_size).saturating_sub(buffered) as WindowSize } pub fn poll_reset( diff --git a/src/server.rs b/src/server.rs index 1eb40312c..87c300083 100644 --- a/src/server.rs +++ b/src/server.rs @@ -245,6 +245,9 @@ pub struct Builder { /// Initial target window size for new connections. initial_target_connection_window_size: Option, + + /// Maximum amount of bytes to "buffer" for writing per stream. + max_send_buffer_size: usize, } /// Send a response back to the client @@ -633,6 +636,7 @@ impl Builder { reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, settings: Settings::default(), initial_target_connection_window_size: None, + max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, } } @@ -870,6 +874,24 @@ impl Builder { self } + /// Sets the maximum send buffer size per stream. + /// + /// Once a stream has buffered up to (or over) the maximum, the stream's + /// flow control will not "poll" additional capacity. Once bytes for the + /// stream have been written to the connection, the send buffer capacity + /// will be freed up again. + /// + /// The default is currently ~400MB, but may change. + /// + /// # Panics + /// + /// This function panics if `max` is larger than `u32::MAX`. + pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { + assert!(max <= std::u32::MAX as usize); + self.max_send_buffer_size = max; + self + } + /// Sets the maximum number of concurrent locally reset streams. /// /// When a stream is explicitly reset by either calling @@ -1290,6 +1312,7 @@ where next_stream_id: 2.into(), // Server does not need to locally initiate any streams initial_max_send_streams: 0, + max_send_buffer_size: self.builder.max_send_buffer_size, reset_stream_duration: self.builder.reset_stream_duration, reset_stream_max: self.builder.reset_stream_max, settings: self.builder.settings.clone(), diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index e7d630808..1a6018f73 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -1611,3 +1611,60 @@ async fn poll_capacity_after_send_data_and_reserve() { join(srv, h2).await; } + +#[tokio::test] +async fn max_send_buffer_size_overflow() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/")) + .await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + srv.recv_frame(frames::data(1, &[0; 10][..])).await; + srv.recv_frame(frames::data(1, &[][..]).eos()).await; + }; + + let client = async move { + let (mut client, mut conn) = client::Builder::new() + .max_send_buffer_size(5) + .handshake::<_, Bytes>(io) + .await + .unwrap(); + let request = Request::builder() + .method(Method::POST) + .uri("https://www.example.com/") + .body(()) + .unwrap(); + + let (response, mut stream) = client.send_request(request, false).unwrap(); + + let response = conn.drive(response).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + assert_eq!(stream.capacity(), 0); + stream.reserve_capacity(10); + assert_eq!( + stream.capacity(), + 5, + "polled capacity not over max buffer size" + ); + + stream.send_data([0; 10][..].into(), false).unwrap(); + + stream.reserve_capacity(15); + assert_eq!( + stream.capacity(), + 0, + "now with buffered over the max, don't overflow" + ); + stream.send_data([0; 0][..].into(), true).unwrap(); + + // Wait for the connection to close + conn.await.unwrap(); + }; + + join(srv, client).await; +} From 88037ae0abb63eb1e25e02a2419136eaca34046b Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 8 Dec 2021 10:08:35 -0800 Subject: [PATCH 05/26] v0.3.8 --- CHANGELOG.md | 6 ++++++ Cargo.toml | 2 +- src/lib.rs | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 14fa00350..dc8424624 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.3.8 (December 8, 2021) + +* Add "extended CONNECT support". Adds `h2::ext::Protocol`, which is used for request and response extensions to connect new protocols over an HTTP/2 stream. +* Add `max_send_buffer_size` options to client and server builders, and a default of ~400MB. This acts like a high-water mark for the `poll_capacity()` method. +* Fix panic if receiving malformed HEADERS with stream ID of 0. + # 0.3.7 (October 22, 2021) * Fix panic if server sends a malformed frame on a stream client was about to open. diff --git a/Cargo.toml b/Cargo.toml index 7bbf1647e..a92350242 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ name = "h2" # - html_root_url. # - Update CHANGELOG.md. # - Create git tag -version = "0.3.7" +version = "0.3.8" license = "MIT" authors = [ "Carl Lerche ", diff --git a/src/lib.rs b/src/lib.rs index db6b4888c..d407341ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,7 +78,7 @@ //! [`server::handshake`]: server/fn.handshake.html //! [`client::handshake`]: client/fn.handshake.html -#![doc(html_root_url = "https://docs.rs/h2/0.3.7")] +#![doc(html_root_url = "https://docs.rs/h2/0.3.8")] #![deny(missing_debug_implementations, missing_docs)] #![cfg_attr(test, deny(warnings))] From a5c60b24dec550ab65e26b948960a421d2e60b3d Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 8 Dec 2021 17:38:41 -0800 Subject: [PATCH 06/26] Fix poll_capacity to wake in combination with max_send_buffer_size --- src/proto/streams/prioritize.rs | 5 ++ src/proto/streams/stream.rs | 11 +++++ tests/h2-tests/tests/flow_control.rs | 74 ++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index eaaee162b..2347f6f0b 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -741,6 +741,11 @@ impl Prioritize { stream.buffered_send_data -= len as usize; stream.requested_send_capacity -= len; + // If the capacity was limited because of the + // max_send_buffer_size, then consider waking + // the send task again... + stream.notify_if_can_buffer_more(); + // Assign the capacity back to the connection that // was just consumed from the stream in the previous // line. diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 79de47a9a..f67cc3642 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -279,6 +279,17 @@ impl Stream { } } + /// If the capacity was limited because of the max_send_buffer_size, + /// then consider waking the send task again... + pub fn notify_if_can_buffer_more(&mut self) { + // Only notify if the capacity exceeds the amount of buffered data + if self.send_flow.available() > self.buffered_send_data { + self.send_capacity_inc = true; + tracing::trace!(" notifying task"); + self.notify_send(); + } + } + /// Returns `Err` when the decrement cannot be completed due to overflow. pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { match self.content_length { diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 1a6018f73..7adb3d730 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -1668,3 +1668,77 @@ async fn max_send_buffer_size_overflow() { join(srv, client).await; } + +#[tokio::test] +async fn max_send_buffer_size_poll_capacity_wakes_task() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/")) + .await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + srv.recv_frame(frames::data(1, &[0; 5][..])).await; + srv.recv_frame(frames::data(1, &[0; 5][..])).await; + srv.recv_frame(frames::data(1, &[0; 5][..])).await; + srv.recv_frame(frames::data(1, &[0; 5][..])).await; + srv.recv_frame(frames::data(1, &[][..]).eos()).await; + }; + + let client = async move { + let (mut client, mut conn) = client::Builder::new() + .max_send_buffer_size(5) + .handshake::<_, Bytes>(io) + .await + .unwrap(); + let request = Request::builder() + .method(Method::POST) + .uri("https://www.example.com/") + .body(()) + .unwrap(); + + let (response, mut stream) = client.send_request(request, false).unwrap(); + + let response = conn.drive(response).await.unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + assert_eq!(stream.capacity(), 0); + const TO_SEND: usize = 20; + stream.reserve_capacity(TO_SEND); + assert_eq!( + stream.capacity(), + 5, + "polled capacity not over max buffer size" + ); + + let t1 = tokio::spawn(async move { + let mut sent = 0; + let buf = [0; TO_SEND]; + loop { + match poll_fn(|cx| stream.poll_capacity(cx)).await { + None => panic!("no cap"), + Some(Err(e)) => panic!("cap error: {:?}", e), + Some(Ok(cap)) => { + stream + .send_data(buf[sent..(sent + cap)].to_vec().into(), false) + .unwrap(); + sent += cap; + if sent >= TO_SEND { + break; + } + } + } + } + stream.send_data(Bytes::new(), true).unwrap(); + }); + + // Wait for the connection to close + conn.await.unwrap(); + t1.await.unwrap(); + }; + + join(srv, client).await; +} From 308663e71e26fee65d1f2aa691469c456d80cf14 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 9 Dec 2021 09:21:13 -0800 Subject: [PATCH 07/26] v0.3.9 --- CHANGELOG.md | 4 ++++ Cargo.toml | 2 +- src/lib.rs | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dc8424624..971d5f8bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.3.9 (December 9, 2021) + +* Fix hang related to new `max_send_buffer_size`. + # 0.3.8 (December 8, 2021) * Add "extended CONNECT support". Adds `h2::ext::Protocol`, which is used for request and response extensions to connect new protocols over an HTTP/2 stream. diff --git a/Cargo.toml b/Cargo.toml index a92350242..680c74f1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ name = "h2" # - html_root_url. # - Update CHANGELOG.md. # - Create git tag -version = "0.3.8" +version = "0.3.9" license = "MIT" authors = [ "Carl Lerche ", diff --git a/src/lib.rs b/src/lib.rs index d407341ac..cd07f3e8e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,7 +78,7 @@ //! [`server::handshake`]: server/fn.handshake.html //! [`client::handshake`]: client/fn.handshake.html -#![doc(html_root_url = "https://docs.rs/h2/0.3.8")] +#![doc(html_root_url = "https://docs.rs/h2/0.3.9")] #![deny(missing_debug_implementations, missing_docs)] #![cfg_attr(test, deny(warnings))] From c876dda6d0c85665ed7e91ea2936f37764cb63fe Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 9 Dec 2021 14:21:20 -0800 Subject: [PATCH 08/26] Fix panic when receiving malformed push promise with stream id 0 --- src/frame/headers.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 05d77234f..bcb905013 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -400,6 +400,10 @@ impl PushPromise { let flags = PushPromiseFlag(head.flag()); let mut pad = 0; + if head.stream_id().is_zero() { + return Err(Error::InvalidStreamId); + } + // Read the padding length if flags.is_padded() { if src.is_empty() { From 6336cc3d7b8de71b15ff9ee9e11d537e443519df Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 6 Jan 2022 10:12:29 -0800 Subject: [PATCH 09/26] Add Error::is_go_away() and Error::is_remote() --- src/error.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/error.rs b/src/error.rs index fdbfc0d1b..6c8f6ed80 100644 --- a/src/error.rs +++ b/src/error.rs @@ -57,7 +57,7 @@ impl Error { } } - /// Returns the true if the error is an io::Error + /// Returns true if the error is an io::Error pub fn is_io(&self) -> bool { match self.kind { Kind::Io(_) => true, @@ -86,6 +86,21 @@ impl Error { kind: Kind::Io(err), } } + + /// Returns true if the error is from a `GOAWAY`. + pub fn is_go_away(&self) -> bool { + matches!(self.kind, Kind::GoAway(..)) + } + + /// Returns true if the error was received in a frame from the remote. + /// + /// Such as from a received `RST_STREAM` or `GOAWAY` frame. + pub fn is_remote(&self) -> bool { + matches!( + self.kind, + Kind::GoAway(_, _, Initiator::Remote) | Kind::Reset(_, _, Initiator::Remote) + ) + } } impl From for Error { From b949d6ef998ce3a8bd805a03352b55985cf2fb0b Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 6 Jan 2022 17:02:40 -0800 Subject: [PATCH 10/26] v0.3.10 --- CHANGELOG.md | 5 +++++ Cargo.toml | 2 +- src/lib.rs | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 971d5f8bc..052d7da44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# 0.3.10 (January 6, 2022) + +* Add `Error::is_go_away()` and `Error::is_remote()` methods. +* Fix panic if receiving malformed PUSH_PROMISE with stream ID of 0. + # 0.3.9 (December 9, 2021) * Fix hang related to new `max_send_buffer_size`. diff --git a/Cargo.toml b/Cargo.toml index 680c74f1b..c8f041837 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ name = "h2" # - html_root_url. # - Update CHANGELOG.md. # - Create git tag -version = "0.3.9" +version = "0.3.10" license = "MIT" authors = [ "Carl Lerche ", diff --git a/src/lib.rs b/src/lib.rs index cd07f3e8e..8e8e18a76 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,7 +78,7 @@ //! [`server::handshake`]: server/fn.handshake.html //! [`client::handshake`]: client/fn.handshake.html -#![doc(html_root_url = "https://docs.rs/h2/0.3.9")] +#![doc(html_root_url = "https://docs.rs/h2/0.3.10")] #![deny(missing_debug_implementations, missing_docs)] #![cfg_attr(test, deny(warnings))] From d92ba1c45ba4c68df9bdfb1a9e42f51c882d9c50 Mon Sep 17 00:00:00 2001 From: Anthony Ramine <123095+nox@users.noreply.github.com> Date: Wed, 19 Jan 2022 19:49:53 +0100 Subject: [PATCH 11/26] Make SendStream::poll_capacity never return Ok(Some(0)) (#596) Fixes #270 --- src/proto/streams/prioritize.rs | 12 +++++- src/proto/streams/send.rs | 8 ++-- src/proto/streams/stream.rs | 21 +++++----- tests/h2-support/src/util.rs | 6 +++ tests/h2-tests/tests/flow_control.rs | 57 +++++++++++++++++++++++++++- 5 files changed, 85 insertions(+), 19 deletions(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 2347f6f0b..c2904aca9 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -51,6 +51,9 @@ pub(super) struct Prioritize { /// What `DATA` frame is currently being sent in the codec. in_flight_data_frame: InFlightData, + + /// The maximum amount of bytes a stream should buffer. + max_buffer_size: usize, } #[derive(Debug, Eq, PartialEq)] @@ -93,9 +96,14 @@ impl Prioritize { flow, last_opened_id: StreamId::ZERO, in_flight_data_frame: InFlightData::Nothing, + max_buffer_size: config.local_max_buffer_size, } } + pub(crate) fn max_buffer_size(&self) -> usize { + self.max_buffer_size + } + /// Queue a frame to be sent to the remote pub fn queue_frame( &mut self, @@ -424,7 +432,7 @@ impl Prioritize { tracing::trace!(capacity = assign, "assigning"); // Assign the capacity to the stream - stream.assign_capacity(assign); + stream.assign_capacity(assign, self.max_buffer_size); // Claim the capacity from the connection self.flow.claim_capacity(assign); @@ -744,7 +752,7 @@ impl Prioritize { // If the capacity was limited because of the // max_send_buffer_size, then consider waking // the send task again... - stream.notify_if_can_buffer_more(); + stream.notify_if_can_buffer_more(self.max_buffer_size); // Assign the capacity back to the connection that // was just consumed from the stream in the previous diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index b7230030e..2c5a38c80 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -28,9 +28,6 @@ pub(super) struct Send { /// > the identified last stream. max_stream_id: StreamId, - /// The maximum amount of bytes a stream should buffer. - max_buffer_size: usize, - /// Initial window size of locally initiated streams init_window_sz: WindowSize, @@ -55,7 +52,6 @@ impl Send { pub fn new(config: &Config) -> Self { Send { init_window_sz: config.remote_init_window_sz, - max_buffer_size: config.local_max_buffer_size, max_stream_id: StreamId::MAX, next_stream_id: Ok(config.local_next_stream_id), prioritize: Prioritize::new(config), @@ -340,7 +336,9 @@ impl Send { let available = stream.send_flow.available().as_size() as usize; let buffered = stream.buffered_send_data; - available.min(self.max_buffer_size).saturating_sub(buffered) as WindowSize + available + .min(self.prioritize.max_buffer_size()) + .saturating_sub(buffered) as WindowSize } pub fn poll_reset( diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index f67cc3642..36d515bad 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -260,30 +260,29 @@ impl Stream { self.ref_count == 0 && !self.state.is_closed() } - pub fn assign_capacity(&mut self, capacity: WindowSize) { + pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) { debug_assert!(capacity > 0); - self.send_capacity_inc = true; self.send_flow.assign_capacity(capacity); tracing::trace!( - " assigned capacity to stream; available={}; buffered={}; id={:?}", + " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={}", self.send_flow.available(), self.buffered_send_data, - self.id + self.id, + max_buffer_size ); - // Only notify if the capacity exceeds the amount of buffered data - if self.send_flow.available() > self.buffered_send_data { - tracing::trace!(" notifying task"); - self.notify_send(); - } + self.notify_if_can_buffer_more(max_buffer_size); } /// If the capacity was limited because of the max_send_buffer_size, /// then consider waking the send task again... - pub fn notify_if_can_buffer_more(&mut self) { + pub fn notify_if_can_buffer_more(&mut self, max_buffer_size: usize) { + let available = self.send_flow.available().as_size() as usize; + let buffered = self.buffered_send_data; + // Only notify if the capacity exceeds the amount of buffered data - if self.send_flow.available() > self.buffered_send_data { + if available.min(max_buffer_size) > buffered { self.send_capacity_inc = true; tracing::trace!(" notifying task"); self.notify_send(); diff --git a/tests/h2-support/src/util.rs b/tests/h2-support/src/util.rs index b3322c4d2..1150d5925 100644 --- a/tests/h2-support/src/util.rs +++ b/tests/h2-support/src/util.rs @@ -32,6 +32,7 @@ pub async fn yield_once() { .await; } +/// Should only be called after a non-0 capacity was requested for the stream. pub fn wait_for_capacity(stream: h2::SendStream, target: usize) -> WaitForCapacity { WaitForCapacity { stream: Some(stream), @@ -59,6 +60,11 @@ impl Future for WaitForCapacity { let act = self.stream().capacity(); + // If a non-0 capacity was requested for the stream before calling + // wait_for_capacity, then poll_capacity should return Pending + // until there is a non-0 capacity. + assert_ne!(act, 0); + if act >= self.target { return Poll::Ready(self.stream.take().unwrap().into()); } diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 7adb3d730..92e7a532f 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -1600,7 +1600,62 @@ async fn poll_capacity_after_send_data_and_reserve() { // Initial window size was 5 so current capacity is 0 even if we just reserved. assert_eq!(stream.capacity(), 0); - // The first call to `poll_capacity` in `wait_for_capacity` will return 0. + // This will panic if there is a bug causing h2 to return Ok(0) from poll_capacity. + let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await; + + stream.send_data("".into(), true).unwrap(); + + // Wait for the connection to close + h2.await.unwrap(); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn poll_capacity_after_send_data_and_reserve_with_max_send_buffer_size() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv + .assert_client_handshake_with_settings(frames::settings().initial_window_size(10)) + .await; + assert_default_settings!(settings); + srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/")) + .await; + srv.send_frame(frames::headers(1).response(200)).await; + srv.recv_frame(frames::data(1, &b"abcde"[..])).await; + srv.send_frame(frames::window_update(1, 10)).await; + srv.recv_frame(frames::data(1, &b""[..]).eos()).await; + }; + + let h2 = async move { + let (mut client, mut h2) = client::Builder::new() + .max_send_buffer_size(5) + .handshake::<_, Bytes>(io) + .await + .unwrap(); + let request = Request::builder() + .method(Method::POST) + .uri("https://www.example.com/") + .body(()) + .unwrap(); + + let (response, mut stream) = client.send_request(request, false).unwrap(); + + let response = h2.drive(response).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + stream.send_data("abcde".into(), false).unwrap(); + + stream.reserve_capacity(5); + + // Initial window size was 10 but with a max send buffer size of 10 in the client, + // so current capacity is 0 even if we just reserved. + assert_eq!(stream.capacity(), 0); + + // This will panic if there is a bug causing h2 to return Ok(0) from poll_capacity. let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await; stream.send_data("".into(), true).unwrap(); From a28a39ca4a489b5337bd60f1c49d3d845f65b50b Mon Sep 17 00:00:00 2001 From: Anthony Ramine <123095+nox@users.noreply.github.com> Date: Fri, 21 Jan 2022 19:59:11 +0100 Subject: [PATCH 12/26] Update tracing-subscriber and use tracing-tree when testing (#586) This makes reading the logs way easier on the eyes. --- tests/h2-support/Cargo.toml | 4 ++- tests/h2-support/src/trace.rs | 46 +++++++++-------------------------- 2 files changed, 14 insertions(+), 36 deletions(-) diff --git a/tests/h2-support/Cargo.toml b/tests/h2-support/Cargo.toml index e97c6b310..f178178eb 100644 --- a/tests/h2-support/Cargo.toml +++ b/tests/h2-support/Cargo.toml @@ -7,9 +7,11 @@ edition = "2018" [dependencies] h2 = { path = "../..", features = ["stream", "unstable"] } +atty = "0.2" bytes = "1" tracing = "0.1" -tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "chrono", "ansi"] } +tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } +tracing-tree = "0.2" futures = { version = "0.3", default-features = false } http = "0.2" tokio = { version = "1", features = ["time"] } diff --git a/tests/h2-support/src/trace.rs b/tests/h2-support/src/trace.rs index 4ac11742c..87038c350 100644 --- a/tests/h2-support/src/trace.rs +++ b/tests/h2-support/src/trace.rs @@ -1,41 +1,17 @@ -use std::{io, str}; pub use tracing; pub use tracing_subscriber; -pub fn init() -> tracing::dispatcher::DefaultGuard { - tracing::subscriber::set_default( - tracing_subscriber::fmt() - .with_max_level(tracing::Level::TRACE) - .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) - .with_writer(PrintlnWriter { _p: () }) - .finish(), - ) -} - -struct PrintlnWriter { - _p: (), -} +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; -impl tracing_subscriber::fmt::MakeWriter for PrintlnWriter { - type Writer = PrintlnWriter; - fn make_writer(&self) -> Self::Writer { - PrintlnWriter { _p: () } - } -} - -impl io::Write for PrintlnWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - let s = str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; - println!("{}", s); - Ok(s.len()) - } - - fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> io::Result<()> { - println!("{}", fmt); - Ok(()) - } +pub fn init() -> tracing::dispatcher::DefaultGuard { + let use_colors = atty::is(atty::Stream::Stdout); + let layer = tracing_tree::HierarchicalLayer::default() + .with_writer(tracing_subscriber::fmt::writer::TestWriter::default()) + .with_indent_lines(true) + .with_ansi(use_colors) + .with_targets(true) + .with_indent_amount(2); - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } + tracing_subscriber::registry().with(layer).set_default() } From 7de2ccc1a3ca2a1ec265a686dc70a64dbcd56270 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 21 Jan 2022 15:41:39 -0800 Subject: [PATCH 13/26] fix panic when receiving already reset push promise (#597) Found by oss-fuzz --- src/proto/streams/recv.rs | 10 ++++++++++ src/proto/streams/streams.rs | 4 ++++ 2 files changed, 14 insertions(+) diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index e613c26b3..1754ab4dd 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -786,6 +786,16 @@ impl Recv { } } + pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) { + if let Ok(next_id) = self.next_stream_id { + // !Peer::is_local_init should have been called beforehand + debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated()); + if id >= next_id { + self.next_stream_id = id.next_id(); + } + } + } + /// Returns true if the remote peer can reserve a stream with the given ID. pub fn ensure_can_reserve(&self) -> Result<(), Error> { if !self.is_push_enabled { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 5c235c15c..3e7ae97d9 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -883,6 +883,10 @@ impl Inner { // We normally would open this stream, so update our // next-send-id record. self.actions.send.maybe_reset_next_stream_id(id); + } else { + // We normally would recv this stream, so update our + // next-recv-id record. + self.actions.recv.maybe_reset_next_stream_id(id); } let stream = Stream::new(id, 0, 0); From 556447c130dd38e905525c62e895d95dc8b04da3 Mon Sep 17 00:00:00 2001 From: Anthony Ramine <123095+nox@users.noreply.github.com> Date: Wed, 26 Jan 2022 11:18:28 +0100 Subject: [PATCH 14/26] Make use of NLL to clean up handshaking logic (#576) --- .github/workflows/CI.yml | 29 ++++++++ src/server.rs | 151 +++++++++++++++------------------------ 2 files changed, 86 insertions(+), 94 deletions(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 0dab7b3dd..775ce32e2 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -83,3 +83,32 @@ jobs: - name: Check minimal versions run: cargo clean; cargo update -Zminimal-versions; cargo check if: matrix.rust == 'nightly' + + msrv: + name: Check MSRV (${{ matrix.rust }}) + needs: [style] + strategy: + matrix: + rust: + - 1.46 # never go past Hyper's own MSRV + + os: + - ubuntu-latest + + runs-on: ${{ matrix.os }} + + steps: + - name: Checkout + uses: actions/checkout@v1 + + - name: Install Rust (${{ matrix.rust }}) + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: ${{ matrix.rust }} + override: true + + - name: Check + uses: actions-rs/cargo@v1 + with: + command: check diff --git a/src/server.rs b/src/server.rs index 87c300083..f82b05011 100644 --- a/src/server.rs +++ b/src/server.rs @@ -126,7 +126,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; -use std::{convert, fmt, io, mem}; +use std::{fmt, io}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tracing::instrument::{Instrument, Instrumented}; @@ -301,8 +301,8 @@ enum Handshaking { Flushing(Instrumented>>), /// State 2. Connection is waiting for the client preface. ReadingPreface(Instrumented>>), - /// Dummy state for `mem::replace`. - Empty, + /// State 3. Handshake is done, polling again would panic. + Done, } /// Flush a Sink @@ -387,7 +387,8 @@ where .expect("invalid SETTINGS frame"); // Create the handshake future. - let state = Handshaking::from(codec); + let state = + Handshaking::Flushing(Flush::new(codec).instrument(tracing::trace_span!("flush"))); drop(entered); @@ -1269,63 +1270,58 @@ where let span = self.span.clone(); // XXX(eliza): T_T let _e = span.enter(); tracing::trace!(state = ?self.state); - use crate::server::Handshaking::*; - - self.state = if let Flushing(ref mut flush) = self.state { - // We're currently flushing a pending SETTINGS frame. Poll the - // flush future, and, if it's completed, advance our state to wait - // for the client preface. - let codec = match Pin::new(flush).poll(cx)? { - Poll::Pending => { - tracing::trace!(flush.poll = %"Pending"); - return Poll::Pending; + + loop { + match &mut self.state { + Handshaking::Flushing(flush) => { + // We're currently flushing a pending SETTINGS frame. Poll the + // flush future, and, if it's completed, advance our state to wait + // for the client preface. + let codec = match Pin::new(flush).poll(cx)? { + Poll::Pending => { + tracing::trace!(flush.poll = %"Pending"); + return Poll::Pending; + } + Poll::Ready(flushed) => { + tracing::trace!(flush.poll = %"Ready"); + flushed + } + }; + self.state = Handshaking::ReadingPreface( + ReadPreface::new(codec).instrument(tracing::trace_span!("read_preface")), + ); } - Poll::Ready(flushed) => { - tracing::trace!(flush.poll = %"Ready"); - flushed + Handshaking::ReadingPreface(read) => { + let codec = ready!(Pin::new(read).poll(cx)?); + + self.state = Handshaking::Done; + + let connection = proto::Connection::new( + codec, + Config { + next_stream_id: 2.into(), + // Server does not need to locally initiate any streams + initial_max_send_streams: 0, + max_send_buffer_size: self.builder.max_send_buffer_size, + reset_stream_duration: self.builder.reset_stream_duration, + reset_stream_max: self.builder.reset_stream_max, + settings: self.builder.settings.clone(), + }, + ); + + tracing::trace!("connection established!"); + let mut c = Connection { connection }; + if let Some(sz) = self.builder.initial_target_connection_window_size { + c.set_target_window_size(sz); + } + + return Poll::Ready(Ok(c)); + } + Handshaking::Done => { + panic!("Handshaking::poll() called again after handshaking was complete") } - }; - Handshaking::from(ReadPreface::new(codec)) - } else { - // Otherwise, we haven't actually advanced the state, but we have - // to replace it with itself, because we have to return a value. - // (note that the assignment to `self.state` has to be outside of - // the `if let` block above in order to placate the borrow checker). - mem::replace(&mut self.state, Handshaking::Empty) - }; - let poll = if let ReadingPreface(ref mut read) = self.state { - // We're now waiting for the client preface. Poll the `ReadPreface` - // future. If it has completed, we will create a `Connection` handle - // for the connection. - Pin::new(read).poll(cx) - // Actually creating the `Connection` has to occur outside of this - // `if let` block, because we've borrowed `self` mutably in order - // to poll the state and won't be able to borrow the SETTINGS frame - // as well until we release the borrow for `poll()`. - } else { - unreachable!("Handshake::poll() state was not advanced completely!") - }; - poll?.map(|codec| { - let connection = proto::Connection::new( - codec, - Config { - next_stream_id: 2.into(), - // Server does not need to locally initiate any streams - initial_max_send_streams: 0, - max_send_buffer_size: self.builder.max_send_buffer_size, - reset_stream_duration: self.builder.reset_stream_duration, - reset_stream_max: self.builder.reset_stream_max, - settings: self.builder.settings.clone(), - }, - ); - - tracing::trace!("connection established!"); - let mut c = Connection { connection }; - if let Some(sz) = self.builder.initial_target_connection_window_size { - c.set_target_window_size(sz); } - Ok(c) - }) + } } } @@ -1548,42 +1544,9 @@ where #[inline] fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { match *self { - Handshaking::Flushing(_) => write!(f, "Handshaking::Flushing(_)"), - Handshaking::ReadingPreface(_) => write!(f, "Handshaking::ReadingPreface(_)"), - Handshaking::Empty => write!(f, "Handshaking::Empty"), + Handshaking::Flushing(_) => f.write_str("Flushing(_)"), + Handshaking::ReadingPreface(_) => f.write_str("ReadingPreface(_)"), + Handshaking::Done => f.write_str("Done"), } } } - -impl convert::From>> for Handshaking -where - T: AsyncRead + AsyncWrite, - B: Buf, -{ - #[inline] - fn from(flush: Flush>) -> Self { - Handshaking::Flushing(flush.instrument(tracing::trace_span!("flush"))) - } -} - -impl convert::From>> for Handshaking -where - T: AsyncRead + AsyncWrite, - B: Buf, -{ - #[inline] - fn from(read: ReadPreface>) -> Self { - Handshaking::ReadingPreface(read.instrument(tracing::trace_span!("read_preface"))) - } -} - -impl convert::From>> for Handshaking -where - T: AsyncRead + AsyncWrite, - B: Buf, -{ - #[inline] - fn from(codec: Codec>) -> Self { - Handshaking::from(Flush::new(codec)) - } -} From b0d01bb200cbc50c81786875fdb0117aeaf0e969 Mon Sep 17 00:00:00 2001 From: Anthony Ramine Date: Wed, 26 Jan 2022 11:23:42 +0100 Subject: [PATCH 15/26] v0.3.11 --- CHANGELOG.md | 5 +++++ Cargo.toml | 2 +- src/lib.rs | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 052d7da44..fdc622d73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# 0.3.11 (January 26, 2022) + +* Make `SendStream::poll_capacity` never return `Ok(Some(0))` (#596) +* Fix panic when receiving already reset push promise (#597) + # 0.3.10 (January 6, 2022) * Add `Error::is_go_away()` and `Error::is_remote()` methods. diff --git a/Cargo.toml b/Cargo.toml index c8f041837..fd693e185 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ name = "h2" # - html_root_url. # - Update CHANGELOG.md. # - Create git tag -version = "0.3.10" +version = "0.3.11" license = "MIT" authors = [ "Carl Lerche ", diff --git a/src/lib.rs b/src/lib.rs index 8e8e18a76..e9e25e3f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,7 +78,7 @@ //! [`server::handshake`]: server/fn.handshake.html //! [`client::handshake`]: client/fn.handshake.html -#![doc(html_root_url = "https://docs.rs/h2/0.3.10")] +#![doc(html_root_url = "https://docs.rs/h2/0.3.11")] #![deny(missing_debug_implementations, missing_docs)] #![cfg_attr(test, deny(warnings))] From 4dc2b4a16431d3edeaa5394f370b8e48b753a63b Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 31 Jan 2022 23:52:42 +0000 Subject: [PATCH 16/26] Avoid time operations that can panic We have reports of runtime panics (linkerd/linkerd2#7748) that sound a lot like rust-lang/rust#86470. We don't have any evidence that these panics originate in h2, but there is one use of `Instant::sub` that could panic in this way. Even though this is almost definitely a bug in Rust, it seems most prudent to actively avoid the uses of `Instant` that are prone to this bug. These fixes should ultimately be made in the standard library, but this change lets us avoid this problem while we wait for those fixes. This change replaces uses of `Instant::elapsed` and `Instant::sub` with calls to `Instant::saturating_duration_since` to prevent this class of panic. See also hyperium/hyper#2746 --- src/proto/streams/recv.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 1754ab4dd..3af1af3a1 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -860,7 +860,10 @@ impl Recv { let reset_duration = self.reset_duration; while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| { let reset_at = stream.reset_at.expect("reset_at must be set if in queue"); - now - reset_at > reset_duration + // rust-lang/rust#86470 tracks a bug in the standard library where `Instant` + // subtraction can panic (because, on some platforms, `Instant` isn't actually + // monotonic). We use a saturating operation to avoid this panic here. + now.saturating_duration_since(reset_at) > reset_duration }) { counts.transition_after(stream, true); } From 47e9f62fb574ba52800a1a1067a32ef9c8ecd831 Mon Sep 17 00:00:00 2001 From: lucas Date: Sun, 6 Feb 2022 18:06:57 +0000 Subject: [PATCH 17/26] There's no such thing as HTTP/2.0 --- Cargo.toml | 2 +- README.md | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fd693e185..6141dae5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ authors = [ "Carl Lerche ", "Sean McArthur ", ] -description = "An HTTP/2.0 client and server" +description = "An HTTP/2 client and server" documentation = "https://docs.rs/h2" repository = "https://github.com/hyperium/h2" readme = "README.md" diff --git a/README.md b/README.md index 63627b706..2e1599914 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # H2 -A Tokio aware, HTTP/2.0 client & server implementation for Rust. +A Tokio aware, HTTP/2 client & server implementation for Rust. [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) [![Crates.io](https://img.shields.io/crates/v/h2.svg)](https://crates.io/crates/h2) @@ -12,21 +12,21 @@ More information about this crate can be found in the [crate documentation][dox] ## Features -* Client and server HTTP/2.0 implementation. -* Implements the full HTTP/2.0 specification. +* Client and server HTTP/2 implementation. +* Implements the full HTTP/2 specification. * Passes [h2spec](https://github.com/summerwind/h2spec). * Focus on performance and correctness. * Built on [Tokio](https://tokio.rs). ## Non goals -This crate is intended to only be an implementation of the HTTP/2.0 +This crate is intended to only be an implementation of the HTTP/2 specification. It does not handle: * Managing TCP connections * HTTP 1.0 upgrade * TLS -* Any feature not described by the HTTP/2.0 specification. +* Any feature not described by the HTTP/2 specification. This crate is now used by [hyper](https://github.com/hyperium/hyper), which will provide all of these features. @@ -55,7 +55,7 @@ fn main() { **How does h2 compare to [solicit] or [rust-http2]?** -The h2 library has implemented more of the details of the HTTP/2.0 specification +The h2 library has implemented more of the details of the HTTP/2 specification than any other Rust library. It also passes the [h2spec] set of tests. The h2 library is rapidly approaching "production ready" quality. From 4c31a320beb01584b55cfcb44b96c0f151324af6 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Fri, 11 Feb 2022 10:22:53 +0100 Subject: [PATCH 18/26] Upgrade dev-dependencies --- Cargo.toml | 16 ++- examples/akamai.rs | 24 +++-- fuzz/Cargo.toml | 2 - src/fuzz_bridge.rs | 2 +- src/hpack/test/fuzz.rs | 208 +++++++++++++++++++------------------ tests/h2-fuzz/Cargo.toml | 2 +- util/genfixture/Cargo.toml | 2 +- 7 files changed, 132 insertions(+), 124 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6141dae5b..cf9d3e437 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,22 +55,20 @@ indexmap = { version = "1.5.2", features = ["std"] } [dev-dependencies] # Fuzzing -quickcheck = { version = "0.4.1", default-features = false } -rand = "0.3.15" +quickcheck = { version = "1.0.3", default-features = false } +rand = "0.8.4" # HPACK fixtures -hex = "0.2.0" -walkdir = "1.0.0" +hex = "0.4.3" +walkdir = "2.3.2" serde = "1.0.0" serde_json = "1.0.0" # Examples tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "net"] } -env_logger = { version = "0.5.3", default-features = false } -rustls = "0.19" -tokio-rustls = "0.22" -webpki = "0.21" -webpki-roots = "0.21" +env_logger = { version = "0.9", default-features = false } +tokio-rustls = "0.23.2" +webpki-roots = "0.22.2" [package.metadata.docs.rs] features = ["stream"] diff --git a/examples/akamai.rs b/examples/akamai.rs index 29d8a9347..e522b37ff 100644 --- a/examples/akamai.rs +++ b/examples/akamai.rs @@ -3,9 +3,9 @@ use http::{Method, Request}; use tokio::net::TcpStream; use tokio_rustls::TlsConnector; -use rustls::Session; -use webpki::DNSNameRef; +use tokio_rustls::rustls::{OwnedTrustAnchor, RootCertStore, ServerName}; +use std::convert::TryFrom; use std::error::Error; use std::net::ToSocketAddrs; @@ -16,9 +16,19 @@ pub async fn main() -> Result<(), Box> { let _ = env_logger::try_init(); let tls_client_config = std::sync::Arc::new({ - let mut c = rustls::ClientConfig::new(); - c.root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + let mut root_store = RootCertStore::empty(); + root_store.add_server_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| { + OwnedTrustAnchor::from_subject_spki_name_constraints( + ta.subject, + ta.spki, + ta.name_constraints, + ) + })); + + let mut c = tokio_rustls::rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(root_store) + .with_no_client_auth(); c.alpn_protocols.push(ALPN_H2.as_bytes().to_owned()); c }); @@ -33,13 +43,13 @@ pub async fn main() -> Result<(), Box> { println!("ADDR: {:?}", addr); let tcp = TcpStream::connect(&addr).await?; - let dns_name = DNSNameRef::try_from_ascii_str("http2.akamai.com").unwrap(); + let dns_name = ServerName::try_from("http2.akamai.com").unwrap(); let connector = TlsConnector::from(tls_client_config); let res = connector.connect(dns_name, tcp).await; let tls = res.unwrap(); { let (_, session) = tls.get_ref(); - let negotiated_protocol = session.get_alpn_protocol(); + let negotiated_protocol = session.alpn_protocol(); assert_eq!( Some(ALPN_H2.as_bytes()), negotiated_protocol.as_ref().map(|x| &**x) diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml index ca32138e2..aafb60ae7 100644 --- a/fuzz/Cargo.toml +++ b/fuzz/Cargo.toml @@ -13,12 +13,10 @@ cargo-fuzz = true arbitrary = { version = "1", features = ["derive"] } libfuzzer-sys = { version = "0.4.0", features = ["arbitrary-derive"] } tokio = { version = "1", features = [ "full" ] } -bytes = "0.5.2" h2 = { path = "../", features = [ "unstable" ] } h2-support = { path = "../tests/h2-support" } futures = { version = "0.3", default-features = false, features = ["std"] } http = "0.2" -env_logger = { version = "0.5.3", default-features = false } # Prevent this from interfering with workspaces [workspace] diff --git a/src/fuzz_bridge.rs b/src/fuzz_bridge.rs index 6132deeb4..3ea8b591c 100644 --- a/src/fuzz_bridge.rs +++ b/src/fuzz_bridge.rs @@ -1,7 +1,7 @@ #[cfg(fuzzing)] pub mod fuzz_logic { use crate::hpack; - use bytes::{BufMut, BytesMut}; + use bytes::BytesMut; use http::header::HeaderName; use std::io::Cursor; diff --git a/src/hpack/test/fuzz.rs b/src/hpack/test/fuzz.rs index 1d05a97c5..ad0d47b6b 100644 --- a/src/hpack/test/fuzz.rs +++ b/src/hpack/test/fuzz.rs @@ -4,7 +4,9 @@ use http::header::{HeaderName, HeaderValue}; use bytes::BytesMut; use quickcheck::{Arbitrary, Gen, QuickCheck, TestResult}; -use rand::{Rng, SeedableRng, StdRng}; +use rand::distributions::Slice; +use rand::rngs::StdRng; +use rand::{thread_rng, Rng, SeedableRng}; use std::io::Cursor; @@ -46,9 +48,9 @@ struct HeaderFrame { } impl FuzzHpack { - fn new(seed: [usize; 4]) -> FuzzHpack { + fn new(seed: [u8; 32]) -> FuzzHpack { // Seed the RNG - let mut rng = StdRng::from_seed(&seed); + let mut rng = StdRng::from_seed(seed); // Generates a bunch of source headers let mut source: Vec>> = vec![]; @@ -58,12 +60,12 @@ impl FuzzHpack { } // Actual test run headers - let num: usize = rng.gen_range(40, 500); + let num: usize = rng.gen_range(40..500); let mut frames: Vec = vec![]; let mut added = 0; - let skew: i32 = rng.gen_range(1, 5); + let skew: i32 = rng.gen_range(1..5); // Rough number of headers to add while added < num { @@ -72,24 +74,24 @@ impl FuzzHpack { headers: vec![], }; - match rng.gen_range(0, 20) { + match rng.gen_range(0..20) { 0 => { // Two resizes - let high = rng.gen_range(128, MAX_CHUNK * 2); - let low = rng.gen_range(0, high); + let high = rng.gen_range(128..MAX_CHUNK * 2); + let low = rng.gen_range(0..high); frame.resizes.extend(&[low, high]); } 1..=3 => { - frame.resizes.push(rng.gen_range(128, MAX_CHUNK * 2)); + frame.resizes.push(rng.gen_range(128..MAX_CHUNK * 2)); } _ => {} } let mut is_name_required = true; - for _ in 0..rng.gen_range(1, (num - added) + 1) { - let x: f64 = rng.gen_range(0.0, 1.0); + for _ in 0..rng.gen_range(1..(num - added) + 1) { + let x: f64 = rng.gen_range(0.0..1.0); let x = x.powi(skew); let i = (x * source.len() as f64) as usize; @@ -177,31 +179,31 @@ impl FuzzHpack { } impl Arbitrary for FuzzHpack { - fn arbitrary(g: &mut G) -> Self { - FuzzHpack::new(quickcheck::Rng::gen(g)) + fn arbitrary(_: &mut Gen) -> Self { + FuzzHpack::new(thread_rng().gen()) } } fn gen_header(g: &mut StdRng) -> Header> { use http::{Method, StatusCode}; - if g.gen_weighted_bool(10) { - match g.next_u32() % 5 { + if g.gen_ratio(1, 10) { + match g.gen_range(0u32..5) { 0 => { let value = gen_string(g, 4, 20); Header::Authority(to_shared(value)) } 1 => { - let method = match g.next_u32() % 6 { + let method = match g.gen_range(0u32..6) { 0 => Method::GET, 1 => Method::POST, 2 => Method::PUT, 3 => Method::PATCH, 4 => Method::DELETE, 5 => { - let n: usize = g.gen_range(3, 7); + let n: usize = g.gen_range(3..7); let bytes: Vec = (0..n) - .map(|_| g.choose(b"ABCDEFGHIJKLMNOPQRSTUVWXYZ").unwrap().clone()) + .map(|_| *g.sample(Slice::new(b"ABCDEFGHIJKLMNOPQRSTUVWXYZ").unwrap())) .collect(); Method::from_bytes(&bytes).unwrap() @@ -212,7 +214,7 @@ fn gen_header(g: &mut StdRng) -> Header> { Header::Method(method) } 2 => { - let value = match g.next_u32() % 2 { + let value = match g.gen_range(0u32..2) { 0 => "http", 1 => "https", _ => unreachable!(), @@ -221,7 +223,7 @@ fn gen_header(g: &mut StdRng) -> Header> { Header::Scheme(to_shared(value.to_string())) } 3 => { - let value = match g.next_u32() % 100 { + let value = match g.gen_range(0u32..100) { 0 => "/".to_string(), 1 => "/index.html".to_string(), _ => gen_string(g, 2, 20), @@ -237,14 +239,14 @@ fn gen_header(g: &mut StdRng) -> Header> { _ => unreachable!(), } } else { - let name = if g.gen_weighted_bool(10) { + let name = if g.gen_ratio(1, 10) { None } else { Some(gen_header_name(g)) }; let mut value = gen_header_value(g); - if g.gen_weighted_bool(30) { + if g.gen_ratio(1, 30) { value.set_sensitive(true); } @@ -255,84 +257,86 @@ fn gen_header(g: &mut StdRng) -> Header> { fn gen_header_name(g: &mut StdRng) -> HeaderName { use http::header; - if g.gen_weighted_bool(2) { - g.choose(&[ - header::ACCEPT, - header::ACCEPT_CHARSET, - header::ACCEPT_ENCODING, - header::ACCEPT_LANGUAGE, - header::ACCEPT_RANGES, - header::ACCESS_CONTROL_ALLOW_CREDENTIALS, - header::ACCESS_CONTROL_ALLOW_HEADERS, - header::ACCESS_CONTROL_ALLOW_METHODS, - header::ACCESS_CONTROL_ALLOW_ORIGIN, - header::ACCESS_CONTROL_EXPOSE_HEADERS, - header::ACCESS_CONTROL_MAX_AGE, - header::ACCESS_CONTROL_REQUEST_HEADERS, - header::ACCESS_CONTROL_REQUEST_METHOD, - header::AGE, - header::ALLOW, - header::ALT_SVC, - header::AUTHORIZATION, - header::CACHE_CONTROL, - header::CONNECTION, - header::CONTENT_DISPOSITION, - header::CONTENT_ENCODING, - header::CONTENT_LANGUAGE, - header::CONTENT_LENGTH, - header::CONTENT_LOCATION, - header::CONTENT_RANGE, - header::CONTENT_SECURITY_POLICY, - header::CONTENT_SECURITY_POLICY_REPORT_ONLY, - header::CONTENT_TYPE, - header::COOKIE, - header::DNT, - header::DATE, - header::ETAG, - header::EXPECT, - header::EXPIRES, - header::FORWARDED, - header::FROM, - header::HOST, - header::IF_MATCH, - header::IF_MODIFIED_SINCE, - header::IF_NONE_MATCH, - header::IF_RANGE, - header::IF_UNMODIFIED_SINCE, - header::LAST_MODIFIED, - header::LINK, - header::LOCATION, - header::MAX_FORWARDS, - header::ORIGIN, - header::PRAGMA, - header::PROXY_AUTHENTICATE, - header::PROXY_AUTHORIZATION, - header::PUBLIC_KEY_PINS, - header::PUBLIC_KEY_PINS_REPORT_ONLY, - header::RANGE, - header::REFERER, - header::REFERRER_POLICY, - header::REFRESH, - header::RETRY_AFTER, - header::SERVER, - header::SET_COOKIE, - header::STRICT_TRANSPORT_SECURITY, - header::TE, - header::TRAILER, - header::TRANSFER_ENCODING, - header::USER_AGENT, - header::UPGRADE, - header::UPGRADE_INSECURE_REQUESTS, - header::VARY, - header::VIA, - header::WARNING, - header::WWW_AUTHENTICATE, - header::X_CONTENT_TYPE_OPTIONS, - header::X_DNS_PREFETCH_CONTROL, - header::X_FRAME_OPTIONS, - header::X_XSS_PROTECTION, - ]) - .unwrap() + if g.gen_ratio(1, 2) { + g.sample( + Slice::new(&[ + header::ACCEPT, + header::ACCEPT_CHARSET, + header::ACCEPT_ENCODING, + header::ACCEPT_LANGUAGE, + header::ACCEPT_RANGES, + header::ACCESS_CONTROL_ALLOW_CREDENTIALS, + header::ACCESS_CONTROL_ALLOW_HEADERS, + header::ACCESS_CONTROL_ALLOW_METHODS, + header::ACCESS_CONTROL_ALLOW_ORIGIN, + header::ACCESS_CONTROL_EXPOSE_HEADERS, + header::ACCESS_CONTROL_MAX_AGE, + header::ACCESS_CONTROL_REQUEST_HEADERS, + header::ACCESS_CONTROL_REQUEST_METHOD, + header::AGE, + header::ALLOW, + header::ALT_SVC, + header::AUTHORIZATION, + header::CACHE_CONTROL, + header::CONNECTION, + header::CONTENT_DISPOSITION, + header::CONTENT_ENCODING, + header::CONTENT_LANGUAGE, + header::CONTENT_LENGTH, + header::CONTENT_LOCATION, + header::CONTENT_RANGE, + header::CONTENT_SECURITY_POLICY, + header::CONTENT_SECURITY_POLICY_REPORT_ONLY, + header::CONTENT_TYPE, + header::COOKIE, + header::DNT, + header::DATE, + header::ETAG, + header::EXPECT, + header::EXPIRES, + header::FORWARDED, + header::FROM, + header::HOST, + header::IF_MATCH, + header::IF_MODIFIED_SINCE, + header::IF_NONE_MATCH, + header::IF_RANGE, + header::IF_UNMODIFIED_SINCE, + header::LAST_MODIFIED, + header::LINK, + header::LOCATION, + header::MAX_FORWARDS, + header::ORIGIN, + header::PRAGMA, + header::PROXY_AUTHENTICATE, + header::PROXY_AUTHORIZATION, + header::PUBLIC_KEY_PINS, + header::PUBLIC_KEY_PINS_REPORT_ONLY, + header::RANGE, + header::REFERER, + header::REFERRER_POLICY, + header::REFRESH, + header::RETRY_AFTER, + header::SERVER, + header::SET_COOKIE, + header::STRICT_TRANSPORT_SECURITY, + header::TE, + header::TRAILER, + header::TRANSFER_ENCODING, + header::USER_AGENT, + header::UPGRADE, + header::UPGRADE_INSECURE_REQUESTS, + header::VARY, + header::VIA, + header::WARNING, + header::WWW_AUTHENTICATE, + header::X_CONTENT_TYPE_OPTIONS, + header::X_DNS_PREFETCH_CONTROL, + header::X_FRAME_OPTIONS, + header::X_XSS_PROTECTION, + ]) + .unwrap(), + ) .clone() } else { let value = gen_string(g, 1, 25); @@ -349,9 +353,7 @@ fn gen_string(g: &mut StdRng, min: usize, max: usize) -> String { let bytes: Vec<_> = (min..max) .map(|_| { // Chars to pick from - g.choose(b"ABCDEFGHIJKLMNOPQRSTUVabcdefghilpqrstuvwxyz----") - .unwrap() - .clone() + *g.sample(Slice::new(b"ABCDEFGHIJKLMNOPQRSTUVabcdefghilpqrstuvwxyz----").unwrap()) }) .collect(); diff --git a/tests/h2-fuzz/Cargo.toml b/tests/h2-fuzz/Cargo.toml index 524627f31..dadb62c92 100644 --- a/tests/h2-fuzz/Cargo.toml +++ b/tests/h2-fuzz/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" [dependencies] h2 = { path = "../.." } -env_logger = { version = "0.5.3", default-features = false } +env_logger = { version = "0.9", default-features = false } futures = { version = "0.3", default-features = false, features = ["std"] } honggfuzz = "0.5" http = "0.2" diff --git a/util/genfixture/Cargo.toml b/util/genfixture/Cargo.toml index 694a99496..cce7eb1b1 100644 --- a/util/genfixture/Cargo.toml +++ b/util/genfixture/Cargo.toml @@ -6,4 +6,4 @@ publish = false edition = "2018" [dependencies] -walkdir = "1.0.0" +walkdir = "2.3.2" From 7bb14625ba7c4450c549ff0c2d06ad76f59170dd Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 23 Feb 2022 16:54:44 -0800 Subject: [PATCH 19/26] Bump MSRV to 1.49, since Tokio uses it --- .github/workflows/CI.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 775ce32e2..874af81fe 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -90,7 +90,7 @@ jobs: strategy: matrix: rust: - - 1.46 # never go past Hyper's own MSRV + - 1.49 # never go past Hyper's own MSRV os: - ubuntu-latest From 85549fca19a24e971009fc11a8477429e86c1fd1 Mon Sep 17 00:00:00 2001 From: hikaricai <13061980190@163.com> Date: Thu, 24 Feb 2022 09:09:04 +0800 Subject: [PATCH 20/26] fix header parsing: consume buf only if header name and value are both decoded Decoding error when processing continuation header which contains normal header name at boundary --- src/hpack/decoder.rs | 101 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 94 insertions(+), 7 deletions(-) diff --git a/src/hpack/decoder.rs b/src/hpack/decoder.rs index e4b34d1fc..988b48db1 100644 --- a/src/hpack/decoder.rs +++ b/src/hpack/decoder.rs @@ -142,6 +142,12 @@ struct Table { max_size: usize, } +struct StringMarker { + offset: usize, + len: usize, + string: Option, +} + // ===== impl Decoder ===== impl Decoder { @@ -279,10 +285,13 @@ impl Decoder { // First, read the header name if table_idx == 0 { + let old_pos = buf.position(); + let name_marker = self.try_decode_string(buf)?; + let value_marker = self.try_decode_string(buf)?; + buf.set_position(old_pos); // Read the name as a literal - let name = self.decode_string(buf)?; - let value = self.decode_string(buf)?; - + let name = name_marker.consume(buf); + let value = value_marker.consume(buf); Header::new(name, value) } else { let e = self.table.get(table_idx)?; @@ -292,7 +301,11 @@ impl Decoder { } } - fn decode_string(&mut self, buf: &mut Cursor<&mut BytesMut>) -> Result { + fn try_decode_string( + &mut self, + buf: &mut Cursor<&mut BytesMut>, + ) -> Result { + let old_pos = buf.position(); const HUFF_FLAG: u8 = 0b1000_0000; // The first bit in the first byte contains the huffman encoded flag. @@ -309,17 +322,34 @@ impl Decoder { return Err(DecoderError::NeedMore(NeedMore::StringUnderflow)); } + let offset = (buf.position() - old_pos) as usize; if huff { let ret = { let raw = &buf.chunk()[..len]; - huffman::decode(raw, &mut self.buffer).map(BytesMut::freeze) + huffman::decode(raw, &mut self.buffer).map(|buf| StringMarker { + offset, + len, + string: Some(BytesMut::freeze(buf)), + }) }; buf.advance(len); - return ret; + ret + } else { + buf.advance(len); + Ok(StringMarker { + offset, + len, + string: None, + }) } + } - Ok(take(buf, len)) + fn decode_string(&mut self, buf: &mut Cursor<&mut BytesMut>) -> Result { + let old_pos = buf.position(); + let marker = self.try_decode_string(buf)?; + buf.set_position(old_pos); + Ok(marker.consume(buf)) } } @@ -433,6 +463,19 @@ fn take(buf: &mut Cursor<&mut BytesMut>, n: usize) -> Bytes { head.freeze() } +impl StringMarker { + fn consume(self, buf: &mut Cursor<&mut BytesMut>) -> Bytes { + buf.advance(self.offset); + match self.string { + Some(string) => { + buf.advance(self.len); + string + } + None => take(buf, self.len), + } + } +} + fn consume(buf: &mut Cursor<&mut BytesMut>) { // remove bytes from the internal BytesMut when they have been successfully // decoded. This is a more permanent cursor position, which will be @@ -850,4 +893,48 @@ mod test { huffman::encode(src, &mut buf); buf } + + #[test] + fn test_decode_continuation_header_with_non_huff_encoded_name() { + let mut de = Decoder::new(0); + let value = huff_encode(b"bar"); + let mut buf = BytesMut::new(); + // header name is non_huff encoded + buf.extend(&[0b01000000, 0x00 | 3]); + buf.extend(b"foo"); + // header value is partial + buf.extend(&[0x80 | 3]); + buf.extend(&value[0..1]); + + let mut res = vec![]; + let e = de + .decode(&mut Cursor::new(&mut buf), |h| { + res.push(h); + }) + .unwrap_err(); + // decode error because the header value is partial + assert_eq!(e, DecoderError::NeedMore(NeedMore::StringUnderflow)); + + // extend buf with the remaining header value + buf.extend(&value[1..]); + let _ = de + .decode(&mut Cursor::new(&mut buf), |h| { + res.push(h); + }) + .unwrap(); + + assert_eq!(res.len(), 1); + assert_eq!(de.table.size(), 0); + + match res[0] { + Header::Field { + ref name, + ref value, + } => { + assert_eq!(name, "foo"); + assert_eq!(value, "bar"); + } + _ => panic!(), + } + } } From b8eab381c053ccf3ebf99d3ef1c0fd27f5e11d89 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 8 Mar 2022 15:54:31 -0800 Subject: [PATCH 21/26] tracing: remove I/O type names from handshake spans (#608) ## Motivation Currently, the `tracing` spans for the client and server handshakes contain the name of the I/O type. In some cases, where nested I/O types are in use, these names can be quite long; for example, in Linkerd, we see log lines like this: ``` 2022-03-07T23:38:15.322506670Z [ 10533.916262s] DEBUG ThreadId(01) inbound:accept{client.addr=192.168.1.9:1227}:server{port=4143}:direct:gateway{dst=server.echo.svc.cluster.local:8080}:server_handshake{io=hyper::common::io::rewind::Rewind, linkerd_io::prefixed::PrefixedIo>>>, linkerd_io::either::EitherIo, linkerd_io::prefixed::PrefixedIo>>>>, linkerd_transport_metrics::sensor::Sensor>, linkerd_io::sensor::SensorIo, linkerd_io::prefixed::PrefixedIo>>>, linkerd_io::either::EitherIo, linkerd_io::prefixed::PrefixedIo>>>, linkerd_transport_metrics::sensor::Sensor>>>}:FramedWrite::buffer{frame=Settings { flags: (0x0), initial_window_size: 65535, max_frame_size: 16384 }}: h2::codec::framed_write: send frame=Settings { flags: (0x0), initial_window_size: 65535, max_frame_size: 16384 } ``` which is kinda not great. ## Solution This branch removes the IO type's type name from the spans for the server and client handshakes. In practice, these are not particularly useful, because a given server or client instance is parameterized over the IO types and will only serve connections of that type. --- src/client.rs | 2 +- src/server.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index d4ec3b906..e75cd3507 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1154,7 +1154,7 @@ where let builder = Builder::new(); builder .handshake(io) - .instrument(tracing::trace_span!("client_handshake", io = %std::any::type_name::())) + .instrument(tracing::trace_span!("client_handshake")) .await } diff --git a/src/server.rs b/src/server.rs index f82b05011..16a50da4b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -367,7 +367,7 @@ where B: Buf + 'static, { fn handshake2(io: T, builder: Builder) -> Handshake { - let span = tracing::trace_span!("server_handshake", io = %std::any::type_name::()); + let span = tracing::trace_span!("server_handshake"); let entered = span.enter(); // Create the codec. From 3383ef71e2dabe27fe979e8b2c82b61b51eca9f5 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 9 Mar 2022 09:22:49 -0800 Subject: [PATCH 22/26] v0.3.12 # 0.3.12 (March 9, 2022) * Avoid time operations that can panic (#599) * Bump MSRV to Rust 1.49 (#606) * Fix header decoding error when a header name is contained at a continuation header boundary (#589) * Remove I/O type names from handshake `tracing` spans (#608) --- CHANGELOG.md | 8 ++++++++ Cargo.toml | 2 +- src/lib.rs | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fdc622d73..7cff904f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# 0.3.12 (March 9, 2022) + +* Avoid time operations that can panic (#599) +* Bump MSRV to Rust 1.49 (#606) +* Fix header decoding error when a header name is contained at a continuation + header boundary (#589) +* Remove I/O type names from handshake `tracing` spans (#608) + # 0.3.11 (January 26, 2022) * Make `SendStream::poll_capacity` never return `Ok(Some(0))` (#596) diff --git a/Cargo.toml b/Cargo.toml index cf9d3e437..f7d06bf55 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ name = "h2" # - html_root_url. # - Update CHANGELOG.md. # - Create git tag -version = "0.3.11" +version = "0.3.12" license = "MIT" authors = [ "Carl Lerche ", diff --git a/src/lib.rs b/src/lib.rs index e9e25e3f6..925b10c93 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,7 +78,7 @@ //! [`server::handshake`]: server/fn.handshake.html //! [`client::handshake`]: client/fn.handshake.html -#![doc(html_root_url = "https://docs.rs/h2/0.3.11")] +#![doc(html_root_url = "https://docs.rs/h2/0.3.12")] #![deny(missing_debug_implementations, missing_docs)] #![cfg_attr(test, deny(warnings))] From a54d9265b7a6dfc78f67c38db62bfbeae790c1f8 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Fri, 11 Feb 2022 09:43:40 +0100 Subject: [PATCH 23/26] Upgrade tokio-util to 0.7 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index f7d06bf55..0333389ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ members = [ futures-core = { version = "0.3", default-features = false } futures-sink = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false } -tokio-util = { version = "0.6", features = ["codec"] } +tokio-util = { version = "0.7.1", features = ["codec"] } tokio = { version = "1", features = ["io-util"] } bytes = "1" http = "0.2" From 3a0c622f0c9ec8f53df8f6481ba7873dc06be9cf Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 31 Mar 2022 15:12:45 -0700 Subject: [PATCH 24/26] v0.3.13 --- CHANGELOG.md | 4 ++++ Cargo.toml | 2 +- src/lib.rs | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7cff904f9..7b00632f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.3.13 (March 31, 2022) + +* Update private internal `tokio-util` dependency. + # 0.3.12 (March 9, 2022) * Avoid time operations that can panic (#599) diff --git a/Cargo.toml b/Cargo.toml index 0333389ae..bc1388051 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ name = "h2" # - html_root_url. # - Update CHANGELOG.md. # - Create git tag -version = "0.3.12" +version = "0.3.13" license = "MIT" authors = [ "Carl Lerche ", diff --git a/src/lib.rs b/src/lib.rs index 925b10c93..be42b100e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,7 +78,7 @@ //! [`server::handshake`]: server/fn.handshake.html //! [`client::handshake`]: client/fn.handshake.html -#![doc(html_root_url = "https://docs.rs/h2/0.3.12")] +#![doc(html_root_url = "https://docs.rs/h2/0.3.13")] #![deny(missing_debug_implementations, missing_docs)] #![cfg_attr(test, deny(warnings))] From 9aab3620c1f12c5230a927128a4394017e69f309 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Tue, 10 May 2022 10:51:44 +0200 Subject: [PATCH 25/26] Ignore doctest that fails due to unexposed private API --- src/frame/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 5a682b634..8b5351b51 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -11,7 +11,7 @@ use std::fmt; /// /// # Examples /// -/// ```rust +/// ```rust,ignore /// let buf: [u8; 4] = [0, 0, 0, 1]; /// assert_eq!(1u32, unpack_octets_4!(buf, 0, u32)); /// ``` From 4999051cb6f45f8e85637ac6ebf22df1fa566383 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Mon, 22 Aug 2022 10:19:00 +0200 Subject: [PATCH 26/26] Retrigger tests