Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't use SEND_ONCE mach rights for async, use a queue limit of 1 #102

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions src/watcher/async.zig
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ fn AsyncMachPort(comptime xev: type) type {
/// The error that can come in the wait callback.
pub const WaitError = xev.Sys.MachPortError;

/// Missing Mach APIs from Zig stdlib. Data from xnu: osfmk/mach/port.h
const mach_port_flavor_t = c_int;
const mach_port_limits = extern struct { mpl_qlimit: c_uint };
const MACH_PORT_LIMITS_INFO = 1;
extern "c" fn mach_port_set_attributes(
task: posix.system.ipc_space_t,
name: posix.system.mach_port_name_t,
flavor: mach_port_flavor_t,
info: *anyopaque,
count: posix.system.mach_msg_type_number_t,
) posix.system.kern_return_t;

/// The mach port
port: posix.system.mach_port_name_t,

Expand All @@ -155,6 +167,31 @@ fn AsyncMachPort(comptime xev: type) type {
}
errdefer _ = posix.system.mach_port_deallocate(mach_self, mach_port);

// Insert a send right into the port since we also use this to send
switch (posix.system.getKernError(posix.system.mach_port_insert_right(
mach_self,
mach_port,
mach_port,
@intFromEnum(posix.system.MACH_MSG_TYPE.MAKE_SEND),
))) {
.SUCCESS => {}, // Success
else => return error.MachPortAllocFailed,
}

// Modify the port queue size to be 1 because we are only
// using it for notifications and not for any other purpose.
var limits: mach_port_limits = .{ .mpl_qlimit = 1 };
switch (posix.system.getKernError(mach_port_set_attributes(
mach_self,
mach_port,
MACH_PORT_LIMITS_INFO,
&limits,
@sizeOf(@TypeOf(limits)),
))) {
.SUCCESS => {}, // Success
else => return error.MachPortAllocFailed,
}

return .{
.port = mach_port,
};
Expand Down Expand Up @@ -266,7 +303,9 @@ fn AsyncMachPort(comptime xev: type) type {
pub fn notify(self: Self) !void {
// This constructs an empty mach message. It has no data.
var msg: posix.system.mach_msg_header_t = .{
.msgh_bits = @intFromEnum(posix.system.MACH_MSG_TYPE.MAKE_SEND_ONCE),
// We use COPY_SEND which will not increment any send ref
// counts because it'll reuse the existing send right.
.msgh_bits = @intFromEnum(posix.system.MACH_MSG_TYPE.COPY_SEND),
.msgh_size = @sizeOf(posix.system.mach_msg_header_t),
.msgh_remote_port = self.port,
.msgh_local_port = posix.system.MACH_PORT_NULL,
Expand All @@ -277,11 +316,11 @@ fn AsyncMachPort(comptime xev: type) type {
return switch (posix.system.getMachMsgError(
posix.system.mach_msg(
&msg,
posix.system.MACH_SEND_MSG,
posix.system.MACH_SEND_MSG | posix.system.MACH_SEND_TIMEOUT,
msg.msgh_size,
0,
posix.system.MACH_PORT_NULL,
posix.system.MACH_MSG_TIMEOUT_NONE,
0, // Fail instantly if the port is full
posix.system.MACH_PORT_NULL,
),
)) {
Expand All @@ -294,6 +333,10 @@ fn AsyncMachPort(comptime xev: type) type {
// This is okay because it means that there was no more buffer
// space meaning that the port will wake up.
.SEND_NO_BUFFER => {},

// This means that the send would've blocked because the
// queue is full. We assume success because the port is full.
.SEND_TIMED_OUT => {},
};
}

Expand Down