diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 0ba712fdc..d5ebd29dc 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1,6 +1,6 @@ use std::{ cmp, - collections::VecDeque, + collections::{BTreeMap, VecDeque}, convert::TryFrom, fmt, io, mem, net::{IpAddr, SocketAddr}, @@ -61,7 +61,7 @@ mod packet_crypto; use packet_crypto::{PrevCrypto, ZeroRttCrypto}; mod paths; -use paths::{PathData, PathResponses}; +use paths::PathData; pub use paths::{PathId, RttEstimator}; mod send_buffer; @@ -143,7 +143,13 @@ pub struct Connection { /// The "real" local IP address which was was used to receive the initial packet. /// This is only populated for the server case, and if known local_ip: Option, - paths: FxHashMap, + /// The [`PathData`] for each path + /// + /// This needs to be ordered because [`Connection::poll_transmit`] needs to + /// deterministically select the next PathId to send on. + /// + /// TODO(flub): well does it really? But deterministic is nice for now. + paths: BTreeMap, /// Whether MTU detection is supported in this environment allow_mtud: bool, state: State, @@ -174,7 +180,7 @@ pub struct Connection { spin: bool, /// Packet number spaces: initial, handshake, 1-RTT spaces: [PacketSpace; 3], - /// Highest usable packet number space + /// Highest usable [`SpaceId`] highest_space: SpaceId, /// 1-RTT keys used prior to a key update prev_crypto: Option, @@ -197,8 +203,7 @@ pub struct Connection { // // Queued non-retransmittable 1-RTT data // - /// Responses to PATH_CHALLENGE frames - path_responses: PathResponses, + /// If the CONNECTION_CLOSE frame needs to be sent close: bool, // @@ -303,8 +308,8 @@ impl Connection { handshake_cid: loc_cid, rem_handshake_cid: rem_cid, local_cid_state, - paths: FxHashMap::from_iter([( - PathId::default(), + paths: BTreeMap::from_iter([( + PathId(0), PathMigrationData { path, prev_path: None, @@ -346,7 +351,6 @@ impl Connection { timers: TimerTable::default(), authentication_failures: 0, error: None, - path_responses: PathResponses::default(), close: false, ack_frequency: AckFrequencyState::new(get_max_ack_delay( @@ -495,6 +499,23 @@ impl Connection { true => max_datagrams, }; + // Each call to poll_transmit can only send datagrams to one destination, because + // all datagrams in a GSO batch are for the same destination. Therefore only + // datagrams for one Path ID are produced for each poll_transmit call. + + // First, if we have to send a close, select a path for that. + // Next, all paths that have a PATH_CHALLENGE or PATH_RESPONSE pending. + + // For all AVAILABLE paths: + // - Is the path congestion blocked or pacing blocked? + // - call maybe_queue_ to ensure a tail-loss probe would be sent? + // - do we need to send a close message? + // - call can_send + // Once there's nothing more to send on the AVAILABLE paths, do the same for BACKUP paths + + // What about PATH_CHALLENGE or PATH_RESPONSE? We need to check if we need to send + // any of those. + // TODO(flub): We only have PathId(0) for now. For multipath we need to figure // out which path we want to send the packet on before we start building it. let path_id = PathId(0); @@ -505,18 +526,10 @@ impl Connection { self.path_data(path_id).current_mtu().into(), ); - if let Some(challenge) = self.send_path_challenge(now, &mut buf, path_id) { + if let Some(challenge) = self.send_prev_path_challenge(now, &mut buf, path_id) { return Some(challenge); } - // If we need to send a probe, make sure we have something to send. - // TODO(flub): We need to populate each path_id. - for space in SpaceId::iter() { - let request_immediate_ack = - space == SpaceId::Data && self.peer_supports_ack_frequency(); - self.spaces[space].maybe_queue_probe(path_id, request_immediate_ack, &self.streams); - } - // Check whether we need to send a close message let close = match self.state { State::Drained => { @@ -536,13 +549,13 @@ impl Connection { }; // Check whether we need to send an ACK_FREQUENCY frame - let rtt = self - .paths - .values() - .map(|p| p.path.rtt.get()) - .min() - .expect("one path exists"); if let Some(config) = &self.config.ack_frequency_config { + let rtt = self + .paths + .values() + .map(|p| p.path.rtt.get()) + .min() + .expect("one path exists"); self.spaces[SpaceId::Data].pending.ack_frequency = self .ack_frequency .should_send_ack_frequency(rtt, config, &self.peer_params) @@ -565,103 +578,87 @@ impl Connection { // The packet number of the last built packet. let mut last_packet_number = None; - // Iterate over all spaces and find data to send - // - // Each loop builds one packet, which is finished before the next iteration of the - // loop. When packets are coalesced a datagram is filled over multiple loops. - let mut next_space_id = self.next_send_space(SpaceId::Initial, path_id, &buf, close); - while let Some(space_id) = next_space_id { - // Whether the next packet will contain ack-eliciting frames. - let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams) - || self.spaces[space_id].for_path(path_id).ping_pending - || self.spaces[space_id] - .for_path(path_id) - .immediate_ack_pending; - if space_id == SpaceId::Data { - let pn = self.spaces[SpaceId::Data] - .for_path(path_id) - .peek_tx_number(); - let frame_space_1rtt = buf - .segment_size() - .saturating_sub(self.predict_1rtt_overhead(pn, path_id)); - ack_eliciting |= self.can_send_1rtt(frame_space_1rtt); - } - - // If the datagram is full, we need to start a new one. - if buf.datagram_remaining_mut() == 0 { - // Is 1 more datagram allowed? - if buf.num_datagrams() >= buf.max_datagrams() { - // No more datagrams allowed - break; + let mut path_id = *self.paths.first_key_value().expect("one path must exist").0; + let mut space_id = match path_id { + PathId(0) => SpaceId::Initial, + _ => SpaceId::Data, + }; + loop { + // Can anything be sent on this packet number space? If not advance either the + // SpaceId or PathId and try again. + let send_ready = self.space_ready_to_send(path_id, space_id, &buf, close, now); + let can_send = match send_ready { + SendReady::Frames(can_send) if !can_send.is_empty() => can_send, + SendReady::Frames(_) if space_id < SpaceId::Data => { + space_id = space_id.next(); + continue; } - - // Anti-amplification is only based on `total_sent`, which gets - // updated at the end of this method. Therefore we pass the amount - // of bytes for datagrams that are already created, as well as 1 byte - // for starting another datagram. If there is any anti-amplification - // budget left, we always allow a full MTU to be sent - // (see https://github.com/quinn-rs/quinn/issues/1082) - if self.path_data(path_id).anti_amplification_blocked( - (buf.segment_size() * buf.num_datagrams()) as u64 + 1, - ) { - trace!("blocked by anti-amplification"); - break; + SendReady::CongestionBlocked if space_id < SpaceId::Data => { + // Higher spaces might still have tail-loss probes to send, which are + // not congestion blocked. + congestion_blocked = true; + space_id = space_id.next(); + continue; } - - // Congestion control and pacing checks - // Tail loss probes must not be blocked by congestion, or a deadlock could arise - if ack_eliciting && self.spaces[space_id].for_path(path_id).loss_probes == 0 { - let bytes_to_send = buf.segment_size() as u64; - if self.path_data(path_id).in_flight.bytes + bytes_to_send - >= self.path_data(path_id).congestion.window() - { - next_space_id = self.next_send_space(space_id.next(), path_id, &buf, close); + _ => { + // Nothing more to send on this path. + if !matches!(send_ready, SendReady::Frames(_)) { congestion_blocked = true; - trace!("blocked by congestion control"); - if next_space_id == Some(space_id) { - // We are in the highest space, nothing more to do. - break; - } else { - // We continue looking for packets in higher spaces because we - // might still have to send loss probes in them, which are not - // congestion controlled. - continue; - } } - // Check whether the next datagram is blocked by pacing - if let Some(delay) = - self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) - { - // TODO(@divma): this needs fixing asap - self.timers.set(Timer::Pacing, delay); - congestion_blocked = true; - // Loss probes should be subject to pacing, even though - // they are not congestion controlled. - trace!("blocked by pacing"); + // If there are any datagrams in the transmit, packets for another path + // can not be built anymore. + if buf.num_datagrams() > 0 { break; } + + // TODO(flub): We want to prioritise active paths and those needing path + // challenges/responses. + match self.paths.keys().find(|&&path| path > path_id) { + Some(new_path_id) => { + // See if this next path can send anything. + path_id = *new_path_id; + space_id = SpaceId::Data; + continue; + } + None => { + // Nothing more to send. + break; + } + } + } + }; + + // If the datagram is full, we need to start a new one. + if buf.datagram_remaining_mut() == 0 { + if buf.num_datagrams() >= buf.max_datagrams() { + // No more datagrams allowed + break; } - // Start the next datagram match self.spaces[space_id].for_path(path_id).loss_probes { 0 => buf.start_new_datagram(), _ => { self.spaces[space_id].for_path(path_id).loss_probes -= 1; - // Clamp the datagram to at most the minimum MTU to ensure that loss probes - // can get through and enable recovery even if the path MTU has shrank - // unexpectedly. + // Clamp the datagram to at most the minimum MTU to ensure that loss + // probes can get through and enable recovery even if the path MTU + // has shrank unexpectedly. buf.start_new_datagram_with_size(std::cmp::min( usize::from(INITIAL_MTU), buf.segment_size(), )); } - }; + } + trace!(count = buf.num_datagrams(), "new datagram started"); coalesce = true; pad_datagram = false; } - debug_assert!(buf.datagram_remaining_mut() >= MIN_PACKET_SPACE); + // If coalescing another packet into the existing datagram, there should + // still be enough space for a whole packet. + if buf.datagram_start_offset() < buf.len() { + debug_assert!(buf.datagram_remaining_mut() >= MIN_PACKET_SPACE); + } // // From here on, we've determined that a packet will definitely be sent. @@ -688,17 +685,17 @@ impl Connection { path_id, self.rem_cids.get(&path_id).unwrap().active(), &mut buf, - ack_eliciting, + can_send.other, self, )?; last_packet_number = Some(builder.exact_number); coalesce = coalesce && !builder.short_header; - // https://tools.ietf.org/html/draft-ietf-quic-transport-34#section-14.1 + // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1 pad_datagram |= - space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting); + space_id == SpaceId::Initial && (self.side.is_client() || can_send.other); - if close { + if can_send.close { trace!("sending CONNECTION_CLOSE"); // Encode ACKs before the ConnectionClose message, to give the receiver // a better approximate on what data has been processed. This is @@ -758,23 +755,27 @@ impl Connection { builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram); if space_id == self.highest_space { // Don't send another close packet + // TODO(flub): Is it worth sending CONNECTION_CLOSE on all paths? self.close = false; // `CONNECTION_CLOSE` is the final packet break; } else { - // Send a close frame in every possible space for robustness, per RFC9000 - // "Immediate Close during the Handshake". Don't bother trying to send anything - // else. - next_space_id = self.next_send_space(space_id.next(), path_id, &buf, close); + // Send a close frame in every possible space for robustness, per + // RFC9000 "Immediate Close during the Handshake". Don't bother trying + // to send anything else. + space_id = space_id.next(); continue; } } - // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path - // validation can occur while the link is saturated. + // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that + // path validation can occur while the link is saturated. if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 { - let remote = self.path_data(path_id).remote; - if let Some((token, remote)) = self.path_responses.pop_off_path(remote) { + let path = self.path_data_mut(path_id); + if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) { + // TODO(flub): We need to use the right CID! We shouldn't use the same + // CID as the current active one for the path. Though see also + // https://github.com/quinn-rs/quinn/issues/2184 trace!("PATH_RESPONSE {:08x} (off-path)", token); builder .frame_space_mut() @@ -814,29 +815,15 @@ impl Connection { // only checked if the full MTU is available and when potentially large fixed-size // frames aren't queued, so that lack of space in the datagram isn't the reason for just // writing ACKs. - { - let pn = if builder.space == SpaceId::Data { - builder.exact_number - } else { - self.spaces[SpaceId::Data] - .for_path(path_id) - .peek_tx_number() - }; - let frame_space_1rtt = builder - .buf - .segment_size() - .saturating_sub(self.predict_1rtt_overhead(pn, path_id)); - let can_send = self.space_can_send(space_id, path_id, frame_space_1rtt); - debug_assert!( - !(sent_frames.is_ack_only(&self.streams) - && !can_send.acks - && can_send.other - && builder.buf.segment_size() - == self.path_data(path_id).current_mtu() as usize - && self.datagrams.outgoing.is_empty()), - "SendableFrames was {can_send:?}, but only ACKs have been written" - ); - } + debug_assert!( + !(sent_frames.is_ack_only(&self.streams) + && !can_send.acks + && can_send.other + && builder.buf.segment_size() + == self.path_data(path_id).current_mtu() as usize + && self.datagrams.outgoing.is_empty()), + "SendableFrames was {can_send:?}, but only ACKs have been written" + ); pad_datagram |= sent_frames.requires_padding; if sent_frames.largest_acked.is_some() { @@ -848,86 +835,66 @@ impl Connection { // be coalescing the next packet into this one, or will be ending the datagram // as well. Because if this is the last packet in the datagram more padding // might be needed because of the packet type, or to fill the GSO segment size. - next_space_id = self.next_send_space(space_id, path_id, builder.buf, close); - if let Some(next_space_id) = next_space_id { - // Are we allowed to coalesce AND is there enough space for another *packet* - // in this datagram? - if coalesce - && builder - .buf - .datagram_remaining_mut() - .saturating_sub(builder.predict_packet_end()) - > MIN_PACKET_SPACE - { - // We can append/coalesce the next packet into the current - // datagram. Finish the current packet without adding extra padding. - builder.finish_and_track(now, self, path_id, sent_frames, false); - } else { - // We need a new datagram for the next packet. Finish the current - // packet with padding. - if builder.buf.num_datagrams() > 1 { - // If too many padding bytes would be required to continue the - // GSO batch after this packet, end the GSO batch here. Ensures - // that fixed-size frames with heterogeneous sizes - // (e.g. application datagrams) won't inadvertently waste large - // amounts of bandwidth. The exact threshold is a bit arbitrary - // and might benefit from further tuning, though there's no - // universally optimal value. - // - // Additionally, if this datagram is a loss probe and - // `segment_size` is larger than `INITIAL_MTU`, then padding it - // to `segment_size` to continue the GSO batch would risk - // failure to recover from a reduction in path MTU. Loss probes - // are the only packets for which we might grow `buf_capacity` - // by less than `segment_size`. - const MAX_PADDING: usize = 16; - if builder.buf.datagram_remaining_mut() - > builder.predict_packet_end() + MAX_PADDING - { - trace!( - "GSO truncated by demand for {} padding bytes", - builder.buf.datagram_remaining_mut() - builder.predict_packet_end() - ); - builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram); - break; - } - // Pad the current datagram to GSO segment size so it can be - // included in the GSO batch. - builder.pad_to(builder.buf.segment_size() as u16); + // Are we allowed to coalesce AND is there enough space for another *packet* in + // this datagram AND is there another packet to send in this or the next space? + if coalesce + && builder + .buf + .datagram_remaining_mut() + .saturating_sub(builder.predict_packet_end()) + > MIN_PACKET_SPACE + && (matches!( + self.space_ready_to_send(path_id, space_id, builder.buf, close, now), + SendReady::Frames(can_send) if !can_send.is_empty(), + ) || matches!( + self.space_ready_to_send(path_id, space_id.next(), builder.buf, close, now), + SendReady::Frames(can_send) if !can_send.is_empty(), + )) + { + // We can append/coalesce the next packet into the current + // datagram. Finish the current packet without adding extra padding. + builder.finish_and_track(now, self, path_id, sent_frames, false); + } else { + // We need a new datagram for the next packet. Finish the current + // packet with padding. + if builder.buf.num_datagrams() > 1 { + // If too many padding bytes would be required to continue the + // GSO batch after this packet, end the GSO batch here. Ensures + // that fixed-size frames with heterogeneous sizes + // (e.g. application datagrams) won't inadvertently waste large + // amounts of bandwidth. The exact threshold is a bit arbitrary + // and might benefit from further tuning, though there's no + // universally optimal value. + // + // Additionally, if this datagram is a loss probe and + // `segment_size` is larger than `INITIAL_MTU`, then padding it + // to `segment_size` to continue the GSO batch would risk + // failure to recover from a reduction in path MTU. Loss probes + // are the only packets for which we might grow `buf_capacity` + // by less than `segment_size`. + const MAX_PADDING: usize = 16; + if builder.buf.datagram_remaining_mut() + > builder.predict_packet_end() + MAX_PADDING + { + trace!( + "GSO truncated by demand for {} padding bytes", + builder.buf.datagram_remaining_mut() - builder.predict_packet_end() + ); + builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram); + break; } - builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram); - - if buf.num_datagrams() == 1 { - buf.clip_datagram_size(); - if next_space_id == SpaceId::Data { - // Now that we know the size of the first datagram, check whether - // the data we planned to send will fit in the next segment. If - // not, bail out and leave it for the next GSO batch. We can't - // easily compute the right segment size before the original call to - // `space_can_send`, because at that time we haven't determined - // whether we're going to coalesce with the first datagram or - // potentially pad it to `MIN_INITIAL_SIZE`. - let pn = self.spaces[SpaceId::Data] - .for_path(path_id) - .peek_tx_number(); - let frame_space_1rtt = buf - .segment_size() - .saturating_sub(self.predict_1rtt_overhead(pn, path_id)); - if self - .space_can_send(next_space_id, path_id, frame_space_1rtt) - .is_empty() - { - break; - } - } - } + // Pad the current datagram to GSO segment size so it can be + // included in the GSO batch. + builder.pad_to(builder.buf.segment_size() as u16); } - } else { - // Nothing more to send. This was the last packet. + builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram); - break; + + if buf.num_datagrams() == 1 { + buf.clip_datagram_size(); + } } } @@ -1024,16 +991,67 @@ impl Connection { }) } - /// Returns the [`SpaceId`] of the next packet space which has data to send + // /// Returns the next path on which data is ready to be sent + // fn next_send_path(&mut self) -> Option { + // if !self.is_multipath_enabled() { + // return Some(PathId(0)); + // } + + // // If we still have initial or handshake spaces we prefer those. + // if self.highest_space < SpaceId::Data { + // for space_id in SpaceId::iter() { + // let space = &mut self.spaces[space_id]; + // if !space.can_send(PathId(0), &self.streams).is_empty() + // || space.for_path(PathId(0)).loss_probes > 0 + // { + // // TODO(flub): Check the path is not anti-aplification, congestion or + // // pacing blocked. + // return Some(PathId(0)); + // } + // } + // } + + // // From now on we only need to check SpaceId::Data. + // let space = &mut self.spaces[SpaceId::Data]; + + // // First look for pending PATH_CHALLENGE or PATH_RESPONSE frames. + + // // PATH_CHALLENGE probes for new paths so is not anti-amplification blocked. + // // PATH_RESPONSE is anti-amplification blocked. However we can send a PATH_RESPONSE that is + // for (path_id, path) in self.paths.iter() { + // let challenge_pending = path.path.challenge_pending + // || path + // .prev_path + // .as_ref() + // .is_some_and(|(_cid, path_data)| path_data.challenge_pending); + + // let response_pending = !path.path.path_responses.is_empty(); + // if challenge_pending || response_pending { + // // PATH_CHALLENGE and PATH_RESPONSE frames are not anti-amplification blocked. The + + // // TODO(flub): Check the path is not congestion or pacing blocked + // return Some(*path_id); + // } + // } + + // // + + // todo!(); + // None + // } + + /// Whether anything needs to be sent in this packet number space /// - /// This takes into account the space available to frames in the next datagram. - fn next_send_space( + /// This checks whether there is anything to send on the `(PathId, SpaceId)` tuple and + /// whether sending is allowed by the congestion controler etc. + fn space_ready_to_send( &mut self, - current_space_id: SpaceId, path_id: PathId, - buf: &TransmitBuf<'_>, + space_id: SpaceId, + transmit: &TransmitBuf<'_>, close: bool, - ) -> Option { + now: Instant, + ) -> SendReady { // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed // to be able to send an individual frame at least this large in the next 1-RTT // packet. This could be generalized to support every space, but it's only needed to @@ -1043,26 +1061,70 @@ impl Connection { let pn = self.spaces[SpaceId::Data] .for_path(path_id) .peek_tx_number(); - let frame_space_1rtt = buf + let frame_space_1rtt = transmit .segment_size() .saturating_sub(self.predict_1rtt_overhead(pn, path_id)); - let mut space_id = current_space_id; - loop { - let can_send = self.space_can_send(space_id, path_id, frame_space_1rtt); - if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) { - return Some(space_id); - } - space_id = match space_id { - SpaceId::Initial => SpaceId::Handshake, - SpaceId::Handshake => SpaceId::Data, - SpaceId::Data => break, + let mut can_send = self.space_can_send(space_id, path_id, frame_space_1rtt); + can_send.close = close && self.spaces[space_id].crypto.is_some(); + let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0; + + if can_send.is_empty() && !need_loss_probe { + return SendReady::Frames(can_send); + } + + // Anti-amplification is only based on `total_sent`, which gets updated after the + // transmit is sent. Therefore we pass the amount of bytes for datagrams that are + // already created, as well as 1 byte for starting another datagram. If there is any + // anti-amplification budget left, we always allow a full MTU to be sent (see + // https://github.com/quinn-rs/quinn/issues/1082). + if self + .path_data(path_id) + .anti_amplification_blocked(transmit.len() as u64 + 1) + { + return SendReady::AntiAmplificationBlocked; + } + + // Congestion control check. + // Tail loss probes must not be blocked by congestion, or a deadlock could arise. + let bytes_to_send = transmit.segment_size() as u64; + if can_send.other && !need_loss_probe && !can_send.close { + let path = self.path_data(path_id); + if path.in_flight.bytes + bytes_to_send >= path.congestion.window() { + trace!("blocked by congestion control"); + return SendReady::CongestionBlocked; } } - None + + // Pacing check. + if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) { + // TODO(@divma): this needs fixing asap + self.timers.set(Timer::Pacing, delay); + // Loss probes and CONNECTION_CLOSE should be subject to pacing, even though + // they are not congestion controlled. + trace!("blocked by pacing"); + return SendReady::PacingBlocked; + } + + // Ensure there is something to send if a loss probe is needed. + if need_loss_probe && can_send.is_empty() { + let request_immediate_ack = + space_id == SpaceId::Data && self.peer_supports_ack_frequency(); + self.spaces[space_id].maybe_queue_probe(path_id, request_immediate_ack, &self.streams); + can_send = self.space_can_send(space_id, path_id, frame_space_1rtt); + debug_assert!( + can_send.other, + "tail-loss probe must have something to send" + ); + } + + SendReady::Frames(can_send) } /// Send PATH_CHALLENGE for a previous path if necessary - fn send_path_challenge( + /// + /// QUIC-TRANSPORT section 9.3.3 + /// https://www.rfc-editor.org/rfc/rfc9000.html#name-off-path-packet-forwarding + fn send_prev_path_challenge( &mut self, now: Instant, buf: &mut TransmitBuf<'_>, @@ -1134,7 +1196,7 @@ impl Connection { } let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams); if space_id == SpaceId::Data { - can_send.other |= self.can_send_1rtt(frame_space_1rtt); + can_send.other |= self.can_send_1rtt(path_id, frame_space_1rtt); } can_send } @@ -1513,6 +1575,8 @@ impl Connection { /// Multipath is only enabled after the handshake is completed and if it was negotiated /// by both peers. pub fn is_multipath_enabled(&self) -> bool { + // TODO(flub): I believe it might be a TRANSPORT_ERROR if multipath is enabled but + // there's a zero-lenth CID. !self.is_handshaking() && !self.handshake_cid.is_empty() && !self.rem_handshake_cid.is_empty() @@ -2066,6 +2130,7 @@ impl Connection { } /// Probe Timeout + // TODO(flub): This needs a PathId parameter fn pto(&self, space: SpaceId) -> Duration { let max_ack_delay = match space { SpaceId::Initial | SpaceId::Handshake => Duration::ZERO, @@ -3025,18 +3090,45 @@ impl Connection { close = Some(reason); } Frame::PathChallenge(token) => { - self.path_responses.push(number, token, remote); - if remote == self.path_data(path_id).remote { + // A PATH_CHALLENGE can create a new path. + let path = &mut self + .paths + .entry(path_id) + .or_insert_with(|| { + let peer_max_udp_payload_size = + u16::try_from(self.peer_params.max_udp_payload_size.into_inner()) + .unwrap_or(u16::MAX); + let data = PathData::new( + remote, + self.allow_mtud, + Some(peer_max_udp_payload_size), + now, + &self.config, + ); + PathMigrationData { + path: data, + prev_path: None, + } + }) + .path; + path.path_responses.push(number, token, remote); + if remote == path.remote { // PATH_CHALLENGE on active path, possible off-path packet forwarding // attack. Send a non-probing packet to recover the active path. match self.peer_supports_ack_frequency() { true => self.immediate_ack(path_id), - false => self.ping(), + // TODO(flub): switch back to using self.ping(path_id) again if + // that function gains a PathId parameter? + false => { + self.spaces[self.highest_space] + .for_path(path_id) + .ping_pending = true; + } } } } Frame::PathResponse(token) => { - // TODO(@divma): make an effort ot move to path + // TODO(@divma): make an effort to move to path let path_data = self.paths.get_mut(&path_id).expect("known path"); if path_data.path.challenge == Some(token) && remote == path_data.path.remote { trace!("new path validated"); @@ -3487,7 +3579,6 @@ impl Connection { ) -> SentFrames { let mut sent = SentFrames::default(); let is_multipath_enabled = self.is_multipath_enabled(); - let path_data_remote = self.path_data(path_id).remote; let space = &mut self.spaces[space_id]; let path = &mut self.paths.get_mut(&path_id).expect("known path").path; let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none(); @@ -3626,7 +3717,7 @@ impl Connection { // PATH_RESPONSE if buf.remaining_mut() > 9 && space_id == SpaceId::Data { - if let Some(token) = self.path_responses.pop_on_path(path.remote) { + if let Some(token) = path.path_responses.pop_on_path(path.remote) { sent.non_retransmits = true; sent.requires_padding = true; trace!("PATH_RESPONSE {:08x}", token); @@ -3804,7 +3895,7 @@ impl Connection { panic!("NEW_TOKEN frames should not be enqueued by clients"); }; - if remote_addr != path_data_remote { + if remote_addr != path.remote { // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only // useful if used from the same IP address. Thus, we abandon enqueued NEW_TOKEN // frames upon an path change. Instead, when the new path becomes validated, @@ -4162,20 +4253,23 @@ impl Connection { /// Whether we have 1-RTT data to send /// - /// See also `self.space(SpaceId::Data).can_send()` - fn can_send_1rtt(&self, max_size: usize) -> bool { - // TODO(@divma): needs work for multipath - let chanllenge_pending = { - let path_mig_data = self.paths.get(&PathId(0)).expect("known path"); - path_mig_data.path.challenge_pending - || path_mig_data - .prev_path + /// This checks for frames that can only be sent in the data space (1-RTT): + /// - Pending PATH_CHALLENGE frames on the active and previous path if just migrated. + /// - Pending data to send in STREAM frames. + /// - Pending DATAGRAM frames to send. + /// + /// See also [`PacketSpace::can_send`] which keeps track of all other frame types that + /// may need to be sent. + fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> bool { + let challenge_pending = self.paths.get(&path_id).is_some_and(|p| { + p.path.challenge_pending + || p.prev_path .as_ref() - .is_some_and(|(_, x)| x.challenge_pending) - }; + .is_some_and(|(_, path)| path.challenge_pending) + }); self.streams.can_send_stream_data() - || chanllenge_pending - || !self.path_responses.is_empty() + || challenge_pending + || !self.path_data(path_id).path_responses.is_empty() || self .datagrams .outgoing @@ -4288,6 +4382,13 @@ impl fmt::Debug for Connection { } } +enum SendReady { + Frames(SendableFrames), + AntiAmplificationBlocked, + CongestionBlocked, + PacingBlocked, +} + /// Fields of `Connection` specific to it being client-side or server-side enum ConnectionSide { Client { diff --git a/quinn-proto/src/connection/paths.rs b/quinn-proto/src/connection/paths.rs index 69313c790..ea138c7fe 100644 --- a/quinn-proto/src/connection/paths.rs +++ b/quinn-proto/src/connection/paths.rs @@ -70,6 +70,8 @@ pub(super) struct PathData { pub(super) pacing: Pacer, pub(super) challenge: Option, pub(super) challenge_pending: bool, + /// Pending responses to PATH_CHALLENGE frames + pub(super) path_responses: PathResponses, /// Whether we're certain the peer can both send and receive on this address /// /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every @@ -123,6 +125,7 @@ impl PathData { congestion, challenge: None, challenge_pending: false, + path_responses: PathResponses::default(), validated: false, total_sent: 0, total_recvd: 0, @@ -163,6 +166,7 @@ impl PathData { congestion, challenge: None, challenge_pending: false, + path_responses: PathResponses::default(), validated: false, total_sent: 0, total_recvd: 0, @@ -414,6 +418,7 @@ impl PathResponses { struct PathResponse { /// The packet number the corresponding PATH_CHALLENGE was received in packet: u64, + /// The token of the PATH_CHALLENGE token: u64, /// The address the corresponding PATH_CHALLENGE was received from remote: SocketAddr, diff --git a/quinn-proto/src/connection/spaces.rs b/quinn-proto/src/connection/spaces.rs index a2fbc8cb5..4c6b103c8 100644 --- a/quinn-proto/src/connection/spaces.rs +++ b/quinn-proto/src/connection/spaces.rs @@ -105,9 +105,6 @@ impl PacketSpace { /// waiting to be sent, then we retransmit in-flight data to reduce odds of loss. If there's no /// in-flight data either, we're probably a client guarding against a handshake /// anti-amplification deadlock and we just make something up. - // TODO(flub): This is still wrong! The probe needs to be sent for each path - // separately, and ON the path so that it gets a higher packet number than those that - // might be lost. pub(super) fn maybe_queue_probe( &mut self, path_id: PathId, @@ -147,34 +144,29 @@ impl PacketSpace { // Nothing new to send and nothing to retransmit, so fall back on a ping. This should only // happen in rare cases during the handshake when the server becomes blocked by // anti-amplification. - // TODO(flub): Sending a ping on all paths is wasteful, but we also need per-path - // pings so doing this is easier for now. Maybe later introduce a - // connection-level ping again. if !self.for_path(path_id).immediate_ack_pending { - self.number_spaces - .values_mut() - .for_each(|s| s.ping_pending = true); + self.for_path(path_id).ping_pending = true; } } - /// Whether there is anything to send. + /// Whether there is anything to send in this space + /// + /// For the data space [`Connection::can_send_1rtt`] also needs to be consulted. pub(super) fn can_send(&self, path_id: PathId, streams: &StreamsState) -> SendableFrames { - let immediate_ack_pending = self - .number_spaces - .get(&path_id) - .map(|pns| pns.immediate_ack_pending) - .unwrap_or_default(); let acks = self.pending_acks.can_send(); let other = !self.pending.is_empty(streams) || self .number_spaces - .values() - .any(|s| s.ping_pending || immediate_ack_pending); - - SendableFrames { acks, other } + .get(&path_id) + .is_some_and(|s| s.ping_pending || s.immediate_ack_pending); + SendableFrames { + acks, + other, + close: false, + } } - /// The number of packets sent with the current crypto keys. + /// The number of packets sent with the current crypto keys /// /// Used to know if a key update is needed. pub(super) fn sent_with_keys(&self) -> u64 { @@ -755,8 +747,12 @@ impl Dedup { /// Indicates which data is available for sending #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub(super) struct SendableFrames { + /// Whether there ACK frames to send pub(super) acks: bool, + /// Whether there are any other frames to send, these are ack-eliciting pub(super) other: bool, + /// Whether there is a CONNECTION_CLOSE to send + pub(super) close: bool, } impl SendableFrames { @@ -765,12 +761,13 @@ impl SendableFrames { Self { acks: false, other: false, + close: false, } } /// Whether no data is sendable pub(super) fn is_empty(&self) -> bool { - !self.acks && !self.other + !self.acks && !self.other && !self.close } }