Skip to content

Commit

Permalink
Add support for pread() and pwrite() (#55)
Browse files Browse the repository at this point in the history
When working with files, you usually need to control the position where
you're reading from.
  • Loading branch information
mitchellh authored Aug 6, 2023
2 parents affec7b + f2abd63 commit 8017f2d
Show file tree
Hide file tree
Showing 5 changed files with 594 additions and 5 deletions.
85 changes: 85 additions & 0 deletions src/backend/epoll.zig
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,28 @@ pub const Loop = struct {
)) null else |err| .{ .read = err };
},

.pread => res: {
if (completion.flags.threadpool) {
if (self.thread_schedule(completion)) |_|
return
else |err|
break :res .{ .read = err };
}

var ev: linux.epoll_event = .{
.events = linux.EPOLL.IN | linux.EPOLL.RDHUP,
.data = .{ .ptr = @intFromPtr(completion) },
};

const fd = completion.fd_maybe_dup() catch |err| break :res .{ .read = err };
break :res if (std.os.epoll_ctl(
self.fd,
linux.EPOLL.CTL_ADD,
fd,
&ev,
)) null else |err| .{ .read = err };
},

.write => res: {
if (completion.flags.threadpool) {
if (self.thread_schedule(completion)) |_|
Expand All @@ -548,6 +570,28 @@ pub const Loop = struct {
)) null else |err| .{ .write = err };
},

.pwrite => res: {
if (completion.flags.threadpool) {
if (self.thread_schedule(completion)) |_|
return
else |err|
break :res .{ .write = err };
}

var ev: linux.epoll_event = .{
.events = linux.EPOLL.OUT,
.data = .{ .ptr = @intFromPtr(completion) },
};

const fd = completion.fd_maybe_dup() catch |err| break :res .{ .write = err };
break :res if (std.os.epoll_ctl(
self.fd,
linux.EPOLL.CTL_ADD,
fd,
&ev,
)) null else |err| .{ .write = err };
},

.send => res: {
var ev: linux.epoll_event = .{
.events = linux.EPOLL.OUT,
Expand Down Expand Up @@ -858,13 +902,34 @@ pub const Completion = struct {
};
},

.pread => |*op| res: {
const n_ = switch (op.buffer) {
.slice => |v| std.os.pread(op.fd, v, op.offset),
.array => |*v| std.os.pread(op.fd, v, op.offset),
};

break :res .{
.pread = if (n_) |n|
if (n == 0) error.EOF else n
else |err|
err,
};
},

.write => |*op| .{
.write = switch (op.buffer) {
.slice => |v| std.os.write(op.fd, v),
.array => |*v| std.os.write(op.fd, v.array[0..v.len]),
},
},

.pwrite => |*op| .{
.pwrite = switch (op.buffer) {
.slice => |v| std.os.pwrite(op.fd, v, op.offset),
.array => |*v| std.os.pwrite(op.fd, v.array[0..v.len], op.offset),
},
},

.send => |*op| .{
.send = switch (op.buffer) {
.slice => |v| std.os.send(op.fd, v, 0),
Expand Down Expand Up @@ -926,8 +991,10 @@ pub const Completion = struct {
.connect => |v| v.socket,
.poll => |v| v.fd,
.read => |v| v.fd,
.pread => |v| v.fd,
.recv => |v| v.fd,
.write => |v| v.fd,
.pwrite => |v| v.fd,
.send => |v| v.fd,
.sendmsg => |v| v.fd,
.recvmsg => |v| v.fd,
Expand All @@ -949,7 +1016,9 @@ pub const OperationType = enum {
connect,
poll,
read,
pread,
write,
pwrite,
send,
recv,
sendmsg,
Expand All @@ -968,7 +1037,9 @@ pub const Result = union(OperationType) {
connect: ConnectError!void,
poll: PollError!void,
read: ReadError!usize,
pread: ReadError!usize,
write: WriteError!usize,
pwrite: WriteError!usize,
send: WriteError!usize,
recv: ReadError!usize,
sendmsg: WriteError!usize,
Expand Down Expand Up @@ -1014,11 +1085,23 @@ pub const Operation = union(OperationType) {
buffer: ReadBuffer,
},

pread: struct {
fd: std.os.fd_t,
buffer: ReadBuffer,
offset: u64,
},

write: struct {
fd: std.os.fd_t,
buffer: WriteBuffer,
},

pwrite: struct {
fd: std.os.fd_t,
buffer: WriteBuffer,
offset: u64,
},

send: struct {
fd: std.os.fd_t,
buffer: WriteBuffer,
Expand Down Expand Up @@ -1169,6 +1252,7 @@ pub const ConnectError = std.os.EpollCtlError || std.os.ConnectError || error{

pub const ReadError = ThreadPoolError || std.os.EpollCtlError ||
std.os.ReadError ||
std.os.PReadError ||
std.os.RecvFromError ||
error{
DupFailed,
Expand All @@ -1178,6 +1262,7 @@ pub const ReadError = ThreadPoolError || std.os.EpollCtlError ||

pub const WriteError = ThreadPoolError || std.os.EpollCtlError ||
std.os.WriteError ||
std.os.PWriteError ||
std.os.SendError ||
std.os.SendMsgError ||
error{
Expand Down
65 changes: 65 additions & 0 deletions src/backend/io_uring.zig
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,22 @@ pub const Loop = struct {
),
},

.pread => |*v| switch (v.buffer) {
.array => |*buf| linux.io_uring_prep_read(
sqe,
v.fd,
buf,
v.offset,
),

.slice => |buf| linux.io_uring_prep_read(
sqe,
v.fd,
buf,
v.offset,
),
},

.recv => |*v| switch (v.buffer) {
.array => |*buf| linux.io_uring_prep_recv(
sqe,
Expand Down Expand Up @@ -498,6 +514,22 @@ pub const Loop = struct {
),
},

.pwrite => |*v| switch (v.buffer) {
.array => |*buf| linux.io_uring_prep_write(
sqe,
v.fd,
buf.array[0..buf.len],
v.offset,
),

.slice => |buf| linux.io_uring_prep_write(
sqe,
v.fd,
buf,
v.offset,
),
},

.cancel => |v| linux.io_uring_prep_cancel(sqe, @intCast(@intFromPtr(v.c)), 0),
}

Expand Down Expand Up @@ -598,6 +630,10 @@ pub const Completion = struct {
.read = self.readResult(.read, res),
},

.pread => .{
.pread = self.readResult(.pread, res),
},

.recv => .{
.recv = self.readResult(.recv, res),
},
Expand Down Expand Up @@ -676,6 +712,15 @@ pub const Completion = struct {
},
},

.pwrite => .{
.pwrite = if (res >= 0)
@intCast(res)
else switch (@as(std.os.E, @enumFromInt(-res))) {
.CANCELED => error.Canceled,
else => |errno| std.os.unexpectedErrno(errno),
},
},

.cancel => .{
.cancel = if (res >= 0) {} else switch (@as(std.os.E, @enumFromInt(-res))) {
.NOENT => error.NotFound,
Expand Down Expand Up @@ -733,6 +778,9 @@ pub const OperationType = enum {
/// Read
read,

/// PRead
pread,

/// Receive a message from a socket.
recv,

Expand All @@ -748,6 +796,9 @@ pub const OperationType = enum {
/// Shutdown all or part of a full-duplex connection.
shutdown,

/// PWrite
pwrite,

/// Write
write,

Expand All @@ -771,6 +822,7 @@ pub const Result = union(OperationType) {
close: CloseError!void,
poll: PollError!void,
read: ReadError!usize,
pread: ReadError!usize,
recv: ReadError!usize,
send: WriteError!usize,
sendmsg: WriteError!usize,
Expand All @@ -779,6 +831,7 @@ pub const Result = union(OperationType) {
timer: TimerError!TimerTrigger,
timer_remove: TimerRemoveError!void,
write: WriteError!usize,
pwrite: WriteError!usize,
cancel: CancelError!void,
};

Expand Down Expand Up @@ -815,6 +868,12 @@ pub const Operation = union(OperationType) {
buffer: ReadBuffer,
},

pread: struct {
fd: std.os.fd_t,
buffer: ReadBuffer,
offset: u64,
},

recv: struct {
fd: std.os.fd_t,
buffer: ReadBuffer,
Expand Down Expand Up @@ -866,6 +925,12 @@ pub const Operation = union(OperationType) {
buffer: WriteBuffer,
},

pwrite: struct {
fd: std.os.fd_t,
buffer: WriteBuffer,
offset: u64,
},

cancel: struct {
c: *Completion,
},
Expand Down
Loading

0 comments on commit 8017f2d

Please sign in to comment.