Skip to content

Commit 5062b7e

Browse files
authored
feat: integrate multiplexing (paradigmxyz#5559)
1 parent 701e378 commit 5062b7e

File tree

16 files changed

+1176
-134
lines changed

16 files changed

+1176
-134
lines changed

Cargo.lock

+23
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/net/eth-wire/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recov
5353
arbitrary = { workspace = true, features = ["derive"] }
5454
proptest.workspace = true
5555
proptest-derive.workspace = true
56+
async-stream = "0.3"
5657

5758
[features]
5859
default = ["serde"]

crates/net/eth-wire/src/capability.rs

+22-2
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,14 @@ impl SharedCapability {
317317
}
318318
}
319319

320+
/// Returns the eth version if it's the `eth` capability.
321+
pub fn eth_version(&self) -> Option<EthVersion> {
322+
match self {
323+
SharedCapability::Eth { version, .. } => Some(*version),
324+
_ => None,
325+
}
326+
}
327+
320328
/// Returns the message ID offset of the current capability.
321329
///
322330
/// This represents the message ID offset for the first message of the eth capability in the
@@ -375,8 +383,8 @@ impl SharedCapabilities {
375383

376384
/// Returns the negotiated eth version if it is shared.
377385
#[inline]
378-
pub fn eth_version(&self) -> Result<u8, P2PStreamError> {
379-
self.eth().map(|cap| cap.version())
386+
pub fn eth_version(&self) -> Result<EthVersion, P2PStreamError> {
387+
self.eth().map(|cap| cap.eth_version().expect("is eth; qed"))
380388
}
381389

382390
/// Returns true if the shared capabilities contain the given capability.
@@ -438,6 +446,18 @@ impl SharedCapabilities {
438446
) -> Result<&SharedCapability, UnsupportedCapabilityError> {
439447
self.find(cap).ok_or_else(|| UnsupportedCapabilityError { capability: cap.clone() })
440448
}
449+
450+
/// Returns the number of shared capabilities.
451+
#[inline]
452+
pub fn len(&self) -> usize {
453+
self.0.len()
454+
}
455+
456+
/// Returns true if there are no shared capabilities.
457+
#[inline]
458+
pub fn is_empty(&self) -> bool {
459+
self.0.is_empty()
460+
}
441461
}
442462

443463
/// Determines the offsets for each shared capability between the input list of peer

crates/net/eth-wire/src/ethstream.rs

+6
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ where
166166
#[pin_project]
167167
#[derive(Debug)]
168168
pub struct EthStream<S> {
169+
/// Negotiated eth version.
169170
version: EthVersion,
170171
#[pin]
171172
inner: S,
@@ -174,26 +175,31 @@ pub struct EthStream<S> {
174175
impl<S> EthStream<S> {
175176
/// Creates a new unauthed [`EthStream`] from a provided stream. You will need
176177
/// to manually handshake a peer.
178+
#[inline]
177179
pub fn new(version: EthVersion, inner: S) -> Self {
178180
Self { version, inner }
179181
}
180182

181183
/// Returns the eth version.
184+
#[inline]
182185
pub fn version(&self) -> EthVersion {
183186
self.version
184187
}
185188

186189
/// Returns the underlying stream.
190+
#[inline]
187191
pub fn inner(&self) -> &S {
188192
&self.inner
189193
}
190194

191195
/// Returns mutable access to the underlying stream.
196+
#[inline]
192197
pub fn inner_mut(&mut self) -> &mut S {
193198
&mut self.inner
194199
}
195200

196201
/// Consumes this type and returns the wrapped stream.
202+
#[inline]
197203
pub fn into_inner(self) -> S {
198204
self.inner
199205
}

crates/net/eth-wire/src/hello.rs

+20
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ impl HelloMessageWithProtocols {
4949
}
5050

5151
/// Returns the raw [HelloMessage] without the additional protocol information.
52+
#[inline]
5253
pub fn message(&self) -> HelloMessage {
5354
HelloMessage {
5455
protocol_version: self.protocol_version,
@@ -69,6 +70,25 @@ impl HelloMessageWithProtocols {
6970
id: self.id,
7071
}
7172
}
73+
74+
/// Returns true if the set of protocols contains the given protocol.
75+
#[inline]
76+
pub fn contains_protocol(&self, protocol: &Protocol) -> bool {
77+
self.protocols.iter().any(|p| p.cap == protocol.cap)
78+
}
79+
80+
/// Adds a new protocol to the set.
81+
///
82+
/// Returns an error if the protocol already exists.
83+
#[inline]
84+
pub fn try_add_protocol(&mut self, protocol: Protocol) -> Result<(), Protocol> {
85+
if self.contains_protocol(&protocol) {
86+
Err(protocol)
87+
} else {
88+
self.protocols.push(protocol);
89+
Ok(())
90+
}
91+
}
7292
}
7393

7494
// TODO: determine if we should allow for the extra fields at the end like EIP-706 suggests

0 commit comments

Comments
 (0)