diff --git a/lib/std/sync/futex.zig b/lib/std/sync/futex.zig index ae9482e7e301..299d4fcb3a13 100644 --- a/lib/std/sync/futex.zig +++ b/lib/std/sync/futex.zig @@ -24,6 +24,7 @@ test "futex" { const Self = @This(); const State = enum(u32) { unset, set }; + pub const Lock = @import("./primitives/Mutex.zig").Mutex(Futex); pub fn init(self: *Self) void { self.state = .unset; @@ -57,7 +58,7 @@ test "futex" { } atomic.store(&self.state, .set, .SeqCst); - Futex.notifyOne(@ptrCast(*const u32, &self.state)); + Futex.wake(@ptrCast(*const u32, &self.state), @as(u32, 1)); } pub fn reset(self: *Self) void { @@ -68,12 +69,101 @@ test "futex" { }; inline for (.{ - .{os}, - .{spin}, - .{event}, - .{generic.forFutex(os)}, - .{generic.forFutex(spin)}, + os, + spin, + event, + generic.forFutex(os), + generic.forFutex(spin), }) |futex| { - // @compileError("TODO: test wait/wake/nanotime"); + { + // Simple wait/wake + const Context = struct { + state: State = .unset, + const State = enum(u32) { unset, set }; + + fn waker(ctx: *@This()) !void { + std.time.sleep(std.time.ns_per_ms * 2); + atomic.store(&ctx.state, .set, .SeqCst); + Futex.wake(@ptrCast(*const u32, &ctx.state), @as(u32, 1)); + } + + fn waiter(ctx: *@This()) !void { + while (atomic.load(&ctx.state, .SeqCst) == .unset) { + futex.wait(@ptrCast(*const u32, &ctx.state), @enumToInt(State.unset), null) catch unreachable; + } + } + }; + + var ctx = Context{}; + var twait = try std.Thread.spawn(&ctx, Context.waiter); + var twake = try std.Thread.spawn(&ctx, Context.waker); + + twait.wait(); + twake.wait(); + } + + { + // Multiple wake + const waiters = 8; + + const Context = struct { + state: State = .unset, + const State = enum(u32) { unset, set }; + + fn waker(ctx: *@This()) !void { + std.time.sleep(std.time.ns_per_ms * 2); + atomic.store(&ctx.state, .set, .SeqCst); + Futex.wake(@ptrCast(*const u32, &ctx.state), @as(u32, waiters)); + } + + fn waiter(ctx: *@This()) !void { + while (atomic.load(&ctx.state, .SeqCst) == .unset) { + futex.wait(@ptrCast(*const u32, &ctx.state), @enumToInt(State.unset), null) catch unreachable; + } + } + }; + + var ctx = Context{}; + var twaiters: [waiters]*std.Thread = undefined; + for (twaiters) |*waiter| { + waiter.* = try std.Thread.spawn(&ctx, Context.waiter); + } + var twake = try std.Thread.spawn(&ctx, Context.waker); + + twake.wait(); + for (twaiters) |waiter| { + waiter.wait(); + } + } + + { + // Timeout + const Context = struct { + state: State = .unset, + const State = enum(u32) { unset, set }; + + const duration = std.time.ns_per_ms * 2; + + fn slowWaker(ctx: *@This()) !void { + std.time.sleep(duration + 1); + atomic.store(&ctx.state, .set, .SeqCst); + Futex.wake(@ptrCast(*const u32, &ctx.state), @as(u32, 1)); + } + + fn waiter(ctx: *@This()) !void { + while (atomic.load(&ctx.state, .SeqCst) == .unset) { + futex.wait(@ptrCast(*const u32, &ctx.state), @enumToInt(State.unset), duration) catch return; + } + unreachable; + } + }; + + var ctx = Context{}; + var twait = try std.Thread.spawn(&ctx, Context.waiter); + var twake = try std.Thread.spawn(&ctx, Context.slowWaker); + + twait.wait(); + twake.wait(); + } } } diff --git a/lib/std/sync/futex/generic.zig b/lib/std/sync/futex/generic.zig index 47d7cb3d1b5d..da246ba87880 100644 --- a/lib/std/sync/futex/generic.zig +++ b/lib/std/sync/futex/generic.zig @@ -14,7 +14,7 @@ const helgrind: ?type = if (builtin.valgrind_support) std.valgrind.helgrind else pub fn Futex(comptime Event: type) type { return struct { const WaitLock = Event.Lock; - const bucket_count = Event.bucket_count; + const bucket_count = if (@hasDecl(Event, "bucket_count")) Event.bucket_count else 1024; const WaitBucket = struct { lock: WaitLock = .{},