Skip to content

Commit 1f8cac2

Browse files
yo?
1 parent bafa242 commit 1f8cac2

10 files changed

+180
-164
lines changed

src/dht_service.rs

Lines changed: 39 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,7 @@ pub mod message_broker;
44
pub mod peer_guide;
55
mod transaction_id_pool;
66

7-
use crate::{
8-
dht_service::dht_server::DhtServer,
9-
domain_knowledge::{NodeId, TransactionId},
10-
message::Krpc,
11-
our_error::OurError,
12-
};
7+
use crate::{dht_service::dht_server::DhtServer, domain_knowledge::NodeId, our_error::OurError};
138

149
use message_broker::MessageBroker;
1510
use peer_guide::PeerGuide;
@@ -111,28 +106,6 @@ impl DhtV4 {
111106
let client = DhtHandle::new(message_broker.clone(), peer_guide.clone(), our_id);
112107
let client = Arc::new(client);
113108

114-
// ask all the known nodes for ourselves
115-
let mut bootstrap_join_set = JoinSet::new();
116-
117-
for contact in known_nodes {
118-
bootstrap_join_set
119-
.build_task()
120-
.name(&*format!("bootstrap with {contact}"))
121-
.spawn(Self::bootstrap_from(client.clone(), contact))
122-
.unwrap();
123-
}
124-
125-
while let Some(_) = bootstrap_join_set.join_next().await {}
126-
127-
// info!(
128-
// "DHT bootstrapped, routing table has {} nodes",
129-
// peer_guide.read().await.node_count()
130-
// );
131-
drop(bootstrap_join_set);
132-
133-
// TODO: huh, since when this this make sense?
134-
//
135-
// only spawn the server after the bootstrap has completed
136109
let server = DhtServer::new(our_id.clone(), peer_guide.clone(), message_broker.clone());
137110
let server = Arc::new(server);
138111

@@ -150,6 +123,23 @@ impl DhtV4 {
150123
helper_tasks: join_set,
151124
};
152125

126+
// ask all the known nodes for ourselves
127+
let mut bootstrap_join_set = JoinSet::new();
128+
129+
for contact in known_nodes {
130+
bootstrap_join_set
131+
.build_task()
132+
.name(&*format!("bootstrap with {contact}"))
133+
.spawn(Self::bootstrap_from(client.clone(), contact))
134+
.unwrap();
135+
}
136+
137+
while let Some(_) = bootstrap_join_set.join_next().await {}
138+
drop(bootstrap_join_set);
139+
140+
println!("I should be done");
141+
info!("DHT bootstrapped, routing table has {} nodes", peer_guide.node_count());
142+
153143
Ok(dht)
154144
}
155145

@@ -167,43 +157,20 @@ impl DhtV4 {
167157
#[instrument(skip_all)]
168158
async fn bootstrap_from(dht: Arc<DhtHandle>, peer: SocketAddrV4) -> Result<(), OurError> {
169159
let our_id = dht.our_id.clone();
170-
let txn_id = dht.transaction_id_pool.next();
171-
let query = Krpc::new_find_node_query(TransactionId::from(txn_id), our_id, our_id);
172160

173161
info!("bootstrapping with {peer}");
174162
let response = timeout(Duration::from_secs(5), async {
175-
dht.send_and_wait(query, peer).await.expect("failure to send message")
163+
let node_id = dht.ping(peer).await?;
164+
dht.routing_table.add(node_id, peer);
165+
166+
dht.find_node(our_id).await;
167+
println!("done finding node");
168+
169+
Ok::<(), eyre::Report>(())
176170
})
177171
.await?;
178172

179-
// TODO: use ensure!
180-
if let Krpc::FindNodeGetPeersResponse(response) = response {
181-
let mut nodes = response.nodes().clone();
182-
nodes.sort_unstable_by_key(|node| node.contact().0);
183-
nodes.dedup();
184-
185-
for node in nodes {
186-
let dht = dht.clone();
187-
let contact = node.contact().0;
188-
let our_id = dht.our_id;
189-
let txn_id = dht.transaction_id_pool.next();
190-
let find_ourself = Krpc::new_find_node_query(TransactionId::from(txn_id), our_id, node.id());
191-
192-
TskBuilder::new()
193-
.name(&*format!("leave level bootstrap to {}", contact))
194-
.spawn(async move {
195-
let _response = timeout(Duration::from_secs(5), async {
196-
dht.send_and_wait(find_ourself, contact)
197-
.await
198-
.expect("failure to send message")
199-
})
200-
.await?;
201-
Ok::<_, OurError>(())
202-
})
203-
.unwrap();
204-
}
205-
info!("bootstrapping with {peer} succeeded");
206-
}
173+
info!("{peer} bootstrap success");
207174
Ok(())
208175
}
209176

@@ -237,21 +204,21 @@ mod tests {
237204

238205
fn set_up_tracing() {
239206
let _ = color_eyre::install();
240-
// let fmt_layer = fmt::layer()
241-
// .compact()
242-
// .with_line_number(true)
243-
// .with_filter(LevelFilter::DEBUG);
244-
//
207+
let fmt_layer = fmt::layer()
208+
.compact()
209+
.with_line_number(true)
210+
.with_filter(LevelFilter::DEBUG);
211+
245212
// global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
246213
// let tracer = opentelemetry_jaeger::new_pipeline().install_simple().unwrap();
247-
//
214+
248215
// let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
249-
//
250-
// tracing_subscriber::registry()
251-
// .with(console_subscriber::spawn())
252-
// .with(telemetry)
253-
// .with(fmt_layer)
254-
// .init();
216+
217+
tracing_subscriber::registry()
218+
.with(console_subscriber::spawn())
219+
// .with(telemetry)
220+
.with(fmt_layer)
221+
.init();
255222
}
256223

257224
#[tokio::test(flavor = "multi_thread")]
@@ -261,7 +228,7 @@ mod tests {
261228
let external_ip = public_ip::addr_v4().await.unwrap();
262229

263230
let dht = DhtV4::bootstrap_with_random_id(
264-
SocketAddrV4::from_str("0.0.0.0:51413").unwrap(),
231+
SocketAddrV4::from_str("0.0.0.0:44444").unwrap(),
265232
external_ip,
266233
vec![
267234
// dht.tansmissionbt.com
@@ -292,7 +259,6 @@ mod tests {
292259
rng.fill_bytes(&mut bytes);
293260

294261
let node = client.find_node(NodeId(bytes)).await;
295-
println!("{node:?}");
296262
if let Ok(node) = node {
297263
println!("found node {:?}", node);
298264
} else {

src/dht_service/dht_client.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use crate::{
88
use num::BigUint;
99
use std::{collections::HashSet, net::SocketAddrV4, ops::BitXor, sync::Arc, time::Duration};
1010
use tokio::time::timeout;
11-
use tracing::instrument;
1211
use tracing::warn;
12+
use tracing::{instrument, trace};
1313

1414
use super::peer_guide::PeerGuide;
1515

@@ -56,15 +56,14 @@ impl DhtHandle {
5656
Ok(response)
5757
}
5858

59-
pub async fn ping(self: Arc<Self>, peer: SocketAddrV4) -> Result<(), OurError> {
60-
let this = &self;
59+
pub async fn ping(&self, peer: SocketAddrV4) -> Result<NodeId, OurError> {
6160
let txn_id = self.transaction_id_pool.next();
62-
let ping_msg = Krpc::new_ping_query(TransactionId::from(txn_id), this.our_id);
61+
let ping_msg = Krpc::new_ping_query(TransactionId::from(txn_id), self.our_id);
6362

6463
let response = self.send_and_wait(ping_msg, peer).await?;
6564

66-
return if let Krpc::PingAnnouncePeerResponse(_response) = response {
67-
Ok(())
65+
return if let Krpc::PingAnnouncePeerResponse(response) = response {
66+
Ok(*response.target_id())
6867
} else {
6968
warn!("Unexpected response to ping: {:?}", response);
7069
Err(naur!("Unexpected response to ping"))
@@ -140,7 +139,7 @@ impl DhtHandle {
140139
// attempt to find the target node via a peer on this address
141140
async fn send_find_nodes_rpc(
142141
self: Arc<Self>,
143-
peer_addr: SocketAddrV4,
142+
dest: SocketAddrV4,
144143
target: NodeId,
145144
) -> Result<Vec<NodeInfo>, OurError> {
146145
// construct the message to query our friends
@@ -151,7 +150,11 @@ impl DhtHandle {
151150
let time_out = Duration::from_secs(REQ_TIMEOUT);
152151
// TODO: make this configurable or let parent handle timeout, wooo maybe we can configure this
153152
// based on ip geo location distance
154-
let response = timeout(time_out, self.send_and_wait(query, peer_addr)).await??;
153+
let response = timeout(time_out, self.send_and_wait(query, dest))
154+
.await
155+
.inspect_err(|_e| trace!("find_nodes for {:?} timed out", dest))
156+
? // timeout error
157+
?; // send_and_wait error
155158

156159
if let Krpc::FindNodeGetPeersResponse(find_node_response) = response {
157160
// TODO:: does the following actually handle the giant bitstring thing correctly?
@@ -170,7 +173,7 @@ impl DhtHandle {
170173
}
171174
}
172175

173-
// TODO: API is broken, since we can't gurantee that the peer will exist or we can find them,
176+
// TODO: API is broken, since we can't guarantee that the peer will exist or we can find them,
174177
// we should return a list of K closest nodes or the target itself if can be found
175178
pub async fn get_peers(self: Arc<Self>, info_hash: InfoHash) -> Result<(Token, Vec<PeerContact>), OurError> {
176179
let resonsible = NodeId(info_hash.0);
@@ -289,18 +292,24 @@ impl DhtHandle {
289292
#[instrument(skip(self))]
290293
async fn send_get_peers_rpc(
291294
self: Arc<Self>,
292-
interlocutor: SocketAddrV4,
295+
dest: SocketAddrV4,
293296
info_hash: InfoHash,
294297
) -> Result<(Option<Token>, Vec<NodeInfo>, Vec<PeerContact>), OurError> {
295-
// trace!("Asking {:?} for peers", interlocutor);
298+
trace!("Asking {:?} for peers", dest);
296299
// construct the message to query our friends
297300
let transaction_id = self.transaction_id_pool.next();
298301

299302
let query = Krpc::new_get_peers_query(TransactionId::from(transaction_id), self.our_id.clone(), info_hash);
300303

301304
// send the message and await for a response
302305
let time_out = Duration::from_secs(REQ_TIMEOUT);
303-
let response = timeout(time_out, self.send_and_wait(query, interlocutor)).await??;
306+
let response = timeout(time_out, self.send_and_wait(query, dest))
307+
.await
308+
.inspect_err(|_e| {
309+
trace!("get_peers for {:?} timed out", dest);
310+
})
311+
? // timeout error
312+
?; // send_and_wait related error
304313

305314
return match response {
306315
Krpc::ErrorResponse(response) => {

0 commit comments

Comments
 (0)