Skip to content

Commit 505bf17

Browse files
Revert "update tokio-stream to 0.1.3 and use BroadcastStream (#2212)"
This reverts commit 62f3865.
1 parent 47ae543 commit 505bf17

File tree

5 files changed

+87
-20
lines changed

5 files changed

+87
-20
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/http_api/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ edition = "2018"
88
warp = { git = "https://github.com/paulhauner/warp ", branch = "cors-wildcard" }
99
serde = { version = "1.0.116", features = ["derive"] }
1010
tokio = { version = "1.1.0", features = ["macros","sync"] }
11-
tokio-stream = { version = "0.1.3", features = ["sync"] }
11+
tokio-stream = "0.1.2"
1212
tokio-util = "0.6.3"
1313
parking_lot = "0.11.0"
1414
types = { path = "../../consensus/types" }
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// TODO: this should be replaced with the tokio's `BroadcastStream` once it's added to
2+
// tokio-stream (https://github.com/tokio-rs/tokio/pull/3384)
3+
4+
use std::fmt;
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
7+
use tokio::sync::broadcast::error::RecvError;
8+
use tokio::sync::broadcast::Receiver;
9+
use tokio_stream::Stream;
10+
use tokio_util::sync::ReusableBoxFuture;
11+
12+
/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`].
13+
///
14+
/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver
15+
/// [`Stream`]: trait@crate::Stream
16+
pub struct BroadcastStream<T> {
17+
inner: ReusableBoxFuture<(Result<T, RecvError>, Receiver<T>)>,
18+
}
19+
20+
/// An error returned from the inner stream of a [`BroadcastStream`].
21+
#[derive(Debug, PartialEq)]
22+
pub enum BroadcastStreamRecvError {
23+
/// The receiver lagged too far behind. Attempting to receive again will
24+
/// return the oldest message still retained by the channel.
25+
///
26+
/// Includes the number of skipped messages.
27+
Lagged(u64),
28+
}
29+
30+
async fn make_future<T: Clone>(mut rx: Receiver<T>) -> (Result<T, RecvError>, Receiver<T>) {
31+
let result = rx.recv().await;
32+
(result, rx)
33+
}
34+
35+
impl<T: 'static + Clone + Send> BroadcastStream<T> {
36+
/// Create a new `BroadcastStream`.
37+
pub fn new(rx: Receiver<T>) -> Self {
38+
Self {
39+
inner: ReusableBoxFuture::new(make_future(rx)),
40+
}
41+
}
42+
}
43+
44+
impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
45+
type Item = Result<T, BroadcastStreamRecvError>;
46+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47+
let (result, rx) = match self.inner.poll(cx) {
48+
std::task::Poll::Ready(t) => t,
49+
std::task::Poll::Pending => return std::task::Poll::Pending,
50+
};
51+
self.inner.set(make_future(rx));
52+
match result {
53+
Ok(item) => Poll::Ready(Some(Ok(item))),
54+
Err(RecvError::Closed) => Poll::Ready(None),
55+
Err(RecvError::Lagged(n)) => {
56+
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n))))
57+
}
58+
}
59+
}
60+
}
61+
62+
impl<T> fmt::Debug for BroadcastStream<T> {
63+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64+
f.debug_struct("BroadcastStream").finish()
65+
}
66+
}

beacon_node/http_api/src/lib.rs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
88
mod beacon_proposer_cache;
99
mod block_id;
10+
mod broadcast_stream;
1011
mod metrics;
1112
mod state_id;
1213
mod validator_inclusion;
@@ -35,7 +36,7 @@ use std::future::Future;
3536
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
3637
use std::sync::Arc;
3738
use tokio::sync::mpsc::UnboundedSender;
38-
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
39+
use tokio_stream::StreamExt;
3940
use types::{
4041
Attestation, AttestationDuty, AttesterSlashing, CloneConfig, CommitteeCache, Epoch, EthSpec,
4142
Hash256, ProposerSlashing, PublicKey, PublicKeyBytes, RelativeEpoch, SignedAggregateAndProof,
@@ -2404,22 +2405,23 @@ pub fn serve<T: BeaconChainTypes>(
24042405
}
24052406
};
24062407

2407-
receivers.push(BroadcastStream::new(receiver).map(|msg| {
2408-
match msg {
2409-
Ok(data) => Event::default()
2410-
.event(data.topic_name())
2411-
.json_data(data)
2412-
.map_err(|e| {
2413-
warp_utils::reject::server_sent_event_error(format!(
2414-
"{:?}",
2415-
e
2416-
))
2417-
}),
2418-
Err(e) => Err(warp_utils::reject::server_sent_event_error(
2419-
format!("{:?}", e),
2420-
)),
2421-
}
2422-
}));
2408+
receivers.push(broadcast_stream::BroadcastStream::new(receiver).map(
2409+
|msg| {
2410+
match msg {
2411+
Ok(data) => Event::default()
2412+
.event(data.topic_name())
2413+
.json_data(data)
2414+
.map_err(|e| {
2415+
warp_utils::reject::server_sent_event_error(
2416+
format!("{:?}", e),
2417+
)
2418+
}),
2419+
Err(e) => Err(warp_utils::reject::server_sent_event_error(
2420+
format!("{:?}", e),
2421+
)),
2422+
}
2423+
},
2424+
));
24232425
}
24242426
} else {
24252427
return Err(warp_utils::reject::custom_server_error(

slasher/service/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@ slot_clock = { path = "../../common/slot_clock" }
1515
state_processing = { path = "../../consensus/state_processing" }
1616
task_executor = { path = "../../common/task_executor" }
1717
tokio = { version = "1.1.0", features = ["full"] }
18-
tokio-stream = "0.1.3"
18+
tokio-stream = "0.1.2"
1919
types = { path = "../../consensus/types" }

0 commit comments

Comments
 (0)