Skip to content

Commit 68e5c81

Browse files
committed
Add uTP listener events test
1 parent 0b6ddc0 commit 68e5c81

File tree

6 files changed

+220
-5
lines changed

6 files changed

+220
-5
lines changed

Cargo.lock

Lines changed: 48 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

trin-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,4 @@ features = ["bundled"]
6969

7070
[dev-dependencies]
7171
quickcheck = "1.0.3"
72+
ntest = "0.8.0"

trin-core/src/portalnet/discovery.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use rand::seq::SliceRandom;
1414
use serde_json::{json, Value};
1515
use std::{
1616
convert::TryFrom,
17+
fmt,
1718
net::{IpAddr, SocketAddr},
1819
sync::Arc,
1920
time::Duration,
@@ -54,6 +55,18 @@ pub struct Discovery {
5455
pub listen_socket: SocketAddr,
5556
}
5657

58+
impl fmt::Debug for Discovery {
59+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
60+
write!(
61+
f,
62+
"Discovery: ( enr: {}, started: {}, listen_socket: {} )",
63+
self.discv5.local_enr(),
64+
self.started,
65+
self.listen_socket
66+
)
67+
}
68+
}
69+
5770
impl Discovery {
5871
pub fn new(portal_config: PortalnetConfig) -> Result<Self, String> {
5972
let listen_all_ips = SocketAddr::new("0.0.0.0".parse().unwrap(), portal_config.listen_port);

trin-core/src/portalnet/types/messages.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ pub enum ProtocolIdError {
143143
}
144144

145145
/// Protocol identifiers
146-
#[derive(Debug, Clone)]
146+
#[derive(Debug, Clone, PartialEq)]
147147
pub enum ProtocolId {
148148
State,
149149
History,

trin-core/src/utp/stream.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ struct DelayDifferenceSample {
105105

106106
/// Represent overlay to uTP listener request. It is used as a way to communicate between the overlay protocol
107107
/// and uTP listener
108+
#[derive(Debug)]
108109
pub enum UtpListenerRequest {
109110
/// Request to create and connect to a uTP stream initiated by a remote node
110111
Connect(
@@ -119,6 +120,7 @@ pub enum UtpListenerRequest {
119120
}
120121

121122
/// Emit global event to overlay handler
123+
#[derive(Debug, PartialEq)]
122124
pub enum UtpListenerEvent {
123125
/// uTP stream is closed
124126
ClosedStream(UtpPayload, ProtocolId, UtpStreamId),
@@ -360,8 +362,8 @@ impl UtpListener {
360362
let conn_key = ConnectionKey::new(connected_to.node_id(), conn_id_recv);
361363
self.utp_connections.insert(conn_key, conn);
362364
}
363-
UtpListenerRequest::Connect(conn_id, node_id, protocol_id, stream_id, tx) => {
364-
let conn = self.connect(conn_id, node_id, protocol_id, stream_id).await;
365+
UtpListenerRequest::Connect(conn_id, enr, protocol_id, stream_id, tx) => {
366+
let conn = self.connect(conn_id, enr, protocol_id, stream_id).await;
365367
if tx.send(conn).is_err() {
366368
error!("Unable to send the uTP stream to requester")
367369
};
@@ -430,7 +432,7 @@ impl UtpListener {
430432

431433
// Used to be MicroTransportProtocol impl but it is basically just called UtpStream compared to the
432434
// Rust Tcp Lib so I changed it
433-
#[derive(Clone)]
435+
#[derive(Debug, Clone)]
434436
pub struct UtpStream {
435437
/// The wrapped discv5 protocol
436438
socket: Arc<Discovery>,
@@ -439,7 +441,7 @@ pub struct UtpStream {
439441
pub state: StreamState,
440442

441443
/// ENR of the connected remote peer
442-
connected_to: Enr,
444+
pub connected_to: Enr,
443445

444446
/// Overlay protocol identifier
445447
protocol_id: ProtocolId,

trin-core/tests/utp_listener.rs

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
use discv5::Discv5Event;
2+
use ntest::timeout;
3+
use ssz::Encode;
4+
use std::{
5+
net::{IpAddr, Ipv4Addr, SocketAddr},
6+
str::FromStr,
7+
sync::Arc,
8+
};
9+
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
10+
use trin_core::{
11+
portalnet::{
12+
discovery::Discovery,
13+
types::messages::{PortalnetConfig, ProtocolId},
14+
Enr,
15+
},
16+
utp::{
17+
stream::{UtpListener, UtpListenerEvent, UtpListenerRequest, UtpStream, BUF_SIZE},
18+
trin_helpers::{
19+
UtpAccept, UtpMessage,
20+
UtpStreamId::{AcceptStream, OfferStream},
21+
},
22+
},
23+
};
24+
25+
fn next_test_port() -> u16 {
26+
use std::sync::atomic::{AtomicUsize, Ordering};
27+
static NEXT_OFFSET: AtomicUsize = AtomicUsize::new(0);
28+
const BASE_PORT: u16 = 11600;
29+
BASE_PORT + NEXT_OFFSET.fetch_add(1, Ordering::Relaxed) as u16
30+
}
31+
32+
/// Spawn uTP listener instance and start discv5 event handler
33+
async fn spawn_utp_listener() -> (
34+
Enr,
35+
UnboundedSender<UtpListenerRequest>,
36+
UnboundedReceiver<UtpListenerEvent>,
37+
) {
38+
let ip_addr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
39+
let port = next_test_port();
40+
let config = PortalnetConfig {
41+
listen_port: port,
42+
external_addr: Some(SocketAddr::new(ip_addr, port)),
43+
..Default::default()
44+
};
45+
let mut discv5 = Discovery::new(config).unwrap();
46+
let enr = discv5.discv5.local_enr();
47+
discv5.start().await.unwrap();
48+
49+
let discv5 = Arc::new(discv5);
50+
51+
let (utp_event_tx, utp_listener_tx, utp_listener_rx, mut utp_listener) =
52+
UtpListener::new(Arc::clone(&discv5));
53+
54+
tokio::spawn(async move {
55+
let mut receiver = discv5.discv5.event_stream().await.unwrap();
56+
while let Some(event) = receiver.recv().await {
57+
match event {
58+
Discv5Event::TalkRequest(request) => {
59+
let protocol_id =
60+
ProtocolId::from_str(&hex::encode_upper(request.protocol())).unwrap();
61+
62+
match protocol_id {
63+
ProtocolId::Utp => utp_event_tx.send(request).unwrap(),
64+
_ => continue,
65+
}
66+
}
67+
_ => continue,
68+
}
69+
}
70+
});
71+
tokio::spawn(async move { utp_listener.start().await });
72+
73+
(enr, utp_listener_tx, utp_listener_rx)
74+
}
75+
76+
#[tokio::test]
77+
#[timeout(100)]
78+
/// Simulate simple OFFER -> ACCEPT uTP payload transfer
79+
async fn utp_listener_events() {
80+
let protocol_id = ProtocolId::History;
81+
82+
// Initialize offer uTP listener
83+
let (enr_offer, listener_tx_offer, mut listener_rx_offer) = spawn_utp_listener().await;
84+
// Initialize acceptor uTP listener
85+
let (enr_accept, listener_tx_accept, mut listener_rx_accept) = spawn_utp_listener().await;
86+
87+
// Prepare to receive uTP stream from the offer node
88+
let (requested_content_key, requested_content_value) = (vec![1], vec![1, 1, 1, 1]);
89+
let stream_id = AcceptStream(vec![requested_content_key.clone()]);
90+
let conn_id = 1234;
91+
let request = UtpListenerRequest::InitiateConnection(
92+
enr_offer.clone(),
93+
protocol_id.clone(),
94+
stream_id,
95+
conn_id,
96+
);
97+
listener_tx_accept.send(request).unwrap();
98+
99+
// Initialise an OFFER stream and send handshake uTP packet to the acceptor node
100+
let stream_id = OfferStream;
101+
let (tx, rx) = tokio::sync::oneshot::channel::<UtpStream>();
102+
let offer_request = UtpListenerRequest::Connect(
103+
conn_id,
104+
enr_accept.clone(),
105+
protocol_id.clone(),
106+
stream_id,
107+
tx,
108+
);
109+
listener_tx_offer.send(offer_request).unwrap();
110+
111+
// Handle STATE packet for SYN handshake in the offer node
112+
let mut conn = rx.await.unwrap();
113+
assert_eq!(conn.connected_to, enr_accept);
114+
115+
let mut buf = [0; BUF_SIZE];
116+
conn.recv(&mut buf).await.unwrap();
117+
118+
// Send content key with content value to the acceptor node
119+
let content_items = vec![(
120+
requested_content_key.clone(),
121+
requested_content_value.clone(),
122+
)];
123+
124+
let content_message = UtpAccept {
125+
message: content_items,
126+
};
127+
128+
let utp_payload = UtpMessage::new(content_message.as_ssz_bytes()).encode();
129+
let expected_utp_payload = utp_payload.clone();
130+
131+
tokio::spawn(async move {
132+
// Send the content to the acceptor over a uTP stream
133+
conn.send_to(&utp_payload).await.unwrap();
134+
// Close uTP connection
135+
conn.close().await.unwrap();
136+
});
137+
138+
// Check if the expected uTP listener events match the events in offer and accept nodes
139+
let offer_event = listener_rx_offer.recv().await.unwrap();
140+
let expected_offer_event =
141+
UtpListenerEvent::ClosedStream(vec![], protocol_id.clone(), OfferStream);
142+
assert_eq!(offer_event, expected_offer_event);
143+
144+
let accept_event = listener_rx_accept.recv().await.unwrap();
145+
let expected_accept_event = UtpListenerEvent::ClosedStream(
146+
expected_utp_payload,
147+
protocol_id.clone(),
148+
AcceptStream(vec![requested_content_key]),
149+
);
150+
assert_eq!(accept_event, expected_accept_event);
151+
}

0 commit comments

Comments
 (0)