Skip to content

Commit 4d54e9a

Browse files
authored
Merge pull request #3918 from kprotty/lock_fix
Synchronization primitive improvements
2 parents d8499f7 + c912296 commit 4d54e9a

File tree

3 files changed

+490
-410
lines changed

3 files changed

+490
-410
lines changed

lib/std/mutex.zig

Lines changed: 179 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
const std = @import("std.zig");
22
const builtin = @import("builtin");
3+
const os = std.os;
34
const testing = std.testing;
5+
const SpinLock = std.SpinLock;
46
const ResetEvent = std.ResetEvent;
57

68
/// Lock may be held only once. If the same thread
79
/// tries to acquire the same mutex twice, it deadlocks.
8-
/// This type supports static initialization and is based off of Webkit's WTF Lock (via rust parking_lot)
9-
/// https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs
10+
/// This type supports static initialization and is at most `@sizeOf(usize)` in size.
1011
/// When an application is built in single threaded release mode, all the functions are
1112
/// no-ops. In single threaded debug mode, there is deadlock detection.
1213
pub const Mutex = if (builtin.single_threaded)
@@ -24,136 +25,229 @@ pub const Mutex = if (builtin.single_threaded)
2425
}
2526
}
2627
};
28+
2729
pub fn init() Mutex {
2830
return Mutex{ .lock = lock_init };
2931
}
30-
pub fn deinit(self: *Mutex) void {}
3132

32-
pub fn acquire(self: *Mutex) Held {
33-
if (std.debug.runtime_safety and self.lock) {
34-
@panic("deadlock detected");
33+
pub fn deinit(self: *Mutex) void {
34+
self.* = undefined;
35+
}
36+
37+
pub fn tryAcquire(self: *Mutex) ?Held {
38+
if (std.debug.runtime_safety) {
39+
if (self.lock) return null;
40+
self.lock = true;
3541
}
3642
return Held{ .mutex = self };
3743
}
38-
}
39-
else
40-
struct {
41-
state: usize,
4244

43-
const MUTEX_LOCK: usize = 1 << 0;
44-
const QUEUE_LOCK: usize = 1 << 1;
45-
const QUEUE_MASK: usize = ~(MUTEX_LOCK | QUEUE_LOCK);
46-
const QueueNode = std.atomic.Stack(ResetEvent).Node;
47-
48-
/// number of iterations to spin yielding the cpu
49-
const SPIN_CPU = 4;
50-
51-
/// number of iterations to spin in the cpu yield loop
52-
const SPIN_CPU_COUNT = 30;
45+
pub fn acquire(self: *Mutex) Held {
46+
return self.tryAcquire() orelse @panic("deadlock detected");
47+
}
48+
}
49+
else if (builtin.os == .windows)
50+
// https://locklessinc.com/articles/keyed_events/
51+
extern union {
52+
locked: u8,
53+
waiters: u32,
5354

54-
/// number of iterations to spin yielding the thread
55-
const SPIN_THREAD = 1;
55+
const WAKE = 1 << 8;
56+
const WAIT = 1 << 9;
5657

5758
pub fn init() Mutex {
58-
return Mutex{ .state = 0 };
59+
return Mutex{ .waiters = 0 };
5960
}
6061

6162
pub fn deinit(self: *Mutex) void {
6263
self.* = undefined;
6364
}
6465

66+
pub fn tryAcquire(self: *Mutex) ?Held {
67+
if (@atomicRmw(u8, &self.locked, .Xchg, 1, .Acquire) != 0)
68+
return null;
69+
return Held{ .mutex = self };
70+
}
71+
72+
pub fn acquire(self: *Mutex) Held {
73+
return self.tryAcquire() orelse self.acquireSlow();
74+
}
75+
76+
fn acquireSlow(self: *Mutex) Held {
77+
@setCold(true);
78+
while (true) : (SpinLock.loopHint(1)) {
79+
const waiters = @atomicLoad(u32, &self.waiters, .Monotonic);
80+
81+
// try and take lock if unlocked
82+
if ((waiters & 1) == 0) {
83+
if (@atomicRmw(u8, &self.locked, .Xchg, 1, .Acquire) == 0)
84+
return Held{ .mutex = self };
85+
86+
// otherwise, try and update the waiting count.
87+
// then unset the WAKE bit so that another unlocker can wake up a thread.
88+
} else if (@cmpxchgWeak(u32, &self.waiters, waiters, (waiters + WAIT) | 1, .Monotonic, .Monotonic) == null) {
89+
ResetEvent.OsEvent.Futex.wait(@ptrCast(*i32, &self.waiters), undefined, null) catch unreachable;
90+
_ = @atomicRmw(u32, &self.waiters, .Sub, WAKE, .Monotonic);
91+
}
92+
}
93+
}
94+
6595
pub const Held = struct {
6696
mutex: *Mutex,
6797

6898
pub fn release(self: Held) void {
69-
// since MUTEX_LOCK is the first bit, we can use (.Sub) instead of (.And, ~MUTEX_LOCK).
70-
// this is because .Sub may be implemented more efficiently than the latter
71-
// (e.g. `lock xadd` vs `cmpxchg` loop on x86)
72-
const state = @atomicRmw(usize, &self.mutex.state, .Sub, MUTEX_LOCK, .Release);
73-
if ((state & QUEUE_MASK) != 0 and (state & QUEUE_LOCK) == 0) {
74-
self.mutex.releaseSlow(state);
99+
// unlock without a rmw/cmpxchg instruction
100+
@atomicStore(u8, @ptrCast(*u8, &self.mutex.locked), 0, .Release);
101+
102+
while (true) : (SpinLock.loopHint(1)) {
103+
const waiters = @atomicLoad(u32, &self.mutex.waiters, .Monotonic);
104+
105+
// no one is waiting
106+
if (waiters < WAIT) return;
107+
// someone grabbed the lock and will do the wake instead
108+
if (waiters & 1 != 0) return;
109+
// someone else is currently waking up
110+
if (waiters & WAKE != 0) return;
111+
112+
// try to decrease the waiter count & set the WAKE bit meaning a thread is waking up
113+
if (@cmpxchgWeak(u32, &self.mutex.waiters, waiters, waiters - WAIT + WAKE, .Release, .Monotonic) == null)
114+
return ResetEvent.OsEvent.Futex.wake(@ptrCast(*i32, &self.mutex.waiters));
75115
}
76116
}
77117
};
118+
}
119+
else if (builtin.link_libc or builtin.os == .linux)
120+
// stack-based version of https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs
121+
struct {
122+
state: usize,
78123

79-
pub fn acquire(self: *Mutex) Held {
80-
// fast path close to SpinLock fast path
81-
if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic)) |current_state| {
82-
self.acquireSlow(current_state);
83-
}
124+
/// number of times to spin trying to acquire the lock.
125+
/// https://webkit.org/blog/6161/locking-in-webkit/
126+
const SPIN_COUNT = 40;
127+
128+
const MUTEX_LOCK: usize = 1 << 0;
129+
const QUEUE_LOCK: usize = 1 << 1;
130+
const QUEUE_MASK: usize = ~(MUTEX_LOCK | QUEUE_LOCK);
131+
132+
const Node = struct {
133+
next: ?*Node,
134+
event: ResetEvent,
135+
};
136+
137+
pub fn init() Mutex {
138+
return Mutex{ .state = 0 };
139+
}
140+
141+
pub fn deinit(self: *Mutex) void {
142+
self.* = undefined;
143+
}
144+
145+
pub fn tryAcquire(self: *Mutex) ?Held {
146+
if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic) != null)
147+
return null;
84148
return Held{ .mutex = self };
85149
}
86150

87-
fn acquireSlow(self: *Mutex, current_state: usize) void {
88-
var spin: usize = 0;
89-
var state = current_state;
151+
pub fn acquire(self: *Mutex) Held {
152+
return self.tryAcquire() orelse {
153+
self.acquireSlow();
154+
return Held{ .mutex = self };
155+
};
156+
}
157+
158+
fn acquireSlow(self: *Mutex) void {
159+
// inlining the fast path and hiding *Slow()
160+
// calls behind a @setCold(true) appears to
161+
// improve performance in release builds.
162+
@setCold(true);
90163
while (true) {
91164

92-
// try and acquire the lock if unlocked
93-
if ((state & MUTEX_LOCK) == 0) {
94-
state = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return;
95-
continue;
165+
// try and spin for a bit to acquire the mutex if theres currently no queue
166+
var spin_count: u32 = SPIN_COUNT;
167+
var state = @atomicLoad(usize, &self.state, .Monotonic);
168+
while (spin_count != 0) : (spin_count -= 1) {
169+
if (state & MUTEX_LOCK == 0) {
170+
_ = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return;
171+
} else if (state & QUEUE_MASK == 0) {
172+
break;
173+
}
174+
SpinLock.yield();
175+
state = @atomicLoad(usize, &self.state, .Monotonic);
96176
}
97177

98-
// spin only if the waiting queue isn't empty and when it hasn't spun too much already
99-
if ((state & QUEUE_MASK) == 0 and spin < SPIN_CPU + SPIN_THREAD) {
100-
if (spin < SPIN_CPU) {
101-
std.SpinLock.yield(SPIN_CPU_COUNT);
178+
// create the ResetEvent node on the stack
179+
// (faster than threadlocal on platforms like OSX)
180+
var node: Node = undefined;
181+
node.event = ResetEvent.init();
182+
defer node.event.deinit();
183+
184+
// we've spun too long, try and add our node to the LIFO queue.
185+
// if the mutex becomes available in the process, try and grab it instead.
186+
while (true) {
187+
if (state & MUTEX_LOCK == 0) {
188+
_ = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return;
102189
} else {
103-
std.os.sched_yield() catch std.time.sleep(0);
190+
node.next = @intToPtr(?*Node, state & QUEUE_MASK);
191+
const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK);
192+
_ = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse {
193+
node.event.wait();
194+
break;
195+
};
104196
}
197+
SpinLock.yield();
105198
state = @atomicLoad(usize, &self.state, .Monotonic);
106-
continue;
107199
}
108-
109-
// thread should block, try and add this event to the waiting queue
110-
var node = QueueNode{
111-
.next = @intToPtr(?*QueueNode, state & QUEUE_MASK),
112-
.data = ResetEvent.init(),
113-
};
114-
defer node.data.deinit();
115-
const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK);
116-
state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse {
117-
// node is in the queue, wait until a `held.release()` wakes us up.
118-
_ = node.data.wait(null) catch unreachable;
119-
spin = 0;
120-
state = @atomicLoad(usize, &self.state, .Monotonic);
121-
continue;
122-
};
123200
}
124201
}
125202

126-
fn releaseSlow(self: *Mutex, current_state: usize) void {
127-
// grab the QUEUE_LOCK in order to signal a waiting queue node's event.
128-
var state = current_state;
129-
while (true) {
130-
if ((state & QUEUE_LOCK) != 0 or (state & QUEUE_MASK) == 0)
203+
pub const Held = struct {
204+
mutex: *Mutex,
205+
206+
pub fn release(self: Held) void {
207+
// first, remove the lock bit so another possibly parallel acquire() can succeed.
208+
// use .Sub since it can be usually compiled down more efficiency
209+
// (`lock sub` on x86) vs .And ~MUTEX_LOCK (`lock cmpxchg` loop on x86)
210+
const state = @atomicRmw(usize, &self.mutex.state, .Sub, MUTEX_LOCK, .Release);
211+
212+
// if the LIFO queue isnt locked and it has a node, try and wake up the node.
213+
if ((state & QUEUE_LOCK) == 0 and (state & QUEUE_MASK) != 0)
214+
self.mutex.releaseSlow();
215+
}
216+
};
217+
218+
fn releaseSlow(self: *Mutex) void {
219+
@setCold(true);
220+
221+
// try and lock the LFIO queue to pop a node off,
222+
// stopping altogether if its already locked or the queue is empty
223+
var state = @atomicLoad(usize, &self.state, .Monotonic);
224+
while (true) : (SpinLock.loopHint(1)) {
225+
if (state & QUEUE_LOCK != 0 or state & QUEUE_MASK == 0)
131226
return;
132227
state = @cmpxchgWeak(usize, &self.state, state, state | QUEUE_LOCK, .Acquire, .Monotonic) orelse break;
133228
}
134229

135-
while (true) {
136-
// barrier needed to observe incoming state changes
137-
defer @fence(.Acquire);
138-
139-
// the mutex is currently locked. try to unset the QUEUE_LOCK and let the locker wake up the next node.
140-
// avoids waking up multiple sleeping threads which try to acquire the lock again which increases contention.
230+
// acquired the QUEUE_LOCK, try and pop a node to wake it.
231+
// if the mutex is locked, then unset QUEUE_LOCK and let
232+
// the thread who holds the mutex do the wake-up on unlock()
233+
while (true) : (SpinLock.loopHint(1)) {
141234
if ((state & MUTEX_LOCK) != 0) {
142-
state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Monotonic) orelse return;
143-
continue;
235+
state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Acquire) orelse return;
236+
} else {
237+
const node = @intToPtr(*Node, state & QUEUE_MASK);
238+
const new_state = @ptrToInt(node.next);
239+
state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Acquire) orelse {
240+
node.event.set();
241+
return;
242+
};
144243
}
145-
146-
// try to pop the top node on the waiting queue stack to wake it up
147-
// while at the same time unsetting the QUEUE_LOCK.
148-
const node = @intToPtr(*QueueNode, state & QUEUE_MASK);
149-
const new_state = @ptrToInt(node.next) | (state & MUTEX_LOCK);
150-
state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse {
151-
_ = node.data.set(false);
152-
return;
153-
};
154244
}
155245
}
156-
};
246+
}
247+
248+
// for platforms without a known OS blocking
249+
// primitive, default to SpinLock for correctness
250+
else SpinLock;
157251

158252
const TestContext = struct {
159253
mutex: *Mutex,

0 commit comments

Comments
 (0)