diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1c154b2..030b621 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,18 +13,16 @@ jobs: x86_64-linux-musl, aarch64-macos, x86_64-macos, - wasm32-wasi + wasm32-wasi, + x86_64-windows-gnu # Broken but not in any obvious way: # x86-linux-gnu, # x86-linux-musl, - - # Not yet supported: - # i386-windows, - # x86_64-windows-gnu, + # x86-windows, ] runs-on: ${{ matrix.os }} - needs: test + needs: [test-x86_64-linux, test-x86_64-windows] steps: - name: Checkout code uses: actions/checkout@v3 @@ -45,7 +43,7 @@ jobs: - name: test run: nix develop -c zig build --summary all -Dtarget=${{ matrix.target }} - test: + test-x86_64-linux: strategy: matrix: os: [ubuntu-latest] @@ -77,3 +75,26 @@ jobs: # Run a full build to ensure that works - run: nix build + + test-x86_64-windows: + strategy: + matrix: + os: [windows-latest] + runs-on: ${{ matrix.os }} + steps: + - name: Checkout code + uses: actions/checkout@v3 + with: + submodules: recursive + fetch-depth: 0 + + - name: Install zig + uses: goto-bus-stop/setup-zig@v2 + with: + version: 0.12.0-dev.256+8b74eae9c + + - name: test + run: zig build test --summary all + + - name: build all benchmarks and examples + run: zig build -Dexample -Dbench --summary all diff --git a/build.zig b/build.zig index 5631565..cc2d276 100644 --- a/build.zig +++ b/build.zig @@ -80,8 +80,16 @@ pub fn build(b: *std.Build) !void { .target = target, .optimize = optimize, }); - b.installArtifact(static_lib); + static_lib.linkLibC(); + + // Link required libraries if targeting Windows + if (target.getOsTag() == .windows) { + static_lib.linkSystemLibrary("ws2_32"); + static_lib.linkSystemLibrary("mswsock"); + } + + b.installArtifact(static_lib); b.default_step.dependOn(&static_lib.step); const static_binding_test = b.addExecutable(.{ @@ -107,10 +115,7 @@ pub fn build(b: *std.Build) !void { // Dynamic C lib. We only build this if this is the native target so we // can link to libxml2 on our native system. if (target.isNative()) { - const dynamic_lib_name = if (target.isWindows()) - "xev.dll" - else - "xev"; + const dynamic_lib_name = "xev"; const dynamic_lib = b.addSharedLibrary(.{ .name = dynamic_lib_name, @@ -169,6 +174,7 @@ pub fn build(b: *std.Build) !void { b.installFile(file, "share/pkgconfig/libxev.pc"); } + // Benchmarks _ = try benchTargets(b, target, optimize, bench_install, bench_name); diff --git a/examples/million-timers.c b/examples/million-timers.c index f27ce57..8927bc6 100644 --- a/examples/million-timers.c +++ b/examples/million-timers.c @@ -14,11 +14,36 @@ xev_cb_action timer_cb(xev_loop* loop, xev_completion* c, int result, void *user return XEV_DISARM; } +#ifdef _WIN32 +#include +uint64_t hrtime(void) { + static int initialized = 0; + static LARGE_INTEGER start_timestamp; + static uint64_t qpc_tick_duration; + + if (!initialized) { + initialized = 1; + + LARGE_INTEGER qpc_freq; + QueryPerformanceFrequency(&qpc_freq); + qpc_tick_duration = 1e9 / qpc_freq.QuadPart; + + QueryPerformanceCounter(&start_timestamp); + } + + LARGE_INTEGER t; + QueryPerformanceCounter(&t); + t.QuadPart -= start_timestamp.QuadPart; + + return (uint64_t)t.QuadPart * qpc_tick_duration; +} +#else uint64_t hrtime(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return ts.tv_nsec + (ts.tv_sec * 1e9); } +#endif int main(void) { xev_watcher* timers; diff --git a/src/backend/iocp.zig b/src/backend/iocp.zig new file mode 100644 index 0000000..b064876 --- /dev/null +++ b/src/backend/iocp.zig @@ -0,0 +1,2267 @@ +//! Backend to use win32 IOCP. +const std = @import("std"); +const assert = std.debug.assert; +const windows = @import("../windows.zig"); +const queue = @import("../queue.zig"); +const heap = @import("../heap.zig"); +const xev = @import("../main.zig").IOCP; + +const log = std.log.scoped(.libxev_iocp); + +pub const Loop = struct { + const TimerHeap = heap.Intrusive(Timer, void, Timer.less); + + /// The handle to the IO completion port. + iocp_handle: windows.HANDLE = windows.INVALID_HANDLE_VALUE, + + /// The number of active completions. This DOES NOT include completions that are queued in the + /// submissions queue. + active: usize = 0, + + /// Our queue of submissions that we want to enqueue on the next tick. + /// These are NOT started. + submissions: queue.Intrusive(Completion) = .{}, + + /// The queue of cancellation requests. These will point to the completion that we need to + /// cancel. We don't enqueue the exact completion to cancel because it may be in another queue. + cancellations: queue.Intrusive(Completion) = .{}, + + /// Our queue of completed completions where the callback hasn't been called yet, but the + /// "result" field should be set on every completion. This is used to delay completion callbacks + /// until the next tick. + completions: queue.Intrusive(Completion) = .{}, + + /// Our queue of waiting completions + asyncs: queue.Intrusive(Completion) = .{}, + + /// Heap of timers. + timers: TimerHeap = .{ .context = {} }, + + /// Cached time + cached_now: u64, + + /// Duration of a tick of Windows QueryPerformanceCounter. + qpc_duration: u64, + + /// Some internal fields we can pack for better space. + flags: packed struct { + /// Whether we're in a run of not (to prevent nested runs). + in_run: bool = false, + + /// Whether our loop is in a stopped state or not. + stopped: bool = false, + } = .{}, + + /// Initialize a new IOCP-backed event loop. See the Options docs + /// for what options matter for IOCP. + pub fn init(options: xev.Options) !Loop { + _ = options; + + // Get the duration of the QueryPerformanceCounter. + // We should check if the division is lossless, but it returns 10_000_000 on my machine so + // we'll handle that later. + var qpc_duration = 1_000_000_000 / windows.QueryPerformanceFrequency(); + + // This creates a new Completion Port + const handle = try windows.CreateIoCompletionPort(windows.INVALID_HANDLE_VALUE, null, 0, 1); + var res: Loop = .{ + .iocp_handle = handle, + .qpc_duration = qpc_duration, + .cached_now = undefined, + }; + + res.update_now(); + return res; + } + + /// Deinitialize the loop, this closes the handle to the Completion Port. Any events that were + /// unprocessed are lost -- their callbacks will never be called. + pub fn deinit(self: *Loop) void { + windows.CloseHandle(self.iocp_handle); + } + + /// Stop the loop. This can only be called from the main thread. + /// This will stop the loop forever. Future ticks will do nothing. + /// + /// This does NOT stop any completions associated to operations that are in-flight. + pub fn stop(self: *Loop) void { + self.flags.stopped = true; + } + + /// Add a completion to the loop. The completion is not started until the loop is run (`run`) or + /// an explicit submission request is made (`submit`). + pub fn add(self: *Loop, completion: *Completion) void { + // If the completion is a cancel operation, we start it immediately as it will be put in the + // cancellations queue. + if (completion.op == .cancel) { + self.start_completion(completion); + return; + } + + switch (completion.flags.state) { + // The completion is in an adding state already, nothing needs to be done. + .adding => return, + + // The completion is dead, probably because it was canceled. + .dead => {}, + + // If we reach this point, we have a problem... + .active => unreachable, + } + + // We just add the completion to the queue. Failures can happen + // at submission or tick time. + completion.flags.state = .adding; + self.submissions.push(completion); + } + + /// Submit any enqueued completions. This does not fire any callbacks for completed events + /// (success or error). Callbacks are only fired on the next tick. + pub fn submit(self: *Loop) !void { + // Submit all the submissions. We copy the submission queue so that any resubmits don't + // cause an infinite loop. + var queued = self.submissions; + self.submissions = .{}; + + // On error, we have to restore the queue because we may be batching. + errdefer self.submissions = queued; + + while (queued.pop()) |c| { + switch (c.flags.state) { + .adding => self.start_completion(c), + .dead => self.stop_completion(c, null), + .active => std.log.err( + "invalid state in submission queue state={}", + .{c.flags.state}, + ), + } + } + } + + /// Process the cancellations queue. This doesn't call any callbacks but can potentially make + /// system call to cancel active IO. + fn process_cancellations(self: *Loop) void { + while (self.cancellations.pop()) |c| { + const target = c.op.cancel.c; + var cancel_result: CancelError!void = {}; + switch (target.flags.state) { + // If the target is dead already we do nothing. + .dead => {}, + + // If it is in the submission queue, mark them as dead so they will never be + // submitted. + .adding => target.flags.state = .dead, + + // If it is active we need to schedule the deletion. + .active => self.stop_completion(target, &cancel_result), + } + + // We completed the cancellation. + c.result = .{ .cancel = cancel_result }; + self.completions.push(c); + } + } + + /// Run the event loop. See RunMode documentation for details on modes. + /// Once the loop is run, the pointer MUST remain stable. + pub fn run(self: *Loop, mode: xev.RunMode) !void { + switch (mode) { + .no_wait => try self.tick(0), + .once => try self.tick(1), + .until_done => while (!self.done()) try self.tick(1), + } + } + + /// Tick through the event loop once, waiting for at least "wait" completions to be processed by + /// the loop itself. + pub fn tick(self: *Loop, wait: u32) !void { + // If we're stopped then the loop is fully over. + if (self.flags.stopped) return; + + // We can't nest runs. + if (self.flags.in_run) return error.NestedRunsNotAllowed; + self.flags.in_run = true; + defer self.flags.in_run = false; + + // The list of entry that will be filled with a call to GetQueuedCompletionStatusEx. + var entries: [128]windows.OVERLAPPED_ENTRY = undefined; + + var wait_rem = @as(usize, @intCast(wait)); + + // Handle all of our cancellations first because we may be able to stop submissions from + // even happening if its still queued. Plus, cancellations sometimes add more to the + // submission queue. + self.process_cancellations(); + + // Submit pending completions. + try self.submit(); + + // Loop condition is inspired from the kqueue backend. See its documentation for details. + while (true) { + // If we're stopped then the loop is fully over. + if (self.flags.stopped) return; + + // We must update our time no matter what. + self.update_now(); + + const should_continue = (self.active > 0 and (wait == 0 or wait_rem > 0)) or !self.completions.empty(); + if (!should_continue) break; + + // Run our expired timers. + const now_timer: Timer = .{ .next = self.cached_now }; + while (self.timers.peek()) |t| { + if (!Timer.less({}, t, &now_timer)) break; + + // Remove the timer + assert(self.timers.deleteMin().? == t); + + // Mark completion as done + const c = t.c; + c.flags.state = .dead; + + // We mark it as inactive here because if we rearm below the start() function will + // reincrement this. + self.active -= 1; + + // Lower our remaining count since we have processed something. + wait_rem -|= 1; + + // Invoke + const action = c.callback(c.userdata, self, c, .{ .timer = .expiration }); + switch (action) { + .disarm => {}, + .rearm => self.start_completion(c), + } + } + + // Process the completions we already have completed. + while (self.completions.pop()) |c| { + // We store whether this completion was active so we can decrement the active count + // later. + const c_active = c.flags.state == .active; + c.flags.state = .dead; + + // Decrease our waiters because we are definitely processing one. + wait_rem -|= 1; + + // Completion queue items MUST have a result set. + const action = c.callback(c.userdata, self, c, c.result.?); + switch (action) { + .disarm => { + // If we were active, decrement the number of active completions. + if (c_active) self.active -= 1; + }, + + // Only resubmit if we aren't already active + .rearm => if (!c_active) self.submissions.push(c), + } + } + + // Process asyncs + if (!self.asyncs.empty()) { + var asyncs = self.asyncs; + self.asyncs = .{}; + + while (asyncs.pop()) |c| { + const c_wakeup = c.op.async_wait.wakeup.swap(false, .SeqCst); + + // If we aren't waking this one up, requeue + if (!c_wakeup) { + self.asyncs.push(c); + continue; + } + + // We are waking up, mark this as dead and call it. + c.flags.state = .dead; + self.active -= 1; + + // Lower our waiters + wait_rem -|= 1; + + const action = c.callback(c.userdata, self, c, .{ .async_wait = {} }); + switch (action) { + .disarm => {}, + .rearm => self.start_completion(c), + } + } + } + + // If we have processed enough event, we break out of the loop. + if (wait_rem == 0) break; + + // Determine our next timeout based on the timers. + const timeout: ?windows.DWORD = timeout: { + // If we have a timer, we want to set the timeout to our next timer value. If we + // have no timer, we wait forever. + const t = self.timers.peek() orelse break :timeout null; + + // Determin the time in milliseconds. If the cast fails, we fallback to the maximum + // acceptable value. + const ms_now = self.cached_now / std.time.ns_per_ms; + const ms_next = t.next / std.time.ns_per_ms; + const ms = ms_next -| ms_now; + break :timeout std.math.cast(windows.DWORD, ms) orelse windows.INFINITE - 1; + }; + + // Wait for changes IO completions. + const count: u32 = windows.GetQueuedCompletionStatusEx(self.iocp_handle, &entries, timeout, false) catch |err| switch (err) { + // A timeout means that nothing was completed. + error.Timeout => 0, + + else => return err, + }; + + // Go through the entries and perform completions callbacks. + for (entries[0..count]) |entry| { + // We retrieve the Completion from the OVERLAPPED pointer as we know it's a part of + // the Completion struct. + const overlapped_ptr: ?*windows.OVERLAPPED = @as(?*windows.OVERLAPPED, @ptrCast(entry.lpOverlapped)); + if (overlapped_ptr == null) { + // Probably an async wakeup + continue; + } + var completion = @fieldParentPtr(Completion, "overlapped", overlapped_ptr.?); + + wait_rem -|= 1; + + self.active -= 1; + completion.flags.state = .dead; + + const result = completion.perform(); + const action = completion.callback(completion.userdata, self, completion, result); + switch (action) { + .disarm => {}, + .rearm => { + completion.reset(); + self.start_completion(completion); + }, + } + } + + // If we ran through the loop once we break if we don't care. + if (wait == 0) break; + } + } + + /// Returns the "loop" time in milliseconds. The loop time is updated once per loop tick, before + /// IO polling occurs. It remains constant throughout callback execution. + /// + /// You can force an update of the "now" value by calling update_now() at any time from the main + /// thread. + /// + /// QueryPerformanceCounter is used to get the current timestamp. + pub fn now(self: *Loop) i64 { + return @as(i64, @intCast(self.cached_now)); + } + + /// Update the cached time. + pub fn update_now(self: *Loop) void { + // Compute the current timestamp in ms by multiplying the QueryPerfomanceCounter value in + // ticks by the duration of a tick. + self.cached_now = windows.QueryPerformanceCounter() * self.qpc_duration; + } + + /// Add a timer to the loop. The timer will execute in "next_ms". This is oneshot: the timer + /// will not repeat. To repeat a timer, either schedule another in your callback or return rearm + /// from the callback. + pub fn timer( + self: *Loop, + c: *Completion, + next_ms: u64, + userdata: ?*anyopaque, + comptime cb: xev.Callback, + ) void { + c.* = .{ + .op = .{ + .timer = .{ + .next = self.timer_next(next_ms), + }, + }, + .userdata = userdata, + .callback = cb, + }; + + self.add(c); + } + + /// see io_uring.timer_reset for docs. + pub fn timer_reset( + self: *Loop, + c: *Completion, + c_cancel: *Completion, + next_ms: u64, + userdata: ?*anyopaque, + comptime cb: xev.Callback, + ) void { + switch (c.flags.state) { + .dead => { + self.timer(c, next_ms, userdata, cb); + return; + }, + + // Adding state we can just modify the metadata and return since the timer isn't in the + // heap yet. + .adding => { + c.op.timer.next = self.timer_next(next_ms); + c.userdata = userdata; + c.callback = cb; + }, + + .active => { + // Update the reset time for the timer to the desired time along with all the + // callbacks. + c.op.timer.reset = self.timer_next(next_ms); + c.userdata = userdata; + c.callback = cb; + + // If the cancellation is active, we assume its for this timer. + if (c_cancel.state() == .active) return; + assert(c_cancel.state() == .dead and c.state() == .active); + c_cancel.* = .{ .op = .{ .cancel = .{ .c = c } } }; + self.add(c_cancel); + }, + } + } + + // Get the absolute timestamp corresponding to the given "next_ms". + pub fn timer_next(self: *Loop, next_ms: u64) u64 { + return self.cached_now + next_ms * std.time.ns_per_ms; + } + + pub fn done(self: *Loop) bool { + return self.flags.stopped or (self.active == 0 and + self.submissions.empty() and + self.completions.empty()); + } + + // Start the completion. + fn start_completion(self: *Loop, completion: *Completion) void { + const StartAction = union(enum) { + // We successfully submitted the operation. + submitted: void, + + // We are a timer. + timer: void, + + // We are a cancellation. + cancel: void, + + // We are an async wait + async_wait: void, + + // We have a result code from making a system call now. + result: Result, + }; + + const action: StartAction = switch (completion.op) { + .noop => { + completion.flags.state = .dead; + return; + }, + + .accept => |*v| action: { + if (v.internal_accept_socket == null) { + var addr: std.os.sockaddr.storage = undefined; + var addr_len: i32 = @sizeOf(std.os.sockaddr.storage); + + std.debug.assert(windows.ws2_32.getsockname(asSocket(v.socket), @as(*std.os.sockaddr, @ptrCast(&addr)), &addr_len) == 0); + + var socket_type: i32 = 0; + var socket_type_bytes = std.mem.asBytes(&socket_type); + var opt_len: i32 = @as(i32, @intCast(socket_type_bytes.len)); + std.debug.assert(windows.ws2_32.getsockopt(asSocket(v.socket), std.os.SOL.SOCKET, std.os.SO.TYPE, socket_type_bytes, &opt_len) == 0); + + v.internal_accept_socket = windows.WSASocketW(addr.family, socket_type, 0, null, 0, windows.ws2_32.WSA_FLAG_OVERLAPPED) catch |err| { + break :action .{ .result = .{ .accept = err } }; + }; + } + + self.associate_fd(completion.handle().?) catch unreachable; + + var discard: u32 = undefined; + const result = windows.ws2_32.AcceptEx( + asSocket(v.socket), + asSocket(v.internal_accept_socket.?), + &v.storage, + 0, + 0, + @as(u32, @intCast(@sizeOf(std.os.sockaddr.storage))), + &discard, + &completion.overlapped, + ); + if (result != windows.TRUE) { + const err = windows.ws2_32.WSAGetLastError(); + switch (err) { + windows.ws2_32.WinsockError.WSA_IO_PENDING => break :action .{ .submitted = {} }, + else => { + windows.CloseHandle(v.internal_accept_socket.?); + break :action .{ .result = .{ .accept = windows.unexpectedWSAError(err) } }; + }, + } + } + + break :action .{ .submitted = {} }; + }, + + .close => |v| .{ .result = .{ .close = windows.CloseHandle(v.fd) } }, + + .connect => |*v| action: { + const result = windows.ws2_32.connect(asSocket(v.socket), &v.addr.any, @as(i32, @intCast(v.addr.getOsSockLen()))); + if (result != 0) { + const err = windows.ws2_32.WSAGetLastError(); + break :action switch (err) { + else => .{ .result = .{ .connect = windows.unexpectedWSAError(err) } }, + }; + } + break :action .{ .result = .{ .connect = {} } }; + }, + + .read => |*v| action: { + self.associate_fd(completion.handle().?) catch unreachable; + var buffer: []u8 = if (v.buffer == .slice) v.buffer.slice else &v.buffer.array; + break :action if (windows.exp.ReadFile(v.fd, buffer, &completion.overlapped)) |_| + .{ + .submitted = {}, + } + else |err| + .{ + .result = .{ .read = err }, + }; + }, + + .pread => |*v| action: { + self.associate_fd(completion.handle().?) catch unreachable; + var buffer: []u8 = if (v.buffer == .slice) v.buffer.slice else &v.buffer.array; + completion.overlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME.Offset = @intCast(v.offset & 0xFFFF_FFFF_FFFF_FFFF); + completion.overlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME.OffsetHigh = @intCast(v.offset >> 32); + break :action if (windows.exp.ReadFile(v.fd, buffer, &completion.overlapped)) |_| + .{ + .submitted = {}, + } + else |err| + .{ + .result = .{ .pread = err }, + }; + }, + + .shutdown => |*v| .{ .result = .{ .shutdown = std.os.shutdown(asSocket(v.socket), v.how) } }, + + .write => |*v| action: { + self.associate_fd(completion.handle().?) catch unreachable; + var buffer: []const u8 = if (v.buffer == .slice) v.buffer.slice else v.buffer.array.array[0..v.buffer.array.len]; + break :action if (windows.exp.WriteFile(v.fd, buffer, &completion.overlapped)) |_| + .{ + .submitted = {}, + } + else |err| + .{ + .result = .{ .write = err }, + }; + }, + + .pwrite => |*v| action: { + self.associate_fd(completion.handle().?) catch unreachable; + var buffer: []const u8 = if (v.buffer == .slice) v.buffer.slice else v.buffer.array.array[0..v.buffer.array.len]; + completion.overlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME.Offset = @intCast(v.offset & 0xFFFF_FFFF_FFFF_FFFF); + completion.overlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME.OffsetHigh = @intCast(v.offset >> 32); + break :action if (windows.exp.WriteFile(v.fd, buffer, &completion.overlapped)) |_| + .{ + .submitted = {}, + } + else |err| + .{ + .result = .{ .pwrite = err }, + }; + }, + + .send => |*v| action: { + self.associate_fd(completion.handle().?) catch unreachable; + var buffer: []const u8 = if (v.buffer == .slice) v.buffer.slice else v.buffer.array.array[0..v.buffer.array.len]; + v.wsa_buffer = .{ .buf = @constCast(buffer.ptr), .len = @as(u32, @intCast(buffer.len)) }; + const result = windows.ws2_32.WSASend( + asSocket(v.fd), + @as([*]windows.ws2_32.WSABUF, @ptrCast(&v.wsa_buffer)), + 1, + null, + 0, + &completion.overlapped, + null, + ); + if (result != 0) { + const err = windows.ws2_32.WSAGetLastError(); + break :action switch (err) { + windows.ws2_32.WinsockError.WSA_IO_PENDING => .{ .submitted = {} }, + else => .{ .result = .{ .send = windows.unexpectedWSAError(err) } }, + }; + } + break :action .{ .submitted = {} }; + }, + + .recv => |*v| action: { + self.associate_fd(completion.handle().?) catch unreachable; + var buffer: []u8 = if (v.buffer == .slice) v.buffer.slice else &v.buffer.array; + v.wsa_buffer = .{ .buf = buffer.ptr, .len = @as(u32, @intCast(buffer.len)) }; + + var flags: u32 = 0; + + const result = windows.ws2_32.WSARecv( + asSocket(v.fd), + @as([*]windows.ws2_32.WSABUF, @ptrCast(&v.wsa_buffer)), + 1, + null, + &flags, + &completion.overlapped, + null, + ); + if (result != 0) { + const err = windows.ws2_32.WSAGetLastError(); + break :action switch (err) { + windows.ws2_32.WinsockError.WSA_IO_PENDING => .{ .submitted = {} }, + else => .{ .result = .{ .recv = windows.unexpectedWSAError(err) } }, + }; + } + break :action .{ .submitted = {} }; + }, + + .sendto => |*v| action: { + self.associate_fd(completion.handle().?) catch unreachable; + var buffer: []const u8 = if (v.buffer == .slice) v.buffer.slice else v.buffer.array.array[0..v.buffer.array.len]; + v.wsa_buffer = .{ .buf = @constCast(buffer.ptr), .len = @as(u32, @intCast(buffer.len)) }; + const result = windows.ws2_32.WSASendTo( + asSocket(v.fd), + @as([*]windows.ws2_32.WSABUF, @ptrCast(&v.wsa_buffer)), + 1, + null, + 0, + &v.addr.any, + @as(i32, @intCast(v.addr.getOsSockLen())), + &completion.overlapped, + null, + ); + if (result != 0) { + const err = windows.ws2_32.WSAGetLastError(); + break :action switch (err) { + windows.ws2_32.WinsockError.WSA_IO_PENDING => .{ .submitted = {} }, + else => .{ .result = .{ .sendto = windows.unexpectedWSAError(err) } }, + }; + } + break :action .{ .submitted = {} }; + }, + + .recvfrom => |*v| action: { + self.associate_fd(completion.handle().?) catch unreachable; + var buffer: []u8 = if (v.buffer == .slice) v.buffer.slice else &v.buffer.array; + v.wsa_buffer = .{ .buf = buffer.ptr, .len = @as(u32, @intCast(buffer.len)) }; + + var flags: u32 = 0; + + const result = windows.ws2_32.WSARecvFrom( + asSocket(v.fd), + @as([*]windows.ws2_32.WSABUF, @ptrCast(&v.wsa_buffer)), + 1, + null, + &flags, + &v.addr, + @as(*i32, @ptrCast(&v.addr_size)), + &completion.overlapped, + null, + ); + if (result != 0) { + const err = windows.ws2_32.WSAGetLastError(); + break :action switch (err) { + windows.ws2_32.WinsockError.WSA_IO_PENDING => .{ .submitted = {} }, + else => .{ .result = .{ .recvfrom = windows.unexpectedWSAError(err) } }, + }; + } + break :action .{ .submitted = {} }; + }, + + .timer => |*v| action: { + v.c = completion; + self.timers.insert(v); + break :action .{ .timer = {} }; + }, + + .cancel => action: { + self.cancellations.push(completion); + break :action .{ .cancel = {} }; + }, + + .async_wait => action: { + self.asyncs.push(completion); + break :action .{ .async_wait = {} }; + }, + }; + + switch (action) { + .timer, .submitted, .cancel => { + // Increase our active count so we now wait for this. We assume it'll successfully + // queue. If it doesn't we handle that later (see submit). + self.active += 1; + completion.flags.state = .active; + }, + + .async_wait => { + // We are considered an active completion. + self.active += 1; + completion.flags.state = .active; + }, + + // A result is immediately available. Queue the completion to be invoked. + .result => |r| { + completion.result = r; + self.completions.push(completion); + }, + } + } + + /// Stop the completion. Fill `cancel_result` if it is non-null. + fn stop_completion(self: *Loop, completion: *Completion, cancel_result: ?*CancelError!void) void { + if (completion.flags.state == .active and completion.result != null) return; + + // Inspect other operations. WARNING: the state can be anything here so per op be sure to + // check the state flag. + switch (completion.op) { + .timer => |*v| { + if (completion.flags.state == .active) { + // Remove from the heap so it never fires... + self.timers.remove(v); + + // If we have reset AND we got cancellation result, that means that we were + // canceled so that we can update our expiration time. + if (v.reset) |r| { + v.next = r; + v.reset = null; + completion.flags.state = .dead; + self.active -= 1; + self.add(completion); + return; + } + } + + // Add to our completion so we trigger the callback. + completion.result = .{ .timer = .cancel }; + self.completions.push(completion); + + // Note the timers state purposely remains ACTIVE so that + // when we process the completion we decrement the + // active count. + }, + + .accept => |*v| { + if (completion.flags.state == .active) { + const result = windows.kernel32.CancelIoEx(asSocket(v.socket), &completion.overlapped); + cancel_result.?.* = if (result == windows.FALSE) + windows.unexpectedError(windows.kernel32.GetLastError()) + else {}; + } + }, + + inline .read, .pread, .write, .pwrite, .recv, .send, .sendto, .recvfrom => |*v| { + if (completion.flags.state == .active) { + const result = windows.kernel32.CancelIoEx(asSocket(v.fd), &completion.overlapped); + cancel_result.?.* = if (result == windows.FALSE) + windows.unexpectedError(windows.kernel32.GetLastError()) + else {}; + } + }, + + else => @panic("Not implemented"), + } + } + + // Sens an empty Completion token so that the loop wakes up if it is waiting for a completion + // event. + pub fn async_notify(self: *Loop, completion: *Completion) void { + // The completion must be in a waiting state. + assert(completion.op == .async_wait); + + // The completion has been wakeup, this is used to see which completion in the async queue + // needs to be removed. + completion.op.async_wait.wakeup.store(true, .SeqCst); + + const result = windows.kernel32.PostQueuedCompletionStatus( + self.iocp_handle, + 0, + 0, + null, + ); + + // NOTE(Corendos): if something goes wrong, ignore it for the moment. + if (result == windows.FALSE) { + const err = windows.kernel32.GetLastError(); + windows.unexpectedError(err) catch {}; + } + } + + /// Associate a handler to the internal completion port. + /// This has to be done only once per handle so we delegate the responsibility to the caller. + pub fn associate_fd(self: Loop, fd: windows.HANDLE) !void { + if (fd == windows.INVALID_HANDLE_VALUE or self.iocp_handle == windows.INVALID_HANDLE_VALUE) return error.InvalidParameter; + // We ignore the error here because multiple call to CreateIoCompletionPort with a HANDLE + // already registered triggers a INVALID_PARAMETER error and we have no way to see the cause + // of it. + _ = windows.kernel32.CreateIoCompletionPort(fd, self.iocp_handle, 0, 0); + } +}; + +/// Convenience to convert from windows.HANDLE to windows.ws2_32.SOCKET (which are the same thing). +inline fn asSocket(h: windows.HANDLE) windows.ws2_32.SOCKET { + return @as(windows.ws2_32.SOCKET, @ptrCast(h)); +} + +/// A completion is a request to perform some work with the loop. +pub const Completion = struct { + /// Operation to execute. + op: Operation = .{ .noop = {} }, + + /// Userdata and callback for when the completion is finished. + userdata: ?*anyopaque = null, + callback: xev.Callback = xev.noopCallback, + + //--------------------------------------------------------------- + // Internal fields + + /// Intrusive queue field. + next: ?*Completion = null, + + /// Result code of the syscall. Only used internally in certain scenarios, should not be relied + /// upon by program authors. + result: ?Result = null, + + flags: packed struct { + /// Watch state of this completion. We use this to determine whether we're active, adding or + /// dead. This lets us add and abd delete multiple times before a loop tick and handle the + /// state properly. + state: State = .dead, + } = .{}, + + /// Win32 OVERLAPPED struct used for asynchronous IO. Only used internally in certain scenarios. + /// It needs to be there as we rely on @fieldParentPtr to get the completion using a pointer to + /// that field. + overlapped: windows.OVERLAPPED = .{ + .Internal = 0, + .InternalHigh = 0, + .DUMMYUNIONNAME = .{ .Pointer = null }, + .hEvent = null, + }, + + /// Loop associated with this completion. HANDLE are required to be associated with an I/O + /// Completion Port to work properly. + loop: ?*const xev.Loop = null, + + const State = enum(u2) { + /// completion is not part of any loop + dead = 0, + + /// completion is in the submission queue + adding = 1, + + /// completion is submitted successfully + active = 2, + }; + + /// Returns the state of this completion. There are some things to be cautious about when + /// calling this function. + /// + /// First, this is only safe to call from the main thread. This cannot be called from any other + /// thread. + /// + /// Second, if you are using default "undefined" completions, this will NOT return a valid value + /// if you access it. You must zero your completion using ".{}". You only need to zero the + /// completion once. Once the completion is in use, it will always be valid. + /// + /// Third, if you stop the loop (loop.stop()), the completions registered with the loop will NOT + /// be reset to a dead state. + pub fn state(self: Completion) xev.CompletionState { + return switch (self.flags.state) { + .dead => .dead, + .adding, .active => .active, + }; + } + + /// Returns a handle for the current operation if it makes sense. + fn handle(self: Completion) ?windows.HANDLE { + return switch (self.op) { + inline .accept => |*v| v.socket, + inline .read, .pread, .write, .pwrite, .recv, .send, .recvfrom, .sendto => |*v| v.fd, + else => null, + }; + } + + /// Perform the operation associated with this completion. This will perform the full blocking + /// operation for the completion. + pub fn perform(self: *Completion) Result { + return switch (self.op) { + .noop, .close, .connect, .shutdown, .timer, .cancel => { + std.log.warn("perform op={s}", .{@tagName(self.op)}); + unreachable; + }, + + .accept => |*v| r: { + var bytes_transferred: u32 = 0; + var flags: u32 = 0; + const result = windows.ws2_32.WSAGetOverlappedResult(asSocket(v.socket), &self.overlapped, &bytes_transferred, windows.FALSE, &flags); + + if (result != windows.TRUE) { + const err = windows.ws2_32.WSAGetLastError(); + const r = .{ + .accept = switch (err) { + windows.ws2_32.WinsockError.WSA_OPERATION_ABORTED => error.Canceled, + else => windows.unexpectedWSAError(err), + }, + }; + windows.CloseHandle(v.internal_accept_socket.?); + break :r r; + } + + break :r .{ .accept = self.op.accept.internal_accept_socket.? }; + }, + + .read => |*v| r: { + var bytes_transferred: windows.DWORD = 0; + const result = windows.kernel32.GetOverlappedResult(v.fd, &self.overlapped, &bytes_transferred, windows.FALSE); + if (result == windows.FALSE) { + const err = windows.kernel32.GetLastError(); + break :r .{ .read = switch (err) { + windows.Win32Error.OPERATION_ABORTED => error.Canceled, + else => error.Unexpected, + } }; + } + break :r .{ .read = @as(usize, @intCast(bytes_transferred)) }; + }, + + .pread => |*v| r: { + var bytes_transferred: windows.DWORD = 0; + const result = windows.kernel32.GetOverlappedResult(v.fd, &self.overlapped, &bytes_transferred, windows.FALSE); + if (result == windows.FALSE) { + const err = windows.kernel32.GetLastError(); + break :r .{ .read = switch (err) { + windows.Win32Error.OPERATION_ABORTED => error.Canceled, + else => error.Unexpected, + } }; + } + break :r .{ .pread = @as(usize, @intCast(bytes_transferred)) }; + }, + + .write => |*v| r: { + var bytes_transferred: windows.DWORD = 0; + const result = windows.kernel32.GetOverlappedResult(v.fd, &self.overlapped, &bytes_transferred, windows.FALSE); + if (result == windows.FALSE) { + const err = windows.kernel32.GetLastError(); + break :r .{ .write = switch (err) { + windows.Win32Error.OPERATION_ABORTED => error.Canceled, + else => error.Unexpected, + } }; + } + break :r .{ .write = @as(usize, @intCast(bytes_transferred)) }; + }, + + .pwrite => |*v| r: { + var bytes_transferred: windows.DWORD = 0; + const result = windows.kernel32.GetOverlappedResult(v.fd, &self.overlapped, &bytes_transferred, windows.FALSE); + if (result == windows.FALSE) { + const err = windows.kernel32.GetLastError(); + break :r .{ .write = switch (err) { + windows.Win32Error.OPERATION_ABORTED => error.Canceled, + else => error.Unexpected, + } }; + } + break :r .{ .pwrite = @as(usize, @intCast(bytes_transferred)) }; + }, + + .send => |*v| r: { + var bytes_transferred: u32 = 0; + var flags: u32 = 0; + + const result = windows.ws2_32.WSAGetOverlappedResult(asSocket(v.fd), &self.overlapped, &bytes_transferred, windows.FALSE, &flags); + + if (result != windows.TRUE) { + const err = windows.ws2_32.WSAGetLastError(); + break :r .{ + .send = switch (err) { + windows.ws2_32.WinsockError.WSA_OPERATION_ABORTED => error.Canceled, + else => windows.unexpectedWSAError(err), + }, + }; + } + break :r .{ .send = @as(usize, @intCast(bytes_transferred)) }; + }, + + .recv => |*v| r: { + var bytes_transferred: u32 = 0; + var flags: u32 = 0; + + const result = windows.ws2_32.WSAGetOverlappedResult(asSocket(v.fd), &self.overlapped, &bytes_transferred, windows.FALSE, &flags); + + if (result != windows.TRUE) { + const err = windows.ws2_32.WSAGetLastError(); + break :r .{ + .recv = switch (err) { + windows.ws2_32.WinsockError.WSA_OPERATION_ABORTED => error.Canceled, + else => windows.unexpectedWSAError(err), + }, + }; + } + + // NOTE(Corendos): according to Win32 documentation, EOF has to be detected using the socket type. + const socket_type = t: { + var socket_type: windows.DWORD = 0; + var socket_type_bytes = std.mem.asBytes(&socket_type); + var opt_len: i32 = @as(i32, @intCast(socket_type_bytes.len)); + + // Here we assume the call will succeed because the socket should be valid. + std.debug.assert(windows.ws2_32.getsockopt(asSocket(v.fd), std.os.SOL.SOCKET, std.os.SO.TYPE, socket_type_bytes, &opt_len) == 0); + break :t socket_type; + }; + + if (socket_type == std.os.SOCK.STREAM and bytes_transferred == 0) { + break :r .{ .recv = error.EOF }; + } + + break :r .{ .recv = @as(usize, @intCast(bytes_transferred)) }; + }, + + .sendto => |*v| r: { + var bytes_transferred: u32 = 0; + var flags: u32 = 0; + + const result = windows.ws2_32.WSAGetOverlappedResult(asSocket(v.fd), &self.overlapped, &bytes_transferred, windows.FALSE, &flags); + + if (result != windows.TRUE) { + const err = windows.ws2_32.WSAGetLastError(); + break :r .{ + .sendto = switch (err) { + windows.ws2_32.WinsockError.WSA_OPERATION_ABORTED => error.Canceled, + else => windows.unexpectedWSAError(err), + }, + }; + } + break :r .{ .sendto = @as(usize, @intCast(bytes_transferred)) }; + }, + + .recvfrom => |*v| r: { + var bytes_transferred: u32 = 0; + var flags: u32 = 0; + + const result = windows.ws2_32.WSAGetOverlappedResult(asSocket(v.fd), &self.overlapped, &bytes_transferred, windows.FALSE, &flags); + + if (result != windows.TRUE) { + const err = windows.ws2_32.WSAGetLastError(); + break :r .{ + .recvfrom = switch (err) { + windows.ws2_32.WinsockError.WSA_OPERATION_ABORTED => error.Canceled, + else => windows.unexpectedWSAError(err), + }, + }; + } + break :r .{ .recvfrom = @as(usize, @intCast(bytes_transferred)) }; + }, + + .async_wait => .{ .async_wait = {} }, + }; + } + + /// Reset the completion so it can be reused (in case of rearming for example). + pub fn reset(self: *Completion) void { + self.overlapped = .{ + .Internal = 0, + .InternalHigh = 0, + .DUMMYUNIONNAME = .{ .Pointer = null }, + .hEvent = null, + }; + self.result = null; + } +}; + +pub const OperationType = enum { + /// Do nothing. This operation will not be queued and will never + /// have its callback fired. This is NOT equivalent to the io_uring + /// "nop" operation. + noop, + + /// Accept a connection on a socket. + accept, + + /// Close a file descriptor. + close, + + /// Initiate a connection on a socket. + connect, + + /// Read + read, + + /// Pread + pread, + + /// Shutdown all or part of a full-duplex connection. + shutdown, + + /// Write + write, + + /// Pwrite + pwrite, + + /// Send + send, + + /// Recv + recv, + + /// Sendto + sendto, + + /// Recvfrom + recvfrom, + + /// A oneshot or repeating timer. For io_uring, this is implemented + /// using the timeout mechanism. + timer, + + /// Cancel an existing operation. + cancel, + + /// Wait for an async event to be posted. + async_wait, +}; + +/// All the supported operations of this event loop. These are always +/// backend-specific and therefore the structure and types change depending +/// on the underlying system in use. The high level operations are +/// done by initializing the request handles. +pub const Operation = union(OperationType) { + noop: void, + + accept: struct { + socket: windows.HANDLE, + storage: [@sizeOf(std.os.sockaddr.storage)]u8 = undefined, + + internal_accept_socket: ?windows.HANDLE = null, + }, + + close: struct { + fd: windows.HANDLE, + }, + + connect: struct { + socket: windows.HANDLE, + addr: std.net.Address, + }, + + read: struct { + fd: windows.HANDLE, + buffer: ReadBuffer, + }, + + pread: struct { + fd: windows.HANDLE, + buffer: ReadBuffer, + offset: u64, + }, + + shutdown: struct { + socket: windows.HANDLE, + how: std.os.ShutdownHow = .both, + }, + + write: struct { + fd: windows.HANDLE, + buffer: WriteBuffer, + }, + + pwrite: struct { + fd: windows.HANDLE, + buffer: WriteBuffer, + offset: u64, + }, + + send: struct { + fd: windows.HANDLE, + buffer: WriteBuffer, + wsa_buffer: windows.ws2_32.WSABUF = undefined, + }, + + recv: struct { + fd: windows.HANDLE, + buffer: ReadBuffer, + wsa_buffer: windows.ws2_32.WSABUF = undefined, + }, + + sendto: struct { + fd: windows.HANDLE, + buffer: WriteBuffer, + addr: std.net.Address, + wsa_buffer: windows.ws2_32.WSABUF = undefined, + }, + + recvfrom: struct { + fd: windows.HANDLE, + buffer: ReadBuffer, + addr: std.os.sockaddr = undefined, + addr_size: std.os.socklen_t = @sizeOf(std.os.sockaddr), + wsa_buffer: windows.ws2_32.WSABUF = undefined, + }, + + timer: Timer, + + cancel: struct { + c: *Completion, + }, + + async_wait: struct { + wakeup: std.atomic.Atomic(bool) = .{ .value = false }, + }, +}; + +/// The result type based on the operation type. For a callback, the +/// result tag will ALWAYS match the operation tag. +pub const Result = union(OperationType) { + noop: void, + accept: AcceptError!windows.HANDLE, + close: CloseError!void, + connect: ConnectError!void, + read: ReadError!usize, + pread: ReadError!usize, + shutdown: ShutdownError!void, + write: WriteError!usize, + pwrite: WriteError!usize, + send: WriteError!usize, + recv: ReadError!usize, + sendto: WriteError!usize, + recvfrom: ReadError!usize, + timer: TimerError!TimerTrigger, + cancel: CancelError!void, + async_wait: AsyncError!void, +}; + +pub const CancelError = error{ + Unexpected, +}; + +pub const AcceptError = error{ + AddressFamilyNotSupported, + ProcessFdQuotaExceeded, + SystemResources, + ProtocolNotSupported, + Canceled, + Unexpected, +}; + +pub const CloseError = error{ + Unexpected, +}; + +pub const ConnectError = error{ + Canceled, + Unexpected, +}; + +pub const ShutdownError = std.os.ShutdownError || error{ + Unexpected, +}; + +pub const WriteError = windows.WriteFileError || error{ + Canceled, + Unexpected, +}; + +pub const ReadError = windows.ReadFileError || error{ + EOF, + Canceled, + Unexpected, +}; + +pub const AsyncError = error{ + Unexpected, +}; + +pub const TimerError = error{ + Unexpected, +}; + +pub const TimerTrigger = enum { + /// Unused with IOCP + request, + + /// Timer expired. + expiration, + + /// Timer was canceled. + cancel, +}; + +/// ReadBuffer are the various options for reading. +pub const ReadBuffer = union(enum) { + /// Read into this slice. + slice: []u8, + + /// Read into this array, just set this to undefined and it will + /// be populated up to the size of the array. This is an option because + /// the other union members force a specific size anyways so this lets us + /// use the other size in the union to support small reads without worrying + /// about buffer allocation. + /// + /// To know the size read you have to use the return value of the + /// read operations (i.e. recv). + /// + /// Note that the union at the time of this writing could accomodate a + /// much larger fixed size array here but we want to retain flexiblity + /// for future fields. + array: [32]u8, + + // TODO: future will have vectors +}; + +/// WriteBuffer are the various options for writing. +pub const WriteBuffer = union(enum) { + /// Write from this buffer. + slice: []const u8, + + /// Write from this array. See ReadBuffer.array for why we support this. + array: struct { + array: [32]u8, + len: usize, + }, + + // TODO: future will have vectors +}; + +/// Timer that is inserted into the heap. +pub const Timer = struct { + /// The absolute time to fire this timer next. + next: u64, + + /// Only used internally. If this is non-null and timer is + /// CANCELLED, then the timer is rearmed automatically with this + /// as the next time. The callback will not be called on the + /// cancellation. + reset: ?u64 = null, + + /// Internal heap field. + heap: heap.IntrusiveField(Timer) = .{}, + + /// We point back to completion for now. When issue[1] is fixed, + /// we can juse use that from our heap fields. + /// [1]: https://github.com/ziglang/zig/issues/6611 + c: *Completion = undefined, + + fn less(_: void, a: *const Timer, b: *const Timer) bool { + return a.next < b.next; + } +}; + +test "iocp: loop time" { + const testing = std.testing; + + var loop = try Loop.init(.{}); + defer loop.deinit(); + + // should never init zero + var now = loop.now(); + try testing.expect(now > 0); + + while (now == loop.now()) try loop.run(.no_wait); +} +test "iocp: stop" { + const testing = std.testing; + + var loop = try Loop.init(.{}); + defer loop.deinit(); + + // Add the timer + var called = false; + var c1: Completion = undefined; + loop.timer(&c1, 1_000_000, &called, (struct { + fn callback(ud: ?*anyopaque, l: *xev.Loop, _: *xev.Completion, r: xev.Result) xev.CallbackAction { + _ = l; + _ = r; + const b = @as(*bool, @ptrCast(ud.?)); + b.* = true; + return .disarm; + } + }).callback); + + // Tick + try loop.run(.no_wait); + try testing.expect(!called); + + // Stop + loop.stop(); + try loop.run(.until_done); + try testing.expect(!called); +} + +test "iocp: timer" { + const testing = std.testing; + + var loop = try Loop.init(.{}); + defer loop.deinit(); + + // Add the timer + var called = false; + var c1: xev.Completion = undefined; + loop.timer(&c1, 1, &called, (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + _: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = r; + const b = @as(*bool, @ptrCast(ud.?)); + b.* = true; + return .disarm; + } + }).callback); + + // Add another timer + var called2 = false; + var c2: xev.Completion = undefined; + loop.timer(&c2, 100_000, &called2, (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + _: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = r; + const b = @as(*bool, @ptrCast(ud.?)); + b.* = true; + return .disarm; + } + }).callback); + + // State checking + try testing.expect(c1.state() == .active); + try testing.expect(c2.state() == .active); + + // Tick + while (!called) try loop.run(.no_wait); + try testing.expect(called); + try testing.expect(!called2); + + // State checking + try testing.expect(c1.state() == .dead); + try testing.expect(c2.state() == .active); +} + +test "iocp: timer reset" { + const testing = std.testing; + + var loop = try Loop.init(.{}); + defer loop.deinit(); + + const cb: xev.Callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + _: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + const v = @as(*?TimerTrigger, @ptrCast(ud.?)); + v.* = r.timer catch unreachable; + return .disarm; + } + }).callback; + + // Add the timer + var trigger: ?TimerTrigger = null; + var c1: Completion = undefined; + loop.timer(&c1, 100_000, &trigger, cb); + + // We know timer won't be called from the timer test previously. + try loop.run(.no_wait); + try testing.expect(trigger == null); + + // Reset the timer + var c_cancel: Completion = .{}; + loop.timer_reset(&c1, &c_cancel, 1, &trigger, cb); + try testing.expect(c1.state() == .active); + try testing.expect(c_cancel.state() == .active); + + // Run + try loop.run(.until_done); + try testing.expect(trigger.? == .expiration); + try testing.expect(c1.state() == .dead); + try testing.expect(c_cancel.state() == .dead); +} + +test "iocp: timer reset before tick" { + const testing = std.testing; + + var loop = try Loop.init(.{}); + defer loop.deinit(); + + const cb: xev.Callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + _: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + const v = @as(*?TimerTrigger, @ptrCast(ud.?)); + v.* = r.timer catch unreachable; + return .disarm; + } + }).callback; + + // Add the timer + var trigger: ?TimerTrigger = null; + var c1: Completion = undefined; + loop.timer(&c1, 100_000, &trigger, cb); + + // Reset the timer + var c_cancel: Completion = .{}; + loop.timer_reset(&c1, &c_cancel, 1, &trigger, cb); + try testing.expect(c1.state() == .active); + try testing.expect(c_cancel.state() == .dead); + + // Run + try loop.run(.until_done); + try testing.expect(trigger.? == .expiration); + try testing.expect(c1.state() == .dead); + try testing.expect(c_cancel.state() == .dead); +} + +test "iocp: timer reset after trigger" { + const testing = std.testing; + + var loop = try Loop.init(.{}); + defer loop.deinit(); + + const cb: xev.Callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + _: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + const v = @as(*?TimerTrigger, @ptrCast(ud.?)); + v.* = r.timer catch unreachable; + return .disarm; + } + }).callback; + + // Add the timer + var trigger: ?TimerTrigger = null; + var c1: Completion = undefined; + loop.timer(&c1, 1, &trigger, cb); + + // Run the timer + try loop.run(.until_done); + try testing.expect(trigger.? == .expiration); + try testing.expect(c1.state() == .dead); + trigger = null; + + // Reset the timer + var c_cancel: Completion = .{}; + loop.timer_reset(&c1, &c_cancel, 1, &trigger, cb); + try testing.expect(c1.state() == .active); + try testing.expect(c_cancel.state() == .dead); + + // Run + try loop.run(.until_done); + try testing.expect(trigger.? == .expiration); + try testing.expect(c1.state() == .dead); + try testing.expect(c_cancel.state() == .dead); +} + +test "iocp: timer cancellation" { + const testing = std.testing; + + var loop = try Loop.init(.{}); + defer loop.deinit(); + + // Add the timer + var trigger: ?TimerTrigger = null; + var c1: xev.Completion = undefined; + loop.timer(&c1, 100_000, &trigger, (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + _: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + const ptr: *?TimerTrigger = @ptrCast(@alignCast(ud.?)); + ptr.* = r.timer catch unreachable; + return .disarm; + } + }).callback); + + // Tick and verify we're not called. + try loop.run(.no_wait); + try testing.expect(trigger == null); + + // Cancel the timer + var called = false; + var c_cancel: xev.Completion = .{ + .op = .{ + .cancel = .{ + .c = &c1, + }, + }, + + .userdata = &called, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = r.cancel catch unreachable; + const ptr: *bool = @ptrCast(@alignCast(ud.?)); + ptr.* = true; + return .disarm; + } + }).callback, + }; + loop.add(&c_cancel); + + // Tick + try loop.run(.until_done); + try testing.expect(called); + try testing.expect(trigger.? == .cancel); +} + +test "iocp: canceling a completed operation" { + const testing = std.testing; + + var loop = try Loop.init(.{}); + defer loop.deinit(); + + // Add the timer + var trigger: ?TimerTrigger = null; + var c1: xev.Completion = undefined; + loop.timer(&c1, 1, &trigger, (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + _: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + const ptr: *?TimerTrigger = @ptrCast(@alignCast(ud.?)); + ptr.* = r.timer catch unreachable; + return .disarm; + } + }).callback); + + // Tick and verify we're not called. + try loop.run(.until_done); + try testing.expect(trigger.? == .expiration); + + // Cancel the timer + var called = false; + var c_cancel: xev.Completion = .{ + .op = .{ + .cancel = .{ + .c = &c1, + }, + }, + + .userdata = &called, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = r.cancel catch unreachable; + const ptr: *bool = @ptrCast(@alignCast(ud.?)); + ptr.* = true; + return .disarm; + } + }).callback, + }; + loop.add(&c_cancel); + + // Tick + try loop.run(.until_done); + try testing.expect(called); + try testing.expect(trigger.? == .expiration); +} + +test "iocp: noop" { + var loop = try Loop.init(.{}); + defer loop.deinit(); + + var c: Completion = .{}; + loop.add(&c); + + try loop.run(.once); +} + +test "iocp: file IO" { + const testing = std.testing; + + var loop = try Loop.init(.{}); + defer loop.deinit(); + + const utf16_file_name = (try windows.sliceToPrefixedFileW(null, "test_watcher_file")).span(); + + const f_handle = try windows.exp.CreateFile(utf16_file_name, windows.GENERIC_READ | windows.GENERIC_WRITE, 0, null, windows.OPEN_ALWAYS, windows.FILE_FLAG_OVERLAPPED, null); + defer windows.exp.DeleteFile(utf16_file_name) catch {}; + defer windows.CloseHandle(f_handle); + + // Perform a write and then a read + var write_buf = [_]u8{ 1, 1, 2, 3, 5, 8, 13 }; + var c_write: xev.Completion = .{ + .op = .{ + .write = .{ + .fd = f_handle, + .buffer = .{ .slice = &write_buf }, + }, + }, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = ud; + _ = l; + _ = c; + _ = r.write catch unreachable; + return .disarm; + } + }).callback, + }; + loop.add(&c_write); + + // Wait for the write + try loop.run(.until_done); + + // Read + var read_buf: [128]u8 = undefined; + var read_len: usize = 0; + var c_read: xev.Completion = .{ + .op = .{ + .read = .{ + .fd = f_handle, + .buffer = .{ .slice = &read_buf }, + }, + }, + .userdata = &read_len, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = c; + const ptr: *usize = @ptrCast(@alignCast(ud.?)); + ptr.* = r.read catch unreachable; + return .disarm; + } + }).callback, + }; + loop.add(&c_read); + + // Wait for the read + try loop.run(.until_done); + try testing.expectEqualSlices(u8, &write_buf, read_buf[0..read_len]); +} + +test "iocp: file IO with offset" { + const testing = std.testing; + + var loop = try Loop.init(.{}); + defer loop.deinit(); + + const utf16_file_name = (try windows.sliceToPrefixedFileW(null, "test_watcher_file")).span(); + + const f_handle = try windows.exp.CreateFile(utf16_file_name, windows.GENERIC_READ | windows.GENERIC_WRITE, 0, null, windows.OPEN_ALWAYS, windows.FILE_FLAG_OVERLAPPED, null); + defer windows.exp.DeleteFile(utf16_file_name) catch {}; + defer windows.CloseHandle(f_handle); + + // Perform a write and then a read + var write_buf = [_]u8{ 1, 1, 2, 3, 5, 8, 13 }; + var c_write: xev.Completion = .{ + .op = .{ + .pwrite = .{ + .fd = f_handle, + .buffer = .{ .slice = &write_buf }, + .offset = 1, + }, + }, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = ud; + _ = l; + _ = c; + _ = r.pwrite catch unreachable; + return .disarm; + } + }).callback, + }; + loop.add(&c_write); + + // Wait for the write + try loop.run(.until_done); + + // Read + var read_buf: [128]u8 = undefined; + var read_len: usize = 0; + var c_read: xev.Completion = .{ + .op = .{ + .pread = .{ + .fd = f_handle, + .buffer = .{ .slice = &read_buf }, + .offset = 1, + }, + }, + .userdata = &read_len, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = c; + const ptr: *usize = @ptrCast(@alignCast(ud.?)); + ptr.* = r.pread catch unreachable; + return .disarm; + } + }).callback, + }; + loop.add(&c_read); + + // Wait for the read + try loop.run(.until_done); + try testing.expectEqualSlices(u8, &write_buf, read_buf[0..read_len]); +} + +test "iocp: socket accept/connect/send/recv/close" { + const mem = std.mem; + const net = std.net; + const testing = std.testing; + + var loop = try Loop.init(.{}); + defer loop.deinit(); + + // Create a TCP server socket + const address = try net.Address.parseIp4("127.0.0.1", 3131); + const kernel_backlog = 1; + var ln = try windows.WSASocketW(std.os.AF.INET, std.os.SOCK.STREAM, std.os.IPPROTO.TCP, null, 0, windows.ws2_32.WSA_FLAG_OVERLAPPED); + errdefer std.os.closeSocket(ln); + + try std.os.setsockopt(ln, std.os.SOL.SOCKET, std.os.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1))); + try std.os.bind(ln, &address.any, address.getOsSockLen()); + try std.os.listen(ln, kernel_backlog); + + // Create a TCP client socket + var client_conn = try windows.WSASocketW(std.os.AF.INET, std.os.SOCK.STREAM, std.os.IPPROTO.TCP, null, 0, windows.ws2_32.WSA_FLAG_OVERLAPPED); + errdefer std.os.closeSocket(client_conn); + + var server_conn_result: Result = undefined; + var c_accept: Completion = .{ + .op = .{ + .accept = .{ + .socket = ln, + }, + }, + + .userdata = &server_conn_result, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = c; + const conn: *Result = @ptrCast(@alignCast(ud.?)); + conn.* = r; + return .disarm; + } + }).callback, + }; + loop.add(&c_accept); + + // Connect + var connected = false; + var c_connect: xev.Completion = .{ + .op = .{ + .connect = .{ + .socket = client_conn, + .addr = address, + }, + }, + + .userdata = &connected, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = r.connect catch unreachable; + const b = @as(*bool, @ptrCast(ud.?)); + b.* = true; + return .disarm; + } + }).callback, + }; + loop.add(&c_connect); + + // Wait for the connection to be established + try loop.run(.until_done); + //try testing.expect(server_conn > 0); + try testing.expect(connected); + var server_conn = try server_conn_result.accept; + + // Send + var c_send: xev.Completion = .{ + .op = .{ + .send = .{ + .fd = client_conn, + .buffer = .{ .slice = &[_]u8{ 1, 1, 2, 3, 5, 8, 13 } }, + }, + }, + + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = r.send catch unreachable; + _ = ud; + return .disarm; + } + }).callback, + }; + loop.add(&c_send); + + // Receive + var recv_buf: [128]u8 = undefined; + var recv_len: usize = 0; + var c_recv: xev.Completion = .{ + .op = .{ + .recv = .{ + .fd = server_conn, + .buffer = .{ .slice = &recv_buf }, + }, + }, + + .userdata = &recv_len, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = c; + const ptr: *usize = @ptrCast(@alignCast(ud.?)); + ptr.* = r.recv catch unreachable; + return .disarm; + } + }).callback, + }; + loop.add(&c_recv); + + // Wait for the send/receive + try loop.run(.until_done); + try testing.expectEqualSlices(u8, c_send.op.send.buffer.slice, recv_buf[0..recv_len]); + + // Shutdown + var shutdown = false; + var c_client_shutdown: xev.Completion = .{ + .op = .{ + .shutdown = .{ + .socket = client_conn, + }, + }, + + .userdata = &shutdown, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = r.shutdown catch unreachable; + const ptr: *bool = @ptrCast(@alignCast(ud.?)); + ptr.* = true; + return .disarm; + } + }).callback, + }; + loop.add(&c_client_shutdown); + try loop.run(.until_done); + try testing.expect(shutdown); + + // Read should be EOF + var eof: ?bool = null; + c_recv = .{ + .op = .{ + .recv = .{ + .fd = server_conn, + .buffer = .{ .slice = &recv_buf }, + }, + }, + + .userdata = &eof, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = c; + const ptr: *?bool = @ptrCast(@alignCast(ud.?)); + ptr.* = if (r.recv) |_| false else |err| switch (err) { + error.EOF => true, + else => false, + }; + return .disarm; + } + }).callback, + }; + loop.add(&c_recv); + + try loop.run(.until_done); + try testing.expect(eof.? == true); + + // Close + var client_conn_closed: bool = false; + var c_client_close: xev.Completion = .{ + .op = .{ + .close = .{ + .fd = client_conn, + }, + }, + + .userdata = &client_conn_closed, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = r.close catch unreachable; + const ptr: *bool = @ptrCast(@alignCast(ud.?)); + ptr.* = true; + return .disarm; + } + }).callback, + }; + loop.add(&c_client_close); + + var ln_closed: bool = false; + var c_server_close: xev.Completion = .{ + .op = .{ + .close = .{ + .fd = ln, + }, + }, + + .userdata = &ln_closed, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = r.close catch unreachable; + const ptr: *bool = @ptrCast(@alignCast(ud.?)); + ptr.* = true; + return .disarm; + } + }).callback, + }; + loop.add(&c_server_close); + + // Wait for the sockets to close + try loop.run(.until_done); + try testing.expect(ln_closed); + try testing.expect(client_conn_closed); +} + +test "iocp: recv cancellation" { + const mem = std.mem; + const net = std.net; + const testing = std.testing; + + var loop = try Loop.init(.{}); + defer loop.deinit(); + + // Create a TCP server socket + const address = try net.Address.parseIp4("127.0.0.1", 3131); + var socket = try windows.WSASocketW(std.os.AF.INET, std.os.SOCK.DGRAM, std.os.IPPROTO.UDP, null, 0, windows.ws2_32.WSA_FLAG_OVERLAPPED); + errdefer std.os.closeSocket(socket); + + try std.os.setsockopt(socket, std.os.SOL.SOCKET, std.os.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1))); + try std.os.bind(socket, &address.any, address.getOsSockLen()); + + var recv_buf: [128]u8 = undefined; + var recv_result: Result = undefined; + var c_recv: xev.Completion = .{ + .op = .{ + .recv = .{ + .fd = socket, + .buffer = .{ .slice = &recv_buf }, + }, + }, + + .userdata = &recv_result, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = c; + const ptr: *Result = @ptrCast(@alignCast(ud.?)); + ptr.* = r; + return .disarm; + } + }).callback, + }; + loop.add(&c_recv); + + try loop.submit(); + + var c_cancel_recv: xev.Completion = .{ + .op = .{ .cancel = .{ .c = &c_recv } }, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = r.cancel catch unreachable; + _ = ud; + _ = l; + _ = c; + return .disarm; + } + }).callback, + }; + loop.add(&c_cancel_recv); + + // Wait for the send/receive + try loop.run(.until_done); + + try testing.expect(recv_result == .recv); + try testing.expectError(error.Canceled, recv_result.recv); +} + +test "iocp: accept cancellation" { + const mem = std.mem; + const net = std.net; + const testing = std.testing; + + var loop = try Loop.init(.{}); + defer loop.deinit(); + + // Create a TCP server socket + const address = try net.Address.parseIp4("127.0.0.1", 3131); + const kernel_backlog = 1; + var ln = try windows.WSASocketW(std.os.AF.INET, std.os.SOCK.STREAM, std.os.IPPROTO.TCP, null, 0, windows.ws2_32.WSA_FLAG_OVERLAPPED); + errdefer std.os.closeSocket(ln); + + try std.os.setsockopt(ln, std.os.SOL.SOCKET, std.os.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1))); + try std.os.bind(ln, &address.any, address.getOsSockLen()); + try std.os.listen(ln, kernel_backlog); + + var server_conn_result: Result = undefined; + var c_accept: Completion = .{ + .op = .{ + .accept = .{ + .socket = ln, + }, + }, + + .userdata = &server_conn_result, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = c; + const conn: *Result = @ptrCast(@alignCast(ud.?)); + conn.* = r; + return .disarm; + } + }).callback, + }; + loop.add(&c_accept); + + try loop.submit(); + + var c_cancel_accept: xev.Completion = .{ + .op = .{ .cancel = .{ .c = &c_accept } }, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = r.cancel catch unreachable; + _ = ud; + _ = l; + _ = c; + return .disarm; + } + }).callback, + }; + loop.add(&c_cancel_accept); + + // Wait for the send/receive + try loop.run(.until_done); + + try testing.expect(server_conn_result == .accept); + try testing.expectError(error.Canceled, server_conn_result.accept); +} diff --git a/src/bench/ping-pongs.zig b/src/bench/ping-pongs.zig index d1d4572..f72be56 100644 --- a/src/bench/ping-pongs.zig +++ b/src/bench/ping-pongs.zig @@ -179,7 +179,7 @@ const Client = struct { socket: xev.TCP, r: xev.TCP.ShutdownError!void, ) xev.CallbackAction { - _ = r catch unreachable; + _ = r catch {}; const self = self_.?; socket.close(l, c, Client, self, closeCallback); @@ -231,7 +231,7 @@ const Server = struct { /// Must be called with stable self pointer. pub fn start(self: *Server) !void { const addr = try std.net.Address.parseIp4("127.0.0.1", 3131); - const socket = try xev.TCP.init(addr); + var socket = try xev.TCP.init(addr); const c = try self.completion_pool.create(); try socket.bind(addr); @@ -333,7 +333,7 @@ const Server = struct { s: xev.TCP, r: xev.TCP.ShutdownError!void, ) xev.CallbackAction { - _ = r catch unreachable; + _ = r catch {}; const self = self_.?; s.close(l, c, Server, self, closeCallback); diff --git a/src/bench/ping-udp1.zig b/src/bench/ping-udp1.zig index bb9b643..5c2a7cd 100644 --- a/src/bench/ping-udp1.zig +++ b/src/bench/ping-udp1.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const builtin = @import("builtin"); const assert = std.debug.assert; const Allocator = std.mem.Allocator; const Instant = std.time.Instant; @@ -58,6 +59,7 @@ const Pinger = struct { c_write: xev.Completion = undefined, state_read: xev.UDP.State = undefined, state_write: xev.UDP.State = undefined, + op_count: u8 = 0, pub const PING = "PING\n"; @@ -109,6 +111,8 @@ const Pinger = struct { buf: xev.ReadBuffer, r: xev.UDP.ReadError!usize, ) xev.CallbackAction { + _ = c; + _ = socket; const self = self_.?; const n = r catch unreachable; const data = buf.slice[0..n]; @@ -122,12 +126,16 @@ const Pinger = struct { // If we're done then exit if (self.pongs > 500_000) { - socket.close(loop, c, Pinger, self, closeCallback); + self.udp.close(loop, &self.c_read, Pinger, self, closeCallback); return .disarm; } - // Send another ping - self.write(loop); + self.op_count += 1; + if (self.op_count == 2) { + self.op_count = 0; + // Send another ping + self.write(loop); + } } } @@ -135,14 +143,23 @@ const Pinger = struct { } pub fn writeCallback( - _: ?*Pinger, - _: *xev.Loop, + self_: ?*Pinger, + loop: *xev.Loop, _: *xev.Completion, _: *xev.UDP.State, _: xev.UDP, _: xev.WriteBuffer, r: xev.UDP.WriteError!usize, ) xev.CallbackAction { + const self = self_.?; + + self.op_count += 1; + if (self.op_count == 2) { + self.op_count = 0; + // Send another ping + self.write(loop); + } + _ = r catch unreachable; return .disarm; } diff --git a/src/bench/udp_pummel_1v1.zig b/src/bench/udp_pummel_1v1.zig index f55ee63..25c8814 100644 --- a/src/bench/udp_pummel_1v1.zig +++ b/src/bench/udp_pummel_1v1.zig @@ -35,7 +35,7 @@ pub fn run(comptime n_senders: comptime_int, comptime n_receivers: comptime_int) var receivers: [n_receivers]Receiver = undefined; for (&receivers, 0..) |*r, i| { - const addr = try std.net.Address.parseIp4("0.0.0.0", @as(u16, @intCast(base_port + i))); + const addr = try std.net.Address.parseIp4("127.0.0.1", @as(u16, @intCast(base_port + i))); r.* = .{ .udp = try xev.UDP.init(addr) }; try r.udp.bind(addr); r.udp.read( @@ -52,7 +52,7 @@ pub fn run(comptime n_senders: comptime_int, comptime n_receivers: comptime_int) var senders: [n_senders]Sender = undefined; for (&senders, 0..) |*s, i| { const addr = try std.net.Address.parseIp4( - "0.0.0.0", + "127.0.0.1", @as(u16, @intCast(base_port + (i % n_receivers))), ); s.* = .{ .udp = try xev.UDP.init(addr) }; diff --git a/src/debug.zig b/src/debug.zig new file mode 100644 index 0000000..734e4ab --- /dev/null +++ b/src/debug.zig @@ -0,0 +1,58 @@ +const std = @import("std"); + +inline fn indent(depth: usize, writer: anytype) !void { + for (0..depth) |_| try writer.writeByte(' '); +} + +pub fn describe(comptime T: type, writer: anytype, depth: usize) !void { + const type_info = @typeInfo(T); + switch (type_info) { + .Type, + .Void, + .Bool, + .NoReturn, + .Int, + .Float, + .Pointer, + .Array, + .ComptimeFloat, + .ComptimeInt, + .Undefined, + .Null, + .Optional, + .ErrorUnion, + .ErrorSet, + .Enum, + .Fn, + .Opaque, + .Frame, + .AnyFrame, + .Vector, + .EnumLiteral, + => { + try writer.print("{s} ({d} bytes)", .{ @typeName(T), @sizeOf(T) }); + }, + .Union => |s| { + try writer.print("{s} ({d} bytes) {{\n", .{ @typeName(T), @sizeOf(T) }); + inline for (s.fields) |f| { + try indent(depth + 4, writer); + try writer.print("{s}: ", .{f.name}); + try describe(f.type, writer, depth + 4); + try writer.writeByte('\n'); + } + try indent(depth, writer); + try writer.writeByte('}'); + }, + .Struct => |s| { + try writer.print("{s} ({d} bytes) {{\n", .{ @typeName(T), @sizeOf(T) }); + inline for (s.fields) |f| { + try indent(depth + 4, writer); + try writer.print("{s}: ", .{f.name}); + try describe(f.type, writer, depth + 4); + try writer.writeByte('\n'); + } + try indent(depth, writer); + try writer.writeByte('}'); + }, + } +} diff --git a/src/main.zig b/src/main.zig index 7fb168b..5ec41b9 100644 --- a/src/main.zig +++ b/src/main.zig @@ -15,6 +15,7 @@ pub const IO_Uring = Xev(.io_uring, @import("backend/io_uring.zig")); pub const Epoll = Xev(.epoll, @import("backend/epoll.zig")); pub const Kqueue = Xev(.kqueue, @import("backend/kqueue.zig")); pub const WasiPoll = Xev(.wasi_poll, @import("backend/wasi_poll.zig")); +pub const IOCP = Xev(.iocp, @import("backend/iocp.zig")); /// Generic thread pool implementation. pub const ThreadPool = @import("ThreadPool.zig"); @@ -30,6 +31,7 @@ pub const Backend = enum { epoll, kqueue, wasi_poll, + iocp, /// Returns a recommend default backend from inspecting the system. pub fn default() Backend { @@ -37,6 +39,7 @@ pub const Backend = enum { .linux => .io_uring, .macos => .kqueue, .wasi => .wasi_poll, + .windows => .iocp, else => null, }) orelse { @compileLog(builtin.os); @@ -51,6 +54,7 @@ pub const Backend = enum { .epoll => Epoll, .kqueue => Kqueue, .wasi_poll => WasiPoll, + .iocp => IOCP, }; } }; @@ -84,8 +88,8 @@ pub fn Xev(comptime be: Backend, comptime T: type) type { pub const RunMode = loop.RunMode; pub const CallbackAction = loop.CallbackAction; pub const CompletionState = loop.CompletionState; - // - // // Error types + + /// Error types pub const AcceptError = T.AcceptError; pub const CancelError = T.CancelError; pub const CloseError = T.CloseError; @@ -98,7 +102,7 @@ pub fn Xev(comptime be: Backend, comptime T: type) type { /// common tasks. These may not work with all possible Loop implementations. pub const Async = @import("watcher/async.zig").Async(Self); pub const File = @import("watcher/file.zig").File(Self); - pub const Process = @import("watcher/process.zig").Process(Self); + //pub const Process = @import("watcher/process.zig").Process(Self); pub const Stream = stream.GenericStream(Self); pub const Timer = @import("watcher/timer.zig").Timer(Self); pub const TCP = @import("watcher/tcp.zig").TCP(Self); @@ -161,6 +165,10 @@ test { _ = @import("backend/wasi_poll.zig"); }, + .windows => { + _ = @import("backend/iocp.zig"); + }, + else => {}, } } diff --git a/src/watcher/async.zig b/src/watcher/async.zig index 5f8b4c7..e2dc579 100644 --- a/src/watcher/async.zig +++ b/src/watcher/async.zig @@ -17,6 +17,7 @@ pub fn Async(comptime xev: type) type { // Supported, uses mach ports .kqueue => AsyncMachPort(xev), + .iocp => AsyncIOCP(xev), }; } @@ -386,6 +387,90 @@ fn AsyncLoopState(comptime xev: type, comptime threaded: bool) type { }; } +/// Async implementation for IOCP. +fn AsyncIOCP(comptime xev: type) type { + return struct { + const Self = @This(); + const windows = std.os.windows; + + pub const WaitError = xev.Sys.AsyncError; + + guard: std.Thread.Mutex = .{}, + wakeup: bool = false, + waiter: ?struct { + loop: *xev.Loop, + c: *xev.Completion, + } = null, + + pub fn init() !Self { + return Self{}; + } + + pub fn deinit(self: *Self) void { + _ = self; + } + + pub fn wait( + self: *Self, + loop: *xev.Loop, + c: *xev.Completion, + comptime Userdata: type, + userdata: ?*Userdata, + comptime cb: *const fn ( + ud: ?*Userdata, + l: *xev.Loop, + c: *xev.Completion, + r: WaitError!void, + ) xev.CallbackAction, + ) void { + c.* = xev.Completion{ + .op = .{ .async_wait = .{} }, + .userdata = userdata, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l_inner: *xev.Loop, + c_inner: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + return @call(.always_inline, cb, .{ + common.userdataValue(Userdata, ud), + l_inner, + c_inner, + if (r.async_wait) |_| {} else |err| err, + }); + } + }).callback, + }; + loop.add(c); + + self.guard.lock(); + defer self.guard.unlock(); + + self.waiter = .{ + .loop = loop, + .c = c, + }; + + if (self.wakeup) loop.async_notify(c); + } + + pub fn notify(self: *Self) !void { + self.guard.lock(); + defer self.guard.unlock(); + + if (self.waiter) |w| { + w.loop.async_notify(w.c); + } else { + self.wakeup = true; + } + } + + /// Common tests + pub usingnamespace AsyncTests(xev, Self); + }; +} + fn AsyncTests(comptime xev: type, comptime Impl: type) type { return struct { test "async" { diff --git a/src/watcher/file.zig b/src/watcher/file.zig index faf3596..04a5be2 100644 --- a/src/watcher/file.zig +++ b/src/watcher/file.zig @@ -27,9 +27,10 @@ const stream = @import("stream.zig"); pub fn File(comptime xev: type) type { return struct { const Self = @This(); + const FdType = if (xev.backend == .iocp) os.windows.HANDLE else os.socket_t; /// The underlying file - fd: std.os.fd_t, + fd: FdType, pub usingnamespace stream.Stream(xev, Self, .{ .close = true, @@ -110,6 +111,7 @@ pub fn File(comptime xev: type) type { switch (xev.backend) { .io_uring, .wasi_poll, + .iocp, => {}, .epoll => { @@ -273,6 +275,7 @@ pub fn File(comptime xev: type) type { switch (xev.backend) { .io_uring, .wasi_poll, + .iocp, => {}, .epoll => { @@ -290,6 +293,8 @@ pub fn File(comptime xev: type) type { test "read/write" { // wasi: local files don't work with poll (always ready) if (builtin.os.tag == .wasi) return error.SkipZigTest; + // windows: std.fs.File is not opened with OVERLAPPED flag. + if (builtin.os.tag == .windows) return error.SkipZigTest; const testing = std.testing; @@ -362,6 +367,8 @@ pub fn File(comptime xev: type) type { test "pread/pwrite" { // wasi: local files don't work with poll (always ready) if (builtin.os.tag == .wasi) return error.SkipZigTest; + // windows: std.fs.File is not opened with OVERLAPPED flag. + if (builtin.os.tag == .windows) return error.SkipZigTest; const testing = std.testing; @@ -432,6 +439,8 @@ pub fn File(comptime xev: type) type { test "queued writes" { // wasi: local files don't work with poll (always ready) if (builtin.os.tag == .wasi) return error.SkipZigTest; + // windows: std.fs.File is not opened with OVERLAPPED flag. + if (builtin.os.tag == .windows) return error.SkipZigTest; const testing = std.testing; diff --git a/src/watcher/process.zig b/src/watcher/process.zig index b61f436..c6053f5 100644 --- a/src/watcher/process.zig +++ b/src/watcher/process.zig @@ -16,6 +16,7 @@ pub fn Process(comptime xev: type) type { // Unsupported .wasi_poll => struct {}, + .iocp => struct {}, }; } diff --git a/src/watcher/stream.zig b/src/watcher/stream.zig index 1cb4475..38f4218 100644 --- a/src/watcher/stream.zig +++ b/src/watcher/stream.zig @@ -59,12 +59,7 @@ pub fn Closeable(comptime xev: type, comptime T: type, comptime options: Options ) xev.CallbackAction, ) void { c.* = .{ - .op = .{ - .close = .{ - .fd = self.fd, - }, - }, - + .op = .{ .close = .{ .fd = self.fd } }, .userdata = userdata, .callback = (struct { fn callback( @@ -73,11 +68,12 @@ pub fn Closeable(comptime xev: type, comptime T: type, comptime options: Options c_inner: *xev.Completion, r: xev.Result, ) xev.CallbackAction { + var fd = T.initFd(c_inner.op.close.fd); return @call(.always_inline, cb, .{ common.userdataValue(Userdata, ud), l_inner, c_inner, - T.initFd(c_inner.op.close.fd), + fd, if (r.close) |_| {} else |err| err, }); } @@ -172,6 +168,7 @@ pub fn Readable(comptime xev: type, comptime T: type, comptime options: Options) switch (xev.backend) { .io_uring, .wasi_poll, + .iocp, => {}, .epoll => { @@ -447,6 +444,7 @@ pub fn Writeable(comptime xev: type, comptime T: type, comptime options: Options switch (xev.backend) { .io_uring, .wasi_poll, + .iocp, => {}, .epoll => { diff --git a/src/watcher/tcp.zig b/src/watcher/tcp.zig index ac64987..5969817 100644 --- a/src/watcher/tcp.zig +++ b/src/watcher/tcp.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const builtin = @import("builtin"); const assert = std.debug.assert; const os = std.os; const stream = @import("stream.zig"); @@ -15,8 +16,9 @@ const common = @import("common.zig"); pub fn TCP(comptime xev: type) type { return struct { const Self = @This(); + const FdType = if (xev.backend == .iocp) os.windows.HANDLE else os.socket_t; - fd: os.socket_t, + fd: FdType, pub usingnamespace stream.Stream(xev, Self, .{ .close = true, @@ -30,21 +32,26 @@ pub fn TCP(comptime xev: type) type { pub fn init(addr: std.net.Address) !Self { if (xev.backend == .wasi_poll) @compileError("unsupported in WASI"); - // On io_uring we don't use non-blocking sockets because we may - // just get EAGAIN over and over from completions. - const flags = flags: { - var flags: u32 = os.SOCK.STREAM | os.SOCK.CLOEXEC; - if (xev.backend != .io_uring) flags |= os.SOCK.NONBLOCK; - break :flags flags; + const fd = if (xev.backend == .iocp) + try os.windows.WSASocketW(addr.any.family, os.SOCK.STREAM, 0, null, 0, os.windows.ws2_32.WSA_FLAG_OVERLAPPED) + else fd: { + // On io_uring we don't use non-blocking sockets because we may + // just get EAGAIN over and over from completions. + const flags = flags: { + var flags: u32 = os.SOCK.STREAM | os.SOCK.CLOEXEC; + if (xev.backend != .io_uring) flags |= os.SOCK.NONBLOCK; + break :flags flags; + }; + break :fd try os.socket(addr.any.family, flags, 0); }; return .{ - .fd = try os.socket(addr.any.family, flags, 0), + .fd = fd, }; } /// Initialize a TCP socket from a file descriptor. - pub fn initFd(fd: os.socket_t) Self { + pub fn initFd(fd: FdType) Self { return .{ .fd = fd, }; @@ -54,8 +61,10 @@ pub fn TCP(comptime xev: type) type { pub fn bind(self: Self, addr: std.net.Address) !void { if (xev.backend == .wasi_poll) @compileError("unsupported in WASI"); - try os.setsockopt(self.fd, os.SOL.SOCKET, os.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); - try os.bind(self.fd, &addr.any, addr.getOsSockLen()); + const fd = if (xev.backend == .iocp) @as(os.windows.ws2_32.SOCKET, @ptrCast(self.fd)) else self.fd; + + try os.setsockopt(fd, os.SOL.SOCKET, os.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); + try os.bind(fd, &addr.any, addr.getOsSockLen()); } /// Listen for connections on the socket. This puts the socket into passive @@ -63,7 +72,9 @@ pub fn TCP(comptime xev: type) type { pub fn listen(self: Self, backlog: u31) !void { if (xev.backend == .wasi_poll) @compileError("unsupported in WASI"); - try os.listen(self.fd, backlog); + const fd = if (xev.backend == .iocp) @as(os.windows.ws2_32.SOCKET, @ptrCast(self.fd)) else self.fd; + + try os.listen(fd, backlog); } /// Accept a single connection. @@ -110,6 +121,7 @@ pub fn TCP(comptime xev: type) type { .io_uring, .kqueue, .wasi_poll, + .iocp, => {}, .epoll => c.flags.dup = true, @@ -235,9 +247,14 @@ pub fn TCP(comptime xev: type) type { // Retrieve bound port and initialize client var sock_len = address.getOsSockLen(); - try os.getsockname(server.fd, &address.any, &sock_len); + const fd = if (xev.backend == .iocp) @as(os.windows.ws2_32.SOCKET, @ptrCast(server.fd)) else server.fd; + try os.getsockname(fd, &address.any, &sock_len); const client = try Self.init(address); + //const address = try std.net.Address.parseIp4("127.0.0.1", 3132); + //var server = try Self.init(address); + //var client = try Self.init(address); + // Completions we need var c_accept: xev.Completion = undefined; var c_connect: xev.Completion = undefined; @@ -384,6 +401,8 @@ pub fn TCP(comptime xev: type) type { test "TCP: Queued writes" { // We have no way to get a socket in WASI from a WASI context. if (xev.backend == .wasi_poll) return error.SkipZigTest; + // Windows doesn't seem to respect the SNDBUF socket option. + if (builtin.os.tag == .windows) return error.SkipZigTest; const testing = std.testing; diff --git a/src/watcher/timer.zig b/src/watcher/timer.zig index 395d6d9..447f69f 100644 --- a/src/watcher/timer.zig +++ b/src/watcher/timer.zig @@ -173,6 +173,7 @@ pub fn Timer(comptime xev: type) type { .epoll, .kqueue, .wasi_poll, + .iocp, => .{ .op = .{ .cancel = .{ diff --git a/src/watcher/udp.zig b/src/watcher/udp.zig index e221b66..162a435 100644 --- a/src/watcher/udp.zig +++ b/src/watcher/udp.zig @@ -22,6 +22,9 @@ pub fn UDP(comptime xev: type) type { // Supported, uses sendto/recvfrom .kqueue => UDPSendto(xev), + // Supported with tweaks + .iocp => UDPSendtoIOCP(xev), + // Noop .wasi_poll => struct {}, }; @@ -207,6 +210,187 @@ fn UDPSendto(comptime xev: type) type { }; } +/// UDP implementation that uses sendto/recvfrom. +fn UDPSendtoIOCP(comptime xev: type) type { + return struct { + const Self = @This(); + const windows = std.os.windows; + + fd: windows.HANDLE, + + /// See UDPSendMsg.State + pub const State = struct { + userdata: ?*anyopaque, + }; + + pub usingnamespace stream.Stream(xev, Self, .{ + .close = true, + .read = .none, + .write = .none, + }); + + /// Initialize a new UDP with the family from the given address. Only + /// the family is used, the actual address has no impact on the created + /// resource. + pub fn init(addr: std.net.Address) !Self { + const socket = try windows.WSASocketW(addr.any.family, os.SOCK.DGRAM, 0, null, 0, windows.ws2_32.WSA_FLAG_OVERLAPPED); + + return .{ + .fd = socket, + }; + } + + /// Initialize a UDP socket from a file descriptor. + pub fn initFd(fd: windows.HANDLE) Self { + return .{ + .fd = fd, + }; + } + + /// Bind the address to the socket. + pub fn bind(self: Self, addr: std.net.Address) !void { + var socket = @as(windows.ws2_32.SOCKET, @ptrCast(self.fd)); + try os.setsockopt(socket, os.SOL.SOCKET, os.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); + try os.bind(socket, &addr.any, addr.getOsSockLen()); + } + + /// Read from the socket. This performs a single read. The callback must + /// requeue the read if additional reads want to be performed. Additional + /// reads simultaneously can be queued by calling this multiple times. Note + /// that depending on the backend, the reads can happen out of order. + /// + /// TODO(mitchellh): a way to receive the remote addr + pub fn read( + self: Self, + loop: *xev.Loop, + c: *xev.Completion, + s: *State, + buf: xev.ReadBuffer, + comptime Userdata: type, + userdata: ?*Userdata, + comptime cb: *const fn ( + ud: ?*Userdata, + l: *xev.Loop, + c: *xev.Completion, + s: *State, + addr: std.net.Address, + s: Self, + b: xev.ReadBuffer, + r: ReadError!usize, + ) xev.CallbackAction, + ) void { + s.* = .{ + .userdata = userdata, + }; + + switch (buf) { + inline .slice, .array => { + c.* = .{ + .op = .{ + .recvfrom = .{ + .fd = self.fd, + .buffer = buf, + }, + }, + .userdata = s, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l_inner: *xev.Loop, + c_inner: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + const s_inner: *State = @ptrCast(@alignCast(ud.?)); + return @call(.always_inline, cb, .{ + common.userdataValue(Userdata, s_inner.userdata), + l_inner, + c_inner, + s_inner, + std.net.Address.initPosix(@alignCast(&c_inner.op.recvfrom.addr)), + initFd(c_inner.op.recvfrom.fd), + c_inner.op.recvfrom.buffer, + r.recvfrom, + }); + } + }).callback, + }; + + loop.add(c); + }, + } + } + + /// Write to the socket. This performs a single write. Additional writes + /// can be queued by calling this multiple times. Note that depending on the + /// backend, writes can happen out of order. + pub fn write( + self: Self, + loop: *xev.Loop, + c: *xev.Completion, + s: *State, + addr: std.net.Address, + buf: xev.WriteBuffer, + comptime Userdata: type, + userdata: ?*Userdata, + comptime cb: *const fn ( + ud: ?*Userdata, + l: *xev.Loop, + c: *xev.Completion, + s: *State, + s: Self, + b: xev.WriteBuffer, + r: WriteError!usize, + ) xev.CallbackAction, + ) void { + s.* = .{ + .userdata = userdata, + }; + + switch (buf) { + inline .slice, .array => { + c.* = .{ + .op = .{ + .sendto = .{ + .fd = self.fd, + .buffer = buf, + .addr = addr, + }, + }, + .userdata = s, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l_inner: *xev.Loop, + c_inner: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + const s_inner: *State = @ptrCast(@alignCast(ud.?)); + return @call(.always_inline, cb, .{ + common.userdataValue(Userdata, s_inner.userdata), + l_inner, + c_inner, + s_inner, + initFd(c_inner.op.sendto.fd), + c_inner.op.sendto.buffer, + r.sendto, + }); + } + }).callback, + }; + + loop.add(c); + }, + } + } + + pub const ReadError = xev.ReadError; + pub const WriteError = xev.WriteError; + + /// Common tests + pub usingnamespace UDPTests(xev, Self); + }; +} + /// UDP implementation that uses sendmsg/recvmsg fn UDPSendMsg(comptime xev: type) type { return struct { @@ -369,6 +553,7 @@ fn UDPSendMsg(comptime xev: type) type { .io_uring, .kqueue, .wasi_poll, + .iocp, => {}, .epoll => c.flags.dup = true, @@ -478,6 +663,7 @@ fn UDPSendMsg(comptime xev: type) type { .io_uring, .kqueue, .wasi_poll, + .iocp, => {}, .epoll => c.flags.dup = true, @@ -502,11 +688,11 @@ fn UDPTests(comptime xev: type, comptime Impl: type) type { var loop = try xev.Loop.init(.{}); defer loop.deinit(); - const address = try std.net.Address.parseIp4("127.0.0.1", 3131); + const address = try std.net.Address.parseIp4("127.0.0.1", 3132); const server = try Impl.init(address); const client = try Impl.init(address); - // Bind and recv + // Bind / Recv try server.bind(address); var c_read: xev.Completion = undefined; var s_read: Impl.State = undefined; diff --git a/src/windows.zig b/src/windows.zig new file mode 100644 index 0000000..fdfd678 --- /dev/null +++ b/src/windows.zig @@ -0,0 +1,77 @@ +const std = @import("std"); +const windows = std.os.windows; + +pub usingnamespace std.os.windows; + +/// Namespace containing missing utils from std +pub const exp = struct { + pub const CreateFileError = error{} || std.os.UnexpectedError; + + pub fn CreateFile( + lpFileName: [*:0]const u16, + dwDesiredAccess: windows.DWORD, + dwShareMode: windows.DWORD, + lpSecurityAttributes: ?*windows.SECURITY_ATTRIBUTES, + dwCreationDisposition: windows.DWORD, + dwFlagsAndAttributes: windows.DWORD, + hTemplateFile: ?windows.HANDLE, + ) CreateFileError!windows.HANDLE { + const handle = windows.kernel32.CreateFileW(lpFileName, dwDesiredAccess, dwShareMode, lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile); + if (handle == windows.INVALID_HANDLE_VALUE) { + const err = windows.kernel32.GetLastError(); + return switch (err) { + else => windows.unexpectedError(err), + }; + } + + return handle; + } + + pub fn ReadFile( + handle: windows.HANDLE, + buffer: []u8, + overlapped: ?*windows.OVERLAPPED, + ) windows.ReadFileError!?usize { + var read: windows.DWORD = 0; + const result: windows.BOOL = windows.kernel32.ReadFile(handle, buffer.ptr, @as(windows.DWORD, @intCast(buffer.len)), &read, overlapped); + if (result == windows.FALSE) { + const err = windows.kernel32.GetLastError(); + return switch (err) { + windows.Win32Error.IO_PENDING => null, + else => windows.unexpectedError(err), + }; + } + + return @as(usize, @intCast(read)); + } + + pub fn WriteFile( + handle: windows.HANDLE, + buffer: []const u8, + overlapped: ?*windows.OVERLAPPED, + ) windows.WriteFileError!?usize { + var written: windows.DWORD = 0; + const result: windows.BOOL = windows.kernel32.WriteFile(handle, buffer.ptr, @as(windows.DWORD, @intCast(buffer.len)), &written, overlapped); + if (result == windows.FALSE) { + const err = windows.kernel32.GetLastError(); + return switch (err) { + windows.Win32Error.IO_PENDING => null, + else => windows.unexpectedError(err), + }; + } + + return @as(usize, @intCast(written)); + } + + pub const DeleteFileError = error{} || std.os.UnexpectedError; + + pub fn DeleteFile(name: [*:0]const u16) DeleteFileError!void { + const result: windows.BOOL = windows.kernel32.DeleteFileW(name); + if (result == windows.FALSE) { + const err = windows.kernel32.GetLastError(); + return switch (err) { + else => windows.unexpectedError(err), + }; + } + } +};