Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy a42b74ae8139738a14148f94543c659ec2d5b92b from libxev #12128

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 55 additions & 13 deletions packages/bun-usockets/src/eventing/epoll_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -549,12 +549,28 @@ struct us_internal_async *us_internal_create_async(struct us_loop_t *loop, int f
}

cb->machport_buf = us_malloc(MACHPORT_BUF_LEN);
kern_return_t kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &cb->port);
mach_port_t self = mach_task_self();
kern_return_t kr = mach_port_allocate(self, MACH_PORT_RIGHT_RECEIVE, &cb->port);

if (UNLIKELY(kr != KERN_SUCCESS)) {
return NULL;
}

// Insert a send right into the port since we also use this to send
kr = mach_port_insert_right(self, cb->port, cb->port, MACH_MSG_TYPE_MAKE_SEND);
if (UNLIKELY(kr != KERN_SUCCESS)) {
return NULL;
}

// Modify the port queue size to be 1 because we are only
// using it for notifications and not for any other purpose.
mach_port_limits_t limits = { .mpl_qlimit = 1 };
kr = mach_port_set_attributes(self, cb->port, MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits, MACH_PORT_LIMITS_INFO_COUNT);

if (UNLIKELY(kr != KERN_SUCCESS)) {
return NULL;
}

return (struct us_internal_async *) cb;
}

Expand Down Expand Up @@ -602,18 +618,44 @@ void us_internal_async_set(struct us_internal_async *a, void (*cb)(struct us_int

void us_internal_async_wakeup(struct us_internal_async *a) {
struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a;
mach_msg_empty_send_t message;
memset(&message, 0, sizeof(message));
message.header.msgh_size = sizeof(message);
message.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE);
message.header.msgh_remote_port = internal_cb->port;
kern_return_t kr = mach_msg_send(&message.header);
if (kr != KERN_SUCCESS) {
// If us_internal_async_wakeup is being called by other threads faster
// than the pump can dispatch work, the kernel message queue for the wakeup
// port can fill The kernel does return a SEND_ONCE right in the case of
// failure, which must be destroyed to avoid leaking.
mach_msg_destroy(&message.header);
mach_msg_header_t msg = {
.msgh_bits = MACH_MSGH_BITS(MACH_MSG_TYPE_COPY_SEND, 0),
.msgh_size = sizeof(mach_msg_header_t),
.msgh_remote_port = internal_cb->port,
.msgh_local_port = MACH_PORT_NULL,
.msgh_voucher_port = 0,
.msgh_id = 0,
};

mach_msg_return_t kr = mach_msg(
&msg,
MACH_SEND_MSG | MACH_SEND_TIMEOUT,
msg.msgh_size,
0,
MACH_PORT_NULL,
0, // Fail instantly if the port is full
MACH_PORT_NULL
);

switch (kr) {
case KERN_SUCCESS: {
break;
}

// This means that the send would've blocked because the
// queue is full. We assume success because the port is full.
case MACH_SEND_TIMED_OUT: {
break;
}

// No space means it will wake up.
case MACH_SEND_NO_BUFFER: {
break;
}

default: {
break;
}
}
}
#endif
Expand Down
84 changes: 63 additions & 21 deletions src/io/io_darwin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,36 @@
// errno
#include <errno.h>

#include "wtf/Assertions.h"

extern "C" mach_port_t io_darwin_create_machport(uint64_t wakeup, int32_t fd,
void *wakeup_buffer_,
size_t nbytes) {

mach_port_t port;
// Create a Mach port that will be used to wake up the pump
kern_return_t kr =
mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &port);
if (kr != KERN_SUCCESS) {
return 0;
mach_port_t self = mach_task_self();
kern_return_t kr = mach_port_allocate(self, MACH_PORT_RIGHT_RECEIVE, &port);

if (UNLIKELY(kr != KERN_SUCCESS)) {
return 0;
}

// Insert a send right into the port since we also use this to send
kr = mach_port_insert_right(self, port, port, MACH_MSG_TYPE_MAKE_SEND);
if (UNLIKELY(kr != KERN_SUCCESS)) {
return 0;
}

// Modify the port queue size to be 1 because we are only
// using it for notifications and not for any other purpose.
mach_port_limits_t limits = { .mpl_qlimit = 1 };
kr = mach_port_set_attributes(self, port, MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits, MACH_PORT_LIMITS_INFO_COUNT);

if (UNLIKELY(kr != KERN_SUCCESS)) {
return 0;
}


// Configure the event to directly receive the Mach message as part of the
// kevent64() call.
kevent64_s event{};
Expand Down Expand Up @@ -58,22 +76,46 @@ extern "C" bool getaddrinfo_send_reply(mach_port_t port,
}

extern "C" bool io_darwin_schedule_wakeup(mach_port_t waker) {
mach_msg_empty_send_t message{};
message.header.msgh_size = sizeof(message);
message.header.msgh_bits =
MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE);
message.header.msgh_remote_port = waker;
kern_return_t kr = mach_msg_send(&message.header);
if (kr != KERN_SUCCESS) {
// If io_darwin_schedule_wakeup() is being called by other threads faster
// than the pump can dispatch work, the kernel message queue for the wakeup
// port can fill The kernel does return a SEND_ONCE right in the case of
// failure, which must be destroyed to avoid leaking.
mach_msg_destroy(&message.header);
return false;
}

return true;
mach_msg_header_t msg = {
.msgh_bits = MACH_MSGH_BITS(MACH_MSG_TYPE_COPY_SEND, 0),
.msgh_size = sizeof(mach_msg_header_t),
.msgh_remote_port = waker,
.msgh_local_port = MACH_PORT_NULL,
.msgh_voucher_port = 0,
.msgh_id = 0,
};

mach_msg_return_t kr = mach_msg(
&msg,
MACH_SEND_MSG | MACH_SEND_TIMEOUT,
msg.msgh_size,
0,
MACH_PORT_NULL,
0, // Fail instantly if the port is full
MACH_PORT_NULL
);

switch (kr) {
case KERN_SUCCESS: {
return true;
}

// This means that the send would've blocked because the
// queue is full. We assume success because the port is full.
case MACH_SEND_TIMED_OUT: {
return true;
}

// No space means it will wake up.
case MACH_SEND_NO_BUFFER: {
return true;
}

default: {
ASSERT_NOT_REACHED_WITH_MESSAGE("mach_msg failed with %d", kr);
return false;
}
}
}

#else
Expand Down
Loading