diff --git a/CMakeLists.txt b/CMakeLists.txt index d7487ce905a2..9d831e0a5602 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -466,6 +466,9 @@ set(ZIG_STD_FILES "event/lock.zig" "event/locked.zig" "event/loop.zig" + "event/loop/darwin.zig" + "event/loop/linux.zig" + "event/loop/windows.zig" "event/rwlock.zig" "event/rwlocked.zig" "event/tcp.zig" diff --git a/src/link.cpp b/src/link.cpp index f65c072bac4d..d75a9ac234d8 100644 --- a/src/link.cpp +++ b/src/link.cpp @@ -921,6 +921,12 @@ static void construct_linker_job_macho(LinkJob *lj) { zig_panic("TODO"); } + // Always include CoreFoundation + // Is There a Better Way? + // https://github.com/ziglang/zig/issues/1349 + lj->args.append("-framework"); + lj->args.append("CoreFoundation"); + for (size_t i = 0; i < g->darwin_frameworks.length; i += 1) { lj->args.append("-framework"); lj->args.append(buf_ptr(g->darwin_frameworks.at(i))); diff --git a/std/c/darwin.zig b/std/c/darwin.zig index 437b081cacbe..eca6a974b1a0 100644 --- a/std/c/darwin.zig +++ b/std/c/darwin.zig @@ -1,3 +1,7 @@ +pub const CFRunLoop = @cImport({ + @cInclude("CoreFoundation/CFRunLoop.h"); +}); + extern "c" fn __error() *c_int; pub extern "c" fn _NSGetExecutablePath(buf: [*]u8, bufsize: *u32) c_int; diff --git a/std/event.zig b/std/event.zig index bd3262a57560..ee6e62a3b313 100644 --- a/std/event.zig +++ b/std/event.zig @@ -1,3 +1,4 @@ +const builtin = @import("builtin"); pub const Channel = @import("event/channel.zig").Channel; pub const Future = @import("event/future.zig").Future; pub const Group = @import("event/group.zig").Group; @@ -5,8 +6,17 @@ pub const Lock = @import("event/lock.zig").Lock; pub const Locked = @import("event/locked.zig").Locked; pub const RwLock = @import("event/rwlock.zig").RwLock; pub const RwLocked = @import("event/rwlocked.zig").RwLocked; -pub const Loop = @import("event/loop.zig").Loop; -pub const fs = @import("event/fs.zig"); +pub const Loop = switch (builtin.os) { + builtin.Os.linux, + builtin.Os.macosx, + builtin.Os.windows => @import("event/loop.zig").Loop, + else => @import("empty.zig"), +}; +pub const fs = switch (builtin.os) { + builtin.Os.linux, + builtin.Os.macosx => @import("event/fs.zig"), + else => @import("empty.zig"), +}; pub const tcp = @import("event/tcp.zig"); test "import event tests" { diff --git a/std/event/fs.zig b/std/event/fs.zig index 1f810c484251..ed682b119cdd 100644 --- a/std/event/fs.zig +++ b/std/event/fs.zig @@ -531,9 +531,9 @@ pub fn Watch(comptime V: type) type { os.linux.EINVAL => unreachable, os.linux.EFAULT => unreachable, os.linux.EAGAIN => { - (await (async loop.linuxWaitFd( + (await (async loop.waitEvHandle( inotify_fd, - os.linux.EPOLLET | os.linux.EPOLLIN, + event.Loop.EventFlags.READ, ) catch unreachable)) catch |err| { const transformed_err = switch (err) { error.InvalidFileDescriptor => unreachable, diff --git a/std/event/loop.zig b/std/event/loop.zig index 78191e60d4f4..1b774f39c449 100644 --- a/std/event/loop.zig +++ b/std/event/loop.zig @@ -9,764 +9,69 @@ const os = std.os; const posix = os.posix; const windows = os.windows; -pub const Loop = struct { - allocator: *mem.Allocator, - next_tick_queue: std.atomic.Queue(promise), - os_data: OsData, - final_resume_node: ResumeNode, - pending_event_count: usize, - extra_threads: []*os.Thread, - - // pre-allocated eventfds. all permanently active. - // this is how we send promises to be resumed on other threads. - available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd), - eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node, - - pub const NextTickNode = std.atomic.Queue(promise).Node; - - pub const ResumeNode = struct { - id: Id, - handle: promise, - - pub const Id = enum { - Basic, - Stop, - EventFd, - }; - - pub const EventFd = switch (builtin.os) { - builtin.Os.macosx => MacOsEventFd, - builtin.Os.linux => struct { - base: ResumeNode, - epoll_op: u32, - eventfd: i32, - }, - builtin.Os.windows => struct { - base: ResumeNode, - completion_key: usize, - }, - else => @compileError("unsupported OS"), - }; - - const MacOsEventFd = struct { - base: ResumeNode, - kevent: posix.Kevent, - }; - }; - - /// After initialization, call run(). - /// TODO copy elision / named return values so that the threads referencing *Loop - /// have the correct pointer value. - pub fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void { - return self.initInternal(allocator, 1); - } - - /// The allocator must be thread-safe because we use it for multiplexing - /// coroutines onto kernel threads. - /// After initialization, call run(). - /// TODO copy elision / named return values so that the threads referencing *Loop - /// have the correct pointer value. - pub fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void { - const core_count = try os.cpuCount(allocator); - return self.initInternal(allocator, core_count); - } - - /// Thread count is the total thread count. The thread pool size will be - /// max(thread_count - 1, 0) - fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void { - self.* = Loop{ - .pending_event_count = 1, - .allocator = allocator, - .os_data = undefined, - .next_tick_queue = std.atomic.Queue(promise).init(), - .extra_threads = undefined, - .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(), - .eventfd_resume_nodes = undefined, - .final_resume_node = ResumeNode{ - .id = ResumeNode.Id.Stop, - .handle = undefined, - }, - }; - const extra_thread_count = thread_count - 1; - self.eventfd_resume_nodes = try self.allocator.alloc( - std.atomic.Stack(ResumeNode.EventFd).Node, - extra_thread_count, - ); - errdefer self.allocator.free(self.eventfd_resume_nodes); - - self.extra_threads = try self.allocator.alloc(*os.Thread, extra_thread_count); - errdefer self.allocator.free(self.extra_threads); - - try self.initOsData(extra_thread_count); - errdefer self.deinitOsData(); - } - - pub fn deinit(self: *Loop) void { - self.deinitOsData(); - self.allocator.free(self.extra_threads); - } - - const InitOsDataError = os.LinuxEpollCreateError || mem.Allocator.Error || os.LinuxEventFdError || - os.SpawnThreadError || os.LinuxEpollCtlError || os.BsdKEventError || - os.WindowsCreateIoCompletionPortError; - - const wakeup_bytes = []u8{0x1} ** 8; - - fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void { - switch (builtin.os) { - builtin.Os.linux => { - self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); - self.os_data.fs_queue_len = 0; - // we need another thread for the file system because Linux does not have an async - // file system I/O API. - self.os_data.fs_end_request = fs.RequestNode{ - .prev = undefined, - .next = undefined, - .data = fs.Request{ - .msg = fs.Request.Msg.End, - .finish = fs.Request.Finish.NoAction, - }, - }; - - errdefer { - while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); - } - for (self.eventfd_resume_nodes) |*eventfd_node| { - eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ - .data = ResumeNode.EventFd{ - .base = ResumeNode{ - .id = ResumeNode.Id.EventFd, - .handle = undefined, - }, - .eventfd = try os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK), - .epoll_op = posix.EPOLL_CTL_ADD, - }, - .next = undefined, - }; - self.available_eventfd_resume_nodes.push(eventfd_node); - } - - self.os_data.epollfd = try os.linuxEpollCreate(posix.EPOLL_CLOEXEC); - errdefer os.close(self.os_data.epollfd); - - self.os_data.final_eventfd = try os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK); - errdefer os.close(self.os_data.final_eventfd); - - self.os_data.final_eventfd_event = posix.epoll_event{ - .events = posix.EPOLLIN, - .data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) }, - }; - try os.linuxEpollCtl( - self.os_data.epollfd, - posix.EPOLL_CTL_ADD, - self.os_data.final_eventfd, - &self.os_data.final_eventfd_event, - ); - - self.os_data.fs_thread = try os.spawnThread(self, posixFsRun); - errdefer { - self.posixFsRequest(&self.os_data.fs_end_request); - self.os_data.fs_thread.wait(); - } - - var extra_thread_index: usize = 0; - errdefer { - // writing 8 bytes to an eventfd cannot fail - os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; - while (extra_thread_index != 0) { - extra_thread_index -= 1; - self.extra_threads[extra_thread_index].wait(); - } - } - while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { - self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); - } - }, - builtin.Os.macosx => { - self.os_data.kqfd = try os.bsdKQueue(); - errdefer os.close(self.os_data.kqfd); - - self.os_data.fs_kqfd = try os.bsdKQueue(); - errdefer os.close(self.os_data.fs_kqfd); - - self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); - // we need another thread for the file system because Darwin does not have an async - // file system I/O API. - self.os_data.fs_end_request = fs.RequestNode{ - .prev = undefined, - .next = undefined, - .data = fs.Request{ - .msg = fs.Request.Msg.End, - .finish = fs.Request.Finish.NoAction, - }, - }; - - self.os_data.kevents = try self.allocator.alloc(posix.Kevent, extra_thread_count); - errdefer self.allocator.free(self.os_data.kevents); - - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; - - for (self.eventfd_resume_nodes) |*eventfd_node, i| { - eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ - .data = ResumeNode.EventFd{ - .base = ResumeNode{ - .id = ResumeNode.Id.EventFd, - .handle = undefined, - }, - // this one is for sending events - .kevent = posix.Kevent{ - .ident = i, - .filter = posix.EVFILT_USER, - .flags = posix.EV_CLEAR | posix.EV_ADD | posix.EV_DISABLE, - .fflags = 0, - .data = 0, - .udata = @ptrToInt(&eventfd_node.data.base), - }, - }, - .next = undefined, - }; - self.available_eventfd_resume_nodes.push(eventfd_node); - const kevent_array = (*[1]posix.Kevent)(&eventfd_node.data.kevent); - _ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null); - eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE; - eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER; - // this one is for waiting for events - self.os_data.kevents[i] = posix.Kevent{ - .ident = i, - .filter = posix.EVFILT_USER, - .flags = 0, - .fflags = 0, - .data = 0, - .udata = @ptrToInt(&eventfd_node.data.base), - }; - } - - // Pre-add so that we cannot get error.SystemResources - // later when we try to activate it. - self.os_data.final_kevent = posix.Kevent{ - .ident = extra_thread_count, - .filter = posix.EVFILT_USER, - .flags = posix.EV_ADD | posix.EV_DISABLE, - .fflags = 0, - .data = 0, - .udata = @ptrToInt(&self.final_resume_node), - }; - const final_kev_arr = (*[1]posix.Kevent)(&self.os_data.final_kevent); - _ = try os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null); - self.os_data.final_kevent.flags = posix.EV_ENABLE; - self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER; - - self.os_data.fs_kevent_wake = posix.Kevent{ - .ident = extra_thread_count + 1, - .filter = posix.EVFILT_USER, - .flags = posix.EV_ADD, - .fflags = posix.NOTE_TRIGGER, - .data = 0, - .udata = undefined, - }; - - self.os_data.fs_kevent_wait = posix.Kevent{ - .ident = extra_thread_count + 1, - .filter = posix.EVFILT_USER, - .flags = posix.EV_ADD|posix.EV_CLEAR, - .fflags = 0, - .data = 0, - .udata = undefined, - }; - - self.os_data.fs_thread = try os.spawnThread(self, posixFsRun); - errdefer { - self.posixFsRequest(&self.os_data.fs_end_request); - self.os_data.fs_thread.wait(); - } - - var extra_thread_index: usize = 0; - errdefer { - _ = os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable; - while (extra_thread_index != 0) { - extra_thread_index -= 1; - self.extra_threads[extra_thread_index].wait(); - } - } - while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { - self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); - } - }, - builtin.Os.windows => { - self.os_data.io_port = try os.windowsCreateIoCompletionPort( - windows.INVALID_HANDLE_VALUE, - null, - undefined, - undefined, - ); - errdefer os.close(self.os_data.io_port); - - for (self.eventfd_resume_nodes) |*eventfd_node, i| { - eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ - .data = ResumeNode.EventFd{ - .base = ResumeNode{ - .id = ResumeNode.Id.EventFd, - .handle = undefined, - }, - // this one is for sending events - .completion_key = @ptrToInt(&eventfd_node.data.base), - }, - .prev = undefined, - .next = undefined, - }; - self.available_eventfd_resume_nodes.push(eventfd_node); - } - - var extra_thread_index: usize = 0; - errdefer { - var i: usize = 0; - while (i < extra_thread_index) : (i += 1) { - while (true) { - const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); - os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; - break; - } - } - while (extra_thread_index != 0) { - extra_thread_index -= 1; - self.extra_threads[extra_thread_index].wait(); - } - } - while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { - self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); - } - }, - else => {}, - } - } - - fn deinitOsData(self: *Loop) void { - switch (builtin.os) { - builtin.Os.linux => { - os.close(self.os_data.final_eventfd); - while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); - os.close(self.os_data.epollfd); - self.allocator.free(self.eventfd_resume_nodes); - }, - builtin.Os.macosx => { - self.allocator.free(self.os_data.kevents); - os.close(self.os_data.kqfd); - os.close(self.os_data.fs_kqfd); - }, - builtin.Os.windows => { - os.close(self.os_data.io_port); - }, - else => {}, - } - } - - /// resume_node must live longer than the promise that it holds a reference to. - /// flags must contain EPOLLET - pub fn linuxAddFd(self: *Loop, fd: i32, resume_node: *ResumeNode, flags: u32) !void { - assert(flags & posix.EPOLLET == posix.EPOLLET); - self.beginOneEvent(); - errdefer self.finishOneEvent(); - try self.linuxModFd( - fd, - posix.EPOLL_CTL_ADD, - flags, - resume_node, - ); - } - - pub fn linuxModFd(self: *Loop, fd: i32, op: u32, flags: u32, resume_node: *ResumeNode) !void { - assert(flags & posix.EPOLLET == posix.EPOLLET); - var ev = os.linux.epoll_event{ - .events = flags, - .data = os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) }, - }; - try os.linuxEpollCtl(self.os_data.epollfd, op, fd, &ev); - } - - pub fn linuxRemoveFd(self: *Loop, fd: i32) void { - self.linuxRemoveFdNoCounter(fd); - self.finishOneEvent(); - } - - fn linuxRemoveFdNoCounter(self: *Loop, fd: i32) void { - os.linuxEpollCtl(self.os_data.epollfd, os.linux.EPOLL_CTL_DEL, fd, undefined) catch {}; - } - - pub async fn linuxWaitFd(self: *Loop, fd: i32, flags: u32) !void { - defer self.linuxRemoveFd(fd); - suspend { - // TODO explicitly put this memory in the coroutine frame #1194 - var resume_node = ResumeNode{ - .id = ResumeNode.Id.Basic, - .handle = @handle(), - }; - try self.linuxAddFd(fd, &resume_node, flags); - } - } - - fn dispatch(self: *Loop) void { - while (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| { - const next_tick_node = self.next_tick_queue.get() orelse { - self.available_eventfd_resume_nodes.push(resume_stack_node); - return; - }; - const eventfd_node = &resume_stack_node.data; - eventfd_node.base.handle = next_tick_node.data; - switch (builtin.os) { - builtin.Os.macosx => { - const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent); - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; - _ = os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch { - self.next_tick_queue.unget(next_tick_node); - self.available_eventfd_resume_nodes.push(resume_stack_node); - return; - }; - }, - builtin.Os.linux => { - // the pending count is already accounted for - const epoll_events = posix.EPOLLONESHOT | os.linux.EPOLLIN | os.linux.EPOLLOUT | - os.linux.EPOLLET; - self.linuxModFd( - eventfd_node.eventfd, - eventfd_node.epoll_op, - epoll_events, - &eventfd_node.base, - ) catch { - self.next_tick_queue.unget(next_tick_node); - self.available_eventfd_resume_nodes.push(resume_stack_node); - return; - }; - }, - builtin.Os.windows => { - // this value is never dereferenced but we need it to be non-null so that - // the consumer code can decide whether to read the completion key. - // it has to do this for normal I/O, so we match that behavior here. - const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); - os.windowsPostQueuedCompletionStatus( - self.os_data.io_port, - undefined, - eventfd_node.completion_key, - overlapped, - ) catch { - self.next_tick_queue.unget(next_tick_node); - self.available_eventfd_resume_nodes.push(resume_stack_node); - return; - }; - }, - else => @compileError("unsupported OS"), - } - } - } - - /// Bring your own linked list node. This means it can't fail. - pub fn onNextTick(self: *Loop, node: *NextTickNode) void { - self.beginOneEvent(); // finished in dispatch() - self.next_tick_queue.put(node); - self.dispatch(); - } - - pub fn cancelOnNextTick(self: *Loop, node: *NextTickNode) void { - if (self.next_tick_queue.remove(node)) { - self.finishOneEvent(); - } - } - - pub fn run(self: *Loop) void { - self.finishOneEvent(); // the reference we start with - - self.workerRun(); - - self.os_data.fs_thread.wait(); - - for (self.extra_threads) |extra_thread| { - extra_thread.wait(); - } - } - - /// This is equivalent to an async call, except instead of beginning execution of the async function, - /// it immediately returns to the caller, and the async function is queued in the event loop. It still - /// returns a promise to be awaited. - pub fn call(self: *Loop, comptime func: var, args: ...) !(promise->@typeOf(func).ReturnType) { - const S = struct { - async fn asyncFunc(loop: *Loop, handle: *promise->@typeOf(func).ReturnType, args2: ...) @typeOf(func).ReturnType { - suspend { - handle.* = @handle(); - var my_tick_node = Loop.NextTickNode{ - .prev = undefined, - .next = undefined, - .data = @handle(), - }; - loop.onNextTick(&my_tick_node); - } - // TODO guaranteed allocation elision for await in same func as async - return await (async func(args2) catch unreachable); - } - }; - var handle: promise->@typeOf(func).ReturnType = undefined; - return async S.asyncFunc(self, &handle, args); - } - - /// Awaiting a yield lets the event loop run, starting any unstarted async operations. - /// Note that async operations automatically start when a function yields for any other reason, - /// for example, when async I/O is performed. This function is intended to be used only when - /// CPU bound tasks would be waiting in the event loop but never get started because no async I/O - /// is performed. - pub async fn yield(self: *Loop) void { - suspend { - var my_tick_node = Loop.NextTickNode{ - .prev = undefined, - .next = undefined, - .data = @handle(), - }; - self.onNextTick(&my_tick_node); - } - } - - /// call finishOneEvent when done - pub fn beginOneEvent(self: *Loop) void { - _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); - } - - pub fn finishOneEvent(self: *Loop) void { - const prev = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); - if (prev == 1) { - // cause all the threads to stop - switch (builtin.os) { - builtin.Os.linux => { - self.posixFsRequest(&self.os_data.fs_end_request); - // writing 8 bytes to an eventfd cannot fail - os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; - return; - }, - builtin.Os.macosx => { - self.posixFsRequest(&self.os_data.fs_end_request); - const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent); - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; - // cannot fail because we already added it and this just enables it - _ = os.bsdKEvent(self.os_data.kqfd, final_kevent, empty_kevs, null) catch unreachable; - return; - }, - builtin.Os.windows => { - var i: usize = 0; - while (i < self.extra_threads.len + 1) : (i += 1) { - while (true) { - const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); - os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; - break; - } - } - return; - }, - else => @compileError("unsupported OS"), - } - } - } - - fn workerRun(self: *Loop) void { - while (true) { - while (true) { - const next_tick_node = self.next_tick_queue.get() orelse break; - self.dispatch(); - resume next_tick_node.data; - self.finishOneEvent(); - } - - switch (builtin.os) { - builtin.Os.linux => { - // only process 1 event so we don't steal from other threads - var events: [1]os.linux.epoll_event = undefined; - const count = os.linuxEpollWait(self.os_data.epollfd, events[0..], -1); - for (events[0..count]) |ev| { - const resume_node = @intToPtr(*ResumeNode, ev.data.ptr); - const handle = resume_node.handle; - const resume_node_id = resume_node.id; - switch (resume_node_id) { - ResumeNode.Id.Basic => {}, - ResumeNode.Id.Stop => return, - ResumeNode.Id.EventFd => { - const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); - event_fd_node.epoll_op = posix.EPOLL_CTL_MOD; - const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); - self.available_eventfd_resume_nodes.push(stack_node); - }, - } - resume handle; - if (resume_node_id == ResumeNode.Id.EventFd) { - self.finishOneEvent(); - } - } - }, - builtin.Os.macosx => { - var eventlist: [1]posix.Kevent = undefined; - const count = os.bsdKEvent(self.os_data.kqfd, self.os_data.kevents, eventlist[0..], null) catch unreachable; - for (eventlist[0..count]) |ev| { - const resume_node = @intToPtr(*ResumeNode, ev.udata); - const handle = resume_node.handle; - const resume_node_id = resume_node.id; - switch (resume_node_id) { - ResumeNode.Id.Basic => {}, - ResumeNode.Id.Stop => return, - ResumeNode.Id.EventFd => { - const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); - const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); - self.available_eventfd_resume_nodes.push(stack_node); - }, - } - resume handle; - if (resume_node_id == ResumeNode.Id.EventFd) { - self.finishOneEvent(); - } - } - break; - }, - builtin.Os.windows => { - var completion_key: usize = undefined; - while (true) { - var nbytes: windows.DWORD = undefined; - var overlapped: ?*windows.OVERLAPPED = undefined; - switch (os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) { - os.WindowsWaitResult.Aborted => return, - os.WindowsWaitResult.Normal => {}, - } - if (overlapped != null) break; - } - const resume_node = @intToPtr(*ResumeNode, completion_key); - const handle = resume_node.handle; - const resume_node_id = resume_node.id; - switch (resume_node_id) { - ResumeNode.Id.Basic => {}, - ResumeNode.Id.Stop => return, - ResumeNode.Id.EventFd => { - const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); - const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); - self.available_eventfd_resume_nodes.push(stack_node); - }, - } - resume handle; - if (resume_node_id == ResumeNode.Id.EventFd) { - self.finishOneEvent(); - } - }, - else => @compileError("unsupported OS"), - } - } - } - - fn posixFsRequest(self: *Loop, request_node: *fs.RequestNode) void { - self.beginOneEvent(); // finished in posixFsRun after processing the msg - self.os_data.fs_queue.put(request_node); - switch (builtin.os) { - builtin.Os.macosx => { - const fs_kevs = (*[1]posix.Kevent)(&self.os_data.fs_kevent_wake); - const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; - _ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable; - }, - builtin.Os.linux => { - _ = @atomicRmw(i32, &self.os_data.fs_queue_len, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); // let this wrap - const rc = os.linux.futex_wake(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAKE, 1); - switch (os.linux.getErrno(rc)) { - 0 => {}, - posix.EINVAL => unreachable, - else => unreachable, - } - }, - else => @compileError("Unsupported OS"), - } - } - - fn posixFsRun(self: *Loop) void { - var processed_count: i32 = 0; // we let this wrap - while (true) { - while (self.os_data.fs_queue.get()) |node| { - processed_count +%= 1; - switch (node.data.msg) { - @TagType(fs.Request.Msg).End => return, - @TagType(fs.Request.Msg).PWriteV => |*msg| { - msg.result = os.posix_pwritev(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset); - }, - @TagType(fs.Request.Msg).PReadV => |*msg| { - msg.result = os.posix_preadv(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset); - }, - @TagType(fs.Request.Msg).OpenRead => |*msg| { - const flags = posix.O_LARGEFILE | posix.O_RDONLY | posix.O_CLOEXEC; - msg.result = os.posixOpenC(msg.path.ptr, flags, 0); - }, - @TagType(fs.Request.Msg).OpenRW => |*msg| { - const flags = posix.O_LARGEFILE | posix.O_RDWR | posix.O_CREAT | posix.O_CLOEXEC; - msg.result = os.posixOpenC(msg.path.ptr, flags, msg.mode); - }, - @TagType(fs.Request.Msg).Close => |*msg| os.close(msg.fd), - @TagType(fs.Request.Msg).WriteFile => |*msg| blk: { - const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT | - posix.O_CLOEXEC | posix.O_TRUNC; - const fd = os.posixOpenC(msg.path.ptr, flags, msg.mode) catch |err| { - msg.result = err; - break :blk; - }; - defer os.close(fd); - msg.result = os.posixWrite(fd, msg.contents); - }, - } - switch (node.data.finish) { - @TagType(fs.Request.Finish).TickNode => |*tick_node| self.onNextTick(tick_node), - @TagType(fs.Request.Finish).DeallocCloseOperation => |close_op| { - self.allocator.destroy(close_op); - }, - @TagType(fs.Request.Finish).NoAction => {}, - } - self.finishOneEvent(); - } - switch (builtin.os) { - builtin.Os.linux => { - const rc = os.linux.futex_wait(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAIT, processed_count, null); - switch (os.linux.getErrno(rc)) { - 0 => continue, - posix.EINTR => continue, - posix.EAGAIN => continue, - else => unreachable, - } - }, - builtin.Os.macosx => { - const fs_kevs = (*[1]posix.Kevent)(&self.os_data.fs_kevent_wait); - var out_kevs: [1]posix.Kevent = undefined; - _ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, out_kevs[0..], null) catch unreachable; - }, - else => @compileError("Unsupported OS"), - } - } - } - - const OsData = switch (builtin.os) { - builtin.Os.linux => struct { - epollfd: i32, - final_eventfd: i32, - final_eventfd_event: os.linux.epoll_event, - fs_thread: *os.Thread, - fs_queue_len: i32, // we let this wrap - fs_queue: std.atomic.Queue(fs.Request), - fs_end_request: fs.RequestNode, - }, - builtin.Os.macosx => MacOsData, - builtin.Os.windows => struct { - io_port: windows.HANDLE, - extra_thread_count: usize, - }, - else => struct {}, - }; - - const MacOsData = struct { - kqfd: i32, - final_kevent: posix.Kevent, - kevents: []posix.Kevent, - fs_kevent_wake: posix.Kevent, - fs_kevent_wait: posix.Kevent, - fs_thread: *os.Thread, - fs_kqfd: i32, - fs_queue: std.atomic.Queue(fs.Request), - fs_end_request: fs.RequestNode, - }; +// 06:52 andrewrk: thinking about breaking-up the loop into separate +// files for each platform. importing event.loop will import the correct +// implementation for each OS and we will test the unified API in several test +// blocks inside of event.loop -- there is just too much custom code between the +// platforms and with threads, I want to make sure we get this right. +// 06:53 andrewrk: it's not as bad as you might imagine. only downside +// is if we find a bug in one implementation, we will have to make sure to track +// it down for each other files -- BUT, if we add the appropriate test case for +// that bug, we should be able to run against all implementations and if the bug +// does not show, then so be it +// 06:54 andrewrk: I think that this is going to be important as we go +// forward -- defining the interface as tests in an index file, and then making +// sure the tests pass in each implementation +// 07:20 kristate, I implemented the darwin file system stuff (not the +// watching yet) in my async-fs branch +// 07:20 I went the other direction with it - not having different +// files +// 07:20 andrewrk: yes, I see that -- but when we add FS watching in, +// things will get messy with CFLoopRun +// 07:20 I think it's better to have the same function definitions and +// types in one file, and that can switch out and import os-specific files if +// necessary +// 07:23 kristate, I see, alright my mind is open to your way +// 07:23 sorry I gotta get some sleep, I'll be back in ~8 hours +// 07:24 okay, yeah -- please rest well. I will try to get something +// hacked out with FS watch + +pub use switch (builtin.os) { + builtin.Os.linux => @import("loop/linux.zig"), + builtin.Os.macosx => @import("loop/darwin.zig"), + builtin.Os.windows => @import("loop/windows.zig"), + else => @compileError("Unsupported OS"), }; +test "std.event.Loop - test API" { + _ = Loop.NextTickNode; + _ = Loop.ResumeNode; + _ = Loop.ResumeNode; + _ = Loop.OsEventHandle; + _ = Loop.EventFlagType; + _ = Loop.EventFlags.READ; + _ = Loop.EventFlags.WRITE; + _ = Loop.EventFlags.EXCEPT; + _ = Loop.initSingleThreaded; + _ = Loop.initMultiThreaded; + + _ = Loop.addEvHandle; + _ = Loop.removeEvHandle; + _ = Loop.waitEvHandle; + _ = Loop.onNextTick; + _ = Loop.cancelOnNextTick; + _ = Loop.run; + _ = Loop.call; + _ = Loop.yield; + _ = Loop.beginOneEvent; + _ = Loop.finishOneEvent; + + //Test Posix functions + if (comptime os.is_posix) { + _ = Loop.posixFsRequest; + } +} + test "std.event.Loop - basic" { var da = std.heap.DirectAllocator.init(); defer da.deinit(); diff --git a/std/event/loop/darwin.zig b/std/event/loop/darwin.zig new file mode 100644 index 000000000000..0070b2005644 --- /dev/null +++ b/std/event/loop/darwin.zig @@ -0,0 +1,508 @@ +const std = @import("../../index.zig"); +const builtin = @import("builtin"); +const assert = std.debug.assert; +const mem = std.mem; +const AtomicRmwOp = builtin.AtomicRmwOp; +const AtomicOrder = builtin.AtomicOrder; +const fs = std.event.fs; +const os = std.os; +const posix = os.posix; +const c = std.c; + +extern fn cf_callback(uservalue: ?*c_void) void { + //std.debug.warn("CALLED BACK\n"); + + var self: *Loop = @ptrCast(*Loop, @alignCast(8, uservalue)); + + while (self.os_data.fs_queue.get()) |node| { + switch (node.data.msg) { + @TagType(fs.Request.Msg).End => { + c.CFRunLoop.CFRunLoopStop( self.os_data.cf_loop ); + return; + }, + @TagType(fs.Request.Msg).PWriteV => |*msg| { + msg.result = os.posix_pwritev(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset); + }, + @TagType(fs.Request.Msg).PReadV => |*msg| { + msg.result = os.posix_preadv(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset); + }, + @TagType(fs.Request.Msg).OpenRead => |*msg| { + const flags = posix.O_LARGEFILE | posix.O_RDONLY | posix.O_CLOEXEC; + msg.result = os.posixOpenC(msg.path.ptr, flags, 0); + }, + @TagType(fs.Request.Msg).OpenRW => |*msg| { + const flags = posix.O_LARGEFILE | posix.O_RDWR | posix.O_CREAT | posix.O_CLOEXEC; + msg.result = os.posixOpenC(msg.path.ptr, flags, msg.mode); + }, + @TagType(fs.Request.Msg).Close => |*msg| os.close(msg.fd), + @TagType(fs.Request.Msg).WriteFile => |*msg| blk: { + const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT | + posix.O_CLOEXEC | posix.O_TRUNC; + const fd = os.posixOpenC(msg.path.ptr, flags, msg.mode) catch |err| { + msg.result = err; + break :blk; + }; + defer os.close(fd); + msg.result = os.posixWrite(fd, msg.contents); + }, + } + switch (node.data.finish) { + @TagType(fs.Request.Finish).TickNode => |*tick_node| self.onNextTick(tick_node), + @TagType(fs.Request.Finish).DeallocCloseOperation => |close_op| { + self.allocator.destroy(close_op); + }, + @TagType(fs.Request.Finish).NoAction => {}, + } + self.finishOneEvent(); + } + + +} + +pub fn ptr(p: var) t: { + const T = @typeOf(p); + const info = @typeInfo(@typeOf(p)).Pointer; + break :t if (info.is_const) ?[*]const info.child else ?[*]info.child; +} { + return @ptrCast(@typeInfo(@typeOf(this)).Fn.return_type.?, p); +} + + +pub const Loop = struct { + allocator: *mem.Allocator, + next_tick_queue: std.atomic.Queue(promise), + os_data: OsData, + final_resume_node: ResumeNode, + pending_event_count: usize, + extra_threads: []*os.Thread, + + // pre-allocated eventfds. all permanently active. + // this is how we send promises to be resumed on other threads. + available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd), + eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node, + + pub const OsEventHandle = i32; + // Type for EventFlags based off of u32 + // TODO create some sort of SET type?? + pub const EventFlagType = u32; + pub const EventFlags = struct { + pub const READ = EventFlagType(1<<0); + pub const WRITE = EventFlagType(1<<1); + pub const EXCEPT = EventFlagType(1<<2); + }; + + pub const NextTickNode = std.atomic.Queue(promise).Node; + + pub const ResumeNode = struct { + id: Id, + handle: promise, + + pub const Id = enum { + Basic, + Stop, + EventFd, + }; + + pub const EventFd = struct { + base: ResumeNode, + kevent: posix.Kevent, + }; + }; + + /// After initialization, call run(). + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + pub fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void { + return self.initInternal(allocator, 1); + } + + /// The allocator must be thread-safe because we use it for multiplexing + /// coroutines onto kernel threads. + /// After initialization, call run(). + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + pub fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void { + const core_count = try os.cpuCount(allocator); + return self.initInternal(allocator, core_count); + } + + /// Thread count is the total thread count. The thread pool size will be + /// max(thread_count - 1, 0) + fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void { + self.* = Loop{ + .pending_event_count = 1, + .allocator = allocator, + .os_data = undefined, + .next_tick_queue = std.atomic.Queue(promise).init(), + .extra_threads = undefined, + .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(), + .eventfd_resume_nodes = undefined, + .final_resume_node = ResumeNode{ + .id = ResumeNode.Id.Stop, + .handle = undefined, + }, + }; + const extra_thread_count = thread_count - 1; + self.eventfd_resume_nodes = try self.allocator.alloc( + std.atomic.Stack(ResumeNode.EventFd).Node, + extra_thread_count, + ); + errdefer self.allocator.free(self.eventfd_resume_nodes); + + self.extra_threads = try self.allocator.alloc(*os.Thread, extra_thread_count); + errdefer self.allocator.free(self.extra_threads); + + try self.initOsData(extra_thread_count); + errdefer self.deinitOsData(); + } + + pub fn deinit(self: *Loop) void { + self.deinitOsData(); + self.allocator.free(self.extra_threads); + } + + const InitOsDataError = os.LinuxEpollCreateError || mem.Allocator.Error || os.LinuxEventFdError || + os.SpawnThreadError || os.LinuxEpollCtlError || os.BsdKEventError || + os.WindowsCreateIoCompletionPortError; + + const wakeup_bytes = []u8{0x1} ** 8; + + fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void { + self.os_data.kqfd = try os.bsdKQueue(); + errdefer os.close(self.os_data.kqfd); + + //OLD self.os_data.fs_kqfd = try os.bsdKQueue(); + //OLD errdefer os.close(self.os_data.fs_kqfd); + + var ctx: c.CFRunLoop.CFRunLoopSourceContext = undefined; + + ctx.info = @ptrCast(?*c_void, self); + ctx.perform = cf_callback; + + self.os_data.cf_signal_source = c.CFRunLoop.CFRunLoopSourceCreate(null, 0, ptr(&ctx)); + + self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); + // we need another thread for the file system because Darwin does not have an async + // file system I/O API. + self.os_data.fs_end_request = fs.RequestNode{ + .prev = undefined, + .next = undefined, + .data = fs.Request{ + .msg = fs.Request.Msg.End, + .finish = fs.Request.Finish.NoAction, + }, + }; + + self.os_data.kevents = try self.allocator.alloc(posix.Kevent, extra_thread_count); + errdefer self.allocator.free(self.os_data.kevents); + + const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + + for (self.eventfd_resume_nodes) |*eventfd_node, i| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = ResumeNode.Id.EventFd, + .handle = undefined, + }, + // this one is for sending events + .kevent = posix.Kevent{ + .ident = i, + .filter = posix.EVFILT_USER, + .flags = posix.EV_CLEAR | posix.EV_ADD | posix.EV_DISABLE, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(&eventfd_node.data.base), + }, + }, + .next = undefined, + }; + self.available_eventfd_resume_nodes.push(eventfd_node); + const kevent_array = (*[1]posix.Kevent)(&eventfd_node.data.kevent); + _ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null); + eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE; + eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER; + // this one is for waiting for events + self.os_data.kevents[i] = posix.Kevent{ + .ident = i, + .filter = posix.EVFILT_USER, + .flags = 0, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(&eventfd_node.data.base), + }; + } + + // Pre-add so that we cannot get error.SystemResources + // later when we try to activate it. + self.os_data.final_kevent = posix.Kevent{ + .ident = extra_thread_count, + .filter = posix.EVFILT_USER, + .flags = posix.EV_ADD | posix.EV_DISABLE, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(&self.final_resume_node), + }; + const final_kev_arr = (*[1]posix.Kevent)(&self.os_data.final_kevent); + _ = try os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null); + self.os_data.final_kevent.flags = posix.EV_ENABLE; + self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER; + + self.os_data.fs_kevent_wake = posix.Kevent{ + .ident = extra_thread_count + 1, + .filter = posix.EVFILT_USER, + .flags = posix.EV_ADD, + .fflags = posix.NOTE_TRIGGER, + .data = 0, + .udata = undefined, + }; + + self.os_data.fs_kevent_wait = posix.Kevent{ + .ident = extra_thread_count + 1, + .filter = posix.EVFILT_USER, + .flags = posix.EV_ADD|posix.EV_CLEAR, + .fflags = 0, + .data = 0, + .udata = undefined, + }; + + self.os_data.fs_thread = try os.spawnThread(self, posixFsRun); + errdefer { + self.posixFsRequest(&self.os_data.fs_end_request); + self.os_data.fs_thread.wait(); + } + + var extra_thread_index: usize = 0; + errdefer { + _ = os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable; + while (extra_thread_index != 0) { + extra_thread_index -= 1; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); + } + } + + fn deinitOsData(self: *Loop) void { + self.allocator.free(self.os_data.kevents); + os.close(self.os_data.kqfd); + if (self.os_data.cf_signal_source) |s| { + c.CFRunLoop.CFRelease(@ptrCast(?*c_void, s)); + } + //OLD os.close(self.os_data.fs_kqfd); + } + + /// resume_node must live longer than the promise that it holds a reference to. + /// flags must contain EPOLLET + pub fn addEvHandle(self: *Loop, handle: OsEventHandle, resume_node: *ResumeNode, flags: EventFlagType) !void { + self.beginOneEvent(); + errdefer self.finishOneEvent(); + //TODO + return os.LinuxEpollCtlError.Unexpected; + } + + pub fn removeEvHandle(self: *Loop, handle: OsEventHandle) void { + //TODO + self.finishOneEvent(); + @panic("removeEvHandle"); + } + + pub async fn waitEvHandle(self: *Loop, handle: OsEventHandle, flags: u32) !void { + defer self.removeEvHandle(handle); + suspend { + // TODO explicitly put this memory in the coroutine frame #1194 + var resume_node = ResumeNode{ + .id = ResumeNode.Id.Basic, + .handle = @handle(), + }; + try self.addEvHandle(handle, &resume_node, flags); + } + } + + fn dispatch(self: *Loop) void { + while (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| { + const next_tick_node = self.next_tick_queue.get() orelse { + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + const eventfd_node = &resume_stack_node.data; + eventfd_node.base.handle = next_tick_node.data; + const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent); + const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + _ = os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch { + self.next_tick_queue.unget(next_tick_node); + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + } + } + + /// Bring your own linked list node. This means it can't fail. + pub fn onNextTick(self: *Loop, node: *NextTickNode) void { + self.beginOneEvent(); // finished in dispatch() + self.next_tick_queue.put(node); + self.dispatch(); + } + + pub fn cancelOnNextTick(self: *Loop, node: *NextTickNode) void { + if (self.next_tick_queue.remove(node)) { + self.finishOneEvent(); + } + } + + pub fn run(self: *Loop) void { + self.finishOneEvent(); // the reference we start with + + self.workerRun(); + + self.os_data.fs_thread.wait(); + + for (self.extra_threads) |extra_thread| { + extra_thread.wait(); + } + } + + /// This is equivalent to an async call, except instead of beginning execution of the async function, + /// it immediately returns to the caller, and the async function is queued in the event loop. It still + /// returns a promise to be awaited. + pub fn call(self: *Loop, comptime func: var, args: ...) !(promise->@typeOf(func).ReturnType) { + const S = struct { + async fn asyncFunc(loop: *Loop, handle: *promise->@typeOf(func).ReturnType, args2: ...) @typeOf(func).ReturnType { + suspend { + handle.* = @handle(); + var my_tick_node = Loop.NextTickNode{ + .prev = undefined, + .next = undefined, + .data = @handle(), + }; + loop.onNextTick(&my_tick_node); + } + // TODO guaranteed allocation elision for await in same func as async + return await (async func(args2) catch unreachable); + } + }; + var handle: promise->@typeOf(func).ReturnType = undefined; + return async S.asyncFunc(self, &handle, args); + } + + /// Awaiting a yield lets the event loop run, starting any unstarted async operations. + /// Note that async operations automatically start when a function yields for any other reason, + /// for example, when async I/O is performed. This function is intended to be used only when + /// CPU bound tasks would be waiting in the event loop but never get started because no async I/O + /// is performed. + pub async fn yield(self: *Loop) void { + suspend { + var my_tick_node = Loop.NextTickNode{ + .prev = undefined, + .next = undefined, + .data = @handle(), + }; + self.onNextTick(&my_tick_node); + } + } + + /// call finishOneEvent when done + pub fn beginOneEvent(self: *Loop) void { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + } + + pub fn finishOneEvent(self: *Loop) void { + const prev = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + if (prev == 1) { + // cause all the threads to stop + self.posixFsRequest(&self.os_data.fs_end_request); + const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent); + const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + // cannot fail because we already added it and this just enables it + _ = os.bsdKEvent(self.os_data.kqfd, final_kevent, empty_kevs, null) catch unreachable; + return; + } + } + + fn workerRun(self: *Loop) void { + while (true) { + while (true) { + const next_tick_node = self.next_tick_queue.get() orelse break; + self.dispatch(); + resume next_tick_node.data; + self.finishOneEvent(); + } + var eventlist: [1]posix.Kevent = undefined; + const count = os.bsdKEvent(self.os_data.kqfd, self.os_data.kevents, eventlist[0..], null) catch unreachable; + for (eventlist[0..count]) |ev| { + const resume_node = @intToPtr(*ResumeNode, ev.udata); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + ResumeNode.Id.Basic => {}, + ResumeNode.Id.Stop => return, + ResumeNode.Id.EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + if (resume_node_id == ResumeNode.Id.EventFd) { + self.finishOneEvent(); + } + } + } + } + + pub fn posixFsRequest(self: *Loop, request_node: *fs.RequestNode) void { + self.beginOneEvent(); // finished in posixFsRun after processing the msg + self.os_data.fs_queue.put(request_node); + // + //OLD const fs_kevs = (*[1]posix.Kevent)(&self.os_data.fs_kevent_wake); + //OLD const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + //OLD _ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable; + // + + //Notify Using CFRunLoop and friends + if (self.os_data.cf_loop) |cf_loop| { + c.CFRunLoop.CFRunLoopSourceSignal( self.os_data.cf_signal_source ); + c.CFRunLoop.CFRunLoopWakeUp( cf_loop ); + } + } + + fn posixFsRun(self: *Loop) void { + self.os_data.cf_loop = c.CFRunLoop.CFRunLoopGetCurrent(); + + c.CFRunLoop.CFRunLoopAddSource( self.os_data.cf_loop + , self.os_data.cf_signal_source + , c.CFRunLoop.kCFRunLoopDefaultMode ); + + //Do One + cf_callback( @ptrCast(?*c_void, self) ); + + c.CFRunLoop.CFRunLoopRun(); + + c.CFRunLoop.CFRunLoopRemoveSource(self.os_data.cf_loop + , self.os_data.cf_signal_source + , c.CFRunLoop.kCFRunLoopDefaultMode ); + + //OLD const fs_kevs = (*[1]posix.Kevent)(&self.os_data.fs_kevent_wait); + //OLD var out_kevs: [1]posix.Kevent = undefined; + //OLD _ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, out_kevs[0..], null) catch unreachable; + } + + const OsData = struct { + kqfd: i32, + final_kevent: posix.Kevent, + kevents: []posix.Kevent, + fs_kevent_wake: posix.Kevent, + fs_kevent_wait: posix.Kevent, + fs_thread: *os.Thread, + fs_kqfd: i32, + fs_queue: std.atomic.Queue(fs.Request), + fs_end_request: fs.RequestNode, + + /////// CFLoop + cf_loop: c.CFRunLoop.CFRunLoopRef, + cf_signal_source: c.CFRunLoop.CFRunLoopSourceRef, + + }; +}; diff --git a/std/event/loop/linux.zig b/std/event/loop/linux.zig new file mode 100644 index 000000000000..ec3834e7762f --- /dev/null +++ b/std/event/loop/linux.zig @@ -0,0 +1,450 @@ +const std = @import("../../index.zig"); +const builtin = @import("builtin"); +const assert = std.debug.assert; +const mem = std.mem; +const AtomicRmwOp = builtin.AtomicRmwOp; +const AtomicOrder = builtin.AtomicOrder; +const fs = std.event.fs; +const os = std.os; +const posix = os.posix; + +pub const Loop = struct { + allocator: *mem.Allocator, + next_tick_queue: std.atomic.Queue(promise), + os_data: OsData, + final_resume_node: ResumeNode, + pending_event_count: usize, + extra_threads: []*os.Thread, + + // pre-allocated eventfds. all permanently active. + // this is how we send promises to be resumed on other threads. + available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd), + eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node, + + pub const OsEventHandle = i32; + // Type for EventFlags based off of u32 + // TODO create some sort of SET type?? + pub const EventFlagType = u32; + pub const EventFlags = struct { + pub const READ = EventFlagType(1<<0); + pub const WRITE = EventFlagType(1<<1); + pub const EXCEPT = EventFlagType(1<<2); + }; + + pub const NextTickNode = std.atomic.Queue(promise).Node; + + pub const ResumeNode = struct { + id: Id, + handle: promise, + + pub const Id = enum { + Basic, + Stop, + EventFd, + }; + + pub const EventFd = struct { + base: ResumeNode, + epoll_op: u32, + eventfd: i32, + }; + }; + + /// After initialization, call run(). + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + pub fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void { + return self.initInternal(allocator, 1); + } + + /// The allocator must be thread-safe because we use it for multiplexing + /// coroutines onto kernel threads. + /// After initialization, call run(). + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + pub fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void { + const core_count = try os.cpuCount(allocator); + return self.initInternal(allocator, core_count); + } + + /// Thread count is the total thread count. The thread pool size will be + /// max(thread_count - 1, 0) + fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void { + self.* = Loop{ + .pending_event_count = 1, + .allocator = allocator, + .os_data = undefined, + .next_tick_queue = std.atomic.Queue(promise).init(), + .extra_threads = undefined, + .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(), + .eventfd_resume_nodes = undefined, + .final_resume_node = ResumeNode{ + .id = ResumeNode.Id.Stop, + .handle = undefined, + }, + }; + const extra_thread_count = thread_count - 1; + self.eventfd_resume_nodes = try self.allocator.alloc( + std.atomic.Stack(ResumeNode.EventFd).Node, + extra_thread_count, + ); + errdefer self.allocator.free(self.eventfd_resume_nodes); + + self.extra_threads = try self.allocator.alloc(*os.Thread, extra_thread_count); + errdefer self.allocator.free(self.extra_threads); + + try self.initOsData(extra_thread_count); + errdefer self.deinitOsData(); + } + + pub fn deinit(self: *Loop) void { + self.deinitOsData(); + self.allocator.free(self.extra_threads); + } + + const InitOsDataError = os.LinuxEpollCreateError || mem.Allocator.Error || os.LinuxEventFdError || + os.SpawnThreadError || os.LinuxEpollCtlError; + + const wakeup_bytes = []u8{0x1} ** 8; + + fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void { + self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); + self.os_data.fs_queue_len = 0; + // we need another thread for the file system because Linux does not have an async + // file system I/O API. + self.os_data.fs_end_request = fs.RequestNode{ + .prev = undefined, + .next = undefined, + .data = fs.Request{ + .msg = fs.Request.Msg.End, + .finish = fs.Request.Finish.NoAction, + }, + }; + + errdefer { + while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); + } + for (self.eventfd_resume_nodes) |*eventfd_node| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = ResumeNode.Id.EventFd, + .handle = undefined, + }, + .eventfd = try os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK), + .epoll_op = posix.EPOLL_CTL_ADD, + }, + .next = undefined, + }; + self.available_eventfd_resume_nodes.push(eventfd_node); + } + + self.os_data.epollfd = try os.linuxEpollCreate(posix.EPOLL_CLOEXEC); + errdefer os.close(self.os_data.epollfd); + + self.os_data.final_eventfd = try os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK); + errdefer os.close(self.os_data.final_eventfd); + + self.os_data.final_eventfd_event = posix.epoll_event{ + .events = posix.EPOLLIN, + .data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) }, + }; + try os.linuxEpollCtl( + self.os_data.epollfd, + posix.EPOLL_CTL_ADD, + self.os_data.final_eventfd, + &self.os_data.final_eventfd_event, + ); + + self.os_data.fs_thread = try os.spawnThread(self, posixFsRun); + errdefer { + self.posixFsRequest(&self.os_data.fs_end_request); + self.os_data.fs_thread.wait(); + } + + var extra_thread_index: usize = 0; + errdefer { + // writing 8 bytes to an eventfd cannot fail + os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + while (extra_thread_index != 0) { + extra_thread_index -= 1; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); + } + } + + fn deinitOsData(self: *Loop) void { + os.close(self.os_data.final_eventfd); + while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); + os.close(self.os_data.epollfd); + self.allocator.free(self.eventfd_resume_nodes); + } + + /// resume_node must live longer than the promise that it holds a reference to. + /// flags must contain EPOLLET + pub fn addEvHandle(self: *Loop, handle: OsEventHandle, resume_node: *ResumeNode, flags: EventFlagType) !void { + self.beginOneEvent(); + errdefer self.finishOneEvent(); + // + // flags must contain EPOLLET + var ev_flags = posix.EPOLLET; + if (flags & EventFlags.READ != 0) + ev_flags |= posix.EPOLLIN; + if (flags & EventFlags.WRITE != 0) + ev_flags |= posix.EPOLLOUT; + if (flags & EventFlags.EXCEPT != 0) + ev_flags |= posix.EPOLLERR; + + try self.linuxModFd( + handle, + posix.EPOLL_CTL_ADD, + ev_flags, + resume_node, + ); + } + + fn linuxModFd(self: *Loop, handle: OsEventHandle, op: u32, flags: u32, resume_node: *ResumeNode) !void { + assert(flags & posix.EPOLLET == posix.EPOLLET); + var ev = os.linux.epoll_event{ + .events = flags, + .data = os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) }, + }; + try os.linuxEpollCtl(self.os_data.epollfd, op, handle, &ev); + } + + pub fn removeEvHandle(self: *Loop, handle: OsEventHandle) void { + self.linuxRemoveFdNoCounter(handle); + self.finishOneEvent(); + } + + fn linuxRemoveFdNoCounter(self: *Loop, handle: OsEventHandle) void { + os.linuxEpollCtl(self.os_data.epollfd, os.linux.EPOLL_CTL_DEL, handle, undefined) catch {}; + } + + pub async fn waitEvHandle(self: *Loop, handle: OsEventHandle, flags: u32) !void { + defer self.removeEvHandle(handle); + suspend { + // TODO explicitly put this memory in the coroutine frame #1194 + var resume_node = ResumeNode{ + .id = ResumeNode.Id.Basic, + .handle = @handle(), + }; + try self.addEvHandle(handle, &resume_node, flags); + } + } + + fn dispatch(self: *Loop) void { + while (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| { + const next_tick_node = self.next_tick_queue.get() orelse { + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + const eventfd_node = &resume_stack_node.data; + eventfd_node.base.handle = next_tick_node.data; + // the pending count is already accounted for + const epoll_events = posix.EPOLLONESHOT | os.linux.EPOLLIN | os.linux.EPOLLOUT | + os.linux.EPOLLET; + self.linuxModFd( + eventfd_node.eventfd, + eventfd_node.epoll_op, + epoll_events, + &eventfd_node.base, + ) catch { + self.next_tick_queue.unget(next_tick_node); + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + } + } + + /// Bring your own linked list node. This means it can't fail. + pub fn onNextTick(self: *Loop, node: *NextTickNode) void { + self.beginOneEvent(); // finished in dispatch() + self.next_tick_queue.put(node); + self.dispatch(); + } + + pub fn cancelOnNextTick(self: *Loop, node: *NextTickNode) void { + if (self.next_tick_queue.remove(node)) { + self.finishOneEvent(); + } + } + + pub fn run(self: *Loop) void { + self.finishOneEvent(); // the reference we start with + + self.workerRun(); + + self.os_data.fs_thread.wait(); + + for (self.extra_threads) |extra_thread| { + extra_thread.wait(); + } + } + + /// This is equivalent to an async call, except instead of beginning execution of the async function, + /// it immediately returns to the caller, and the async function is queued in the event loop. It still + /// returns a promise to be awaited. + pub fn call(self: *Loop, comptime func: var, args: ...) !(promise->@typeOf(func).ReturnType) { + const S = struct { + async fn asyncFunc(loop: *Loop, handle: *promise->@typeOf(func).ReturnType, args2: ...) @typeOf(func).ReturnType { + suspend { + handle.* = @handle(); + var my_tick_node = Loop.NextTickNode{ + .prev = undefined, + .next = undefined, + .data = @handle(), + }; + loop.onNextTick(&my_tick_node); + } + // TODO guaranteed allocation elision for await in same func as async + return await (async func(args2) catch unreachable); + } + }; + var handle: promise->@typeOf(func).ReturnType = undefined; + return async S.asyncFunc(self, &handle, args); + } + + /// Awaiting a yield lets the event loop run, starting any unstarted async operations. + /// Note that async operations automatically start when a function yields for any other reason, + /// for example, when async I/O is performed. This function is intended to be used only when + /// CPU bound tasks would be waiting in the event loop but never get started because no async I/O + /// is performed. + pub async fn yield(self: *Loop) void { + suspend { + var my_tick_node = Loop.NextTickNode{ + .prev = undefined, + .next = undefined, + .data = @handle(), + }; + self.onNextTick(&my_tick_node); + } + } + + /// call finishOneEvent when done + pub fn beginOneEvent(self: *Loop) void { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + } + + pub fn finishOneEvent(self: *Loop) void { + const prev = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + if (prev == 1) { + // cause all the threads to stop + self.posixFsRequest(&self.os_data.fs_end_request); + // writing 8 bytes to an eventfd cannot fail + os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + return; + } + } + + fn workerRun(self: *Loop) void { + while (true) { + while (true) { + const next_tick_node = self.next_tick_queue.get() orelse break; + self.dispatch(); + resume next_tick_node.data; + self.finishOneEvent(); + } + + // only process 1 event so we don't steal from other threads + var events: [1]os.linux.epoll_event = undefined; + const count = os.linuxEpollWait(self.os_data.epollfd, events[0..], -1); + for (events[0..count]) |ev| { + const resume_node = @intToPtr(*ResumeNode, ev.data.ptr); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + ResumeNode.Id.Basic => {}, + ResumeNode.Id.Stop => return, + ResumeNode.Id.EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + event_fd_node.epoll_op = posix.EPOLL_CTL_MOD; + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + if (resume_node_id == ResumeNode.Id.EventFd) { + self.finishOneEvent(); + } + } + } + } + + pub fn posixFsRequest(self: *Loop, request_node: *fs.RequestNode) void { + self.beginOneEvent(); // finished in posixFsRun after processing the msg + self.os_data.fs_queue.put(request_node); + _ = @atomicRmw(i32, &self.os_data.fs_queue_len, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); // let this wrap + const rc = os.linux.futex_wake(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAKE, 1); + switch (os.linux.getErrno(rc)) { + 0 => {}, + posix.EINVAL => unreachable, + else => unreachable, + } + } + + fn posixFsRun(self: *Loop) void { + var processed_count: i32 = 0; // we let this wrap + while (true) { + while (self.os_data.fs_queue.get()) |node| { + processed_count +%= 1; + switch (node.data.msg) { + @TagType(fs.Request.Msg).End => return, + @TagType(fs.Request.Msg).PWriteV => |*msg| { + msg.result = os.posix_pwritev(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset); + }, + @TagType(fs.Request.Msg).PReadV => |*msg| { + msg.result = os.posix_preadv(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset); + }, + @TagType(fs.Request.Msg).OpenRead => |*msg| { + const flags = posix.O_LARGEFILE | posix.O_RDONLY | posix.O_CLOEXEC; + msg.result = os.posixOpenC(msg.path.ptr, flags, 0); + }, + @TagType(fs.Request.Msg).OpenRW => |*msg| { + const flags = posix.O_LARGEFILE | posix.O_RDWR | posix.O_CREAT | posix.O_CLOEXEC; + msg.result = os.posixOpenC(msg.path.ptr, flags, msg.mode); + }, + @TagType(fs.Request.Msg).Close => |*msg| os.close(msg.fd), + @TagType(fs.Request.Msg).WriteFile => |*msg| blk: { + const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT | + posix.O_CLOEXEC | posix.O_TRUNC; + const fd = os.posixOpenC(msg.path.ptr, flags, msg.mode) catch |err| { + msg.result = err; + break :blk; + }; + defer os.close(fd); + msg.result = os.posixWrite(fd, msg.contents); + }, + } + switch (node.data.finish) { + @TagType(fs.Request.Finish).TickNode => |*tick_node| self.onNextTick(tick_node), + @TagType(fs.Request.Finish).DeallocCloseOperation => |close_op| { + self.allocator.destroy(close_op); + }, + @TagType(fs.Request.Finish).NoAction => {}, + } + self.finishOneEvent(); + } + const rc = os.linux.futex_wait(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAIT, processed_count, null); + switch (os.linux.getErrno(rc)) { + 0 => continue, + posix.EINTR => continue, + posix.EAGAIN => continue, + else => unreachable, + } + } + } + + const OsData = struct { + epollfd: i32, + final_eventfd: i32, + final_eventfd_event: os.linux.epoll_event, + fs_thread: *os.Thread, + fs_queue_len: i32, // we let this wrap + fs_queue: std.atomic.Queue(fs.Request), + fs_end_request: fs.RequestNode, + }; +}; diff --git a/std/event/loop/windows.zig b/std/event/loop/windows.zig new file mode 100644 index 000000000000..62aa84247e9b --- /dev/null +++ b/std/event/loop/windows.zig @@ -0,0 +1,800 @@ +const std = @import("../../index.zig"); +const builtin = @import("builtin"); +const assert = std.debug.assert; +const mem = std.mem; +const AtomicRmwOp = builtin.AtomicRmwOp; +const AtomicOrder = builtin.AtomicOrder; +const fs = std.event.fs; +const os = std.os; +const posix = os.posix; +const windows = os.windows; + +pub const Loop = struct { + allocator: *mem.Allocator, + next_tick_queue: std.atomic.Queue(promise), + os_data: OsData, + final_resume_node: ResumeNode, + pending_event_count: usize, + extra_threads: []*os.Thread, + + // pre-allocated eventfds. all permanently active. + // this is how we send promises to be resumed on other threads. + available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd), + eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node, + + pub const OsEventHandle = switch (builtin.os) { + builtin.Os.windows => windows.HANDLE, + else => i32, //Assume posixesque + }; + // Type for EventFlags based off of u32 + // TODO create some sort of SET type?? + pub const EventFlagType = u32; + pub const EventFlags = struct { + pub const READ = EventFlagType(1<<0); + pub const WRITE = EventFlagType(1<<1); + pub const EXCEPT = EventFlagType(1<<2); + }; + + pub const NextTickNode = std.atomic.Queue(promise).Node; + + pub const ResumeNode = struct { + id: Id, + handle: promise, + + pub const Id = enum { + Basic, + Stop, + EventFd, + }; + + pub const EventFd = switch (builtin.os) { + builtin.Os.macosx => MacOsEventFd, + builtin.Os.linux => struct { + base: ResumeNode, + epoll_op: u32, + eventfd: i32, + }, + builtin.Os.windows => struct { + base: ResumeNode, + completion_key: usize, + }, + else => @compileError("unsupported OS"), + }; + + const MacOsEventFd = struct { + base: ResumeNode, + kevent: posix.Kevent, + }; + }; + + /// After initialization, call run(). + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + pub fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void { + return self.initInternal(allocator, 1); + } + + /// The allocator must be thread-safe because we use it for multiplexing + /// coroutines onto kernel threads. + /// After initialization, call run(). + /// TODO copy elision / named return values so that the threads referencing *Loop + /// have the correct pointer value. + pub fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void { + const core_count = try os.cpuCount(allocator); + return self.initInternal(allocator, core_count); + } + + /// Thread count is the total thread count. The thread pool size will be + /// max(thread_count - 1, 0) + fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void { + self.* = Loop{ + .pending_event_count = 1, + .allocator = allocator, + .os_data = undefined, + .next_tick_queue = std.atomic.Queue(promise).init(), + .extra_threads = undefined, + .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(), + .eventfd_resume_nodes = undefined, + .final_resume_node = ResumeNode{ + .id = ResumeNode.Id.Stop, + .handle = undefined, + }, + }; + const extra_thread_count = thread_count - 1; + self.eventfd_resume_nodes = try self.allocator.alloc( + std.atomic.Stack(ResumeNode.EventFd).Node, + extra_thread_count, + ); + errdefer self.allocator.free(self.eventfd_resume_nodes); + + self.extra_threads = try self.allocator.alloc(*os.Thread, extra_thread_count); + errdefer self.allocator.free(self.extra_threads); + + try self.initOsData(extra_thread_count); + errdefer self.deinitOsData(); + } + + pub fn deinit(self: *Loop) void { + self.deinitOsData(); + self.allocator.free(self.extra_threads); + } + + const InitOsDataError = os.LinuxEpollCreateError || mem.Allocator.Error || os.LinuxEventFdError || + os.SpawnThreadError || os.LinuxEpollCtlError || os.BsdKEventError || + os.WindowsCreateIoCompletionPortError; + + const wakeup_bytes = []u8{0x1} ** 8; + + fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void { + switch (builtin.os) { + builtin.Os.linux => { + self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); + self.os_data.fs_queue_len = 0; + // we need another thread for the file system because Linux does not have an async + // file system I/O API. + self.os_data.fs_end_request = fs.RequestNode{ + .prev = undefined, + .next = undefined, + .data = fs.Request{ + .msg = fs.Request.Msg.End, + .finish = fs.Request.Finish.NoAction, + }, + }; + + errdefer { + while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); + } + for (self.eventfd_resume_nodes) |*eventfd_node| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = ResumeNode.Id.EventFd, + .handle = undefined, + }, + .eventfd = try os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK), + .epoll_op = posix.EPOLL_CTL_ADD, + }, + .next = undefined, + }; + self.available_eventfd_resume_nodes.push(eventfd_node); + } + + self.os_data.epollfd = try os.linuxEpollCreate(posix.EPOLL_CLOEXEC); + errdefer os.close(self.os_data.epollfd); + + self.os_data.final_eventfd = try os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK); + errdefer os.close(self.os_data.final_eventfd); + + self.os_data.final_eventfd_event = posix.epoll_event{ + .events = posix.EPOLLIN, + .data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) }, + }; + try os.linuxEpollCtl( + self.os_data.epollfd, + posix.EPOLL_CTL_ADD, + self.os_data.final_eventfd, + &self.os_data.final_eventfd_event, + ); + + self.os_data.fs_thread = try os.spawnThread(self, posixFsRun); + errdefer { + self.posixFsRequest(&self.os_data.fs_end_request); + self.os_data.fs_thread.wait(); + } + + var extra_thread_index: usize = 0; + errdefer { + // writing 8 bytes to an eventfd cannot fail + os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + while (extra_thread_index != 0) { + extra_thread_index -= 1; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); + } + }, + builtin.Os.macosx => { + self.os_data.kqfd = try os.bsdKQueue(); + errdefer os.close(self.os_data.kqfd); + + self.os_data.fs_kqfd = try os.bsdKQueue(); + errdefer os.close(self.os_data.fs_kqfd); + + self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); + // we need another thread for the file system because Darwin does not have an async + // file system I/O API. + self.os_data.fs_end_request = fs.RequestNode{ + .prev = undefined, + .next = undefined, + .data = fs.Request{ + .msg = fs.Request.Msg.End, + .finish = fs.Request.Finish.NoAction, + }, + }; + + self.os_data.kevents = try self.allocator.alloc(posix.Kevent, extra_thread_count); + errdefer self.allocator.free(self.os_data.kevents); + + const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + + for (self.eventfd_resume_nodes) |*eventfd_node, i| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = ResumeNode.Id.EventFd, + .handle = undefined, + }, + // this one is for sending events + .kevent = posix.Kevent{ + .ident = i, + .filter = posix.EVFILT_USER, + .flags = posix.EV_CLEAR | posix.EV_ADD | posix.EV_DISABLE, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(&eventfd_node.data.base), + }, + }, + .next = undefined, + }; + self.available_eventfd_resume_nodes.push(eventfd_node); + const kevent_array = (*[1]posix.Kevent)(&eventfd_node.data.kevent); + _ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null); + eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE; + eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER; + // this one is for waiting for events + self.os_data.kevents[i] = posix.Kevent{ + .ident = i, + .filter = posix.EVFILT_USER, + .flags = 0, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(&eventfd_node.data.base), + }; + } + + // Pre-add so that we cannot get error.SystemResources + // later when we try to activate it. + self.os_data.final_kevent = posix.Kevent{ + .ident = extra_thread_count, + .filter = posix.EVFILT_USER, + .flags = posix.EV_ADD | posix.EV_DISABLE, + .fflags = 0, + .data = 0, + .udata = @ptrToInt(&self.final_resume_node), + }; + const final_kev_arr = (*[1]posix.Kevent)(&self.os_data.final_kevent); + _ = try os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null); + self.os_data.final_kevent.flags = posix.EV_ENABLE; + self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER; + + self.os_data.fs_kevent_wake = posix.Kevent{ + .ident = extra_thread_count + 1, + .filter = posix.EVFILT_USER, + .flags = posix.EV_ADD, + .fflags = posix.NOTE_TRIGGER, + .data = 0, + .udata = undefined, + }; + + self.os_data.fs_kevent_wait = posix.Kevent{ + .ident = extra_thread_count + 1, + .filter = posix.EVFILT_USER, + .flags = posix.EV_ADD|posix.EV_CLEAR, + .fflags = 0, + .data = 0, + .udata = undefined, + }; + + self.os_data.fs_thread = try os.spawnThread(self, posixFsRun); + errdefer { + self.posixFsRequest(&self.os_data.fs_end_request); + self.os_data.fs_thread.wait(); + } + + var extra_thread_index: usize = 0; + errdefer { + _ = os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable; + while (extra_thread_index != 0) { + extra_thread_index -= 1; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); + } + }, + builtin.Os.windows => { + self.os_data.io_port = try os.windowsCreateIoCompletionPort( + windows.INVALID_HANDLE_VALUE, + null, + undefined, + undefined, + ); + errdefer os.close(self.os_data.io_port); + + for (self.eventfd_resume_nodes) |*eventfd_node, i| { + eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ + .data = ResumeNode.EventFd{ + .base = ResumeNode{ + .id = ResumeNode.Id.EventFd, + .handle = undefined, + }, + // this one is for sending events + .completion_key = @ptrToInt(&eventfd_node.data.base), + }, + .prev = undefined, + .next = undefined, + }; + self.available_eventfd_resume_nodes.push(eventfd_node); + } + + var extra_thread_index: usize = 0; + errdefer { + var i: usize = 0; + while (i < extra_thread_index) : (i += 1) { + while (true) { + const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); + os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; + break; + } + } + while (extra_thread_index != 0) { + extra_thread_index -= 1; + self.extra_threads[extra_thread_index].wait(); + } + } + while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) { + self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun); + } + }, + else => {}, + } + } + + fn deinitOsData(self: *Loop) void { + switch (builtin.os) { + builtin.Os.linux => { + os.close(self.os_data.final_eventfd); + while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); + os.close(self.os_data.epollfd); + self.allocator.free(self.eventfd_resume_nodes); + }, + builtin.Os.macosx => { + self.allocator.free(self.os_data.kevents); + os.close(self.os_data.kqfd); + os.close(self.os_data.fs_kqfd); + }, + builtin.Os.windows => { + os.close(self.os_data.io_port); + }, + else => {}, + } + } + + /// resume_node must live longer than the promise that it holds a reference to. + /// flags must contain EPOLLET + pub fn addEvHandle(self: *Loop, handle: OsEventHandle, resume_node: *ResumeNode, flags: EventFlagType) !void { + self.beginOneEvent(); + errdefer self.finishOneEvent(); + switch (builtin.os) { + builtin.Os.linux => { + // flags must contain EPOLLET + var ev_flags = posix.EPOLLET; + if (flags & EventFlags.READ != 0) + ev_flags |= posix.EPOLLIN; + if (flags & EventFlags.WRITE != 0) + ev_flags |= posix.EPOLLOUT; + if (flags & EventFlags.EXCEPT != 0) + ev_flags |= posix.EPOLLERR; + + try self.linuxModFd( + handle, + posix.EPOLL_CTL_ADD, + ev_flags, + resume_node, + ); + }, + else => { + return os.LinuxEpollCtlError.Unexpected; + } + } + } + + fn linuxModFd(self: *Loop, handle: OsEventHandle, op: u32, flags: u32, resume_node: *ResumeNode) !void { + assert(flags & posix.EPOLLET == posix.EPOLLET); + var ev = os.linux.epoll_event{ + .events = flags, + .data = os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) }, + }; + try os.linuxEpollCtl(self.os_data.epollfd, op, handle, &ev); + } + + pub fn removeEvHandle(self: *Loop, handle: OsEventHandle) void { + switch (builtin.os) { + builtin.Os.linux => { + self.linuxRemoveFdNoCounter(handle); + }, + else => unreachable, + } + self.finishOneEvent(); + } + + fn linuxRemoveFdNoCounter(self: *Loop, handle: OsEventHandle) void { + os.linuxEpollCtl(self.os_data.epollfd, os.linux.EPOLL_CTL_DEL, handle, undefined) catch {}; + } + + pub async fn waitEvHandle(self: *Loop, handle: OsEventHandle, flags: u32) !void { + defer self.removeEvHandle(handle); + suspend { + // TODO explicitly put this memory in the coroutine frame #1194 + var resume_node = ResumeNode{ + .id = ResumeNode.Id.Basic, + .handle = @handle(), + }; + try self.addEvHandle(handle, &resume_node, flags); + } + } + + fn dispatch(self: *Loop) void { + while (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| { + const next_tick_node = self.next_tick_queue.get() orelse { + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + const eventfd_node = &resume_stack_node.data; + eventfd_node.base.handle = next_tick_node.data; + switch (builtin.os) { + builtin.Os.macosx => { + const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent); + const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + _ = os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch { + self.next_tick_queue.unget(next_tick_node); + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + }, + builtin.Os.linux => { + // the pending count is already accounted for + const epoll_events = posix.EPOLLONESHOT | os.linux.EPOLLIN | os.linux.EPOLLOUT | + os.linux.EPOLLET; + self.linuxModFd( + eventfd_node.eventfd, + eventfd_node.epoll_op, + epoll_events, + &eventfd_node.base, + ) catch { + self.next_tick_queue.unget(next_tick_node); + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + }, + builtin.Os.windows => { + // this value is never dereferenced but we need it to be non-null so that + // the consumer code can decide whether to read the completion key. + // it has to do this for normal I/O, so we match that behavior here. + const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); + os.windowsPostQueuedCompletionStatus( + self.os_data.io_port, + undefined, + eventfd_node.completion_key, + overlapped, + ) catch { + self.next_tick_queue.unget(next_tick_node); + self.available_eventfd_resume_nodes.push(resume_stack_node); + return; + }; + }, + else => @compileError("unsupported OS"), + } + } + } + + /// Bring your own linked list node. This means it can't fail. + pub fn onNextTick(self: *Loop, node: *NextTickNode) void { + self.beginOneEvent(); // finished in dispatch() + self.next_tick_queue.put(node); + self.dispatch(); + } + + pub fn cancelOnNextTick(self: *Loop, node: *NextTickNode) void { + if (self.next_tick_queue.remove(node)) { + self.finishOneEvent(); + } + } + + pub fn run(self: *Loop) void { + self.finishOneEvent(); // the reference we start with + + self.workerRun(); + + self.os_data.fs_thread.wait(); + + for (self.extra_threads) |extra_thread| { + extra_thread.wait(); + } + } + + /// This is equivalent to an async call, except instead of beginning execution of the async function, + /// it immediately returns to the caller, and the async function is queued in the event loop. It still + /// returns a promise to be awaited. + pub fn call(self: *Loop, comptime func: var, args: ...) !(promise->@typeOf(func).ReturnType) { + const S = struct { + async fn asyncFunc(loop: *Loop, handle: *promise->@typeOf(func).ReturnType, args2: ...) @typeOf(func).ReturnType { + suspend { + handle.* = @handle(); + var my_tick_node = Loop.NextTickNode{ + .prev = undefined, + .next = undefined, + .data = @handle(), + }; + loop.onNextTick(&my_tick_node); + } + // TODO guaranteed allocation elision for await in same func as async + return await (async func(args2) catch unreachable); + } + }; + var handle: promise->@typeOf(func).ReturnType = undefined; + return async S.asyncFunc(self, &handle, args); + } + + /// Awaiting a yield lets the event loop run, starting any unstarted async operations. + /// Note that async operations automatically start when a function yields for any other reason, + /// for example, when async I/O is performed. This function is intended to be used only when + /// CPU bound tasks would be waiting in the event loop but never get started because no async I/O + /// is performed. + pub async fn yield(self: *Loop) void { + suspend { + var my_tick_node = Loop.NextTickNode{ + .prev = undefined, + .next = undefined, + .data = @handle(), + }; + self.onNextTick(&my_tick_node); + } + } + + /// call finishOneEvent when done + pub fn beginOneEvent(self: *Loop) void { + _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); + } + + pub fn finishOneEvent(self: *Loop) void { + const prev = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); + if (prev == 1) { + // cause all the threads to stop + switch (builtin.os) { + builtin.Os.linux => { + self.posixFsRequest(&self.os_data.fs_end_request); + // writing 8 bytes to an eventfd cannot fail + os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; + return; + }, + builtin.Os.macosx => { + self.posixFsRequest(&self.os_data.fs_end_request); + const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent); + const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + // cannot fail because we already added it and this just enables it + _ = os.bsdKEvent(self.os_data.kqfd, final_kevent, empty_kevs, null) catch unreachable; + return; + }, + builtin.Os.windows => { + var i: usize = 0; + while (i < self.extra_threads.len + 1) : (i += 1) { + while (true) { + const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1); + os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue; + break; + } + } + return; + }, + else => @compileError("unsupported OS"), + } + } + } + + fn workerRun(self: *Loop) void { + while (true) { + while (true) { + const next_tick_node = self.next_tick_queue.get() orelse break; + self.dispatch(); + resume next_tick_node.data; + self.finishOneEvent(); + } + + switch (builtin.os) { + builtin.Os.linux => { + // only process 1 event so we don't steal from other threads + var events: [1]os.linux.epoll_event = undefined; + const count = os.linuxEpollWait(self.os_data.epollfd, events[0..], -1); + for (events[0..count]) |ev| { + const resume_node = @intToPtr(*ResumeNode, ev.data.ptr); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + ResumeNode.Id.Basic => {}, + ResumeNode.Id.Stop => return, + ResumeNode.Id.EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + event_fd_node.epoll_op = posix.EPOLL_CTL_MOD; + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + if (resume_node_id == ResumeNode.Id.EventFd) { + self.finishOneEvent(); + } + } + }, + builtin.Os.macosx => { + var eventlist: [1]posix.Kevent = undefined; + const count = os.bsdKEvent(self.os_data.kqfd, self.os_data.kevents, eventlist[0..], null) catch unreachable; + for (eventlist[0..count]) |ev| { + const resume_node = @intToPtr(*ResumeNode, ev.udata); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + ResumeNode.Id.Basic => {}, + ResumeNode.Id.Stop => return, + ResumeNode.Id.EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + if (resume_node_id == ResumeNode.Id.EventFd) { + self.finishOneEvent(); + } + } + }, + builtin.Os.windows => { + var completion_key: usize = undefined; + while (true) { + var nbytes: windows.DWORD = undefined; + var overlapped: ?*windows.OVERLAPPED = undefined; + switch (os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) { + os.WindowsWaitResult.Aborted => return, + os.WindowsWaitResult.Normal => {}, + } + if (overlapped != null) break; + } + const resume_node = @intToPtr(*ResumeNode, completion_key); + const handle = resume_node.handle; + const resume_node_id = resume_node.id; + switch (resume_node_id) { + ResumeNode.Id.Basic => {}, + ResumeNode.Id.Stop => return, + ResumeNode.Id.EventFd => { + const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node); + const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node); + self.available_eventfd_resume_nodes.push(stack_node); + }, + } + resume handle; + if (resume_node_id == ResumeNode.Id.EventFd) { + self.finishOneEvent(); + } + }, + else => @compileError("unsupported OS"), + } + } + } + + pub fn posixFsRequest(self: *Loop, request_node: *fs.RequestNode) void { + self.beginOneEvent(); // finished in posixFsRun after processing the msg + self.os_data.fs_queue.put(request_node); + switch (builtin.os) { + builtin.Os.macosx => { + const fs_kevs = (*[1]posix.Kevent)(&self.os_data.fs_kevent_wake); + const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + _ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable; + }, + builtin.Os.linux => { + _ = @atomicRmw(i32, &self.os_data.fs_queue_len, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); // let this wrap + const rc = os.linux.futex_wake(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAKE, 1); + switch (os.linux.getErrno(rc)) { + 0 => {}, + posix.EINVAL => unreachable, + else => unreachable, + } + }, + else => @compileError("Unsupported OS"), + } + } + + fn posixFsRun(self: *Loop) void { + var processed_count: i32 = 0; // we let this wrap + while (true) { + while (self.os_data.fs_queue.get()) |node| { + processed_count +%= 1; + switch (node.data.msg) { + @TagType(fs.Request.Msg).End => return, + @TagType(fs.Request.Msg).PWriteV => |*msg| { + msg.result = os.posix_pwritev(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset); + }, + @TagType(fs.Request.Msg).PReadV => |*msg| { + msg.result = os.posix_preadv(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset); + }, + @TagType(fs.Request.Msg).OpenRead => |*msg| { + const flags = posix.O_LARGEFILE | posix.O_RDONLY | posix.O_CLOEXEC; + msg.result = os.posixOpenC(msg.path.ptr, flags, 0); + }, + @TagType(fs.Request.Msg).OpenRW => |*msg| { + const flags = posix.O_LARGEFILE | posix.O_RDWR | posix.O_CREAT | posix.O_CLOEXEC; + msg.result = os.posixOpenC(msg.path.ptr, flags, msg.mode); + }, + @TagType(fs.Request.Msg).Close => |*msg| os.close(msg.fd), + @TagType(fs.Request.Msg).WriteFile => |*msg| blk: { + const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT | + posix.O_CLOEXEC | posix.O_TRUNC; + const fd = os.posixOpenC(msg.path.ptr, flags, msg.mode) catch |err| { + msg.result = err; + break :blk; + }; + defer os.close(fd); + msg.result = os.posixWrite(fd, msg.contents); + }, + } + switch (node.data.finish) { + @TagType(fs.Request.Finish).TickNode => |*tick_node| self.onNextTick(tick_node), + @TagType(fs.Request.Finish).DeallocCloseOperation => |close_op| { + self.allocator.destroy(close_op); + }, + @TagType(fs.Request.Finish).NoAction => {}, + } + self.finishOneEvent(); + } + switch (builtin.os) { + builtin.Os.linux => { + const rc = os.linux.futex_wait(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAIT, processed_count, null); + switch (os.linux.getErrno(rc)) { + 0 => continue, + posix.EINTR => continue, + posix.EAGAIN => continue, + else => unreachable, + } + }, + builtin.Os.macosx => { + const fs_kevs = (*[1]posix.Kevent)(&self.os_data.fs_kevent_wait); + var out_kevs: [1]posix.Kevent = undefined; + _ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, out_kevs[0..], null) catch unreachable; + }, + else => @compileError("Unsupported OS"), + } + } + } + + const OsData = switch (builtin.os) { + builtin.Os.linux => struct { + epollfd: i32, + final_eventfd: i32, + final_eventfd_event: os.linux.epoll_event, + fs_thread: *os.Thread, + fs_queue_len: i32, // we let this wrap + fs_queue: std.atomic.Queue(fs.Request), + fs_end_request: fs.RequestNode, + }, + builtin.Os.macosx => MacOsData, + builtin.Os.windows => struct { + io_port: windows.HANDLE, + extra_thread_count: usize, + }, + else => struct {}, + }; + + const MacOsData = struct { + kqfd: i32, + final_kevent: posix.Kevent, + kevents: []posix.Kevent, + fs_kevent_wake: posix.Kevent, + fs_kevent_wait: posix.Kevent, + fs_thread: *os.Thread, + fs_kqfd: i32, + fs_queue: std.atomic.Queue(fs.Request), + fs_end_request: fs.RequestNode, + }; +}; diff --git a/std/os/index.zig b/std/os/index.zig index cc3d060aed40..ff1d18ecd439 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -2,7 +2,7 @@ const std = @import("../index.zig"); const builtin = @import("builtin"); const Os = builtin.Os; const is_windows = builtin.os == Os.windows; -const is_posix = switch (builtin.os) { +pub const is_posix = switch (builtin.os) { builtin.Os.linux, builtin.Os.macosx => true, else => false, };