From c85c7cc771800581093af1f8e91154f4535ce7c8 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 10 Apr 2025 14:45:14 +1000 Subject: [PATCH 1/3] Add a kad SUBSTREAM_TIMEOUT constant --- protocols/kad/src/handler.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 384ebc3f2b1..6dcb83c8627 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -41,6 +41,7 @@ use std::time::Duration; use std::{error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll}; const MAX_NUM_STREAMS: usize = 32; +const SUBSTREAM_TIMEOUT: Duration = Duration::from_secs(10); /// Protocol handler that manages substreams for the Kademlia protocol /// on a single connection with a peer. @@ -455,7 +456,7 @@ impl Handler { next_connec_unique_id: UniqueConnecId(0), inbound_substreams: Default::default(), outbound_substreams: futures_bounded::FuturesTupleSet::new( - Duration::from_secs(10), + SUBSTREAM_TIMEOUT, MAX_NUM_STREAMS, ), pending_streams: Default::default(), From 782a4ce6bfb33ddf814dd092d18f75d8913a122c Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 10 Apr 2025 14:52:52 +1000 Subject: [PATCH 2/3] Add request, response, and closing timeouts on inbound streams --- protocols/kad/src/handler.rs | 282 +++++++++++++++++++++++++++-------- 1 file changed, 220 insertions(+), 62 deletions(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 6dcb83c8627..d0938566ff6 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -28,6 +28,7 @@ use either::Either; use futures::channel::oneshot; use futures::prelude::*; use futures::stream::SelectAll; +use futures_timer::Delay; use libp2p_core::{upgrade, ConnectedPoint}; use libp2p_identity::PeerId; use libp2p_swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound}; @@ -106,15 +107,39 @@ enum InboundSubstreamState { first: bool, connection_id: UniqueConnecId, substream: KadInStreamSink, + /// How long before we give up on waiting for a request. + request_timeout: Delay, }, /// Waiting for the behaviour to send a [`HandlerIn`] event containing the response. - WaitingBehaviour(UniqueConnecId, KadInStreamSink, Option), + WaitingBehaviour( + UniqueConnecId, + KadInStreamSink, + Option, + /// How long before we give up on sending a response (because the remote will have given up + /// anyway). + Delay, + ), /// Waiting to send an answer back to the remote. - PendingSend(UniqueConnecId, KadInStreamSink, KadResponseMsg), + PendingSend( + UniqueConnecId, + KadInStreamSink, + KadResponseMsg, + /// How long before we give up on sending a response. + Delay, + ), /// Waiting to flush an answer back to the remote. - PendingFlush(UniqueConnecId, KadInStreamSink), + PendingFlush( + UniqueConnecId, + KadInStreamSink, + /// How long before we give up on sending a response. + Delay, + ), /// The substream is being closed. - Closing(KadInStreamSink), + Closing( + KadInStreamSink, + /// How long before we give up on closing the substream. + Delay, + ), /// The substream was cancelled in favor of a new one. Cancelled, @@ -135,10 +160,14 @@ impl InboundSubstreamState { phantom: PhantomData, }, ) { - InboundSubstreamState::WaitingBehaviour(conn_id, substream, mut waker) - if conn_id == id.connec_unique_id => - { - *self = InboundSubstreamState::PendingSend(conn_id, substream, msg); + InboundSubstreamState::WaitingBehaviour( + conn_id, + substream, + mut waker, + response_timeout, + ) if conn_id == id.connec_unique_id => { + *self = + InboundSubstreamState::PendingSend(conn_id, substream, msg, response_timeout); if let Some(waker) = waker.take() { waker.wake(); @@ -161,12 +190,20 @@ impl InboundSubstreamState { phantom: PhantomData, }, ) { - InboundSubstreamState::WaitingMessage { substream, .. } - | InboundSubstreamState::WaitingBehaviour(_, substream, _) - | InboundSubstreamState::PendingSend(_, substream, _) - | InboundSubstreamState::PendingFlush(_, substream) - | InboundSubstreamState::Closing(substream) => { - *self = InboundSubstreamState::Closing(substream); + InboundSubstreamState::WaitingMessage { + substream, + first: _, + connection_id: _, + request_timeout: _, + } + | InboundSubstreamState::WaitingBehaviour(_, substream, _, _) + | InboundSubstreamState::PendingSend(_, substream, _, _) + | InboundSubstreamState::PendingFlush(_, substream, _) => { + let closing_timeout = Delay::new(SUBSTREAM_TIMEOUT); + *self = InboundSubstreamState::Closing(substream, closing_timeout); + } + InboundSubstreamState::Closing(substream, closing_timeout) => { + *self = InboundSubstreamState::Closing(substream, closing_timeout); } InboundSubstreamState::Cancelled => { *self = InboundSubstreamState::Cancelled; @@ -548,6 +585,7 @@ impl Handler { first: true, connection_id: connec_unique_id, substream: protocol, + request_timeout: Delay::new(SUBSTREAM_TIMEOUT), }); } @@ -616,7 +654,7 @@ impl ConnectionHandler for Handler { .inbound_substreams .iter_mut() .find(|state| match state { - InboundSubstreamState::WaitingBehaviour(conn_id, _, _) => { + InboundSubstreamState::WaitingBehaviour(conn_id, _, _, _) => { conn_id == &request_id.connec_unique_id } _ => false, @@ -886,15 +924,26 @@ impl futures::Stream for InboundSubstreamState { first, connection_id, mut substream, - } => match substream.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(KadRequestMsg::Ping))) => { + mut request_timeout, + } => match ( + substream.poll_next_unpin(cx), + request_timeout.poll_unpin(cx), + ) { + // Prefer ready requests over ready timeouts + (Poll::Ready(Some(Ok(KadRequestMsg::Ping))), _) => { tracing::warn!("Kademlia PING messages are unsupported"); - *this = InboundSubstreamState::Closing(substream); + let closing_timeout = Delay::new(SUBSTREAM_TIMEOUT); + *this = InboundSubstreamState::Closing(substream, closing_timeout); } - Poll::Ready(Some(Ok(KadRequestMsg::FindNode { key }))) => { - *this = - InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); + (Poll::Ready(Some(Ok(KadRequestMsg::FindNode { key }))), _) => { + let response_timeout = Delay::new(SUBSTREAM_TIMEOUT); + *this = InboundSubstreamState::WaitingBehaviour( + connection_id, + substream, + None, + response_timeout, + ); return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( HandlerEvent::FindNodeReq { key, @@ -904,9 +953,14 @@ impl futures::Stream for InboundSubstreamState { }, ))); } - Poll::Ready(Some(Ok(KadRequestMsg::GetProviders { key }))) => { - *this = - InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); + (Poll::Ready(Some(Ok(KadRequestMsg::GetProviders { key }))), _) => { + let response_timeout = Delay::new(SUBSTREAM_TIMEOUT); + *this = InboundSubstreamState::WaitingBehaviour( + connection_id, + substream, + None, + response_timeout, + ); return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( HandlerEvent::GetProvidersReq { key, @@ -916,19 +970,27 @@ impl futures::Stream for InboundSubstreamState { }, ))); } - Poll::Ready(Some(Ok(KadRequestMsg::AddProvider { key, provider }))) => { + (Poll::Ready(Some(Ok(KadRequestMsg::AddProvider { key, provider }))), _) => { + // The request has finished, so renew the request timeout + let request_timeout = Delay::new(SUBSTREAM_TIMEOUT); *this = InboundSubstreamState::WaitingMessage { first: false, connection_id, substream, + request_timeout, }; return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( HandlerEvent::AddProvider { key, provider }, ))); } - Poll::Ready(Some(Ok(KadRequestMsg::GetValue { key }))) => { - *this = - InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); + (Poll::Ready(Some(Ok(KadRequestMsg::GetValue { key }))), _) => { + let response_timeout = Delay::new(SUBSTREAM_TIMEOUT); + *this = InboundSubstreamState::WaitingBehaviour( + connection_id, + substream, + None, + response_timeout, + ); return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( HandlerEvent::GetRecord { key, @@ -938,9 +1000,14 @@ impl futures::Stream for InboundSubstreamState { }, ))); } - Poll::Ready(Some(Ok(KadRequestMsg::PutValue { record }))) => { - *this = - InboundSubstreamState::WaitingBehaviour(connection_id, substream, None); + (Poll::Ready(Some(Ok(KadRequestMsg::PutValue { record }))), _) => { + let response_timeout = Delay::new(SUBSTREAM_TIMEOUT); + *this = InboundSubstreamState::WaitingBehaviour( + connection_id, + substream, + None, + response_timeout, + ); return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( HandlerEvent::PutRecord { record, @@ -950,69 +1017,160 @@ impl futures::Stream for InboundSubstreamState { }, ))); } - Poll::Pending => { + (Poll::Pending, Poll::Pending) => { + // Keep the original request timeout *this = InboundSubstreamState::WaitingMessage { first, connection_id, substream, + request_timeout, }; return Poll::Pending; } - Poll::Ready(None) => { + (Poll::Pending, Poll::Ready(())) => { + tracing::debug!( + ?first, + ?connection_id, + "Inbound substream timed out waiting for request", + ); + + // Renew the request timeout, but mark this substream as available for re-use + let request_timeout = Delay::new(SUBSTREAM_TIMEOUT); + *this = InboundSubstreamState::WaitingMessage { + first: false, + connection_id, + substream, + request_timeout, + }; + return Poll::Pending; + } + (Poll::Ready(None), _) => { return Poll::Ready(None); } - Poll::Ready(Some(Err(e))) => { + (Poll::Ready(Some(Err(e))), _) => { tracing::trace!("Inbound substream error: {:?}", e); return Poll::Ready(None); } }, - InboundSubstreamState::WaitingBehaviour(id, substream, _) => { - *this = InboundSubstreamState::WaitingBehaviour( - id, - substream, - Some(cx.waker().clone()), - ); - - return Poll::Pending; - } - InboundSubstreamState::PendingSend(id, mut substream, msg) => { - match substream.poll_ready_unpin(cx) { - Poll::Ready(Ok(())) => match substream.start_send_unpin(msg) { + InboundSubstreamState::WaitingBehaviour( + id, + substream, + mut maybe_waker, + mut response_timeout, + ) => match response_timeout.poll_unpin(cx) { + Poll::Ready(()) => { + tracing::debug!( + connection_id = ?id, + has_waker=?maybe_waker.is_some(), + "Inbound substream timed out waiting for behaviour response", + ); + + *this = InboundSubstreamState::Closing(substream, response_timeout); + return Poll::Ready(None); + } + Poll::Pending => { + // Keep the original response timeout, but update the waker if needed + if let Some(waker) = maybe_waker.as_mut() { + waker.clone_from(cx.waker()) + } else { + maybe_waker = Some(cx.waker().clone()); + } + + *this = InboundSubstreamState::WaitingBehaviour( + id, + substream, + maybe_waker, + response_timeout, + ); + + return Poll::Pending; + } + }, + InboundSubstreamState::PendingSend(id, mut substream, msg, mut response_timeout) => { + match ( + substream.poll_ready_unpin(cx), + response_timeout.poll_unpin(cx), + ) { + (Poll::Ready(Ok(())), _) => match substream.start_send_unpin(msg) { Ok(()) => { - *this = InboundSubstreamState::PendingFlush(id, substream); + *this = InboundSubstreamState::PendingFlush( + id, + substream, + response_timeout, + ); } + // TODO: close here? Err(_) => return Poll::Ready(None), }, - Poll::Pending => { - *this = InboundSubstreamState::PendingSend(id, substream, msg); + (Poll::Pending, Poll::Pending) => { + *this = InboundSubstreamState::PendingSend( + id, + substream, + msg, + response_timeout, + ); return Poll::Pending; } - Poll::Ready(Err(_)) => return Poll::Ready(None), + // TODO: close here? (x2) + (Poll::Pending, Poll::Ready(())) => { + tracing::debug!( + connection_id = ?id, + "Inbound substream timed out waiting for response send", + ); + + return Poll::Ready(None); + } + (Poll::Ready(Err(_)), _) => return Poll::Ready(None), } } - InboundSubstreamState::PendingFlush(id, mut substream) => { - match substream.poll_flush_unpin(cx) { - Poll::Ready(Ok(())) => { + InboundSubstreamState::PendingFlush(id, mut substream, mut response_timeout) => { + match ( + substream.poll_flush_unpin(cx), + response_timeout.poll_unpin(cx), + ) { + (Poll::Ready(Ok(())), _) => { *this = InboundSubstreamState::WaitingMessage { first: false, connection_id: id, substream, + request_timeout: Delay::new(SUBSTREAM_TIMEOUT), }; } - Poll::Pending => { - *this = InboundSubstreamState::PendingFlush(id, substream); + (Poll::Pending, Poll::Pending) => { + *this = InboundSubstreamState::PendingFlush( + id, + substream, + response_timeout, + ); return Poll::Pending; } - Poll::Ready(Err(_)) => return Poll::Ready(None), + // TODO: close here? (x2) + (Poll::Pending, Poll::Ready(())) => { + tracing::debug!( + connection_id = ?id, + "Inbound substream timed out waiting for response flush", + ); + + return Poll::Ready(None); + } + (Poll::Ready(Err(_)), _) => return Poll::Ready(None), } } - InboundSubstreamState::Closing(mut stream) => match stream.poll_close_unpin(cx) { - Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => return Poll::Ready(None), - Poll::Pending => { - *this = InboundSubstreamState::Closing(stream); - return Poll::Pending; + InboundSubstreamState::Closing(mut stream, mut closing_timeout) => { + match (stream.poll_close_unpin(cx), closing_timeout.poll_unpin(cx)) { + (Poll::Ready(Ok(())), _) | (Poll::Ready(Err(_)), _) => { + return Poll::Ready(None) + } + (_, Poll::Ready(())) => { + tracing::debug!("Inbound substream timed out waiting for close"); + return Poll::Ready(None); + } + (Poll::Pending, Poll::Pending) => { + *this = InboundSubstreamState::Closing(stream, closing_timeout); + return Poll::Pending; + } } - }, + } InboundSubstreamState::Poisoned { .. } => unreachable!(), InboundSubstreamState::Cancelled => return Poll::Ready(None), } From 8053eb9d67664d8770b9ea5e7d458dacab675599 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 10 Apr 2025 15:10:08 +1000 Subject: [PATCH 3/3] Only use a timeout for the first inbound kad message --- protocols/kad/src/handler.rs | 56 ++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index d0938566ff6..29fcc772452 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -103,12 +103,12 @@ struct ProtocolStatus { enum InboundSubstreamState { /// Waiting for a request from the remote. WaitingMessage { - /// Whether it is the first message to be awaited on this stream. - first: bool, + /// How long before we give up on waiting for the first request on this stream. + /// `None` if there has already been a request on this stream, or it has timed out and can + /// be re-used. + first_request_timeout: Option, connection_id: UniqueConnecId, substream: KadInStreamSink, - /// How long before we give up on waiting for a request. - request_timeout: Delay, }, /// Waiting for the behaviour to send a [`HandlerIn`] event containing the response. WaitingBehaviour( @@ -191,10 +191,9 @@ impl InboundSubstreamState { }, ) { InboundSubstreamState::WaitingMessage { + first_request_timeout: _, substream, - first: _, connection_id: _, - request_timeout: _, } | InboundSubstreamState::WaitingBehaviour(_, substream, _, _) | InboundSubstreamState::PendingSend(_, substream, _, _) @@ -559,7 +558,10 @@ impl Handler { matches!( s, // An inbound substream waiting to be reused. - InboundSubstreamState::WaitingMessage { first: false, .. } + InboundSubstreamState::WaitingMessage { + first_request_timeout: None, + .. + } ) }) { *s = InboundSubstreamState::Cancelled; @@ -582,10 +584,9 @@ impl Handler { self.next_connec_unique_id.0 += 1; self.inbound_substreams .push(InboundSubstreamState::WaitingMessage { - first: true, + first_request_timeout: Some(Delay::new(SUBSTREAM_TIMEOUT)), connection_id: connec_unique_id, substream: protocol, - request_timeout: Delay::new(SUBSTREAM_TIMEOUT), }); } @@ -921,13 +922,12 @@ impl futures::Stream for InboundSubstreamState { }, ) { InboundSubstreamState::WaitingMessage { - first, + mut first_request_timeout, connection_id, mut substream, - mut request_timeout, } => match ( substream.poll_next_unpin(cx), - request_timeout.poll_unpin(cx), + first_request_timeout.as_mut().map(|t| t.poll_unpin(cx)), ) { // Prefer ready requests over ready timeouts (Poll::Ready(Some(Ok(KadRequestMsg::Ping))), _) => { @@ -971,13 +971,11 @@ impl futures::Stream for InboundSubstreamState { ))); } (Poll::Ready(Some(Ok(KadRequestMsg::AddProvider { key, provider }))), _) => { - // The request has finished, so renew the request timeout - let request_timeout = Delay::new(SUBSTREAM_TIMEOUT); + // This request type requires no response *this = InboundSubstreamState::WaitingMessage { - first: false, + first_request_timeout: None, connection_id, substream, - request_timeout, }; return Poll::Ready(Some(ConnectionHandlerEvent::NotifyBehaviour( HandlerEvent::AddProvider { key, provider }, @@ -1017,30 +1015,28 @@ impl futures::Stream for InboundSubstreamState { }, ))); } - (Poll::Pending, Poll::Pending) => { + (Poll::Pending, Some(Poll::Pending)) | (Poll::Pending, None) => { // Keep the original request timeout *this = InboundSubstreamState::WaitingMessage { - first, + first_request_timeout, connection_id, substream, - request_timeout, }; return Poll::Pending; } - (Poll::Pending, Poll::Ready(())) => { + (Poll::Pending, Some(Poll::Ready(()))) => { tracing::debug!( - ?first, + first = ?first_request_timeout.is_some(), ?connection_id, "Inbound substream timed out waiting for request", ); - // Renew the request timeout, but mark this substream as available for re-use - let request_timeout = Delay::new(SUBSTREAM_TIMEOUT); + // Drop the first request timeout, and mark this substream as available for + // re-use *this = InboundSubstreamState::WaitingMessage { - first: false, + first_request_timeout: None, connection_id, substream, - request_timeout, }; return Poll::Pending; } @@ -1086,7 +1082,12 @@ impl futures::Stream for InboundSubstreamState { return Poll::Pending; } }, - InboundSubstreamState::PendingSend(id, mut substream, msg, mut response_timeout) => { + InboundSubstreamState::PendingSend( + id, + mut substream, + msg, + mut response_timeout, + ) => { match ( substream.poll_ready_unpin(cx), response_timeout.poll_unpin(cx), @@ -1130,10 +1131,9 @@ impl futures::Stream for InboundSubstreamState { ) { (Poll::Ready(Ok(())), _) => { *this = InboundSubstreamState::WaitingMessage { - first: false, + first_request_timeout: None, connection_id: id, substream, - request_timeout: Delay::new(SUBSTREAM_TIMEOUT), }; } (Poll::Pending, Poll::Pending) => {