Skip to content

Commit

Permalink
add integration test for TCP + queueWrite()
Browse files Browse the repository at this point in the history
  • Loading branch information
recursiveGecko committed Aug 9, 2023
1 parent 32b8549 commit 42cda99
Show file tree
Hide file tree
Showing 2 changed files with 252 additions and 10 deletions.
12 changes: 6 additions & 6 deletions src/watcher/stream.zig
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const std = @import("std");
const assert = std.debug.assert;
const builtin = @import("builtin");
const common = @import("common.zig");
const queue = @import("../queue.zig");
Expand Down Expand Up @@ -468,26 +469,25 @@ pub fn Writeable(comptime xev: type, comptime T: type, comptime options: Options
/// Given a `WriteBuffer` and number of bytes written during the previous
/// write operation, returns a new `WriteBuffer` with remaining data.
fn writeBufferRemainder(buf: xev.WriteBuffer, offset: usize) xev.WriteBuffer {
var wb: xev.WriteBuffer = undefined;

switch (buf) {
.slice => |slice| {
wb = .{ .slice = slice[offset..] };
assert(offset <= slice.len);
return .{ .slice = slice[offset..] };
},
.array => |array| {
assert(offset <= array.len);
const rem_len = array.len - offset;
wb = .{ .array = .{
var wb = xev.WriteBuffer{ .array = .{
.array = undefined,
.len = rem_len,
} };
@memcpy(
wb.array.array[0..rem_len],
array.array[offset..][0..rem_len],
);
return wb;
},
}

return wb;
}
};
}
Expand Down
250 changes: 246 additions & 4 deletions src/watcher/tcp.zig
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,24 @@ pub fn TCP(comptime xev: type) type {
var loop = try xev.Loop.init(.{});
defer loop.deinit();

const address = try std.net.Address.parseIp4("127.0.0.1", 3131);
// Choose random available port (Zig #14907)
var address = try std.net.Address.parseIp4("127.0.0.1", 0);
const server = try Self.init(address);

// Bind and listen
try server.bind(address);
try server.listen(1);

// Retrieve bound port and initialize client
var sock_len = address.getOsSockLen();
try os.getsockname(server.fd, &address.any, &sock_len);
const client = try Self.init(address);

// Completions we need
var c_accept: xev.Completion = undefined;
var c_connect: xev.Completion = undefined;

// Bind and accept
try server.bind(address);
try server.listen(1);
// Accept
var server_conn: ?Self = null;
server.accept(&loop, &c_accept, ?Self, &server_conn, (struct {
fn callback(
Expand Down Expand Up @@ -359,5 +366,240 @@ pub fn TCP(comptime xev: type) type {
try testing.expect(!connected);
try testing.expect(server_closed);
}

// Potentially flaky - this test could hang if the sender is unable to
// write everything to the socket for whatever reason
// (e.g. incorrectly sized buffer on the receiver side), or if the
// receiver is trying to receive while sender has nothing left to send.
//
// Overview:
// 1. Set up server and client sockets
// 2. connect & accept, set SO_SNDBUF to 8kB on the client
// 3. Try to send 1MB buffer from client to server without queuing, this _should_ fail
// and theoretically send <= 8kB, but in practice, it seems to write ~32kB.
// Asserts that <= 100kB was written
// 4. Set up a queued write with the remaining buffer, shutdown() the socket afterwards
// 5. Set up a receiver that loops until it receives the entire buffer
// 6. Assert send_buf == recv_buf
test "TCP: Queued writes" {
// We have no way to get a socket in WASI from a WASI context.
if (xev.backend == .wasi_poll) return error.SkipZigTest;

const testing = std.testing;

var loop = try xev.Loop.init(.{});
defer loop.deinit();

// Choose random available port (Zig #14907)
var address = try std.net.Address.parseIp4("127.0.0.1", 0);
const server = try Self.init(address);

// Bind and listen
try server.bind(address);
try server.listen(1);

// Retrieve bound port and initialize client
var sock_len = address.getOsSockLen();
try os.getsockname(server.fd, &address.any, &sock_len);
const client = try Self.init(address);

// Completions we need
var c_accept: xev.Completion = undefined;
var c_connect: xev.Completion = undefined;

// Accept
var server_conn: ?Self = null;
server.accept(&loop, &c_accept, ?Self, &server_conn, (struct {
fn callback(
ud: ?*?Self,
_: *xev.Loop,
_: *xev.Completion,
r: AcceptError!Self,
) xev.CallbackAction {
ud.?.* = r catch unreachable;
return .disarm;
}
}).callback);

// Connect
var connected: bool = false;
client.connect(&loop, &c_connect, address, bool, &connected, (struct {
fn callback(
ud: ?*bool,
_: *xev.Loop,
_: *xev.Completion,
_: Self,
r: ConnectError!void,
) xev.CallbackAction {
_ = r catch unreachable;
ud.?.* = true;
return .disarm;
}
}).callback);

// Wait for the connection to be established
try loop.run(.until_done);
try testing.expect(server_conn != null);
try testing.expect(connected);

// Close the server
var server_closed = false;
server.close(&loop, &c_accept, bool, &server_closed, (struct {
fn callback(
ud: ?*bool,
_: *xev.Loop,
_: *xev.Completion,
_: Self,
r: Self.CloseError!void,
) xev.CallbackAction {
_ = r catch unreachable;
ud.?.* = true;
return .disarm;
}
}).callback);
try loop.run(.until_done);
try testing.expect(server_closed);

// Unqueued send - Limit send buffer to 8kB, this should force partial writes.
try os.setsockopt(client.fd, os.SOL.SOCKET, os.SO.SNDBUF, &std.mem.toBytes(@as(c_int, 8192)));

const send_buf = [_]u8{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 } ** 100_000;
var sent_unqueued: usize = 0;

// First we try to send the whole 1MB buffer in one write operation, this _should_ result
// in a partial write.
client.write(&loop, &c_connect, .{ .slice = &send_buf }, usize, &sent_unqueued, (struct {
fn callback(
sent_unqueued_inner: ?*usize,
_: *xev.Loop,
_: *xev.Completion,
_: Self,
_: xev.WriteBuffer,
r: Self.WriteError!usize,
) xev.CallbackAction {
sent_unqueued_inner.?.* = r catch unreachable;
return .disarm;
}
}).callback);

// Make sure that we sent a small fraction of the buffer
try loop.run(.until_done);
// SO_SNDBUF doesn't seem to be respected exactly, sent_unqueued will often be ~32kB
// even though SO_SNDBUF was set to 8kB
try testing.expect(sent_unqueued < (send_buf.len / 10));

// Set up queued write
var w_queue = Self.WriteQueue{};
var wr_send: xev.TCP.WriteRequest = undefined;
var sent_queued: usize = 0;
var queued_slice = send_buf[sent_unqueued..];
client.queueWrite(&loop, &w_queue, &wr_send, .{ .slice = queued_slice }, usize, &sent_queued, (struct {
fn callback(
sent_queued_inner: ?*usize,
l: *xev.Loop,
c: *xev.Completion,
tcp: Self,
_: xev.WriteBuffer,
r: Self.WriteError!usize,
) xev.CallbackAction {
sent_queued_inner.?.* = r catch unreachable;

tcp.shutdown(l, c, void, null, (struct {
fn callback(
_: ?*void,
_: *xev.Loop,
_: *xev.Completion,
_: Self,
_: Self.ShutdownError!void,
) xev.CallbackAction {
return .disarm;
}
}).callback);

return .disarm;
}
}).callback);

// Set up receiver which is going to keep reading until it reads the full
// send buffer
const Receiver = struct {
loop: *xev.Loop,
conn: Self,
completion: xev.Completion = .{},
buf: [send_buf.len]u8 = undefined,
bytes_read: usize = 0,

pub fn read(receiver: *@This()) void {
if (receiver.bytes_read == receiver.buf.len) return;

var read_buf = xev.ReadBuffer{
.slice = receiver.buf[receiver.bytes_read..],
};
receiver.conn.read(receiver.loop, &receiver.completion, read_buf, @This(), receiver, readCb);
}

pub fn readCb(
receiver_opt: ?*@This(),
_: *xev.Loop,
_: *xev.Completion,
_: Self,
_: xev.ReadBuffer,
r: Self.ReadError!usize,
) xev.CallbackAction {
var receiver = receiver_opt.?;
var n_bytes = r catch unreachable;

receiver.bytes_read += n_bytes;
if (receiver.bytes_read < send_buf.len) {
receiver.read();
}

return .disarm;
}
};
var receiver = Receiver{
.loop = &loop,
.conn = server_conn.?,
};
receiver.read();

// Wait for the send/receive
try loop.run(.until_done);
try testing.expectEqualSlices(u8, &send_buf, receiver.buf[0..receiver.bytes_read]);
try testing.expect(send_buf.len == sent_unqueued + sent_queued);

// Close
server_conn.?.close(&loop, &c_accept, ?Self, &server_conn, (struct {
fn callback(
ud: ?*?Self,
_: *xev.Loop,
_: *xev.Completion,
_: Self,
r: Self.CloseError!void,
) xev.CallbackAction {
_ = r catch unreachable;
ud.?.* = null;
return .disarm;
}
}).callback);
client.close(&loop, &c_connect, bool, &connected, (struct {
fn callback(
ud: ?*bool,
_: *xev.Loop,
_: *xev.Completion,
_: Self,
r: Self.CloseError!void,
) xev.CallbackAction {
_ = r catch unreachable;
ud.?.* = false;
return .disarm;
}
}).callback);

try loop.run(.until_done);
try testing.expect(server_conn == null);
try testing.expect(!connected);
try testing.expect(server_closed);
}
};
}

0 comments on commit 42cda99

Please sign in to comment.