Skip to content

Commit 3e05ce4

Browse files
committed
std: implement the once_wait feature
1 parent 6be96e3 commit 3e05ce4

File tree

5 files changed

+249
-94
lines changed

5 files changed

+249
-94
lines changed

library/std/src/sync/once.rs

+41
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,47 @@ impl Once {
264264
self.inner.is_completed()
265265
}
266266

267+
/// Blocks the current thread until initialization has completed.
268+
///
269+
/// # Example
270+
///
271+
/// ```rust
272+
/// #![feature(once_wait)]
273+
///
274+
/// use std::sync::Once;
275+
/// use std::thread;
276+
///
277+
/// static READY: Once = Once::new();
278+
///
279+
/// let thread = thread::spawn(|| {
280+
/// READY.wait();
281+
/// println!("everything is ready");
282+
/// });
283+
///
284+
/// READY.call_once(|| println!("performing setup"));
285+
/// ```
286+
///
287+
/// # Panics
288+
///
289+
/// If this [`Once`] has been poisoned because an initialization closure has
290+
/// panicked, this method will also panic. Use [`wait_force`](Self::wait_force)
291+
/// if this behaviour is not desired.
292+
#[unstable(feature = "once_wait", issue = "127527")]
293+
pub fn wait(&self) {
294+
if !self.inner.is_completed() {
295+
self.inner.wait(false);
296+
}
297+
}
298+
299+
/// Blocks the current thread until initialization has completed, ignoring
300+
/// poisoning.
301+
#[unstable(feature = "once_wait", issue = "127527")]
302+
pub fn wait_force(&self) {
303+
if !self.inner.is_completed() {
304+
self.inner.wait(true);
305+
}
306+
}
307+
267308
/// Returns the current state of the `Once` instance.
268309
///
269310
/// Since this takes a mutable reference, no initialization can currently

library/std/src/sync/once_lock.rs

+28
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,34 @@ impl<T> OnceLock<T> {
167167
}
168168
}
169169

170+
/// Blocks the current thread until the cell is initialized.
171+
///
172+
/// # Example
173+
///
174+
/// Waiting for a computation on another thread to finish:
175+
/// ```rust
176+
/// #![feature(once_wait)]
177+
///
178+
/// use std::thread;
179+
/// use std::sync::OnceLock;
180+
///
181+
/// let value = OnceLock::new();
182+
///
183+
/// thread::scope(|s| {
184+
/// s.spawn(|| value.set(1 + 1));
185+
///
186+
/// let result = value.wait();
187+
/// assert_eq!(result, &2);
188+
/// })
189+
/// ```
190+
#[inline]
191+
#[unstable(feature = "once_wait", issue = "127527")]
192+
pub fn wait(&self) -> &T {
193+
self.once.wait_force();
194+
195+
unsafe { self.get_unchecked() }
196+
}
197+
170198
/// Sets the contents of this cell to `value`.
171199
///
172200
/// May block if another thread is currently attempting to initialize the cell. The cell is

library/std/src/sys/sync/once/futex.rs

+88-36
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::sync::once::ExclusiveState;
88
use crate::sys::futex::{futex_wait, futex_wake_all};
99

1010
// On some platforms, the OS is very nice and handles the waiter queue for us.
11-
// This means we only need one atomic value with 5 states:
11+
// This means we only need one atomic value with 4 states:
1212

1313
/// No initialization has run yet, and no thread is currently using the Once.
1414
const INCOMPLETE: u32 = 0;
@@ -19,16 +19,20 @@ const POISONED: u32 = 1;
1919
/// Some thread is currently attempting to run initialization. It may succeed,
2020
/// so all future threads need to wait for it to finish.
2121
const RUNNING: u32 = 2;
22-
/// Some thread is currently attempting to run initialization and there are threads
23-
/// waiting for it to finish.
24-
const QUEUED: u32 = 3;
2522
/// Initialization has completed and all future calls should finish immediately.
26-
const COMPLETE: u32 = 4;
23+
const COMPLETE: u32 = 3;
2724

28-
// Threads wait by setting the state to QUEUED and calling `futex_wait` on the state
25+
// An additional bit indicates whether there are waiting threads:
26+
27+
/// May only be set if the state is not COMPLETE.
28+
const QUEUED: u32 = 4;
29+
30+
// Threads wait by setting the QUEUED bit and calling `futex_wait` on the state
2931
// variable. When the running thread finishes, it will wake all waiting threads using
3032
// `futex_wake_all`.
3133

34+
const STATE_MASK: u32 = 0b11;
35+
3236
pub struct OnceState {
3337
poisoned: bool,
3438
set_state_to: Cell<u32>,
@@ -47,7 +51,7 @@ impl OnceState {
4751
}
4852

4953
struct CompletionGuard<'a> {
50-
state: &'a AtomicU32,
54+
state_and_queued: &'a AtomicU32,
5155
set_state_on_drop_to: u32,
5256
}
5357

@@ -56,64 +60,106 @@ impl<'a> Drop for CompletionGuard<'a> {
5660
// Use release ordering to propagate changes to all threads checking
5761
// up on the Once. `futex_wake_all` does its own synchronization, hence
5862
// we do not need `AcqRel`.
59-
if self.state.swap(self.set_state_on_drop_to, Release) == QUEUED {
60-
futex_wake_all(&self.state);
63+
if self.state_and_queued.swap(self.set_state_on_drop_to, Release) & QUEUED != 0 {
64+
futex_wake_all(&self.state_and_queued);
6165
}
6266
}
6367
}
6468

6569
pub struct Once {
66-
state: AtomicU32,
70+
state_and_queued: AtomicU32,
6771
}
6872

6973
impl Once {
7074
#[inline]
7175
pub const fn new() -> Once {
72-
Once { state: AtomicU32::new(INCOMPLETE) }
76+
Once { state_and_queued: AtomicU32::new(INCOMPLETE) }
7377
}
7478

7579
#[inline]
7680
pub fn is_completed(&self) -> bool {
7781
// Use acquire ordering to make all initialization changes visible to the
7882
// current thread.
79-
self.state.load(Acquire) == COMPLETE
83+
self.state_and_queued.load(Acquire) == COMPLETE
8084
}
8185

8286
#[inline]
8387
pub(crate) fn state(&mut self) -> ExclusiveState {
84-
match *self.state.get_mut() {
88+
match *self.state_and_queued.get_mut() {
8589
INCOMPLETE => ExclusiveState::Incomplete,
8690
POISONED => ExclusiveState::Poisoned,
8791
COMPLETE => ExclusiveState::Complete,
8892
_ => unreachable!("invalid Once state"),
8993
}
9094
}
9195

92-
// This uses FnMut to match the API of the generic implementation. As this
93-
// implementation is quite light-weight, it is generic over the closure and
94-
// so avoids the cost of dynamic dispatch.
9596
#[cold]
9697
#[track_caller]
97-
pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) {
98-
let mut state = self.state.load(Acquire);
98+
pub fn wait(&self, ignore_poisoning: bool) {
99+
let mut state_and_queued = self.state_and_queued.load(Acquire);
99100
loop {
101+
let state = state_and_queued & STATE_MASK;
102+
let queued = state_and_queued & QUEUED != 0;
100103
match state {
104+
COMPLETE => return,
105+
POISONED if !ignore_poisoning => {
106+
// Panic to propagate the poison.
107+
panic!("Once instance has previously been poisoned");
108+
}
109+
_ => {
110+
// Set the QUEUED bit if it has not already been set.
111+
if !queued {
112+
state_and_queued += QUEUED;
113+
if let Err(new) = self.state_and_queued.compare_exchange_weak(
114+
state,
115+
state_and_queued,
116+
Relaxed,
117+
Acquire,
118+
) {
119+
state_and_queued = new;
120+
continue;
121+
}
122+
}
123+
124+
futex_wait(&self.state_and_queued, state_and_queued, None);
125+
state_and_queued = self.state_and_queued.load(Acquire);
126+
}
127+
}
128+
}
129+
}
130+
131+
#[cold]
132+
#[track_caller]
133+
pub fn call(&self, ignore_poisoning: bool, f: &mut dyn FnMut(&public::OnceState)) {
134+
let mut state_and_queued = self.state_and_queued.load(Acquire);
135+
loop {
136+
let state = state_and_queued & STATE_MASK;
137+
let queued = state_and_queued & QUEUED != 0;
138+
match state {
139+
COMPLETE => return,
101140
POISONED if !ignore_poisoning => {
102141
// Panic to propagate the poison.
103142
panic!("Once instance has previously been poisoned");
104143
}
105144
INCOMPLETE | POISONED => {
106145
// Try to register the current thread as the one running.
107-
if let Err(new) =
108-
self.state.compare_exchange_weak(state, RUNNING, Acquire, Acquire)
109-
{
110-
state = new;
146+
let next = RUNNING + if queued { QUEUED } else { 0 };
147+
if let Err(new) = self.state_and_queued.compare_exchange_weak(
148+
state_and_queued,
149+
next,
150+
Acquire,
151+
Acquire,
152+
) {
153+
state_and_queued = new;
111154
continue;
112155
}
156+
113157
// `waiter_queue` will manage other waiting threads, and
114158
// wake them up on drop.
115-
let mut waiter_queue =
116-
CompletionGuard { state: &self.state, set_state_on_drop_to: POISONED };
159+
let mut waiter_queue = CompletionGuard {
160+
state_and_queued: &self.state_and_queued,
161+
set_state_on_drop_to: POISONED,
162+
};
117163
// Run the function, letting it know if we're poisoned or not.
118164
let f_state = public::OnceState {
119165
inner: OnceState {
@@ -125,21 +171,27 @@ impl Once {
125171
waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get();
126172
return;
127173
}
128-
RUNNING | QUEUED => {
129-
// Set the state to QUEUED if it is not already.
130-
if state == RUNNING
131-
&& let Err(new) =
132-
self.state.compare_exchange_weak(RUNNING, QUEUED, Relaxed, Acquire)
133-
{
134-
state = new;
135-
continue;
174+
_ => {
175+
// All other values must be RUNNING.
176+
assert!(state == RUNNING);
177+
178+
// Set the QUEUED bit if it is not already set.
179+
if !queued {
180+
state_and_queued += QUEUED;
181+
if let Err(new) = self.state_and_queued.compare_exchange_weak(
182+
state,
183+
state_and_queued,
184+
Relaxed,
185+
Acquire,
186+
) {
187+
state_and_queued = new;
188+
continue;
189+
}
136190
}
137191

138-
futex_wait(&self.state, QUEUED, None);
139-
state = self.state.load(Acquire);
192+
futex_wait(&self.state_and_queued, state_and_queued, None);
193+
state_and_queued = self.state_and_queued.load(Acquire);
140194
}
141-
COMPLETE => return,
142-
_ => unreachable!("state is never set to invalid values"),
143195
}
144196
}
145197
}

library/std/src/sys/sync/once/no_threads.rs

+6
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ impl Once {
5555
}
5656
}
5757

58+
#[cold]
59+
#[track_caller]
60+
pub fn wait(&self) {
61+
panic!("not implementable on this target");
62+
}
63+
5864
#[cold]
5965
#[track_caller]
6066
pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) {

0 commit comments

Comments
 (0)