Skip to content

Commit 52115d4

Browse files
committed
Process all closed uTP streams in UtpListener and handle the payload in overlay service
1 parent e7ece03 commit 52115d4

File tree

4 files changed

+103
-41
lines changed

4 files changed

+103
-41
lines changed

newsfragments/325.added.md

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Process all closed uTP streams in UtpListener and pass the payload to overlay service.

trin-core/src/portalnet/overlay_service.rs

+44-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::{
1919
utp::stream::UtpListenerRequest,
2020
};
2121

22+
use crate::utp::{stream::UtpPayload, trin_helpers::UtpStreamId};
2223
use delay_map::HashSetDelay;
2324
use discv5::{
2425
enr::NodeId,
@@ -45,6 +46,10 @@ pub const FIND_CONTENT_MAX_NODES: usize = 32;
4546
/// With even distribution assumptions, 2**17 is enough to put each node (estimating 100k nodes,
4647
/// which is more than 10x the ethereum mainnet node count) into a unique bucket by the 17th bucket index.
4748
const EXPECTED_NON_EMPTY_BUCKETS: usize = 17;
49+
/// Bucket refresh lookup interval in seconds
50+
const BUCKET_REFRESH_INTERVAL: u64 = 60;
51+
/// Process uTP streams interval in milliseconds
52+
const PROCESS_UTP_STREAMS_INTERVAL: u64 = 20;
4853

4954
/// An overlay request error.
5055
#[derive(Clone, Error, Debug)]
@@ -377,7 +382,10 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
377382
/// Bucket maintenance: Maintain the routing table (more info documented above function).
378383
async fn start(&mut self) {
379384
// Construct bucket refresh interval
380-
let mut bucket_refresh_interval = tokio::time::interval(Duration::from_secs(60));
385+
let mut bucket_refresh_interval =
386+
tokio::time::interval(Duration::from_secs(BUCKET_REFRESH_INTERVAL));
387+
let mut process_utp_streams_interval =
388+
tokio::time::interval(Duration::from_millis(PROCESS_UTP_STREAMS_INTERVAL));
381389

382390
loop {
383391
tokio::select! {
@@ -408,6 +416,22 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
408416
self.peers_to_ping.insert(node_id);
409417
}
410418
}
419+
_ = process_utp_streams_interval.tick() => {
420+
let (tx, rx) = tokio::sync::oneshot::channel::<Vec<(UtpPayload, UtpStreamId)>>();
421+
422+
// Send request to uTP listener to process all closed uTP streams and wait for response
423+
if let Err(err) = self.utp_listener_tx.send(UtpListenerRequest::ProcessClosedStreams(tx)) {
424+
error!("Unable to send ProcessClosedStreams request to uTP listener: {err}");
425+
continue
426+
}
427+
428+
match rx.await {
429+
Ok(streams) => {
430+
self.handle_utp_payload(streams);
431+
}
432+
Err(err) => error!("Unable to receive ProcessClosedStreams response from uTP listener: {err}")
433+
}
434+
}
411435
_ = OverlayService::<TContentKey, TMetric>::bucket_maintenance_poll(self.protocol.clone(), &self.kbuckets) => {}
412436
_ = bucket_refresh_interval.tick() => {
413437
debug!("[{:?}] Overlay bucket refresh lookup", self.protocol);
@@ -703,6 +727,19 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
703727
Ok(accept)
704728
}
705729

730+
/// Handle all closed uTP streams, currently we process only AcceptStream here.
731+
/// FindContent payload is processed explicitly when we send FindContent request.
732+
fn handle_utp_payload(&self, streams: Vec<(UtpPayload, UtpStreamId)>) {
733+
for stream in streams {
734+
match stream {
735+
(payload, UtpStreamId::AcceptStream(content_keys)) => {
736+
self.process_accept_utp_payload(content_keys, payload);
737+
}
738+
_ => {}
739+
}
740+
}
741+
}
742+
706743
/// Sends a TALK request via Discovery v5 to some destination node.
707744
fn send_talk_req(&self, request: Request, request_id: OverlayRequestId, destination: Enr) {
708745
let discovery = Arc::clone(&self.discovery);
@@ -734,6 +771,12 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
734771
});
735772
}
736773

774+
/// Process accepted uTP payload of the OFFER?ACCEPT stream
775+
fn process_accept_utp_payload(&self, content_keys: Vec<Vec<u8>>, payload: UtpPayload) {
776+
// TODO: Verify the payload, store the content and propagate gossip.
777+
debug!("DEBUG: Processing content keys: {content_keys:?}, with payload: {payload:?}");
778+
}
779+
737780
/// Processes an incoming request from some source node.
738781
fn process_incoming_request(&mut self, request: Request, _id: RequestId, source: NodeId) {
739782
// Look up the node in the routing table.

trin-core/src/utp/stream.rs

+55-36
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{
2525
utp::{
2626
packets::{ExtensionType, Packet, PacketType, HEADER_SIZE},
2727
time::{now_microseconds, Delay, Timestamp},
28-
trin_helpers::{UtpMessage, UtpMessageId},
28+
trin_helpers::{UtpMessage, UtpStreamId},
2929
util::{abs_diff, ewma, generate_sequential_identifiers},
3030
},
3131
};
@@ -62,6 +62,9 @@ const DISCV5_SOCKET_TIMEOUT: u64 = 25;
6262
/// uTP connection id
6363
type ConnId = u16;
6464

65+
/// uTP payload data
66+
pub type UtpPayload = Vec<u8>;
67+
6568
pub fn rand() -> u16 {
6669
rand::thread_rng().gen()
6770
}
@@ -111,6 +114,8 @@ pub enum UtpListenerRequest {
111114
FindContentData(ConnId, ByteList),
112115
/// Request to listen for FindContent stream
113116
FindContentStream(ConnId),
117+
/// Process all streams where uTP socket state is "Closed"
118+
ProcessClosedStreams(oneshot::Sender<Vec<(UtpPayload, UtpStreamId)>>),
114119
/// Request to listen for Offer stream
115120
OfferStream(ConnId),
116121
}
@@ -122,7 +127,7 @@ pub struct UtpListener {
122127
/// Store all active connections
123128
utp_connections: HashMap<ConnectionKey, UtpSocket>,
124129
/// uTP connection ids to listen for
125-
listening: HashMap<ConnId, UtpMessageId>,
130+
listening: HashMap<ConnId, UtpStreamId>,
126131
/// Receiver for uTP events sent from the main portal event handler
127132
utp_event_rx: UnboundedReceiver<TalkRequest>,
128133
/// Receiver for uTP requests sent from the overlay layer
@@ -221,9 +226,9 @@ impl UtpListener {
221226
// TODO: Probably there is a better way with lifetimes to pass the HashMap value to a
222227
// different thread without removing the key and re-adding it.
223228
self.listening
224-
.insert(conn.sender_connection_id, UtpMessageId::FindContentStream);
229+
.insert(conn.sender_connection_id, UtpStreamId::FindContentStream);
225230

226-
if let Some(UtpMessageId::FindContentData(Content(content_data))) =
231+
if let Some(UtpStreamId::FindContentData(Content(content_data))) =
227232
utp_message_id
228233
{
229234
// We want to send uTP data only if the content is Content(ByteList)
@@ -264,12 +269,17 @@ impl UtpListener {
264269
return;
265270
}
266271

272+
let mut result = Vec::new();
273+
267274
let mut buf = [0; BUF_SIZE];
268-
if let Err(msg) = conn.recv(&mut buf).await {
269-
error!("Unable to receive uTP DATA packet: {msg}")
270-
} else {
271-
conn.recv_data_stream
272-
.append(&mut Vec::from(packet.payload()));
275+
match conn.recv(&mut buf).await {
276+
Ok(bytes_read) => {
277+
if let Some(bytes) = bytes_read {
278+
result.extend_from_slice(&buf[..bytes]);
279+
conn.recv_data_stream.append(&mut result);
280+
}
281+
}
282+
Err(err) => error!("Unable to receive uTP DATA packet: {err}"),
273283
}
274284
}
275285
}
@@ -314,24 +324,29 @@ impl UtpListener {
314324
match request {
315325
UtpListenerRequest::FindContentStream(conn_id) => {
316326
self.listening
317-
.insert(conn_id, UtpMessageId::FindContentStream);
327+
.insert(conn_id, UtpStreamId::FindContentStream);
318328
}
319329
UtpListenerRequest::Connect(conn_id, node_id, tx) => {
320330
let conn = self.connect(conn_id, node_id).await;
321331
if tx.send(conn).is_err() {
322-
warn!("Unable to send uTP socket to requester")
332+
error!("Unable to send uTP socket to requester")
323333
};
324334
}
325335
UtpListenerRequest::OfferStream(conn_id) => {
326-
self.listening.insert(conn_id, UtpMessageId::OfferStream);
336+
self.listening.insert(conn_id, UtpStreamId::OfferStream);
327337
}
328338
UtpListenerRequest::FindContentData(conn_id, content) => {
329339
self.listening
330-
.insert(conn_id, UtpMessageId::FindContentData(Content(content)));
340+
.insert(conn_id, UtpStreamId::FindContentData(Content(content)));
331341
}
332342
UtpListenerRequest::AcceptStream(conn_id, accepted_keys) => {
333343
self.listening
334-
.insert(conn_id, UtpMessageId::AcceptStream(accepted_keys));
344+
.insert(conn_id, UtpStreamId::AcceptStream(accepted_keys));
345+
}
346+
UtpListenerRequest::ProcessClosedStreams(tx) => {
347+
if tx.send(self.process_closed_streams()).is_err() {
348+
error!("Unable to send closed uTP streams to requester")
349+
};
335350
}
336351
}
337352
}
@@ -355,28 +370,32 @@ impl UtpListener {
355370
}
356371
}
357372

358-
// https://github.com/ethereum/portal-network-specs/pull/98\
359-
// Currently the way to handle data over uTP isn't finalized yet, so we are going to use the
360-
// handle data on connection closed method, as that seems to be the accepted method for now.
361-
pub async fn process_utp_byte_stream(&mut self) {
362-
let mut utp_connections = self.utp_connections.clone();
363-
for (conn_key, conn) in self.utp_connections.iter_mut() {
364-
if conn.state == SocketState::Closed {
365-
let received_stream = conn.recv_data_stream.clone();
366-
debug!("Received data: with len: {}", received_stream.len());
367-
368-
match self.listening.get(&conn.receiver_connection_id) {
369-
Some(message_type) => {
370-
if let UtpMessageId::AcceptStream(content_keys) = message_type {
371-
// TODO: Implement this with overlay store and decode receiver stream if multiple content values are send
372-
debug!("Store {content_keys:?}, {received_stream:?}");
373-
}
374-
}
375-
_ => warn!("uTP listening HashMap doesn't have uTP stream message type"),
376-
}
377-
utp_connections.remove(conn_key);
378-
}
379-
}
373+
/// Return and cleanup all active uTP streams where socket state is "Closed"
374+
pub fn process_closed_streams(&mut self) -> Vec<(UtpPayload, UtpStreamId)> {
375+
// This seems to be a hot loop, we may need to optimise it and find a better way to filter by closed
376+
// connections without cloning all records. One reasonable way is to use some data-oriented
377+
// design principles like Struct of Arrays vs. Array of Structs.
378+
self.utp_connections
379+
.clone()
380+
.iter()
381+
.filter(|conn| conn.1.state == SocketState::Closed)
382+
.map(|conn| {
383+
// Remove the closed connections from active connections
384+
let receiver_stream_id = self
385+
.listening
386+
.remove(&conn.1.receiver_connection_id)
387+
.expect("Receiver connection id should match active listening connections.");
388+
self.listening
389+
.remove(&conn.1.sender_connection_id)
390+
.expect("Sender connection id should match active listening connections.");
391+
let utp_socket = self
392+
.utp_connections
393+
.remove(conn.0)
394+
.expect("uTP socket should match asctive utp connections.");
395+
396+
(utp_socket.recv_data_stream, receiver_stream_id)
397+
})
398+
.collect()
380399
}
381400
}
382401

trin-core/src/utp/trin_helpers.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,9 @@ pub struct UtpAccept {
5151
pub message: Vec<(Vec<u8>, Vec<u8>)>,
5252
}
5353

54-
// This is not in a spec, this is just for internally tracking for what portal message
55-
// negotiated the uTP stream
56-
#[derive(Debug, Clone)]
57-
pub enum UtpMessageId {
54+
/// Used to track which stream to which overlay request correspond
55+
#[derive(Debug, Clone, PartialEq)]
56+
pub enum UtpStreamId {
5857
FindContentStream,
5958
FindContentData(Content),
6059
OfferStream,

0 commit comments

Comments
 (0)