diff --git a/src/watcher/stream.zig b/src/watcher/stream.zig index e0add1f..3861b75 100644 --- a/src/watcher/stream.zig +++ b/src/watcher/stream.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const assert = std.debug.assert; const builtin = @import("builtin"); const common = @import("common.zig"); const queue = @import("../queue.zig"); @@ -468,15 +469,15 @@ pub fn Writeable(comptime xev: type, comptime T: type, comptime options: Options /// Given a `WriteBuffer` and number of bytes written during the previous /// write operation, returns a new `WriteBuffer` with remaining data. fn writeBufferRemainder(buf: xev.WriteBuffer, offset: usize) xev.WriteBuffer { - var wb: xev.WriteBuffer = undefined; - switch (buf) { .slice => |slice| { - wb = .{ .slice = slice[offset..] }; + assert(offset <= slice.len); + return .{ .slice = slice[offset..] }; }, .array => |array| { + assert(offset <= array.len); const rem_len = array.len - offset; - wb = .{ .array = .{ + var wb = xev.WriteBuffer{ .array = .{ .array = undefined, .len = rem_len, } }; @@ -484,10 +485,9 @@ pub fn Writeable(comptime xev: type, comptime T: type, comptime options: Options wb.array.array[0..rem_len], array.array[offset..][0..rem_len], ); + return wb; }, } - - return wb; } }; } diff --git a/src/watcher/tcp.zig b/src/watcher/tcp.zig index 48c20de..ac64987 100644 --- a/src/watcher/tcp.zig +++ b/src/watcher/tcp.zig @@ -225,17 +225,24 @@ pub fn TCP(comptime xev: type) type { var loop = try xev.Loop.init(.{}); defer loop.deinit(); - const address = try std.net.Address.parseIp4("127.0.0.1", 3131); + // Choose random available port (Zig #14907) + var address = try std.net.Address.parseIp4("127.0.0.1", 0); const server = try Self.init(address); + + // Bind and listen + try server.bind(address); + try server.listen(1); + + // Retrieve bound port and initialize client + var sock_len = address.getOsSockLen(); + try os.getsockname(server.fd, &address.any, &sock_len); const client = try Self.init(address); // Completions we need var c_accept: xev.Completion = undefined; var c_connect: xev.Completion = undefined; - // Bind and accept - try server.bind(address); - try server.listen(1); + // Accept var server_conn: ?Self = null; server.accept(&loop, &c_accept, ?Self, &server_conn, (struct { fn callback( @@ -359,5 +366,240 @@ pub fn TCP(comptime xev: type) type { try testing.expect(!connected); try testing.expect(server_closed); } + + // Potentially flaky - this test could hang if the sender is unable to + // write everything to the socket for whatever reason + // (e.g. incorrectly sized buffer on the receiver side), or if the + // receiver is trying to receive while sender has nothing left to send. + // + // Overview: + // 1. Set up server and client sockets + // 2. connect & accept, set SO_SNDBUF to 8kB on the client + // 3. Try to send 1MB buffer from client to server without queuing, this _should_ fail + // and theoretically send <= 8kB, but in practice, it seems to write ~32kB. + // Asserts that <= 100kB was written + // 4. Set up a queued write with the remaining buffer, shutdown() the socket afterwards + // 5. Set up a receiver that loops until it receives the entire buffer + // 6. Assert send_buf == recv_buf + test "TCP: Queued writes" { + // We have no way to get a socket in WASI from a WASI context. + if (xev.backend == .wasi_poll) return error.SkipZigTest; + + const testing = std.testing; + + var loop = try xev.Loop.init(.{}); + defer loop.deinit(); + + // Choose random available port (Zig #14907) + var address = try std.net.Address.parseIp4("127.0.0.1", 0); + const server = try Self.init(address); + + // Bind and listen + try server.bind(address); + try server.listen(1); + + // Retrieve bound port and initialize client + var sock_len = address.getOsSockLen(); + try os.getsockname(server.fd, &address.any, &sock_len); + const client = try Self.init(address); + + // Completions we need + var c_accept: xev.Completion = undefined; + var c_connect: xev.Completion = undefined; + + // Accept + var server_conn: ?Self = null; + server.accept(&loop, &c_accept, ?Self, &server_conn, (struct { + fn callback( + ud: ?*?Self, + _: *xev.Loop, + _: *xev.Completion, + r: AcceptError!Self, + ) xev.CallbackAction { + ud.?.* = r catch unreachable; + return .disarm; + } + }).callback); + + // Connect + var connected: bool = false; + client.connect(&loop, &c_connect, address, bool, &connected, (struct { + fn callback( + ud: ?*bool, + _: *xev.Loop, + _: *xev.Completion, + _: Self, + r: ConnectError!void, + ) xev.CallbackAction { + _ = r catch unreachable; + ud.?.* = true; + return .disarm; + } + }).callback); + + // Wait for the connection to be established + try loop.run(.until_done); + try testing.expect(server_conn != null); + try testing.expect(connected); + + // Close the server + var server_closed = false; + server.close(&loop, &c_accept, bool, &server_closed, (struct { + fn callback( + ud: ?*bool, + _: *xev.Loop, + _: *xev.Completion, + _: Self, + r: Self.CloseError!void, + ) xev.CallbackAction { + _ = r catch unreachable; + ud.?.* = true; + return .disarm; + } + }).callback); + try loop.run(.until_done); + try testing.expect(server_closed); + + // Unqueued send - Limit send buffer to 8kB, this should force partial writes. + try os.setsockopt(client.fd, os.SOL.SOCKET, os.SO.SNDBUF, &std.mem.toBytes(@as(c_int, 8192))); + + const send_buf = [_]u8{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 } ** 100_000; + var sent_unqueued: usize = 0; + + // First we try to send the whole 1MB buffer in one write operation, this _should_ result + // in a partial write. + client.write(&loop, &c_connect, .{ .slice = &send_buf }, usize, &sent_unqueued, (struct { + fn callback( + sent_unqueued_inner: ?*usize, + _: *xev.Loop, + _: *xev.Completion, + _: Self, + _: xev.WriteBuffer, + r: Self.WriteError!usize, + ) xev.CallbackAction { + sent_unqueued_inner.?.* = r catch unreachable; + return .disarm; + } + }).callback); + + // Make sure that we sent a small fraction of the buffer + try loop.run(.until_done); + // SO_SNDBUF doesn't seem to be respected exactly, sent_unqueued will often be ~32kB + // even though SO_SNDBUF was set to 8kB + try testing.expect(sent_unqueued < (send_buf.len / 10)); + + // Set up queued write + var w_queue = Self.WriteQueue{}; + var wr_send: xev.TCP.WriteRequest = undefined; + var sent_queued: usize = 0; + var queued_slice = send_buf[sent_unqueued..]; + client.queueWrite(&loop, &w_queue, &wr_send, .{ .slice = queued_slice }, usize, &sent_queued, (struct { + fn callback( + sent_queued_inner: ?*usize, + l: *xev.Loop, + c: *xev.Completion, + tcp: Self, + _: xev.WriteBuffer, + r: Self.WriteError!usize, + ) xev.CallbackAction { + sent_queued_inner.?.* = r catch unreachable; + + tcp.shutdown(l, c, void, null, (struct { + fn callback( + _: ?*void, + _: *xev.Loop, + _: *xev.Completion, + _: Self, + _: Self.ShutdownError!void, + ) xev.CallbackAction { + return .disarm; + } + }).callback); + + return .disarm; + } + }).callback); + + // Set up receiver which is going to keep reading until it reads the full + // send buffer + const Receiver = struct { + loop: *xev.Loop, + conn: Self, + completion: xev.Completion = .{}, + buf: [send_buf.len]u8 = undefined, + bytes_read: usize = 0, + + pub fn read(receiver: *@This()) void { + if (receiver.bytes_read == receiver.buf.len) return; + + var read_buf = xev.ReadBuffer{ + .slice = receiver.buf[receiver.bytes_read..], + }; + receiver.conn.read(receiver.loop, &receiver.completion, read_buf, @This(), receiver, readCb); + } + + pub fn readCb( + receiver_opt: ?*@This(), + _: *xev.Loop, + _: *xev.Completion, + _: Self, + _: xev.ReadBuffer, + r: Self.ReadError!usize, + ) xev.CallbackAction { + var receiver = receiver_opt.?; + var n_bytes = r catch unreachable; + + receiver.bytes_read += n_bytes; + if (receiver.bytes_read < send_buf.len) { + receiver.read(); + } + + return .disarm; + } + }; + var receiver = Receiver{ + .loop = &loop, + .conn = server_conn.?, + }; + receiver.read(); + + // Wait for the send/receive + try loop.run(.until_done); + try testing.expectEqualSlices(u8, &send_buf, receiver.buf[0..receiver.bytes_read]); + try testing.expect(send_buf.len == sent_unqueued + sent_queued); + + // Close + server_conn.?.close(&loop, &c_accept, ?Self, &server_conn, (struct { + fn callback( + ud: ?*?Self, + _: *xev.Loop, + _: *xev.Completion, + _: Self, + r: Self.CloseError!void, + ) xev.CallbackAction { + _ = r catch unreachable; + ud.?.* = null; + return .disarm; + } + }).callback); + client.close(&loop, &c_connect, bool, &connected, (struct { + fn callback( + ud: ?*bool, + _: *xev.Loop, + _: *xev.Completion, + _: Self, + r: Self.CloseError!void, + ) xev.CallbackAction { + _ = r catch unreachable; + ud.?.* = false; + return .disarm; + } + }).callback); + + try loop.run(.until_done); + try testing.expect(server_conn == null); + try testing.expect(!connected); + try testing.expect(server_closed); + } }; }