From 6ab5b97a1389d0a2c048d8a2087b988265c74204 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 17 Mar 2025 19:17:29 +0100 Subject: [PATCH] hack: Prioritise UDP disco messages This should help with disco messages being dropped because the AsyncUdpSocket is hogging the socket's internal lock because it being asked to send so much data. The unbounded queue is essentially bounded by the concurrency of the writers to the socket, so will stay small. --- Cargo.lock | 34 ++++++++++++++++++ iroh/Cargo.toml | 1 + iroh/src/magicsock.rs | 83 +++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 115 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a0798e5ad..269903312e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -185,6 +185,18 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-recursion" version = "1.1.1" @@ -1151,6 +1163,27 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "event-listener" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -2233,6 +2266,7 @@ version = "0.33.0" dependencies = [ "aead", "anyhow", + "async-channel", "atomic-waker", "axum", "backon", diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 89d7275738..65b76ddb2b 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -23,6 +23,7 @@ workspace = true [dependencies] aead = { version = "0.5.2", features = ["bytes"] } anyhow = { version = "1" } +async-channel = "2.3.1" atomic-waker = "1.1.2" concurrent-queue = "2.5" backon = { version = "1.4" } diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index c23e5cf08b..f620121fd7 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -228,7 +228,9 @@ pub(crate) struct MagicSock { disco_secrets: DiscoSecrets, /// UDP disco (ping) queue - udp_disco_sender: mpsc::Sender<(SocketAddr, PublicKey, disco::Message)>, + udp_disco_sender: async_channel::Sender<(SocketAddr, PublicKey, disco::Message)>, + udp_disco_receiver: async_channel::Receiver<(SocketAddr, PublicKey, disco::Message)>, + udp_disco_blocked: std::sync::Mutex>, /// Optional discovery service discovery: Option>, @@ -474,6 +476,79 @@ impl MagicSock { "connection closed", )); } + + // First empty out any disco messages, these have priority. Best-effort though, do + // not propagate any errors. + let mut pop_udp_disco_receiver = true; + while let Ok((dst, dst_key, msg)) = self.udp_disco_blocked.lock().unwrap().pop() { + let span = info_span!("udp-disco", %dst, node = %dst_key.fmt_short(), ?msg); + let _guard = span.enter(); + let packet = self.encode_disco_message(dst_key, &msg); + inc!(MagicsockMetrics, send_disco_udp); + let transmit = quinn_udp::Transmit { + destination: dst, + ecn: None, + contents: &packet, + segment_size: None, + src_ip: None, + }; + match self.try_send_udp(dst, &transmit) { + Ok(_) => { + inc!(MagicsockMetrics, send_disco_udp); + trace!("sent message"); + } + Err(err) if err.kind() != io::ErrorKind::WouldBlock => { + warn!(?err, "failed to send"); + pop_udp_disco_receiver = false; + break; + } + _ => { + trace!("would block"); + self.udp_disco_blocked + .lock() + .unwrap() + .push((dst, dst_key, msg)) + .ok(); + pop_udp_disco_receiver = false; + break; + } + } + } + if pop_udp_disco_receiver { + while let Ok((dst, dst_key, msg)) = self.udp_disco_receiver.try_recv() { + let span = info_span!("udp-disco", %dst, node = %dst_key.fmt_short(), ?msg); + let _guard = span.enter(); + let packet = self.encode_disco_message(dst_key, &msg); + inc!(MagicsockMetrics, send_disco_udp); + let transmit = quinn_udp::Transmit { + destination: dst, + ecn: None, + contents: &packet, + segment_size: None, + src_ip: None, + }; + match self.try_send_udp(dst, &transmit) { + Ok(_) => { + inc!(MagicsockMetrics, send_disco_udp); + trace!("sent message"); + } + Err(err) if err.kind() != io::ErrorKind::WouldBlock => { + warn!(?err, "failed to send"); + break; + } + _ => { + trace!("would block"); + self.udp_disco_blocked + .lock() + .unwrap() + .push((dst, dst_key, msg)) + .ok(); + break; + } + } + } + } + match MappedAddr::from(transmit.destination) { MappedAddr::None(dest) => { error!(%dest, "Cannot convert to a mapped address, voiding transmit."); @@ -1704,7 +1779,7 @@ impl Handle { let (relay_actor_sender, relay_actor_receiver) = mpsc::channel(256); let (relay_datagram_send_tx, relay_datagram_send_rx) = relay_datagram_send_channel(); let relay_datagram_recv_queue = Arc::new(RelayDatagramRecvQueue::new()); - let (udp_disco_sender, mut udp_disco_receiver) = mpsc::channel(256); + let (udp_disco_sender, udp_disco_receiver) = async_channel::bounded(256); // load the node data let node_map = node_map.unwrap_or_default(); @@ -1736,6 +1811,8 @@ impl Handle { node_map, ip_mapped_addrs, udp_disco_sender, + udp_disco_receiver: udp_disco_receiver.clone(), + udp_disco_blocked: std::sync::Mutex::new(ConcurrentQueue::unbounded()), discovery, discovery_user_data: RwLock::new(discovery_user_data), direct_addrs: Default::default(), @@ -1783,7 +1860,7 @@ impl Handle { let _ = actor_tasks.spawn({ let msock = msock.clone(); async move { - while let Some((dst, dst_key, msg)) = udp_disco_receiver.recv().await { + while let Ok((dst, dst_key, msg)) = udp_disco_receiver.recv().await { if let Err(err) = msock.send_disco_message_udp(dst, dst_key, &msg).await { warn!(%dst, node = %dst_key.fmt_short(), ?err, "failed to send disco message (UDP)"); }