diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig index ab20cc6..6c5bdc5 100644 --- a/src/backend/epoll.zig +++ b/src/backend/epoll.zig @@ -526,6 +526,28 @@ pub const Loop = struct { )) null else |err| .{ .read = err }; }, + .pread => res: { + if (completion.flags.threadpool) { + if (self.thread_schedule(completion)) |_| + return + else |err| + break :res .{ .read = err }; + } + + var ev: linux.epoll_event = .{ + .events = linux.EPOLL.IN | linux.EPOLL.RDHUP, + .data = .{ .ptr = @intFromPtr(completion) }, + }; + + const fd = completion.fd_maybe_dup() catch |err| break :res .{ .read = err }; + break :res if (std.os.epoll_ctl( + self.fd, + linux.EPOLL.CTL_ADD, + fd, + &ev, + )) null else |err| .{ .read = err }; + }, + .write => res: { if (completion.flags.threadpool) { if (self.thread_schedule(completion)) |_| @@ -548,6 +570,28 @@ pub const Loop = struct { )) null else |err| .{ .write = err }; }, + .pwrite => res: { + if (completion.flags.threadpool) { + if (self.thread_schedule(completion)) |_| + return + else |err| + break :res .{ .write = err }; + } + + var ev: linux.epoll_event = .{ + .events = linux.EPOLL.OUT, + .data = .{ .ptr = @intFromPtr(completion) }, + }; + + const fd = completion.fd_maybe_dup() catch |err| break :res .{ .write = err }; + break :res if (std.os.epoll_ctl( + self.fd, + linux.EPOLL.CTL_ADD, + fd, + &ev, + )) null else |err| .{ .write = err }; + }, + .send => res: { var ev: linux.epoll_event = .{ .events = linux.EPOLL.OUT, @@ -858,6 +902,20 @@ pub const Completion = struct { }; }, + .pread => |*op| res: { + const n_ = switch (op.buffer) { + .slice => |v| std.os.pread(op.fd, v, op.offset), + .array => |*v| std.os.pread(op.fd, v, op.offset), + }; + + break :res .{ + .pread = if (n_) |n| + if (n == 0) error.EOF else n + else |err| + err, + }; + }, + .write => |*op| .{ .write = switch (op.buffer) { .slice => |v| std.os.write(op.fd, v), @@ -865,6 +923,13 @@ pub const Completion = struct { }, }, + .pwrite => |*op| .{ + .pwrite = switch (op.buffer) { + .slice => |v| std.os.pwrite(op.fd, v, op.offset), + .array => |*v| std.os.pwrite(op.fd, v.array[0..v.len], op.offset), + }, + }, + .send => |*op| .{ .send = switch (op.buffer) { .slice => |v| std.os.send(op.fd, v, 0), @@ -926,8 +991,10 @@ pub const Completion = struct { .connect => |v| v.socket, .poll => |v| v.fd, .read => |v| v.fd, + .pread => |v| v.fd, .recv => |v| v.fd, .write => |v| v.fd, + .pwrite => |v| v.fd, .send => |v| v.fd, .sendmsg => |v| v.fd, .recvmsg => |v| v.fd, @@ -949,7 +1016,9 @@ pub const OperationType = enum { connect, poll, read, + pread, write, + pwrite, send, recv, sendmsg, @@ -968,7 +1037,9 @@ pub const Result = union(OperationType) { connect: ConnectError!void, poll: PollError!void, read: ReadError!usize, + pread: ReadError!usize, write: WriteError!usize, + pwrite: WriteError!usize, send: WriteError!usize, recv: ReadError!usize, sendmsg: WriteError!usize, @@ -1014,11 +1085,23 @@ pub const Operation = union(OperationType) { buffer: ReadBuffer, }, + pread: struct { + fd: std.os.fd_t, + buffer: ReadBuffer, + offset: u64, + }, + write: struct { fd: std.os.fd_t, buffer: WriteBuffer, }, + pwrite: struct { + fd: std.os.fd_t, + buffer: WriteBuffer, + offset: u64, + }, + send: struct { fd: std.os.fd_t, buffer: WriteBuffer, @@ -1169,6 +1252,7 @@ pub const ConnectError = std.os.EpollCtlError || std.os.ConnectError || error{ pub const ReadError = ThreadPoolError || std.os.EpollCtlError || std.os.ReadError || + std.os.PReadError || std.os.RecvFromError || error{ DupFailed, @@ -1178,6 +1262,7 @@ pub const ReadError = ThreadPoolError || std.os.EpollCtlError || pub const WriteError = ThreadPoolError || std.os.EpollCtlError || std.os.WriteError || + std.os.PWriteError || std.os.SendError || std.os.SendMsgError || error{ diff --git a/src/backend/io_uring.zig b/src/backend/io_uring.zig index b3985b5..6f9efd7 100644 --- a/src/backend/io_uring.zig +++ b/src/backend/io_uring.zig @@ -399,6 +399,22 @@ pub const Loop = struct { ), }, + .pread => |*v| switch (v.buffer) { + .array => |*buf| linux.io_uring_prep_read( + sqe, + v.fd, + buf, + v.offset, + ), + + .slice => |buf| linux.io_uring_prep_read( + sqe, + v.fd, + buf, + v.offset, + ), + }, + .recv => |*v| switch (v.buffer) { .array => |*buf| linux.io_uring_prep_recv( sqe, @@ -498,6 +514,22 @@ pub const Loop = struct { ), }, + .pwrite => |*v| switch (v.buffer) { + .array => |*buf| linux.io_uring_prep_write( + sqe, + v.fd, + buf.array[0..buf.len], + v.offset, + ), + + .slice => |buf| linux.io_uring_prep_write( + sqe, + v.fd, + buf, + v.offset, + ), + }, + .cancel => |v| linux.io_uring_prep_cancel(sqe, @intCast(@intFromPtr(v.c)), 0), } @@ -598,6 +630,10 @@ pub const Completion = struct { .read = self.readResult(.read, res), }, + .pread => .{ + .pread = self.readResult(.pread, res), + }, + .recv => .{ .recv = self.readResult(.recv, res), }, @@ -676,6 +712,15 @@ pub const Completion = struct { }, }, + .pwrite => .{ + .pwrite = if (res >= 0) + @intCast(res) + else switch (@as(std.os.E, @enumFromInt(-res))) { + .CANCELED => error.Canceled, + else => |errno| std.os.unexpectedErrno(errno), + }, + }, + .cancel => .{ .cancel = if (res >= 0) {} else switch (@as(std.os.E, @enumFromInt(-res))) { .NOENT => error.NotFound, @@ -733,6 +778,9 @@ pub const OperationType = enum { /// Read read, + /// PRead + pread, + /// Receive a message from a socket. recv, @@ -748,6 +796,9 @@ pub const OperationType = enum { /// Shutdown all or part of a full-duplex connection. shutdown, + /// PWrite + pwrite, + /// Write write, @@ -771,6 +822,7 @@ pub const Result = union(OperationType) { close: CloseError!void, poll: PollError!void, read: ReadError!usize, + pread: ReadError!usize, recv: ReadError!usize, send: WriteError!usize, sendmsg: WriteError!usize, @@ -779,6 +831,7 @@ pub const Result = union(OperationType) { timer: TimerError!TimerTrigger, timer_remove: TimerRemoveError!void, write: WriteError!usize, + pwrite: WriteError!usize, cancel: CancelError!void, }; @@ -815,6 +868,12 @@ pub const Operation = union(OperationType) { buffer: ReadBuffer, }, + pread: struct { + fd: std.os.fd_t, + buffer: ReadBuffer, + offset: u64, + }, + recv: struct { fd: std.os.fd_t, buffer: ReadBuffer, @@ -866,6 +925,12 @@ pub const Operation = union(OperationType) { buffer: WriteBuffer, }, + pwrite: struct { + fd: std.os.fd_t, + buffer: WriteBuffer, + offset: u64, + }, + cancel: struct { c: *Completion, }, diff --git a/src/backend/kqueue.zig b/src/backend/kqueue.zig index ddb812d..cbb60a0 100644 --- a/src/backend/kqueue.zig +++ b/src/backend/kqueue.zig @@ -743,11 +743,21 @@ pub const Loop = struct { break :action .{ .kevent = {} }; }, + .pwrite => action: { + ev.* = c.kevent().?; + break :action .{ .kevent = {} }; + }, + .read => action: { ev.* = c.kevent().?; break :action .{ .kevent = {} }; }, + .pread => action: { + ev.* = c.kevent().?; + break :action .{ .kevent = {} }; + }, + .send => action: { ev.* = c.kevent().?; break :action .{ .kevent = {} }; @@ -1103,7 +1113,7 @@ pub const Completion = struct { .udata = @intFromPtr(self), }), - inline .write, .send, .sendto => |v| kevent_init(.{ + inline .write, .pwrite, .send, .sendto => |v| kevent_init(.{ .ident = @intCast(v.fd), .filter = os.system.EVFILT_WRITE, .flags = os.system.EV_ADD | os.system.EV_ENABLE, @@ -1112,7 +1122,7 @@ pub const Completion = struct { .udata = @intFromPtr(self), }), - inline .read, .recv, .recvfrom => |v| kevent_init(.{ + inline .read, .pread, .recv, .recvfrom => |v| kevent_init(.{ .ident = @intCast(v.fd), .filter = os.system.EVFILT_READ, .flags = os.system.EV_ADD | os.system.EV_ENABLE, @@ -1160,6 +1170,13 @@ pub const Completion = struct { }, }, + .pwrite => |*op| .{ + .pwrite = switch (op.buffer) { + .slice => |v| os.pwrite(op.fd, v, op.offset), + .array => |*v| os.pwrite(op.fd, v.array[0..v.len], op.offset), + }, + }, + .send => |*op| .{ .send = switch (op.buffer) { .slice => |v| os.send(op.fd, v, 0), @@ -1188,6 +1205,20 @@ pub const Completion = struct { }; }, + .pread => |*op| res: { + const n_ = switch (op.buffer) { + .slice => |v| os.pread(op.fd, v, op.offset), + .array => |*v| os.pread(op.fd, v, op.offset), + }; + + break :res .{ + .pread = if (n_) |n| + if (n == 0) error.EOF else n + else |err| + err, + }; + }, + .recv => |*op| res: { const n_ = switch (op.buffer) { .slice => |v| os.recv(op.fd, v, 0), @@ -1269,6 +1300,13 @@ pub const Completion = struct { }, }, + .pwrite => .{ + .pwrite = switch (errno) { + .SUCCESS => @intCast(r), + else => |err| os.unexpectedErrno(err), + }, + }, + .read => .{ .read = switch (errno) { .SUCCESS => if (r == 0) error.EOF else @intCast(r), @@ -1276,6 +1314,13 @@ pub const Completion = struct { }, }, + .pread => .{ + .pread = switch (errno) { + .SUCCESS => if (r == 0) error.EOF else @intCast(r), + else => |err| os.unexpectedErrno(err), + }, + }, + .send => .{ .send = switch (errno) { .SUCCESS => @intCast(r), @@ -1362,6 +1407,8 @@ pub const OperationType = enum { connect, read, write, + pread, + pwrite, send, recv, sendto, @@ -1424,11 +1471,23 @@ pub const Operation = union(OperationType) { buffer: WriteBuffer, }, + pwrite: struct { + fd: std.os.fd_t, + buffer: WriteBuffer, + offset: u64, + }, + read: struct { fd: std.os.fd_t, buffer: ReadBuffer, }, + pread: struct { + fd: std.os.fd_t, + buffer: ReadBuffer, + offset: u64, + }, + machport: struct { port: os.system.mach_port_name_t, buffer: ReadBuffer, @@ -1465,7 +1524,9 @@ pub const Result = union(OperationType) { sendto: WriteError!usize, recvfrom: ReadError!usize, write: WriteError!usize, + pwrite: WriteError!usize, read: ReadError!usize, + pread: ReadError!usize, machport: MachPortError!void, proc: ProcError!u32, shutdown: ShutdownError!void, @@ -1485,6 +1546,7 @@ pub const ConnectError = os.KEventError || os.ConnectError || error{ pub const ReadError = os.KEventError || os.ReadError || + os.PReadError || os.RecvFromError || error{ EOF, @@ -1493,6 +1555,7 @@ pub const ReadError = os.KEventError || pub const WriteError = os.KEventError || os.WriteError || + os.PWriteError || os.SendError || os.SendMsgError || os.SendToError || diff --git a/src/backend/wasi_poll.zig b/src/backend/wasi_poll.zig index 00dbc77..f84d1d4 100644 --- a/src/backend/wasi_poll.zig +++ b/src/backend/wasi_poll.zig @@ -324,12 +324,24 @@ pub const Loop = struct { break :res null; }, + .pread => res: { + const sub = self.batch.get(completion) catch |err| break :res .{ .pread = err }; + sub.* = completion.subscription(); + break :res null; + }, + .write => res: { const sub = self.batch.get(completion) catch |err| break :res .{ .write = err }; sub.* = completion.subscription(); break :res null; }, + .pwrite => res: { + const sub = self.batch.get(completion) catch |err| break :res .{ .pwrite = err }; + sub.* = completion.subscription(); + break :res null; + }, + .recv => res: { const sub = self.batch.get(completion) catch |err| break :res .{ .recv = err }; sub.* = completion.subscription(); @@ -650,6 +662,18 @@ pub const Completion = struct { }, }, + .pread => |v| .{ + .userdata = @intFromPtr(self), + .u = .{ + .tag = wasi.EVENTTYPE_FD_READ, + .u = .{ + .fd_read = .{ + .fd = v.fd, + }, + }, + }, + }, + .write => |v| .{ .userdata = @intFromPtr(self), .u = .{ @@ -662,6 +686,18 @@ pub const Completion = struct { }, }, + .pwrite => |v| .{ + .userdata = @intFromPtr(self), + .u = .{ + .tag = wasi.EVENTTYPE_FD_WRITE, + .u = .{ + .fd_write = .{ + .fd = v.fd, + }, + }, + }, + }, + .accept => |v| .{ .userdata = @intFromPtr(self), .u = .{ @@ -746,6 +782,20 @@ pub const Completion = struct { }; }, + .pread => |*op| res: { + const n_ = switch (op.buffer) { + .slice => |v| std.os.pread(op.fd, v, op.offset), + .array => |*v| std.os.pread(op.fd, v, op.offset), + }; + + break :res .{ + .pread = if (n_) |n| + if (n == 0) error.EOF else n + else |err| + err, + }; + }, + .write => |*op| res: { const n_ = switch (op.buffer) { .slice => |v| std.os.write(op.fd, v), @@ -757,6 +807,17 @@ pub const Completion = struct { }; }, + .pwrite => |*op| res: { + const n_ = switch (op.buffer) { + .slice => |v| std.os.pwrite(op.fd, v, op.offset), + .array => |*v| std.os.pwrite(op.fd, v.array[0..v.len], op.offset), + }; + + break :res .{ + .pwrite = if (n_) |n| n else |err| err, + }; + }, + .recv => |*op| res: { var n: usize = undefined; var roflags: wasi.roflags_t = undefined; @@ -826,7 +887,9 @@ pub const OperationType = enum { cancel, accept, read, + pread, write, + pwrite, recv, send, shutdown, @@ -842,7 +905,9 @@ pub const Result = union(OperationType) { cancel: CancelError!void, accept: AcceptError!std.os.fd_t, read: ReadError!usize, + pread: ReadError!usize, write: WriteError!usize, + pwrite: WriteError!usize, recv: ReadError!usize, send: WriteError!usize, shutdown: ShutdownError!void, @@ -871,11 +936,23 @@ pub const Operation = union(OperationType) { buffer: ReadBuffer, }, + pread: struct { + fd: std.os.fd_t, + buffer: ReadBuffer, + offset: u64, + }, + write: struct { fd: std.os.fd_t, buffer: WriteBuffer, }, + pwrite: struct { + fd: std.os.fd_t, + buffer: WriteBuffer, + offset: u64, + }, + send: struct { fd: std.os.fd_t, buffer: WriteBuffer, @@ -944,13 +1021,13 @@ pub const ShutdownError = error{ Unexpected, }; -pub const ReadError = Batch.Error || std.os.ReadError || +pub const ReadError = Batch.Error || std.os.ReadError || std.os.PReadError || error{ EOF, Unknown, }; -pub const WriteError = Batch.Error || std.os.WriteError || +pub const WriteError = Batch.Error || std.os.WriteError || std.os.PWriteError || error{ Unknown, }; diff --git a/src/watcher/file.zig b/src/watcher/file.zig index eb5317b..f7c3dde 100644 --- a/src/watcher/file.zig +++ b/src/watcher/file.zig @@ -1,5 +1,6 @@ const std = @import("std"); const builtin = @import("builtin"); +const common = @import("common.zig"); const assert = std.debug.assert; const os = std.os; const main = @import("../main.zig"); @@ -58,7 +59,235 @@ pub fn File(comptime xev: type) type { _ = self; } - test { + pub fn pread( + self: Self, + loop: *xev.Loop, + c: *xev.Completion, + buf: xev.ReadBuffer, + offset: u64, + comptime Userdata: type, + userdata: ?*Userdata, + comptime cb: *const fn ( + ud: ?*Userdata, + l: *xev.Loop, + c: *xev.Completion, + s: Self, + b: xev.ReadBuffer, + r: Self.ReadError!usize, + ) xev.CallbackAction, + ) void { + switch (buf) { + inline .slice, .array => { + c.* = .{ + .op = .{ + .pread = .{ + .fd = self.fd, + .buffer = buf, + .offset = offset, + }, + }, + .userdata = userdata, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l_inner: *xev.Loop, + c_inner: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + return @call(.always_inline, cb, .{ + common.userdataValue(Userdata, ud), + l_inner, + c_inner, + Self.initFd(c_inner.op.pread.fd), + c_inner.op.pread.buffer, + if (r.pread) |v| v else |err| err, + }); + } + }).callback, + }; + + // If we're dup-ing, then we ask the backend to manage the fd. + switch (xev.backend) { + .io_uring, + .wasi_poll, + => {}, + + .epoll => { + c.flags.threadpool = true; + }, + + .kqueue => { + c.flags.threadpool = true; + }, + } + + loop.add(c); + }, + } + } + + pub fn queuePWrite( + self: Self, + loop: *xev.Loop, + q: *Self.WriteQueue, + req: *Self.WriteRequest, + buf: xev.WriteBuffer, + offset: u64, + comptime Userdata: type, + userdata: ?*Userdata, + comptime cb: *const fn ( + ud: ?*Userdata, + l: *xev.Loop, + c: *xev.Completion, + s: Self, + b: xev.WriteBuffer, + r: Self.WriteError!usize, + ) xev.CallbackAction, + ) void { + // Initialize our completion + req.* = .{}; + self.pwrite_init(&req.completion, buf, offset); + req.completion.userdata = q; + req.completion.callback = (struct { + fn callback( + ud: ?*anyopaque, + l_inner: *xev.Loop, + c_inner: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + const q_inner = @as(?*Self.WriteQueue, @ptrCast(@alignCast(ud))).?; + + // The queue MUST have a request because a completion + // can only be added if the queue is not empty, and + // nothing else should be popping!. + const req_inner = q_inner.pop().?; + + const cb_res = pwrite_result(c_inner, r); + const action = @call(.always_inline, cb, .{ + common.userdataValue(Userdata, req_inner.userdata), + l_inner, + c_inner, + cb_res.writer, + cb_res.buf, + cb_res.result, + }); + + // Rearm requeues this request, it doesn't return rearm + // on the actual callback here... + if (action == .rearm) q_inner.push(req_inner); + + // If we have another request, add that completion next. + if (q_inner.head) |req_next| l_inner.add(&req_next.completion); + + // We always disarm because the completion in the next + // request will be used if there is more to queue. + return .disarm; + } + }).callback; + + // The userdata as to go on the WriteRequest because we need + // our actual completion userdata to be the WriteQueue so that + // we can process the queue. + req.userdata = @as(?*anyopaque, @ptrCast(@alignCast(userdata))); + + // If the queue is empty, then we add our completion. Otherwise, + // the previously queued writes will trigger this one. + if (q.empty()) loop.add(&req.completion); + + // We always add this item to our queue no matter what + q.push(req); + } + + pub fn pwrite( + self: Self, + loop: *xev.Loop, + c: *xev.Completion, + buf: xev.WriteBuffer, + offset: u64, + comptime Userdata: type, + userdata: ?*Userdata, + comptime cb: *const fn ( + ud: ?*Userdata, + l: *xev.Loop, + c: *xev.Completion, + s: Self, + b: xev.WriteBuffer, + r: Self.WriteError!usize, + ) xev.CallbackAction, + ) void { + self.pwrite_init(c, buf, offset); + c.userdata = userdata; + c.callback = (struct { + fn callback( + ud: ?*anyopaque, + l_inner: *xev.Loop, + c_inner: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + const cb_res = pwrite_result(c_inner, r); + return @call(.always_inline, cb, .{ + common.userdataValue(Userdata, ud), + l_inner, + c_inner, + cb_res.writer, + cb_res.buf, + cb_res.result, + }); + } + }).callback; + + loop.add(c); + } + + inline fn pwrite_result(c: *xev.Completion, r: xev.Result) struct { + writer: Self, + buf: xev.WriteBuffer, + result: Self.WriteError!usize, + } { + return .{ + .writer = Self.initFd(c.op.pwrite.fd), + .buf = c.op.pwrite.buffer, + .result = if (r.pwrite) |v| v else |err| err, + }; + } + + fn pwrite_init( + self: Self, + c: *xev.Completion, + buf: xev.WriteBuffer, + offset: u64, + ) void { + switch (buf) { + inline .slice, .array => { + c.* = .{ + .op = .{ + .pwrite = .{ + .fd = self.fd, + .buffer = buf, + .offset = offset, + }, + }, + }; + + // If we're dup-ing, then we ask the backend to manage the fd. + switch (xev.backend) { + .io_uring, + .wasi_poll, + => {}, + + .epoll => { + c.flags.threadpool = true; + }, + + .kqueue => { + c.flags.threadpool = true; + }, + } + }, + } + } + + test "read/write" { // wasi: local files don't work with poll (always ready) if (builtin.os.tag == .wasi) return error.SkipZigTest; @@ -129,6 +358,76 @@ pub fn File(comptime xev: type) type { try testing.expectEqualSlices(u8, &write_buf, read_buf[0..read_len]); } + test "pread/pwrite" { + // wasi: local files don't work with poll (always ready) + if (builtin.os.tag == .wasi) return error.SkipZigTest; + + const testing = std.testing; + + var tpool = main.ThreadPool.init(.{}); + defer tpool.deinit(); + defer tpool.shutdown(); + var loop = try xev.Loop.init(.{ .thread_pool = &tpool }); + defer loop.deinit(); + + // Create our file + const path = "test_watcher_file"; + const f = try std.fs.cwd().createFile(path, .{ + .read = true, + .truncate = true, + }); + defer f.close(); + defer std.fs.cwd().deleteFile(path) catch {}; + + const file = try init(f); + + // Perform a write and then a read + var write_buf = [_]u8{ 1, 1, 2, 3, 5, 8, 13 }; + var c_write: xev.Completion = undefined; + file.pwrite(&loop, &c_write, .{ .slice = &write_buf }, 0, void, null, (struct { + fn callback( + _: ?*void, + _: *xev.Loop, + _: *xev.Completion, + _: Self, + _: xev.WriteBuffer, + r: Self.WriteError!usize, + ) xev.CallbackAction { + _ = r catch unreachable; + return .disarm; + } + }).callback); + + // Wait for the write + try loop.run(.until_done); + + // Make sure the data is on disk + try f.sync(); + + const f2 = try std.fs.cwd().openFile(path, .{}); + defer f2.close(); + const file2 = try init(f2); + + var read_buf: [128]u8 = undefined; + var read_len: usize = 0; + file2.pread(&loop, &c_write, .{ .slice = &read_buf }, 0, usize, &read_len, (struct { + fn callback( + ud: ?*usize, + _: *xev.Loop, + _: *xev.Completion, + _: Self, + _: xev.ReadBuffer, + r: Self.ReadError!usize, + ) xev.CallbackAction { + ud.?.* = r catch unreachable; + return .disarm; + } + }).callback); + + try loop.run(.until_done); + try testing.expectEqualSlices(u8, &write_buf, read_buf[0..read_len]); + } + test "queued writes" { // wasi: local files don't work with poll (always ready) if (builtin.os.tag == .wasi) return error.SkipZigTest;