Skip to content

Commit 9a930fe

Browse files
committed
Remove redundant listening hashmap from uTP listener and clear dead code
1 parent 5c04aa2 commit 9a930fe

File tree

4 files changed

+14
-187
lines changed

4 files changed

+14
-187
lines changed

trin-core/src/portalnet/overlay.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
101101
config: OverlayConfig,
102102
discovery: Arc<Discovery>,
103103
utp_listener_tx: UnboundedSender<UtpListenerRequest>,
104-
utp_listener_rx: Arc<TRwLock<UnboundedReceiver<UtpListenerEvent>>>,
104+
_utp_listener_rx: Arc<TRwLock<UnboundedReceiver<UtpListenerEvent>>>,
105105
storage: Arc<RwLock<PortalStorage>>,
106106
data_radius: U256,
107107
protocol: ProtocolId,
@@ -124,7 +124,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
124124
Arc::clone(&data_radius),
125125
protocol.clone(),
126126
utp_listener_tx.clone(),
127-
utp_listener_rx,
128127
config.enable_metrics,
129128
)
130129
.await
@@ -285,13 +284,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
285284
enr: Enr,
286285
conn_id: u16,
287286
) -> Result<Content, OverlayRequestError> {
288-
let utp_request = UtpListenerRequest::FindContentStream(conn_id);
289-
if let Err(err) = self.utp_listener_tx.send(utp_request) {
290-
return Err(OverlayRequestError::UtpError(format!(
291-
"Unable to send uTP FindContent stream request: {err}"
292-
)));
293-
}
294-
295287
// initiate the connection to the acceptor
296288
let (tx, rx) = tokio::sync::oneshot::channel::<anyhow::Result<UtpSocket>>();
297289
self.utp_listener_tx
@@ -401,11 +393,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
401393
return Ok(response);
402394
}
403395

404-
let utp_request = UtpListenerRequest::OfferStream(conn_id);
405-
if let Err(err) = self.utp_listener_tx.send(utp_request) {
406-
return Err(anyhow!("Unable to send uTP Offer stream request: {err}"));
407-
}
408-
409396
// initiate the connection to the acceptor
410397
let (tx, rx) = tokio::sync::oneshot::channel::<anyhow::Result<UtpSocket>>();
411398

trin-core/src/portalnet/overlay_service.rs

Lines changed: 6 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,7 @@ use crate::{
1919
utp::stream::UtpListenerRequest,
2020
};
2121

22-
use crate::{
23-
portalnet::types::content_key::RawContentKey,
24-
utp::{
25-
stream::{UtpListenerEvent, UtpPayload},
26-
trin_helpers::UtpStreamId,
27-
},
28-
};
22+
use crate::utp::trin_helpers::UtpStreamId;
2923
use delay_map::HashSetDelay;
3024
use discv5::{
3125
enr::NodeId,
@@ -43,10 +37,7 @@ use rand::seq::SliceRandom;
4337
use ssz::Encode;
4438
use ssz_types::{BitList, VariableList};
4539
use thiserror::Error;
46-
use tokio::sync::{
47-
mpsc::{self, UnboundedReceiver, UnboundedSender},
48-
RwLock as TRwLock,
49-
};
40+
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
5041

5142
/// Maximum number of ENRs in response to FindNodes.
5243
pub const FIND_NODES_MAX_NODES: usize = 32;
@@ -265,8 +256,6 @@ pub struct OverlayService<TContentKey, TMetric> {
265256
response_tx: UnboundedSender<OverlayResponse>,
266257
/// The sender half of a channel to send requests to uTP listener
267258
utp_listener_tx: UnboundedSender<UtpListenerRequest>,
268-
/// Receiver for UtpListener emitted events
269-
utp_listener_rx: Arc<TRwLock<UnboundedReceiver<UtpListenerEvent>>>,
270259
/// Phantom content key.
271260
phantom_content_key: PhantomData<TContentKey>,
272261
/// Phantom metric (distance function).
@@ -292,7 +281,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
292281
data_radius: Arc<U256>,
293282
protocol: ProtocolId,
294283
utp_listener_sender: UnboundedSender<UtpListenerRequest>,
295-
utp_listener_receiver: Arc<TRwLock<UnboundedReceiver<UtpListenerEvent>>>,
296284
enable_metrics: bool,
297285
) -> Result<UnboundedSender<OverlayRequest>, String> {
298286
let (request_tx, request_rx) = mpsc::unbounded_channel();
@@ -325,7 +313,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
325313
response_rx,
326314
response_tx,
327315
utp_listener_tx: utp_listener_sender,
328-
utp_listener_rx: utp_listener_receiver,
329316
phantom_content_key: PhantomData,
330317
phantom_metric: PhantomData,
331318
metrics,
@@ -397,9 +384,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
397384
tokio::time::interval(Duration::from_secs(BUCKET_REFRESH_INTERVAL));
398385

399386
loop {
400-
// let utp_listener_rx = self.utp_listener_rx.clone();
401-
// let mut utp_listener_lock = utp_listener_rx.write_with_warn().await;
402-
403387
tokio::select! {
404388
Some(request) = self.request_rx.recv() => self.process_request(request),
405389
Some(response) = self.response_rx.recv() => {
@@ -428,14 +412,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
428412
self.peers_to_ping.insert(node_id);
429413
}
430414
}
431-
// Some(utp_event) = utp_listener_lock.recv() => {
432-
// match utp_event {
433-
// UtpListenerEvent::ProcessedClosedStreams(processed_streams) =>
434-
// {
435-
// self.handle_utp_payload(processed_streams);
436-
// }
437-
// }
438-
// }
439415
_ = OverlayService::<TContentKey, TMetric>::bucket_maintenance_poll(self.protocol.clone(), &self.kbuckets) => {}
440416
_ = bucket_refresh_interval.tick() => {
441417
debug!("[{:?}] Overlay bucket refresh lookup", self.protocol);
@@ -666,26 +642,11 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
666642
} else {
667643
let conn_id: u16 = crate::utp::stream::rand();
668644

669-
// listen for incoming connection request on conn_id, as part of utp handshake and
670-
// temporarily storing content data, so we can send it right after we receive
645+
// Listen for incoming uTP connection request on as part of uTP handshake and
646+
// storing content data, so we can send it inside UtpListener right after we receive
671647
// SYN packet from the requester
672-
let utp_request = UtpListenerRequest::FindContentData(conn_id, content.clone());
673-
if let Err(err) = self.utp_listener_tx.send(utp_request) {
674-
return Err(OverlayRequestError::UtpError(format!(
675-
"Unable to send uTP FindContentData stream request: {err}"
676-
)));
677-
}
678-
679-
// also listen on conn_id + 1 because this is the receive path
680648
let conn_id_recv = conn_id.wrapping_add(1);
681649

682-
let utp_request = UtpListenerRequest::FindContentStream(conn_id_recv);
683-
if let Err(err) = self.utp_listener_tx.send(utp_request) {
684-
return Err(OverlayRequestError::UtpError(format!(
685-
"Unable to send uTP FindContent stream request: {err}"
686-
)));
687-
}
688-
689650
self.add_utp_connection(
690651
source,
691652
conn_id_recv,
@@ -704,8 +665,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
704665
}
705666
}
706667
Err(msg) => Err(OverlayRequestError::Failure(format!(
707-
"Unable to respond to FindContent: {}",
708-
msg
668+
"Unable to respond to FindContent: {msg}",
709669
))),
710670
}
711671
}
@@ -748,24 +708,8 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
748708
})?;
749709
}
750710

751-
// listen for incoming connection request on conn_id, as part of utp handshake
711+
// Listen for incoming connection request on conn_id, as part of utp handshake
752712
let conn_id: u16 = crate::utp::stream::rand();
753-
let utp_request = UtpListenerRequest::OfferStream(conn_id);
754-
if let Err(err) = self.utp_listener_tx.send(utp_request) {
755-
return Err(OverlayRequestError::UtpError(format!(
756-
"Unable to send uTP Offer stream request: {err}"
757-
)));
758-
}
759-
760-
// also listen on conn_id + 1 because this is the actual receive path for acceptor
761-
let conn_id_recv = conn_id.wrapping_add(1);
762-
763-
let utp_request = UtpListenerRequest::AcceptStream(conn_id_recv, accept_keys.clone());
764-
if let Err(err) = self.utp_listener_tx.send(utp_request) {
765-
return Err(OverlayRequestError::UtpError(format!(
766-
"Unable to send uTP Accept stream request: {err}"
767-
)));
768-
}
769713

770714
self.add_utp_connection(source, conn_id, UtpStreamId::AcceptStream(accept_keys))?;
771715

@@ -777,19 +721,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
777721
Ok(accept)
778722
}
779723

780-
/// Handle all closed uTP streams, currently we process only AcceptStream here.
781-
/// FindContent payload is processed explicitly when we send FindContent request.
782-
fn handle_utp_payload(&self, streams: Vec<(UtpPayload, UtpStreamId)>) {
783-
for stream in streams {
784-
match stream {
785-
(payload, UtpStreamId::AcceptStream(content_keys)) => {
786-
self.process_accept_utp_payload(content_keys, payload);
787-
}
788-
_ => {}
789-
}
790-
}
791-
}
792-
793724
/// Sends a TALK request via Discovery v5 to some destination node.
794725
fn send_talk_req(&self, request: Request, request_id: OverlayRequestId, destination: Enr) {
795726
let discovery = Arc::clone(&self.discovery);
@@ -821,12 +752,6 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
821752
});
822753
}
823754

824-
/// Process accepted uTP payload of the OFFER?ACCEPT stream
825-
fn process_accept_utp_payload(&self, content_keys: Vec<RawContentKey>, payload: UtpPayload) {
826-
// TODO: Verify the payload, store the content and propagate gossip.
827-
debug!("DEBUG: Processing content keys: {content_keys:?}, with payload: {payload:?}, protocol: {:?}", self.protocol);
828-
}
829-
830755
/// Processes an incoming request from some source node.
831756
fn process_incoming_request(&mut self, request: Request, _id: RequestId, source: NodeId) {
832757
// Look up the node in the routing table.
@@ -1354,7 +1279,6 @@ mod tests {
13541279
let discovery = Arc::new(Discovery::new(portal_config).unwrap());
13551280

13561281
let (utp_listener_tx, _) = unbounded_channel::<UtpListenerRequest>();
1357-
let (_, utp_listener_rx) = unbounded_channel::<UtpListenerEvent>();
13581282

13591283
// Initialize DB config
13601284
let storage_capacity: u32 = DEFAULT_STORAGE_CAPACITY.parse().unwrap();
@@ -1392,7 +1316,6 @@ mod tests {
13921316
response_tx,
13931317
response_rx,
13941318
utp_listener_tx,
1395-
utp_listener_rx: Arc::new(TRwLock::new(utp_listener_rx)),
13961319
phantom_content_key: PhantomData,
13971320
phantom_metric: PhantomData,
13981321
metrics,

trin-core/src/utp/stream.rs

Lines changed: 6 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@ use tokio::{
2121

2222
use crate::{
2323
locks::RwLoggingExt,
24-
portalnet::types::{
25-
content_key::RawContentKey,
26-
messages::{ByteList, ProtocolId},
27-
},
24+
portalnet::types::messages::ProtocolId,
2825
utp::{
2926
packets::{ExtensionType, Packet, PacketType, HEADER_SIZE},
3027
time::{now_microseconds, Delay, Timestamp},
@@ -114,8 +111,6 @@ struct DelayDifferenceSample {
114111
/// Represent overlay to uTP listener request. It is used as a way to communicate between the overlay protocol
115112
/// and uTP listener
116113
pub enum UtpListenerRequest {
117-
/// Request to listen for Accept stream
118-
AcceptStream(ConnId, Vec<RawContentKey>),
119114
/// Request to initialize uTP stream with remote node
120115
Connect(
121116
ConnId,
@@ -126,12 +121,6 @@ pub enum UtpListenerRequest {
126121
),
127122
/// Request to add uTP stream to the active connections
128123
AddActiveConnection(Enr, ProtocolId, UtpStreamId, ConnId),
129-
/// Request to listen for FindCOntent stream and send content data
130-
FindContentData(ConnId, ByteList),
131-
/// Request to listen for FindContent stream
132-
FindContentStream(ConnId),
133-
/// Request to listen for Offer stream
134-
OfferStream(ConnId),
135124
}
136125

137126
/// Result from processing all closed uTP streams. Includes a tuple with the payload and the stream id.
@@ -152,13 +141,13 @@ pub enum UtpStreamEvent {
152141
}
153142

154143
/// Main uTP service used to listen and handle all uTP connections and streams
144+
// FIXME: Deny dead_code
145+
#[allow(dead_code)]
155146
pub struct UtpListener {
156147
/// Base discv5 layer
157148
discovery: Arc<Discovery>,
158149
/// Store all active connections
159150
utp_connections: HashMap<ConnectionKey, UtpSocket>,
160-
/// uTP connection ids to listen for
161-
listening: HashMap<ConnId, UtpStreamId>,
162151
/// Receiver for uTP events sent from the main portal event handler
163152
utp_event_rx: UnboundedReceiver<TalkRequest>,
164153
/// Sender to overlay layer with processed uTP stream
@@ -196,7 +185,6 @@ impl UtpListener {
196185
UtpListener {
197186
discovery,
198187
utp_connections: HashMap::new(),
199-
listening: HashMap::new(),
200188
utp_event_rx,
201189
overlay_tx,
202190
overlay_rx: utp_listener_rx,
@@ -218,14 +206,6 @@ impl UtpListener {
218206
Some(overlay_request) = self.overlay_rx.recv() => {
219207
self.process_overlay_request(overlay_request).await
220208
},
221-
// _ = process_utp_streams_interval.tick() => {
222-
// let processed_streams = self.process_closed_streams();
223-
//
224-
// if let Err(err) = self.overlay_tx.send(UtpListenerEvent::ProcessedClosedStreams(processed_streams)) {
225-
// error!("Unable to send ProcessClosedStreams event to overlay layer: {err}");
226-
// continue
227-
// }
228-
// }
229209
}
230210
}
231211
}
@@ -270,15 +250,9 @@ impl UtpListener {
270250
return;
271251
}
272252

273-
// Get ownership of FindContentData and re-add the receiver connection
274-
let utp_message_id = self.listening.remove(&conn.sender_connection_id);
275-
276-
// TODO: Probably there is a better way with lifetimes to pass the HashMap value to a
277-
// different thread without removing the key and re-adding it.
278-
self.listening
279-
.insert(conn.sender_connection_id, UtpStreamId::FindContentStream);
280-
281-
if let Some(UtpStreamId::FindContentData(content_data)) = utp_message_id
253+
// Send content data if the stream is listening for FindContent SYN packet
254+
if let UtpStreamId::FindContentData(content_data) =
255+
conn.stream_id.clone()
282256
{
283257
// We want to send uTP data only if the content is Content(ByteList)
284258
debug!(
@@ -390,27 +364,12 @@ impl UtpListener {
390364
let conn_key = ConnectionKey::new(connected_to.node_id(), conn_id_recv);
391365
self.utp_connections.insert(conn_key, conn);
392366
}
393-
UtpListenerRequest::FindContentStream(conn_id) => {
394-
self.listening
395-
.insert(conn_id, UtpStreamId::FindContentStream);
396-
}
397367
UtpListenerRequest::Connect(conn_id, node_id, protocol_id, stream_id, tx) => {
398368
let conn = self.connect(conn_id, node_id, protocol_id, stream_id).await;
399369
if tx.send(conn).is_err() {
400370
error!("Unable to send uTP socket to requester")
401371
};
402372
}
403-
UtpListenerRequest::OfferStream(conn_id) => {
404-
self.listening.insert(conn_id, UtpStreamId::OfferStream);
405-
}
406-
UtpListenerRequest::FindContentData(conn_id, content) => {
407-
self.listening
408-
.insert(conn_id, UtpStreamId::FindContentData(content));
409-
}
410-
UtpListenerRequest::AcceptStream(conn_id, accepted_keys) => {
411-
self.listening
412-
.insert(conn_id, UtpStreamId::AcceptStream(accepted_keys));
413-
}
414373
}
415374
}
416375

@@ -440,34 +399,6 @@ impl UtpListener {
440399
Err(anyhow!("Trying to connect to unknow Enr"))
441400
}
442401
}
443-
444-
/// Return and cleanup all active uTP streams where socket state is "Closed"
445-
pub fn process_closed_streams(&mut self) -> Vec<(UtpPayload, UtpStreamId)> {
446-
// This seems to be a hot loop, we may need to optimise it and find a better way to filter by closed
447-
// connections without cloning all records. One reasonable way is to use some data-oriented
448-
// design principles like Struct of Arrays vs. Array of Structs.
449-
self.utp_connections
450-
.clone()
451-
.iter()
452-
.filter(|conn| conn.1.state == SocketState::Closed)
453-
.map(|conn| {
454-
// Remove the closed connections from active connections
455-
let receiver_stream_id = self
456-
.listening
457-
.remove(&conn.1.receiver_connection_id)
458-
.expect("Receiver connection id should match active listening connections.");
459-
self.listening
460-
.remove(&conn.1.sender_connection_id)
461-
.expect("Sender connection id should match active listening connections.");
462-
let utp_socket = self
463-
.utp_connections
464-
.remove(conn.0)
465-
.expect("uTP socket should match asctive utp connections.");
466-
467-
(utp_socket.recv_data_stream, receiver_stream_id)
468-
})
469-
.collect()
470-
}
471402
}
472403

473404
// Used to be MicroTransportProtocol impl but it is basically just called UtpStream compared to the

0 commit comments

Comments
 (0)