Skip to content

Commit 85da253

Browse files
add mapped_futures
1 parent 5ac72c7 commit 85da253

File tree

7 files changed

+1386
-0
lines changed

7 files changed

+1386
-0
lines changed

futures-util/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ pin-project-lite = "0.2.6"
4848
futures = { path = "../futures", features = ["async-await", "thread-pool"] }
4949
futures-test = { path = "../futures-test" }
5050
tokio = "0.1.11"
51+
futures-timer = "3.0.2"
5152

5253
[package.metadata.docs.rs]
5354
all-features = true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
pub(super) fn abort(s: &str) -> ! {
2+
struct DoublePanic;
3+
4+
impl Drop for DoublePanic {
5+
fn drop(&mut self) {
6+
panic!("panicking twice to abort the program");
7+
}
8+
}
9+
10+
let _bomb = DoublePanic;
11+
panic!("{}", s);
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
use super::task::{HashTask, Task};
2+
use super::MappedFutures;
3+
use core::hash::Hash;
4+
use core::marker::PhantomData;
5+
use core::pin::Pin;
6+
use core::ptr;
7+
use core::sync::atomic::Ordering::Relaxed;
8+
9+
/// Mutable iterator over all futures in the unordered set.
10+
#[derive(Debug)]
11+
pub struct IterPinMut<'a, K: Hash + Eq, Fut> {
12+
pub(super) task: *const Task<K, Fut>,
13+
pub(super) len: usize,
14+
pub(super) _marker: PhantomData<&'a mut MappedFutures<K, Fut>>,
15+
}
16+
17+
/// Mutable iterator over all futures in the unordered set.
18+
#[derive(Debug)]
19+
pub struct IterMut<'a, K: Hash + Eq, Fut: Unpin>(pub(super) IterPinMut<'a, K, Fut>);
20+
21+
/// Immutable iterator over all futures in the unordered set.
22+
#[derive(Debug)]
23+
pub struct IterPinRef<'a, K: Hash + Eq, Fut> {
24+
pub(super) task: *const Task<K, Fut>,
25+
pub(super) len: usize,
26+
pub(super) pending_next_all: *mut Task<K, Fut>,
27+
pub(super) _marker: PhantomData<&'a MappedFutures<K, Fut>>,
28+
}
29+
30+
/// Immutable iterator over all the futures in the unordered set.
31+
#[derive(Debug)]
32+
pub struct Iter<'a, K: Hash + Eq, Fut: Unpin>(pub(super) IterPinRef<'a, K, Fut>);
33+
34+
/// Owned iterator over all futures in the unordered set.
35+
#[derive(Debug)]
36+
pub struct IntoIter<K: Hash + Eq, Fut: Unpin> {
37+
pub(super) len: usize,
38+
pub(super) inner: MappedFutures<K, Fut>,
39+
}
40+
41+
impl<K: Hash + Eq, Fut: Unpin> Iterator for IntoIter<K, Fut> {
42+
type Item = Fut;
43+
44+
fn next(&mut self) -> Option<Self::Item> {
45+
// `head_all` can be accessed directly and we don't need to spin on
46+
// `Task::next_all` since we have exclusive access to the set.
47+
let task = self.inner.head_all.get_mut();
48+
49+
if (*task).is_null() {
50+
return None;
51+
}
52+
53+
unsafe {
54+
// Moving out of the future is safe because it is `Unpin`
55+
let future = (*(**task).future.get()).take().unwrap();
56+
57+
// Mutable access to a previously shared `MappedFutures` implies
58+
// that the other threads already released the object before the
59+
// current thread acquired it, so relaxed ordering can be used and
60+
// valid `next_all` checks can be skipped.
61+
let next = (**task).next_all.load(Relaxed);
62+
*task = next;
63+
if !task.is_null() {
64+
*(**task).prev_all.get() = ptr::null_mut();
65+
}
66+
self.len -= 1;
67+
Some(future)
68+
}
69+
}
70+
71+
fn size_hint(&self) -> (usize, Option<usize>) {
72+
(self.len, Some(self.len))
73+
}
74+
}
75+
76+
impl<K: Hash + Eq, Fut: Unpin> ExactSizeIterator for IntoIter<K, Fut> {}
77+
78+
impl<'a, K: Hash + Eq, Fut> Iterator for IterPinMut<'a, K, Fut> {
79+
type Item = Pin<&'a mut Fut>;
80+
81+
fn next(&mut self) -> Option<Self::Item> {
82+
if self.task.is_null() {
83+
return None;
84+
}
85+
86+
unsafe {
87+
let future = (*(*self.task).future.get()).as_mut().unwrap();
88+
89+
// Mutable access to a previously shared `MappedFutures` implies
90+
// that the other threads already released the object before the
91+
// current thread acquired it, so relaxed ordering can be used and
92+
// valid `next_all` checks can be skipped.
93+
let next = (*self.task).next_all.load(Relaxed);
94+
self.task = next;
95+
self.len -= 1;
96+
Some(Pin::new_unchecked(future))
97+
}
98+
}
99+
100+
fn size_hint(&self) -> (usize, Option<usize>) {
101+
(self.len, Some(self.len))
102+
}
103+
}
104+
105+
impl<K: Hash + Eq, Fut> ExactSizeIterator for IterPinMut<'_, K, Fut> {}
106+
107+
impl<'a, K: Hash + Eq, Fut: Unpin> Iterator for IterMut<'a, K, Fut> {
108+
type Item = &'a mut Fut;
109+
110+
fn next(&mut self) -> Option<Self::Item> {
111+
self.0.next().map(Pin::get_mut)
112+
}
113+
114+
fn size_hint(&self) -> (usize, Option<usize>) {
115+
self.0.size_hint()
116+
}
117+
}
118+
119+
impl<K: Hash + Eq, Fut: Unpin> ExactSizeIterator for IterMut<'_, K, Fut> {}
120+
121+
impl<'a, K: Hash + Eq, Fut> Iterator for IterPinRef<'a, K, Fut> {
122+
type Item = Pin<&'a Fut>;
123+
124+
fn next(&mut self) -> Option<Self::Item> {
125+
if self.task.is_null() {
126+
return None;
127+
}
128+
129+
unsafe {
130+
let future = (*(*self.task).future.get()).as_ref().unwrap();
131+
132+
// Relaxed ordering can be used since acquire ordering when
133+
// `head_all` was initially read for this iterator implies acquire
134+
// ordering for all previously inserted nodes (and we don't need to
135+
// read `len_all` again for any other nodes).
136+
let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed);
137+
self.task = next;
138+
self.len -= 1;
139+
Some(Pin::new_unchecked(future))
140+
}
141+
}
142+
143+
fn size_hint(&self) -> (usize, Option<usize>) {
144+
(self.len, Some(self.len))
145+
}
146+
}
147+
148+
impl<K: Hash + Eq, Fut> ExactSizeIterator for IterPinRef<'_, K, Fut> {}
149+
150+
impl<'a, K: Hash + Eq, Fut: Unpin> Iterator for Iter<'a, K, Fut> {
151+
type Item = &'a Fut;
152+
153+
fn next(&mut self) -> Option<Self::Item> {
154+
self.0.next().map(Pin::get_ref)
155+
}
156+
157+
fn size_hint(&self) -> (usize, Option<usize>) {
158+
self.0.size_hint()
159+
}
160+
}
161+
162+
impl<K: Hash + Eq, Fut: Unpin> ExactSizeIterator for Iter<'_, K, Fut> {}
163+
164+
pub struct Keys<'a, K: Hash + Eq, Fut> {
165+
pub(super) inner: std::iter::Map<
166+
std::collections::hash_set::Iter<'a, HashTask<K, Fut>>,
167+
Box<dyn FnMut(&'a HashTask<K, Fut>) -> &'a K>,
168+
>,
169+
}
170+
impl<K: Hash + Eq, Fut: Unpin> ExactSizeIterator for Keys<'_, K, Fut> {}
171+
172+
impl<'a, K: Hash + Eq, Fut> Iterator for Keys<'a, K, Fut> {
173+
type Item = &'a K;
174+
175+
fn next(&mut self) -> Option<Self::Item> {
176+
self.inner.next()
177+
}
178+
}
179+
180+
// SAFETY: we do nothing thread-local and there is no interior mutability,
181+
// so the usual structural `Send`/`Sync` apply.
182+
unsafe impl<K: Hash + Eq, Fut: Send> Send for IterPinRef<'_, K, Fut> {}
183+
unsafe impl<K: Hash + Eq, Fut: Sync> Sync for IterPinRef<'_, K, Fut> {}
184+
185+
unsafe impl<K: Hash + Eq, Fut: Send> Send for IterPinMut<'_, K, Fut> {}
186+
unsafe impl<K: Hash + Eq, Fut: Sync> Sync for IterPinMut<'_, K, Fut> {}
187+
188+
unsafe impl<K: Hash + Eq, Fut: Send + Unpin> Send for IntoIter<K, Fut> {}
189+
unsafe impl<K: Hash + Eq, Fut: Sync + Unpin> Sync for IntoIter<K, Fut> {}
190+
191+
unsafe impl<K: Hash + Eq, Fut: Send + Unpin> Send for Keys<'_, K, Fut> {}
192+
unsafe impl<K: Hash + Eq, Fut: Sync + Unpin> Sync for Keys<'_, K, Fut> {}

0 commit comments

Comments
 (0)