Skip to content

Commit 66cb75d

Browse files
committed
std.Mutex: implement blocking mutexes on linux
closes #1463 Thanks to Shawn Landden for the original pull request. This commit is based on that code.
1 parent acefcdb commit 66cb75d

File tree

7 files changed

+106
-24
lines changed

7 files changed

+106
-24
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,7 @@ set(ZIG_STD_FILES
635635
"special/init-lib/src/main.zig"
636636
"special/panic.zig"
637637
"special/test_runner.zig"
638+
"spinlock.zig"
638639
"unicode.zig"
639640
"zig/ast.zig"
640641
"zig/index.zig"

std/event/loop.zig

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -736,8 +736,8 @@ pub const Loop = struct {
736736
_ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable;
737737
},
738738
builtin.Os.linux => {
739-
_ = @atomicRmw(u8, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
740-
const rc = os.linux.futex_wake(@ptrToInt(&self.os_data.fs_queue_item), os.linux.FUTEX_WAKE, 1);
739+
_ = @atomicRmw(i32, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
740+
const rc = os.linux.futex_wake(&self.os_data.fs_queue_item, os.linux.FUTEX_WAKE, 1);
741741
switch (os.linux.getErrno(rc)) {
742742
0 => {},
743743
posix.EINVAL => unreachable,
@@ -757,7 +757,7 @@ pub const Loop = struct {
757757
fn posixFsRun(self: *Loop) void {
758758
while (true) {
759759
if (builtin.os == builtin.Os.linux) {
760-
_ = @atomicRmw(u8, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
760+
_ = @atomicRmw(i32, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
761761
}
762762
while (self.os_data.fs_queue.get()) |node| {
763763
switch (node.data.msg) {
@@ -794,11 +794,9 @@ pub const Loop = struct {
794794
}
795795
switch (builtin.os) {
796796
builtin.Os.linux => {
797-
const rc = os.linux.futex_wait(@ptrToInt(&self.os_data.fs_queue_item), os.linux.FUTEX_WAIT, 0, null);
797+
const rc = os.linux.futex_wait(&self.os_data.fs_queue_item, os.linux.FUTEX_WAIT, 0, null);
798798
switch (os.linux.getErrno(rc)) {
799-
0 => continue,
800-
posix.EINTR => continue,
801-
posix.EAGAIN => continue,
799+
0, posix.EINTR, posix.EAGAIN => continue,
802800
else => unreachable,
803801
}
804802
},
@@ -838,7 +836,7 @@ pub const Loop = struct {
838836
final_eventfd: i32,
839837
final_eventfd_event: os.linux.epoll_event,
840838
fs_thread: *os.Thread,
841-
fs_queue_item: u8,
839+
fs_queue_item: i32,
842840
fs_queue: std.atomic.Queue(fs.Request),
843841
fs_end_request: fs.RequestNode,
844842
};

std/index.zig

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
1-
pub const ArrayList = @import("array_list.zig").ArrayList;
21
pub const AlignedArrayList = @import("array_list.zig").AlignedArrayList;
2+
pub const ArrayList = @import("array_list.zig").ArrayList;
3+
pub const AutoHashMap = @import("hash_map.zig").AutoHashMap;
34
pub const BufMap = @import("buf_map.zig").BufMap;
45
pub const BufSet = @import("buf_set.zig").BufSet;
56
pub const Buffer = @import("buffer.zig").Buffer;
67
pub const BufferOutStream = @import("io.zig").BufferOutStream;
8+
pub const DynLib = @import("dynamic_library.zig").DynLib;
79
pub const HashMap = @import("hash_map.zig").HashMap;
8-
pub const AutoHashMap = @import("hash_map.zig").AutoHashMap;
910
pub const LinkedList = @import("linked_list.zig").LinkedList;
10-
pub const SegmentedList = @import("segmented_list.zig").SegmentedList;
11-
pub const DynLib = @import("dynamic_library.zig").DynLib;
1211
pub const Mutex = @import("mutex.zig").Mutex;
12+
pub const SegmentedList = @import("segmented_list.zig").SegmentedList;
13+
pub const SpinLock = @import("spinlock.zig").SpinLock;
1314

1415
pub const atomic = @import("atomic/index.zig");
1516
pub const base64 = @import("base64.zig");
@@ -45,15 +46,16 @@ pub const lazyInit = @import("lazy_init.zig").lazyInit;
4546

4647
test "std" {
4748
// run tests from these
48-
_ = @import("atomic/index.zig");
4949
_ = @import("array_list.zig");
50+
_ = @import("atomic/index.zig");
5051
_ = @import("buf_map.zig");
5152
_ = @import("buf_set.zig");
5253
_ = @import("buffer.zig");
5354
_ = @import("hash_map.zig");
5455
_ = @import("linked_list.zig");
55-
_ = @import("segmented_list.zig");
5656
_ = @import("mutex.zig");
57+
_ = @import("segmented_list.zig");
58+
_ = @import("spinlock.zig");
5759

5860
_ = @import("base64.zig");
5961
_ = @import("build.zig");

std/mutex.zig

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,74 @@ const builtin = @import("builtin");
33
const AtomicOrder = builtin.AtomicOrder;
44
const AtomicRmwOp = builtin.AtomicRmwOp;
55
const assert = std.debug.assert;
6+
const SpinLock = std.SpinLock;
7+
const linux = std.os.linux;
68

7-
/// TODO use syscalls instead of a spinlock
9+
/// Lock may be held only once. If the same thread
10+
/// tries to acquire the same mutex twice, it deadlocks.
811
pub const Mutex = struct {
9-
lock: u8, // TODO use a bool
12+
/// 0: unlocked
13+
/// 1: locked, no waiters
14+
/// 2: locked, one or more waiters
15+
linux_lock: @typeOf(linux_lock_init),
16+
17+
/// TODO better implementation than spin lock
18+
spin_lock: @typeOf(spin_lock_init),
19+
20+
const linux_lock_init = if (builtin.os == builtin.Os.linux) i32(0) else {};
21+
const spin_lock_init = if (builtin.os != builtin.Os.linux) SpinLock.init() else {};
1022

1123
pub const Held = struct {
1224
mutex: *Mutex,
1325

1426
pub fn release(self: Held) void {
15-
assert(@atomicRmw(u8, &self.mutex.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
27+
if (builtin.os == builtin.Os.linux) {
28+
// Always unlock. If the previous state was Locked-No-Waiters, then we're done.
29+
// Otherwise, wake a waiter up.
30+
const prev = @atomicRmw(i32, &self.mutex.linux_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.Release);
31+
if (prev != 1) {
32+
assert(prev == 2);
33+
const rc = linux.futex_wake(&self.mutex.linux_lock, linux.FUTEX_WAKE, 1);
34+
switch (linux.getErrno(rc)) {
35+
0 => {},
36+
linux.EINVAL => unreachable,
37+
else => unreachable,
38+
}
39+
}
40+
} else {
41+
SpinLock.Held.release(SpinLock.Held{ .spinlock = &self.mutex.spin_lock });
42+
}
1643
}
1744
};
1845

1946
pub fn init() Mutex {
20-
return Mutex{ .lock = 0 };
47+
return Mutex{
48+
.linux_lock = linux_lock_init,
49+
.spin_lock = spin_lock_init,
50+
};
2151
}
2252

2353
pub fn acquire(self: *Mutex) Held {
24-
while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
54+
if (builtin.os == builtin.Os.linux) {
55+
// First try to go from Unlocked to Locked-No-Waiters. If this succeeds, no syscalls are needed.
56+
// Otherwise, we need to be in the Locked-With-Waiters state. If we are already in that state,
57+
// proceed to futex_wait. Otherwise, try to go from Locked-No-Waiters to Locked-With-Waiters.
58+
// If that succeeds, proceed to futex_wait. Otherwise start the whole loop over again.
59+
while (@cmpxchgWeak(i32, &self.linux_lock, 0, 1, AtomicOrder.Acquire, AtomicOrder.Monotonic)) |l| {
60+
if (l == 2 or
61+
@cmpxchgWeak(i32, &self.linux_lock, 1, 2, AtomicOrder.Acquire, AtomicOrder.Monotonic) == null)
62+
{
63+
const rc = linux.futex_wait(&self.linux_lock, linux.FUTEX_WAIT, 2, null);
64+
switch (linux.getErrno(rc)) {
65+
0, linux.EINTR, linux.EAGAIN => continue,
66+
linux.EINVAL => unreachable,
67+
else => unreachable,
68+
}
69+
}
70+
}
71+
} else {
72+
_ = self.spin_lock.acquire();
73+
}
2574
return Held{ .mutex = self };
2675
}
2776
};

std/os/index.zig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2839,7 +2839,7 @@ pub const Thread = struct {
28392839
while (true) {
28402840
const pid_value = @atomicLoad(i32, &self.data.handle, builtin.AtomicOrder.SeqCst);
28412841
if (pid_value == 0) break;
2842-
const rc = linux.futex_wait(@ptrToInt(&self.data.handle), linux.FUTEX_WAIT, pid_value, null);
2842+
const rc = linux.futex_wait(&self.data.handle, linux.FUTEX_WAIT, pid_value, null);
28432843
switch (linux.getErrno(rc)) {
28442844
0 => continue,
28452845
posix.EINTR => continue,

std/os/linux/index.zig

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -728,12 +728,12 @@ pub inline fn vfork() usize {
728728
return @inlineCall(syscall0, SYS_vfork);
729729
}
730730

731-
pub fn futex_wait(uaddr: usize, futex_op: u32, val: i32, timeout: ?*timespec) usize {
732-
return syscall4(SYS_futex, uaddr, futex_op, @bitCast(u32, val), @ptrToInt(timeout));
731+
pub fn futex_wait(uaddr: *const i32, futex_op: u32, val: i32, timeout: ?*timespec) usize {
732+
return syscall4(SYS_futex, @ptrToInt(uaddr), futex_op, @bitCast(u32, val), @ptrToInt(timeout));
733733
}
734734

735-
pub fn futex_wake(uaddr: usize, futex_op: u32, val: i32) usize {
736-
return syscall3(SYS_futex, uaddr, futex_op, @bitCast(u32, val));
735+
pub fn futex_wake(uaddr: *const i32, futex_op: u32, val: i32) usize {
736+
return syscall3(SYS_futex, @ptrToInt(uaddr), futex_op, @bitCast(u32, val));
737737
}
738738

739739
pub fn getcwd(buf: [*]u8, size: usize) usize {

std/spinlock.zig

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
const std = @import("index.zig");
2+
const builtin = @import("builtin");
3+
const AtomicOrder = builtin.AtomicOrder;
4+
const AtomicRmwOp = builtin.AtomicRmwOp;
5+
const assert = std.debug.assert;
6+
7+
pub const SpinLock = struct {
8+
lock: u8, // TODO use a bool or enum
9+
10+
pub const Held = struct {
11+
spinlock: *SpinLock,
12+
13+
pub fn release(self: Held) void {
14+
assert(@atomicRmw(u8, &self.spinlock.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
15+
}
16+
};
17+
18+
pub fn init() SpinLock {
19+
return SpinLock{ .lock = 0 };
20+
}
21+
22+
pub fn acquire(self: *SpinLock) Held {
23+
while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
24+
return Held{ .spinlock = self };
25+
}
26+
};
27+
28+
test "spinlock" {
29+
var lock = SpinLock.init();
30+
const held = lock.acquire();
31+
defer held.release();
32+
}

0 commit comments

Comments
 (0)