Skip to content

feat(bridge): census should use affected_by_radius #1841

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/portal-bridge/src/bridge/beacon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ impl BeaconBridge {
metrics: BridgeMetricsReporter,
census: Census,
) {
let Ok(peers) = census.select_peers(Subnetwork::Beacon, &content_key.content_id()) else {
let Ok(peers) = census.select_peers(Subnetwork::Beacon, &content_key) else {
error!("Failed to request enrs for content key, skipping offer: {content_key:?}");
return;
};
Expand Down
5 changes: 1 addition & 4 deletions bin/portal-bridge/src/bridge/e2hs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,7 @@ impl Gossiper {
content_key: HistoryContentKey,
content_value: HistoryContentValue,
) -> Vec<OfferTrace> {
let Ok(peers) = self
.census
.select_peers(Subnetwork::History, &content_key.content_id())
else {
let Ok(peers) = self.census.select_peers(Subnetwork::History, &content_key) else {
error!("Failed to request enrs for content key, skipping offer: {content_key:?}");
return vec![];
};
Expand Down
5 changes: 1 addition & 4 deletions bin/portal-bridge/src/bridge/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,10 +506,7 @@ impl StateBridge {
content_key: StateContentKey,
content_value: StateContentValue,
) {
let Ok(peers) = self
.census
.select_peers(Subnetwork::State, &content_key.content_id())
else {
let Ok(peers) = self.census.select_peers(Subnetwork::State, &content_key) else {
error!("Failed to request enrs for content key, skipping offer: {content_key:?}");
return;
};
Expand Down
13 changes: 8 additions & 5 deletions bin/portal-bridge/src/census/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::{collections::HashSet, time::Duration};

use discv5::enr::NodeId;
use ethportal_api::types::{network::Subnetwork, portal_wire::OfferTrace};
use ethportal_api::{
types::{network::Subnetwork, portal_wire::OfferTrace},
OverlayContentKey,
};
use network::{Network, NetworkAction, NetworkInitializationConfig, NetworkManager};
use peer::PeerInfo;
use thiserror::Error;
Expand Down Expand Up @@ -69,12 +72,12 @@ impl Census {
pub fn select_peers(
&self,
subnetwork: Subnetwork,
content_id: &[u8; 32],
content_key: &impl OverlayContentKey,
) -> Result<Vec<PeerInfo>, CensusError> {
match subnetwork {
Subnetwork::History => self.history.select_peers(content_id),
Subnetwork::State => self.state.select_peers(content_id),
Subnetwork::Beacon => self.beacon.select_peers(content_id),
Subnetwork::History => self.history.select_peers(content_key),
Subnetwork::State => self.state.select_peers(content_key),
Subnetwork::Beacon => self.beacon.select_peers(content_key),
_ => Err(CensusError::UnsupportedSubnetwork(subnetwork)),
}
}
Expand Down
9 changes: 6 additions & 3 deletions bin/portal-bridge/src/census/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ethportal_api::{
ping_extensions::decode::PingExtension,
portal_wire::{OfferTrace, Pong},
},
Enr,
Enr, OverlayContentKey,
};
use futures::{future::JoinAll, StreamExt};
use itertools::Itertools;
Expand Down Expand Up @@ -106,15 +106,18 @@ impl Network {
}

/// Selects peers to receive content.
pub fn select_peers(&self, content_id: &[u8; 32]) -> Result<Vec<PeerInfo>, CensusError> {
pub fn select_peers(
&self,
content_key: &impl OverlayContentKey,
) -> Result<Vec<PeerInfo>, CensusError> {
if self.peers.is_empty() {
error!(
subnetwork = %self.subnetwork,
"No known peers, unable to look up interested enrs",
);
return Err(CensusError::NoPeers);
}
Ok(self.peers.select_peers(content_id))
Ok(self.peers.select_peers(content_key))
}

/// Records the status of the most recent `Offer` request to one of the peers.
Expand Down
25 changes: 18 additions & 7 deletions bin/portal-bridge/src/census/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ use std::{
};

use discv5::Enr;
use ethportal_api::types::{
client_type::ClientType,
distance::{Distance, Metric, XorMetric},
portal_wire::OfferTrace,
use ethportal_api::{
types::{
client_type::ClientType,
distance::{Distance, Metric, XorMetric},
portal_wire::OfferTrace,
},
OverlayContentKey,
};
use tracing::error;

Expand Down Expand Up @@ -72,9 +75,17 @@ impl Peer {
}

/// Returns true if content is within radius.
pub fn is_interested_in_content(&self, content_id: &[u8; 32]) -> bool {
let distance = XorMetric::distance(&self.enr.node_id().raw(), content_id);
distance <= self.radius
pub fn is_interested_in_content(
&self,
content_key: &impl OverlayContentKey,
content_id: &[u8; 32],
) -> bool {
if content_key.affected_by_radius() {
let distance = XorMetric::distance(&self.enr.node_id().raw(), content_id);
distance <= self.radius
} else {
true
}
}

/// Returns true if all latest [Self::MAX_LIVENESS_CHECKS] liveness checks failed.
Expand Down
6 changes: 3 additions & 3 deletions bin/portal-bridge/src/census/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use delay_map::HashSetDelay;
use discv5::enr::NodeId;
use ethportal_api::{
types::{client_type::ClientType, distance::Distance, portal_wire::OfferTrace},
Enr,
Enr, OverlayContentKey,
};
use futures::Stream;
use tokio::time::Instant;
Expand Down Expand Up @@ -123,9 +123,9 @@ impl<W: Weight> Peers<W> {
}

/// Selects peers to receive content.
pub fn select_peers(&self, content_id: &[u8; 32]) -> Vec<PeerInfo> {
pub fn select_peers(&self, content_key: &impl OverlayContentKey) -> Vec<PeerInfo> {
self.selector
.select_peers(content_id, self.read().peers.values())
.select_peers(content_key, self.read().peers.values())
}

fn read(&self) -> RwLockReadGuard<'_, PeersWithLivenessChecks> {
Expand Down
30 changes: 22 additions & 8 deletions bin/portal-bridge/src/census/scoring.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
use std::{collections::HashMap, time::Duration};

use ethportal_api::types::{accept_code::AcceptCode, portal_wire::OfferTrace};
use ethportal_api::{
types::{accept_code::AcceptCode, portal_wire::OfferTrace},
OverlayContentKey,
};
use itertools::Itertools;
use rand::{rng, seq::IndexedRandom};

use super::peer::{Peer, PeerInfo};

/// A trait for calculating peer's weight.
pub trait Weight: Send + Sync {
fn weight(&self, content_id: &[u8; 32], peer: &Peer) -> u32;
fn weight(
&self,
content_key: &impl OverlayContentKey,
content_id: &[u8; 32],
peer: &Peer,
) -> u32;

fn weight_all<'a>(
&self,
content_id: &[u8; 32],
content_key: &impl OverlayContentKey,
peers: impl IntoIterator<Item = &'a Peer>,
) -> impl Iterator<Item = (&'a Peer, u32)> {
let content_id = content_key.content_id();
peers
.into_iter()
.map(|peer| (peer, self.weight(content_id, peer)))
.map(move |peer| (peer, self.weight(content_key, &content_id, peer)))
}
}

Expand Down Expand Up @@ -72,8 +81,13 @@ impl Default for AdditiveWeight {
}

impl Weight for AdditiveWeight {
fn weight(&self, content_id: &[u8; 32], peer: &Peer) -> u32 {
if !peer.is_interested_in_content(content_id) {
fn weight(
&self,
content_key: &impl OverlayContentKey,
content_id: &[u8; 32],
peer: &Peer,
) -> u32 {
if !peer.is_interested_in_content(content_key, content_id) {
return 0;
}

Expand Down Expand Up @@ -131,12 +145,12 @@ impl<W: Weight> PeerSelector<W> {
/// Selects up to `self.limit` peers based on their weights.
pub fn select_peers<'a>(
&self,
content_id: &[u8; 32],
content_key: &impl OverlayContentKey,
peers: impl IntoIterator<Item = &'a Peer>,
) -> Vec<PeerInfo> {
let weighted_peers = self
.weight
.weight_all(content_id, peers)
.weight_all(content_key, peers)
.filter(|(_peer, weight)| *weight > 0)
.collect_vec();

Expand Down