Skip to content

Commit 06eaeb5

Browse files
Nemo157cramertj
authored andcommitted
Track FuturesUnordered termination via a sentinel length
1 parent dd35b1b commit 06eaeb5

File tree

2 files changed

+29
-13
lines changed

2 files changed

+29
-13
lines changed

futures-util/src/stream/futures_unordered/mod.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,15 @@ use self::task::Task;
2727
mod ready_to_run_queue;
2828
use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue};
2929

30+
/// Constant used for a `FuturesUnordered` to indicate we are empty and have
31+
/// yielded a `None` element so can return `true` from
32+
/// `FusedStream::is_terminated`
33+
///
34+
/// It is safe to not check for this when incrementing as even a ZST future will
35+
/// have a `Task` allocated for it, so we cannot ever reach usize::max_value()
36+
/// without running out of ram.
37+
const TERMINATED_SENTINEL_LENGTH: usize = usize::max_value();
38+
3039
/// A set of futures which may complete in any order.
3140
///
3241
/// This structure is optimized to manage a large number of futures.
@@ -49,9 +58,6 @@ pub struct FuturesUnordered<Fut> {
4958
ready_to_run_queue: Arc<ReadyToRunQueue<Fut>>,
5059
len: usize,
5160
head_all: *const Task<Fut>,
52-
/// Track whether we have yielded `None` and can consider ourselves
53-
/// terminated
54-
is_terminated: bool,
5561
}
5662

5763
unsafe impl<Fut: Send> Send for FuturesUnordered<Fut> {}
@@ -126,7 +132,6 @@ impl<Fut: Future> FuturesUnordered<Fut> {
126132
len: 0,
127133
head_all: ptr::null_mut(),
128134
ready_to_run_queue,
129-
is_terminated: false,
130135
}
131136
}
132137
}
@@ -142,12 +147,12 @@ impl<Fut> FuturesUnordered<Fut> {
142147
///
143148
/// This represents the total number of in-flight futures.
144149
pub fn len(&self) -> usize {
145-
self.len
150+
if self.len == TERMINATED_SENTINEL_LENGTH { 0 } else { self.len }
146151
}
147152

148153
/// Returns `true` if the set contains no futures.
149154
pub fn is_empty(&self) -> bool {
150-
self.len == 0
155+
self.len == 0 || self.len == TERMINATED_SENTINEL_LENGTH
151156
}
152157

153158
/// Push a future into the set.
@@ -166,6 +171,12 @@ impl<Fut> FuturesUnordered<Fut> {
166171
ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue),
167172
});
168173

174+
// If we've previously marked ourselves as terminated we need to reset
175+
// len to 0 to track it correctly
176+
if self.len == TERMINATED_SENTINEL_LENGTH {
177+
self.len = 0;
178+
}
179+
169180
// Right now our task has a strong reference count of 1. We transfer
170181
// ownership of this reference count to our internal linked list
171182
// and we'll reclaim ownership through the `unlink` method below.
@@ -176,10 +187,6 @@ impl<Fut> FuturesUnordered<Fut> {
176187
// futures are ready. To do that we unconditionally enqueue it for
177188
// polling here.
178189
self.ready_to_run_queue.enqueue(ptr);
179-
180-
// If we've previously marked ourselves as terminated we need to clear
181-
// that now that we will yield a new item.
182-
self.is_terminated = false;
183190
}
184191

185192
/// Returns an iterator that allows modifying each future in the set.
@@ -192,7 +199,7 @@ impl<Fut> FuturesUnordered<Fut> {
192199
pub fn iter_pin_mut<'a>(self: Pin<&'a mut Self>) -> IterPinMut<'a, Fut> {
193200
IterPinMut {
194201
task: self.head_all,
195-
len: self.len,
202+
len: self.len(),
196203
_marker: PhantomData
197204
}
198205
}
@@ -294,7 +301,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
294301
if self.is_empty() {
295302
// We can only consider ourselves terminated once we
296303
// have yielded a `None`
297-
self.is_terminated = true;
304+
self.len = TERMINATED_SENTINEL_LENGTH;
298305
return Poll::Ready(None);
299306
} else {
300307
return Poll::Pending;
@@ -483,6 +490,6 @@ where
483490

484491
impl<Fut: Future> FusedStream for FuturesUnordered<Fut> {
485492
fn is_terminated(&self) -> bool {
486-
self.is_terminated
493+
self.len == TERMINATED_SENTINEL_LENGTH
487494
}
488495
}

futures-util/tests/futures_unordered.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,17 @@ fn is_terminated() {
1414
assert_eq!(tasks.poll_next_unpin(lw), Poll::Ready(None));
1515
assert_eq!(tasks.is_terminated(), true);
1616

17+
// Test that the sentinel value doesn't leak
18+
assert_eq!(tasks.is_empty(), true);
19+
assert_eq!(tasks.len(), 0);
20+
assert_eq!(tasks.iter_mut().len(), 0);
21+
1722
tasks.push(future::ready(1));
1823

24+
assert_eq!(tasks.is_empty(), false);
25+
assert_eq!(tasks.len(), 1);
26+
assert_eq!(tasks.iter_mut().len(), 1);
27+
1928
assert_eq!(tasks.is_terminated(), false);
2029
assert_eq!(tasks.poll_next_unpin(lw), Poll::Ready(Some(1)));
2130
assert_eq!(tasks.is_terminated(), false);

0 commit comments

Comments
 (0)