Skip to content

Commit

Permalink
fix WriteQueue handling: retry partially completed writes
Browse files Browse the repository at this point in the history
  • Loading branch information
recursiveGecko committed Aug 9, 2023
1 parent 7421cc9 commit 32b8549
Showing 1 changed file with 69 additions and 5 deletions.
74 changes: 69 additions & 5 deletions src/watcher/stream.zig
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ pub fn Writeable(comptime xev: type, comptime T: type, comptime options: Options
pub const WriteRequest = struct {
completion: xev.Completion = .{},
userdata: ?*anyopaque = null,
initial_write_buffer: xev.WriteBuffer,
next: ?*@This() = null,

/// This can be used to convert a completion pointer back to
Expand Down Expand Up @@ -250,7 +251,8 @@ pub fn Writeable(comptime xev: type, comptime T: type, comptime options: Options
) xev.CallbackAction,
) void {
// Initialize our completion
req.* = .{};
req.* = .{ .initial_write_buffer = buf };
// Must be kept in sync with partial write logic inside the callback
self.write_init(&req.completion, buf);
req.completion.userdata = q;
req.completion.callback = (struct {
Expand All @@ -265,16 +267,45 @@ pub fn Writeable(comptime xev: type, comptime T: type, comptime options: Options
// The queue MUST have a request because a completion
// can only be added if the queue is not empty, and
// nothing else should be popping!.
const req_inner = q_inner.pop().?;
const req_inner: *WriteRequest = q_inner.head.?;

const cb_res = write_result(c_inner, r);
var writer: Self = cb_res.writer;
var result: WriteError!usize = cb_res.result;

// Checks whether the entire buffer was written, this is
// necessary to guarantee correct ordering of writes.
// If the write was partial, it re-submits the remainder of
// the buffer.
const queued_len = writeBufferLength(cb_res.buf);
if (cb_res.result) |written_len| {
if (written_len < queued_len) {
const rem_buf = writeBufferRemainder(cb_res.buf, written_len);

// Write remainder of the buffer, reusing the same completion
writer.write_init(&req_inner.completion, rem_buf);
req_inner.completion.userdata = q_inner;
req_inner.completion.callback = callback;
l_inner.add(&req_inner.completion);

return .disarm;
}

// We wrote the entire buffer, modify the result to indicate
// to the caller that all bytes have been written.
result = writeBufferLength(req_inner.initial_write_buffer);
} else |_| {}

// We can pop previously peeked request.
_ = q_inner.pop().?;

const action = @call(.always_inline, cb, .{
common.userdataValue(Userdata, req_inner.userdata),
l_inner,
c_inner,
cb_res.writer,
cb_res.buf,
cb_res.result,
writer,
req_inner.initial_write_buffer,
result,
});

// Rearm requeues this request, it doesn't return rearm
Expand Down Expand Up @@ -425,6 +456,39 @@ pub fn Writeable(comptime xev: type, comptime T: type, comptime options: Options
},
}
}

/// Returns the length of the write buffer
fn writeBufferLength(buf: xev.WriteBuffer) usize {
return switch (buf) {
.slice => |slice| slice.len,
.array => |array| array.len,
};
}

/// Given a `WriteBuffer` and number of bytes written during the previous
/// write operation, returns a new `WriteBuffer` with remaining data.
fn writeBufferRemainder(buf: xev.WriteBuffer, offset: usize) xev.WriteBuffer {
var wb: xev.WriteBuffer = undefined;

switch (buf) {
.slice => |slice| {
wb = .{ .slice = slice[offset..] };
},
.array => |array| {
const rem_len = array.len - offset;
wb = .{ .array = .{
.array = undefined,
.len = rem_len,
} };
@memcpy(
wb.array.array[0..rem_len],
array.array[offset..][0..rem_len],
);
},
}

return wb;
}
};
}

Expand Down

0 comments on commit 32b8549

Please sign in to comment.