diff --git a/ci/x86_64-windows-debug.ps1 b/ci/x86_64-windows-debug.ps1 index 8a5e0b42da8a..81633b18ad40 100644 --- a/ci/x86_64-windows-debug.ps1 +++ b/ci/x86_64-windows-debug.ps1 @@ -102,7 +102,7 @@ Enter-VsDevShell -VsInstallPath "C:\Program Files (x86)\Microsoft Visual Studio\ CheckLastExitCode Write-Output "Build and run behavior tests with msvc..." -& cl.exe -I..\lib test-x86_64-windows-msvc.c compiler_rt-x86_64-windows-msvc.c /W3 /Z7 -link -nologo -debug -subsystem:console kernel32.lib ntdll.lib libcmt.lib +& cl.exe -I..\lib test-x86_64-windows-msvc.c compiler_rt-x86_64-windows-msvc.c /W3 /Z7 -link -nologo -debug -subsystem:console kernel32.lib ntdll.lib libcmt.lib ws2_32.lib CheckLastExitCode & .\test-x86_64-windows-msvc.exe diff --git a/ci/x86_64-windows-release.ps1 b/ci/x86_64-windows-release.ps1 index 77ef64ccadf1..54929d5d6c7e 100644 --- a/ci/x86_64-windows-release.ps1 +++ b/ci/x86_64-windows-release.ps1 @@ -120,7 +120,7 @@ Enter-VsDevShell -VsInstallPath "C:\Program Files (x86)\Microsoft Visual Studio\ CheckLastExitCode Write-Output "Build and run behavior tests with msvc..." -& cl.exe -I..\lib test-x86_64-windows-msvc.c compiler_rt-x86_64-windows-msvc.c /W3 /Z7 -link -nologo -debug -subsystem:console kernel32.lib ntdll.lib libcmt.lib +& cl.exe -I..\lib test-x86_64-windows-msvc.c compiler_rt-x86_64-windows-msvc.c /W3 /Z7 -link -nologo -debug -subsystem:console kernel32.lib ntdll.lib libcmt.lib ws2_32.lib CheckLastExitCode & .\test-x86_64-windows-msvc.exe diff --git a/lib/std/net.zig b/lib/std/net.zig index 9d821c4399cc..5be4abf85374 100644 --- a/lib/std/net.zig +++ b/lib/std/net.zig @@ -1857,21 +1857,10 @@ pub const Stream = struct { } pub fn read(self: Stream, buffer: []u8) ReadError!usize { - if (native_os == .windows) { - return windows.ReadFile(self.handle, buffer, null); - } - return posix.read(self.handle, buffer); } pub fn readv(s: Stream, iovecs: []const posix.iovec) ReadError!usize { - if (native_os == .windows) { - // TODO improve this to use ReadFileScatter - if (iovecs.len == 0) return @as(usize, 0); - const first = iovecs[0]; - return windows.ReadFile(s.handle, first.base[0..first.len], null); - } - return posix.readv(s.handle, iovecs); } @@ -1902,10 +1891,6 @@ pub const Stream = struct { /// file system thread instead of non-blocking. It needs to be reworked to properly /// use non-blocking I/O. pub fn write(self: Stream, buffer: []const u8) WriteError!usize { - if (native_os == .windows) { - return windows.WriteFile(self.handle, buffer, null); - } - return posix.write(self.handle, buffer); } diff --git a/lib/std/net/test.zig b/lib/std/net/test.zig index 88a18094acdb..91f1b26b3e34 100644 --- a/lib/std/net/test.zig +++ b/lib/std/net/test.zig @@ -261,6 +261,63 @@ test "listen on a port, send bytes, receive bytes" { try testing.expectEqualSlices(u8, "Hello world!", buf[0..n]); } +// Test for windows issue where reading a writing from different threads would block one of the threads. +test "listen on a port, send bytes, receive bytes from different threads" { + if (builtin.single_threaded) return error.SkipZigTest; + if (builtin.os.tag == .wasi) return error.SkipZigTest; + + if (builtin.os.tag == .windows) { + _ = try std.os.windows.WSAStartup(2, 2); + } + defer { + if (builtin.os.tag == .windows) { + std.os.windows.WSACleanup() catch unreachable; + } + } + + // Try only the IPv4 variant as some CI builders have no IPv6 localhost + // configured. + const localhost = try net.Address.parseIp("127.0.0.1", 0); + + var server = try localhost.listen(.{}); + defer server.deinit(); + + const S = struct { + fn clientFn(server_address: net.Address) !void { + const socket = try net.tcpConnectToAddress(server_address); + defer socket.close(); + + const t1 = try std.Thread.spawn(.{}, read, .{socket}); + const t2 = try std.Thread.spawn(.{}, write, .{socket}); + t1.join(); + t2.join(); + } + fn read(conn: std.net.Stream) !void { + var buf: [16]u8 = undefined; + const n = try conn.reader().read(&buf); + + try testing.expectEqual(@as(usize, 12), n); + try testing.expectEqualSlices(u8, "Hello world!", buf[0..n]); + } + fn write(conn: std.net.Stream) !void { + const writer = conn.writer(); + try writer.writeAll("Hello world!"); + } + }; + + const t = try std.Thread.spawn(.{}, S.clientFn, .{server.listen_address}); + defer t.join(); + + var client = try server.accept(); + defer client.stream.close(); + var buf: [16]u8 = undefined; + const n = try client.stream.reader().read(&buf); + try client.stream.writer().writeAll(buf[0..n]); + + try testing.expectEqual(@as(usize, 12), n); + try testing.expectEqualSlices(u8, "Hello world!", buf[0..n]); +} + test "listen on an in use port" { if (builtin.os.tag != .linux and comptime !builtin.os.tag.isDarwin() and builtin.os.tag != .windows) { // TODO build abstractions for other operating systems diff --git a/lib/std/os/windows.zig b/lib/std/os/windows.zig index 12f647e2309b..97e7061e47fe 100644 --- a/lib/std/os/windows.zig +++ b/lib/std/os/windows.zig @@ -2894,6 +2894,7 @@ pub const CHAR = u8; pub const UCHAR = u8; pub const FLOAT = f32; pub const HANDLE = *anyopaque; +pub const OBJ_HANDLE_TAGBITS = 0x03; pub const HCRYPTPROV = ULONG_PTR; pub const ATOM = u16; pub const HBRUSH = *opaque {}; diff --git a/lib/std/os/windows/ws2_32.zig b/lib/std/os/windows/ws2_32.zig index e8375dc2c11f..afb077529e2e 100644 --- a/lib/std/os/windows/ws2_32.zig +++ b/lib/std/os/windows/ws2_32.zig @@ -19,7 +19,7 @@ const ULONG = windows.ULONG; const LPARAM = windows.LPARAM; const FARPROC = windows.FARPROC; -pub const SOCKET = *opaque {}; +pub const SOCKET = HANDLE; pub const INVALID_SOCKET = @as(SOCKET, @ptrFromInt(~@as(usize, 0))); pub const GROUP = u32; diff --git a/lib/std/posix.zig b/lib/std/posix.zig index ad65cf205ded..dfd6859e85a7 100644 --- a/lib/std/posix.zig +++ b/lib/std/posix.zig @@ -828,7 +828,29 @@ pub const ReadError = error{ pub fn read(fd: fd_t, buf: []u8) ReadError!usize { if (buf.len == 0) return 0; if (native_os == .windows) { - return windows.ReadFile(fd, buf, null); + if (windowsIsSocketHandle(fd)) { + var buffers = [_]windows.ws2_32.WSABUF{.{ .buf = buf.ptr, .len = @truncate(buf.len) }}; + var flags: u32 = 0; + var nbytes: u32 = 0; + if (windows.ws2_32.WSARecv(fd, &buffers, 1, &nbytes, &flags, null, null) != 0) { + switch (windows.ws2_32.WSAGetLastError()) { + .WSAECONNABORTED => return error.BrokenPipe, + .WSAECONNREFUSED => return error.ConnectionResetByPeer, + .WSAEFAULT => unreachable, // The lpBuffers parameter is not completely contained in a valid part of the user address space. + .WSAEINVAL => unreachable, // The socket has not been bound (for example, with bind). + .WSAENETRESET => return error.BrokenPipe, + .WSAENOTCONN => return error.BrokenPipe, + .WSAENOTSOCK => unreachable, // The descriptor is not a socket. + .WSAESHUTDOWN => return error.BrokenPipe, + .WSAETIMEDOUT => return error.BrokenPipe, + .WSAEWOULDBLOCK => return error.WouldBlock, + else => |err| return windows.unexpectedWSAError(err), + } + } + return nbytes; + } else { + return windows.ReadFile(fd, buf, null); + } } if (native_os == .wasi and !builtin.link_libc) { const iovs = [1]iovec{iovec{ @@ -1237,9 +1259,32 @@ pub const WriteError = error{ pub fn write(fd: fd_t, bytes: []const u8) WriteError!usize { if (bytes.len == 0) return 0; if (native_os == .windows) { - return windows.WriteFile(fd, bytes, null); + if (windowsIsSocketHandle(fd)) { + var buffers = [_]windows.ws2_32.WSABUF{.{ .buf = @constCast(bytes.ptr), .len = @truncate(bytes.len) }}; + const flags: u32 = 0; + var nbytes: u32 = 0; + if (windows.ws2_32.WSASend(fd, &buffers, 1, &nbytes, flags, null, null) != 0) { + switch (windows.ws2_32.WSAGetLastError()) { + .WSAECONNABORTED => return error.BrokenPipe, + .WSAECONNRESET => return error.ConnectionResetByPeer, + .WSAECONNREFUSED => return error.ConnectionResetByPeer, + .WSAEFAULT => unreachable, // The lpBuffers parameter is not completely contained in a valid part of the user address space. + .WSAEINVAL => unreachable, // The socket has not been bound (for example, with bind). + .WSAENETRESET => return error.BrokenPipe, + .WSAENOBUFS => return error.SystemResources, + .WSAENOTCONN => return error.BrokenPipe, + .WSAENOTSOCK => unreachable, // The descriptor is not a socket. + .WSAESHUTDOWN => return error.BrokenPipe, + .WSAETIMEDOUT => return error.BrokenPipe, + .WSAEWOULDBLOCK => return error.WouldBlock, + else => |err| return windows.unexpectedWSAError(err), + } + } + return nbytes; + } else { + return windows.WriteFile(fd, bytes, null); + } } - if (native_os == .wasi and !builtin.link_libc) { const ciovs = [_]iovec_const{iovec_const{ .base = bytes.ptr, @@ -3610,7 +3655,7 @@ pub fn socket(domain: u32, socket_type: u32, protocol: u32) SocketError!socket_t @bitCast(protocol), null, 0, - flags, + flags | windows.ws2_32.WSA_FLAG_OVERLAPPED, ); errdefer windows.closesocket(rc) catch unreachable; if ((socket_type & SOCK.NONBLOCK) != 0) { @@ -3622,7 +3667,7 @@ pub fn socket(domain: u32, socket_type: u32, protocol: u32) SocketError!socket_t } } } - return rc; + return windowsTagHandleAsSocket(rc); } const have_sock_flags = !builtin.target.os.tag.isDarwin() and native_os != .haiku; @@ -3653,6 +3698,19 @@ pub fn socket(domain: u32, socket_type: u32, protocol: u32) SocketError!socket_t } } +const windows_socket_handle_tag = 0x01; +// The standard posix namespace sets the lowest bit of the socket handle to +// distinguish them from other handles. The lowest 2 bits are unused by Windows. +fn windowsTagHandleAsSocket(s: windows.ws2_32.SOCKET) windows.ws2_32.SOCKET { + return @ptrFromInt(@intFromPtr(s) | windows_socket_handle_tag); +} + +/// Return true if the passed in handle is a socket. +/// Only valid for handles created with std.posix +fn windowsIsSocketHandle(h: windows.HANDLE) bool { + return (@intFromPtr(h) & windows.OBJ_HANDLE_TAGBITS) == windows_socket_handle_tag; +} + pub const ShutdownError = error{ ConnectionAborted, @@ -3947,7 +4005,7 @@ pub fn accept( else => |err| return windows.unexpectedWSAError(err), } } else { - break rc; + break windowsTagHandleAsSocket(rc); } } else { switch (errno(rc)) {