Skip to content

Commit 79bbf49

Browse files
committed
Randomize Stream::merge to improve the throughput. Implements #490.
1 parent 417b548 commit 79bbf49

File tree

1 file changed

+13
-5
lines changed

1 file changed

+13
-5
lines changed

src/stream/stream/merge.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ pin_project! {
2727

2828
impl<L: Stream, R: Stream> Merge<L, R> {
2929
pub(crate) fn new(left: L, right: R) -> Self {
30-
Self { left: left.fuse(), right: right.fuse() }
30+
Self {
31+
left: left.fuse(),
32+
right: right.fuse(),
33+
}
3134
}
3235
}
3336

@@ -40,14 +43,19 @@ where
4043

4144
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4245
let this = self.project();
43-
match this.left.poll_next(cx) {
46+
let (first, second) = if (utils::random(1) == 1) {
47+
(this.left, this.right)
48+
} else {
49+
(this.right, this.left)
50+
};
51+
match first.poll_next(cx) {
4452
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
45-
Poll::Ready(None) => this.right.poll_next(cx),
46-
Poll::Pending => match this.right.poll_next(cx) {
53+
Poll::Ready(None) => second.poll_next(cx),
54+
Poll::Pending => match second.poll_next(cx) {
4755
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
4856
Poll::Ready(None) => Poll::Pending,
4957
Poll::Pending => Poll::Pending,
50-
}
58+
},
5159
}
5260
}
5361
}

0 commit comments

Comments
 (0)