Skip to content

Lower bandwidth used for topology refresh #5618

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: develop
Choose a base branch
from
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions common/client-core/src/client/base_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::client::replies::reply_controller::{ReplyControllerReceiver, ReplyCon
use crate::client::replies::reply_storage::{
CombinedReplyStorage, PersistentReplyStorage, ReplyStorageBackend, SentReplyKeys,
};
use crate::client::topology_control::nym_api_provider::NymApiTopologyProvider;
use crate::client::topology_control::smart_api_provider::NymApiTopologyProvider;
use crate::client::topology_control::{
TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
};
Expand Down Expand Up @@ -54,8 +54,7 @@ use nym_statistics_common::clients::ClientStatsSender;
use nym_statistics_common::generate_client_stats_id;
use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
use nym_task::{TaskClient, TaskHandle};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::HardcodedTopologyProvider;
use nym_topology::providers::{HardcodedTopologyProvider, TopologyProvider};
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, UserAgent};
use rand::rngs::OsRng;
use std::fmt::Debug;
Expand Down Expand Up @@ -557,6 +556,7 @@ where
config_topology,
nym_api_urls,
user_agent,
None,
)),
config::TopologyStructure::GeoAware(group_by) => {
warn!("using deprecated 'GeoAware' topology provider - this option will be removed very soon");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use log::{debug, error};
use nym_explorer_client::{ExplorerClient, PrettyDetailedMixNodeBond};
use nym_network_defaults::var_names::EXPLORER_API;
use nym_topology::{
provider_trait::{async_trait, TopologyProvider},
providers::{async_trait, TopologyProvider},
NymTopology,
};
use nym_validator_client::client::NodeId;
Expand Down
5 changes: 3 additions & 2 deletions common/client-core/src/client/topology_control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ use wasmtimer::tokio::sleep;
mod accessor;
pub mod geo_aware_provider;
pub mod nym_api_provider;
pub mod smart_api_provider;

#[allow(deprecated)]
pub use geo_aware_provider::GeoAwareTopologyProvider;
pub use nym_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};
pub use nym_topology::provider_trait::TopologyProvider;
pub use nym_topology::providers::TopologyProvider;
pub use smart_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};

// TODO: move it to config later
const MAX_FAILURE_COUNT: usize = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

use async_trait::async_trait;
use log::{debug, error, warn};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::NymTopology;
use nym_topology::{NymTopology, TopologyProvider};
use nym_validator_client::UserAgent;
use rand::prelude::SliceRandom;
use rand::thread_rng;
Expand Down
230 changes: 230 additions & 0 deletions common/client-core/src/client/topology_control/smart_api_provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
// Copyright 2024 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: GPL-3.0-only

//! Caching, piecewise API Topology Provider
//!

#![warn(missing_docs)]

use async_trait::async_trait;
use log::{debug, error, warn};
pub use nym_topology::providers::piecewise::Config;
use nym_topology::{
providers::piecewise::{NymTopologyProvider, PiecewiseTopologyProvider},
EpochRewardedSet, NymTopology, RoutingNode, TopologyProvider,
};
use nym_validator_client::UserAgent;
use rand::{prelude::SliceRandom, thread_rng};
use url::Url;

/// Topology Provider build around a cached piecewise provider that uses the Nym API to
/// fetch changes and node details.
#[derive(Clone)]
pub struct NymApiTopologyProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using something like NymApiSmartTopologyProvider? Having two types with the same name was a bit confusing initially when reading the the code

inner: NymTopologyProvider<NymApiPiecewiseProvider>,
}

impl NymApiTopologyProvider {
/// Construct a new thread safe Cached topology provider using the Nym API
pub fn new(
config: impl Into<Config>,
nym_api_urls: Vec<Url>,
user_agent: Option<UserAgent>,
initial_topology: Option<NymTopology>,
) -> Self {
let manager = NymApiPiecewiseProvider::new(nym_api_urls, user_agent);
let inner = NymTopologyProvider::new(manager, config.into(), initial_topology);

Self { inner }
}
}

impl AsRef<NymTopologyProvider<NymApiPiecewiseProvider>> for NymApiTopologyProvider {
fn as_ref(&self) -> &NymTopologyProvider<NymApiPiecewiseProvider> {
&self.inner
}
}

impl AsMut<NymTopologyProvider<NymApiPiecewiseProvider>> for NymApiTopologyProvider {
fn as_mut(&mut self) -> &mut NymTopologyProvider<NymApiPiecewiseProvider> {
&mut self.inner
}
}

#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl TopologyProvider for NymApiTopologyProvider {
async fn get_new_topology(&mut self) -> Option<NymTopology> {
self.as_mut().get_new_topology().await
}
}

#[cfg(target_arch = "wasm32")]
#[async_trait(?Send)]
impl TopologyProvider for NymApiTopologyProvider {
async fn get_new_topology(&mut self) -> Option<NymTopology> {
self.as_mut().get_new_topology().await
}
}

#[derive(Clone)]
struct NymApiPiecewiseProvider {
validator_client: nym_validator_client::client::NymApiClient,
nym_api_urls: Vec<Url>,
currently_used_api: usize,
}

impl NymApiPiecewiseProvider {
fn new(mut nym_api_urls: Vec<Url>, user_agent: Option<UserAgent>) -> Self {
nym_api_urls.shuffle(&mut thread_rng());

let validator_client = if let Some(user_agent) = user_agent {
nym_validator_client::client::NymApiClient::new_with_user_agent(
nym_api_urls[0].clone(),
user_agent,
)
} else {
nym_validator_client::client::NymApiClient::new(nym_api_urls[0].clone())
};

Self {
validator_client,
nym_api_urls,
currently_used_api: 0,
}
}

fn use_next_nym_api(&mut self) {
if self.nym_api_urls.len() == 1 {
warn!("There's only a single nym API available - it won't be possible to use a different one");
return;
}

self.currently_used_api = (self.currently_used_api + 1) % self.nym_api_urls.len();
self.validator_client
.change_nym_api(self.nym_api_urls[self.currently_used_api].clone())
}

async fn get_full_topology_inner(&mut self) -> Option<NymTopology> {
let layer_assignments = self.get_layer_assignments().await?;

let mut topology = NymTopology::new_empty(layer_assignments);

let all_nodes = self
.validator_client
.get_all_basic_nodes()
.await
.inspect_err(|err| {
self.use_next_nym_api();
error!("failed to get network nodes: {err}");
})
.ok()?;

debug!("there are {} nodes on the network", all_nodes.len());
topology.add_additional_nodes(all_nodes.iter());

if !topology.is_minimally_routable() {
error!("the current filtered active topology can't be used to construct any packets");
return None;
}

Some(topology)
}

async fn get_descriptor_batch_inner(&mut self, ids: &[u32]) -> Option<Vec<RoutingNode>> {
// Does this need to return a hashmap of RoutingNodes? that is moderately inconvenient
// especially when the nodes themselves contain their node_id unless we expect to directly
// use the result of this fn for lookups where we would otherwise for example, have to
// iterate over a whole vec to find a specific node_id.
let descriptor_vec = self
.validator_client
.retrieve_basic_nodes_batch(ids)
.await
.inspect_err(|err| {
self.use_next_nym_api();
error!("failed to get current rewarded set: {err}");
})
.ok()?;

let mut out = Vec::new();
for node in descriptor_vec {
if let Ok(routing_node) = RoutingNode::try_from(&node) {
out.push(routing_node);
}
}
Some(out)
}

async fn get_layer_assignments_inner(&mut self) -> Option<EpochRewardedSet> {
self.validator_client
.get_current_rewarded_set()
.await
.inspect_err(|err| {
self.use_next_nym_api();
error!("failed to get current rewarded set: {err}");
})
.ok()
}
}

#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl PiecewiseTopologyProvider for NymApiPiecewiseProvider {
async fn get_full_topology(&mut self) -> Option<NymTopology> {
self.get_full_topology_inner().await
}

async fn get_descriptor_batch(&mut self, ids: &[u32]) -> Option<Vec<RoutingNode>> {
self.get_descriptor_batch_inner(ids).await
}

async fn get_layer_assignments(&mut self) -> Option<EpochRewardedSet> {
self.get_layer_assignments_inner().await
}
}

#[cfg(target_arch = "wasm32")]
#[async_trait(?Send)]
impl PiecewiseTopologyProvider for NymApiPiecewiseProvider {
async fn get_full_topology(&mut self) -> Option<NymTopology> {
self.get_full_topology_inner().await
}

async fn get_descriptor_batch(&mut self, ids: &[u32]) -> Option<Vec<RoutingNode>> {
self.get_descriptor_batch_inner(ids).await
}

async fn get_layer_assignments(&mut self) -> Option<EpochRewardedSet> {
self.get_layer_assignments_inner().await
}
}

// // Test requires running a local instance of the nym-api binary, for example using:
// // `RUST_LOG="info" ./target/debug/nym-api run --nyxd-validator "https://rpc.nymtech.net"`

// #[cfg(test)]
// mod test {
// use std::time::Duration;

// use super::*;
// use nym_bin_common::logging::setup_tracing_logger;

// #[tokio::test]
// async fn local_api_provider_test() {
// setup_tracing_logger();
// let mut provider = NymApiTopologyProvider::new(
// Config::default(),
// vec!["http://localhost:8000"
// .parse()
// .expect("failed to parse api url")],
// None,
// None,
// );

// for _ in 0..180 {
// let topo = provider.get_new_topology().await;
// assert!(topo.is_some());
// tokio::time::sleep(Duration::from_secs(30)).await;
// }
// }
// }
3 changes: 2 additions & 1 deletion common/client-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ pub mod error;
pub mod init;

pub use nym_topology::{
HardcodedTopologyProvider, NymRouteProvider, NymTopology, NymTopologyError, TopologyProvider,
providers::HardcodedTopologyProvider, NymRouteProvider, NymTopology, NymTopologyError,
TopologyProvider,
};

#[cfg(target_arch = "wasm32")]
Expand Down
15 changes: 15 additions & 0 deletions common/client-libs/validator-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,21 @@ impl NymApiClient {
Ok(nodes)
}

/// Batch request for node descriptors in the current topology.
///
/// Given the set of node IDs included in the request body, provide the descriptor for each
/// associated node in if it is available in the current topology.
pub async fn retrieve_basic_nodes_batch(
&self,
node_ids: &[u32],
) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
Ok(self
.nym_api
.retrieve_basic_nodes_batch(node_ids)
.await?
.nodes)
}

pub async fn health(&self) -> Result<ApiHealthResponse, ValidatorClientError> {
Ok(self.nym_api.health().await?)
}
Expand Down
Loading
Loading