Skip to content

Commit 13d22f6

Browse files
authored
Merge pull request #325 from ogenev/handle-utp-accept-payload
Emit global uTP payload event in uTP listener when an uTP stream is closed/reset
2 parents c47b688 + 92f7a2e commit 13d22f6

File tree

17 files changed

+773
-391
lines changed

17 files changed

+773
-391
lines changed

Cargo.lock

Lines changed: 48 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

newsfragments/325.added.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
- Rename `UtpSocket` to `UtpStream`.
2+
- Refactor the way we are storing the received payload (DATA packets) in the uTP stream.
3+
- Add a new AddActiveConnection UtpListener request and move the initialization of a uTP stream inside UtpListener.
4+
- Add UtpStream -> UtpListener event channel and emit event inside UtpStream when stream state changes to Closed or Reset.
5+
- Emit a global uTP listener event containing a uTP payload when a stream is closed.
6+
- Remove redundant and dead code.

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub async fn run_trin(
5252
};
5353

5454
// Initialize and spawn UTP listener
55-
let (utp_events_tx, utp_listener_tx, mut utp_listener) =
55+
let (utp_events_tx, utp_listener_tx, _, mut utp_listener) =
5656
UtpListener::new(Arc::clone(&discovery));
5757
tokio::spawn(async move { utp_listener.start().await });
5858

trin-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,4 @@ features = ["bundled"]
6969

7070
[dev-dependencies]
7171
quickcheck = "1.0.3"
72+
ntest = "0.8.0"

trin-core/src/jsonrpc/types.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use validator::{Validate, ValidationError};
1010
use crate::{
1111
jsonrpc::endpoints::{HistoryEndpoint, StateEndpoint, TrinEndpoint},
1212
portalnet::types::{
13-
content_key::OverlayContentKey,
13+
content_key::{OverlayContentKey, RawContentKey},
1414
messages::{ByteList, CustomPayload, SszEnr},
1515
},
1616
utils::bytes::hex_decode,
@@ -275,7 +275,7 @@ impl TryFrom<[&Value; 2]> for OfferParams {
275275
.collect();
276276

277277
if let Ok(content_keys) = content_keys {
278-
let content_keys: Result<Vec<Vec<u8>>, _> = content_keys
278+
let content_keys: Result<Vec<RawContentKey>, _> = content_keys
279279
.iter()
280280
.map(|s| hex_decode(s.as_str()))
281281
.collect();

trin-core/src/portalnet/discovery.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use rand::seq::SliceRandom;
1414
use serde_json::{json, Value};
1515
use std::{
1616
convert::TryFrom,
17+
fmt,
1718
net::{IpAddr, SocketAddr},
1819
sync::Arc,
1920
time::Duration,
@@ -54,6 +55,18 @@ pub struct Discovery {
5455
pub listen_socket: SocketAddr,
5556
}
5657

58+
impl fmt::Debug for Discovery {
59+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
60+
write!(
61+
f,
62+
"Discovery: ( enr: {}, started: {}, listen_socket: {} )",
63+
self.discv5.local_enr(),
64+
self.started,
65+
self.listen_socket
66+
)
67+
}
68+
}
69+
5770
impl Discovery {
5871
pub fn new(portal_config: PortalnetConfig) -> Result<Self, String> {
5972
let listen_all_ips = SocketAddr::new("0.0.0.0".parse().unwrap(), portal_config.listen_port);

trin-core/src/portalnet/overlay.rs

Lines changed: 73 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@ use crate::portalnet::{
1515
},
1616
};
1717

18-
use crate::utp::{
19-
stream::{UtpListenerRequest, UtpSocket, BUF_SIZE},
20-
trin_helpers::{UtpAccept, UtpMessage},
18+
use crate::{
19+
portalnet::types::content_key::RawContentKey,
20+
utp::{
21+
stream::{UtpListenerRequest, UtpStream, BUF_SIZE},
22+
trin_helpers::{UtpAccept, UtpMessage, UtpStreamId},
23+
},
2124
};
2225
use discv5::{
2326
enr::NodeId,
@@ -277,48 +280,42 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
277280
enr: Enr,
278281
conn_id: u16,
279282
) -> Result<Content, OverlayRequestError> {
280-
let utp_request = UtpListenerRequest::FindContentStream(conn_id);
281-
if let Err(err) = self.utp_listener_tx.send(utp_request) {
282-
return Err(OverlayRequestError::UtpError(format!(
283-
"Unable to send uTP FindContent stream request: {err}"
284-
)));
285-
}
286-
287283
// initiate the connection to the acceptor
288-
let (tx, rx) = tokio::sync::oneshot::channel::<anyhow::Result<UtpSocket>>();
289-
290-
let _ = self
291-
.utp_listener_tx
292-
.send(UtpListenerRequest::Connect(conn_id, enr.node_id(), tx));
284+
let (tx, rx) = tokio::sync::oneshot::channel::<UtpStream>();
285+
let utp_request = UtpListenerRequest::Connect(
286+
conn_id,
287+
enr,
288+
self.protocol.clone(),
289+
UtpStreamId::FindContentStream,
290+
tx,
291+
);
292+
293+
self.utp_listener_tx.send(utp_request).map_err(|err| {
294+
OverlayRequestError::UtpError(format!(
295+
"Unable to send Connect request with FindContent stream to UtpListener: {err}"
296+
))
297+
})?;
293298

294299
match rx.await {
295-
Ok(conn) => {
296-
match conn {
297-
Ok(mut conn) => {
298-
let mut result = Vec::new();
299-
// Loop and receive all DATA packets, similar to `read_to_end`
300-
loop {
301-
let mut buf = [0; BUF_SIZE];
302-
match conn.recv_from(&mut buf).await {
303-
Ok((0, _)) => {
304-
break;
305-
}
306-
Ok((bytes, _)) => {
307-
result.extend_from_slice(&mut buf[..bytes]);
308-
}
309-
Err(err) => {
310-
warn!("Unable to receive content via uTP: {err}");
311-
return Err(OverlayRequestError::UtpError(err.to_string()));
312-
}
313-
}
300+
Ok(mut conn) => {
301+
let mut result = Vec::new();
302+
// Loop and receive all DATA packets, similar to `read_to_end`
303+
loop {
304+
let mut buf = [0; BUF_SIZE];
305+
match conn.recv_from(&mut buf).await {
306+
Ok((0, _)) => {
307+
break;
308+
}
309+
Ok((bytes, _)) => {
310+
result.extend_from_slice(&mut buf[..bytes]);
311+
}
312+
Err(err) => {
313+
warn!("Unable to receive content via uTP: {err}");
314+
return Err(OverlayRequestError::UtpError(err.to_string()));
314315
}
315-
Ok(Content::Content(VariableList::from(result)))
316-
}
317-
Err(err) => {
318-
warn!("Unable to initiate uTP stream with remote node. Error initializing uTP socket: {err}");
319-
Err(OverlayRequestError::UtpError(err.to_string()))
320316
}
321317
}
318+
Ok(Content::Content(VariableList::from(result)))
322319
}
323320
Err(err) => {
324321
warn!("Unable to receive from uTP listener channel: {err}");
@@ -331,7 +328,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
331328
/// Offer is also sent to nodes after FindContent (POKE)
332329
pub async fn send_offer(
333330
&self,
334-
content_keys: Vec<Vec<u8>>,
331+
content_keys: Vec<RawContentKey>,
335332
enr: Enr,
336333
) -> Result<Accept, OverlayRequestError> {
337334
// Construct the request.
@@ -384,49 +381,44 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
384381
return Ok(response);
385382
}
386383

387-
let utp_request = UtpListenerRequest::OfferStream(conn_id);
388-
if let Err(err) = self.utp_listener_tx.send(utp_request) {
389-
return Err(anyhow!("Unable to send uTP Offer stream request: {err}"));
390-
}
391-
392384
// initiate the connection to the acceptor
393-
let (tx, rx) = tokio::sync::oneshot::channel::<anyhow::Result<UtpSocket>>();
394-
395-
let _ = self
396-
.utp_listener_tx
397-
.send(UtpListenerRequest::Connect(conn_id, enr.node_id(), tx));
385+
let (tx, rx) = tokio::sync::oneshot::channel::<UtpStream>();
386+
let utp_request = UtpListenerRequest::Connect(
387+
conn_id,
388+
enr,
389+
self.protocol.clone(),
390+
UtpStreamId::OfferStream,
391+
tx,
392+
);
393+
394+
self.utp_listener_tx
395+
.send(utp_request).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?;
396+
397+
let mut conn = rx.await?;
398+
// Handle STATE packet for SYN
399+
let mut buf = [0; BUF_SIZE];
400+
conn.recv(&mut buf).await?;
401+
402+
let content_items = self.provide_requested_content(&response, content_keys_offered);
403+
404+
let content_message = UtpAccept {
405+
message: content_items,
406+
};
398407

399-
match rx.await? {
400-
Ok(mut conn) => {
401-
// Handle STATE packet for SYN
402-
let mut buf = [0; BUF_SIZE];
403-
conn.recv(&mut buf).await?;
404-
405-
let content_items = self.provide_requested_content(&response, content_keys_offered);
406-
407-
let content_message = UtpAccept {
408-
message: content_items,
409-
};
410-
411-
tokio::spawn(async move {
412-
// send the content to the acceptor over a uTP stream
413-
if let Err(err) = conn
414-
.send_to(&UtpMessage::new(content_message.as_ssz_bytes()).encode()[..])
415-
.await
416-
{
417-
warn!("Error sending content {err}");
418-
};
419-
// Close uTP connection
420-
if let Err(err) = conn.close().await {
421-
warn!("Unable to close uTP connection!: {err}")
422-
};
423-
});
424-
Ok(response)
425-
}
426-
Err(err) => Err(anyhow!(
427-
"Unable to initialize Offer uTP stream with remote node: {err}"
428-
)),
429-
}
408+
tokio::spawn(async move {
409+
// send the content to the acceptor over a uTP stream
410+
if let Err(err) = conn
411+
.send_to(&UtpMessage::new(content_message.as_ssz_bytes()).encode()[..])
412+
.await
413+
{
414+
warn!("Error sending content {err}");
415+
};
416+
// Close uTP connection
417+
if let Err(err) = conn.close().await {
418+
warn!("Unable to close uTP connection!: {err}")
419+
};
420+
});
421+
Ok(response)
430422
}
431423

432424
/// Provide the requested content key and content value for the acceptor

0 commit comments

Comments
 (0)