Skip to content

Commit

Permalink
Copy a42b74ae8139738a14148f94543c659ec2d5b92b from libxev (#12128)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarred-Sumner authored Jun 25, 2024
1 parent d191ec5 commit ccd92a9
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 34 deletions.
68 changes: 55 additions & 13 deletions packages/bun-usockets/src/eventing/epoll_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -549,12 +549,28 @@ struct us_internal_async *us_internal_create_async(struct us_loop_t *loop, int f
}

cb->machport_buf = us_malloc(MACHPORT_BUF_LEN);
kern_return_t kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &cb->port);
mach_port_t self = mach_task_self();
kern_return_t kr = mach_port_allocate(self, MACH_PORT_RIGHT_RECEIVE, &cb->port);

if (UNLIKELY(kr != KERN_SUCCESS)) {
return NULL;
}

// Insert a send right into the port since we also use this to send
kr = mach_port_insert_right(self, cb->port, cb->port, MACH_MSG_TYPE_MAKE_SEND);
if (UNLIKELY(kr != KERN_SUCCESS)) {
return NULL;
}

// Modify the port queue size to be 1 because we are only
// using it for notifications and not for any other purpose.
mach_port_limits_t limits = { .mpl_qlimit = 1 };
kr = mach_port_set_attributes(self, cb->port, MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits, MACH_PORT_LIMITS_INFO_COUNT);

if (UNLIKELY(kr != KERN_SUCCESS)) {
return NULL;
}

return (struct us_internal_async *) cb;
}

Expand Down Expand Up @@ -602,18 +618,44 @@ void us_internal_async_set(struct us_internal_async *a, void (*cb)(struct us_int

void us_internal_async_wakeup(struct us_internal_async *a) {
struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a;
mach_msg_empty_send_t message;
memset(&message, 0, sizeof(message));
message.header.msgh_size = sizeof(message);
message.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE);
message.header.msgh_remote_port = internal_cb->port;
kern_return_t kr = mach_msg_send(&message.header);
if (kr != KERN_SUCCESS) {
// If us_internal_async_wakeup is being called by other threads faster
// than the pump can dispatch work, the kernel message queue for the wakeup
// port can fill The kernel does return a SEND_ONCE right in the case of
// failure, which must be destroyed to avoid leaking.
mach_msg_destroy(&message.header);
mach_msg_header_t msg = {
.msgh_bits = MACH_MSGH_BITS(MACH_MSG_TYPE_COPY_SEND, 0),
.msgh_size = sizeof(mach_msg_header_t),
.msgh_remote_port = internal_cb->port,
.msgh_local_port = MACH_PORT_NULL,
.msgh_voucher_port = 0,
.msgh_id = 0,
};

mach_msg_return_t kr = mach_msg(
&msg,
MACH_SEND_MSG | MACH_SEND_TIMEOUT,
msg.msgh_size,
0,
MACH_PORT_NULL,
0, // Fail instantly if the port is full
MACH_PORT_NULL
);

switch (kr) {
case KERN_SUCCESS: {
break;
}

// This means that the send would've blocked because the
// queue is full. We assume success because the port is full.
case MACH_SEND_TIMED_OUT: {
break;
}

// No space means it will wake up.
case MACH_SEND_NO_BUFFER: {
break;
}

default: {
break;
}
}
}
#endif
Expand Down
84 changes: 63 additions & 21 deletions src/io/io_darwin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,36 @@
// errno
#include <errno.h>

#include "wtf/Assertions.h"

extern "C" mach_port_t io_darwin_create_machport(uint64_t wakeup, int32_t fd,
void *wakeup_buffer_,
size_t nbytes) {

mach_port_t port;
// Create a Mach port that will be used to wake up the pump
kern_return_t kr =
mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &port);
if (kr != KERN_SUCCESS) {
return 0;
mach_port_t self = mach_task_self();
kern_return_t kr = mach_port_allocate(self, MACH_PORT_RIGHT_RECEIVE, &port);

if (UNLIKELY(kr != KERN_SUCCESS)) {
return 0;
}

// Insert a send right into the port since we also use this to send
kr = mach_port_insert_right(self, port, port, MACH_MSG_TYPE_MAKE_SEND);
if (UNLIKELY(kr != KERN_SUCCESS)) {
return 0;
}

// Modify the port queue size to be 1 because we are only
// using it for notifications and not for any other purpose.
mach_port_limits_t limits = { .mpl_qlimit = 1 };
kr = mach_port_set_attributes(self, port, MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits, MACH_PORT_LIMITS_INFO_COUNT);

if (UNLIKELY(kr != KERN_SUCCESS)) {
return 0;
}


// Configure the event to directly receive the Mach message as part of the
// kevent64() call.
kevent64_s event{};
Expand Down Expand Up @@ -58,22 +76,46 @@ extern "C" bool getaddrinfo_send_reply(mach_port_t port,
}

extern "C" bool io_darwin_schedule_wakeup(mach_port_t waker) {
mach_msg_empty_send_t message{};
message.header.msgh_size = sizeof(message);
message.header.msgh_bits =
MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE);
message.header.msgh_remote_port = waker;
kern_return_t kr = mach_msg_send(&message.header);
if (kr != KERN_SUCCESS) {
// If io_darwin_schedule_wakeup() is being called by other threads faster
// than the pump can dispatch work, the kernel message queue for the wakeup
// port can fill The kernel does return a SEND_ONCE right in the case of
// failure, which must be destroyed to avoid leaking.
mach_msg_destroy(&message.header);
return false;
}

return true;
mach_msg_header_t msg = {
.msgh_bits = MACH_MSGH_BITS(MACH_MSG_TYPE_COPY_SEND, 0),
.msgh_size = sizeof(mach_msg_header_t),
.msgh_remote_port = waker,
.msgh_local_port = MACH_PORT_NULL,
.msgh_voucher_port = 0,
.msgh_id = 0,
};

mach_msg_return_t kr = mach_msg(
&msg,
MACH_SEND_MSG | MACH_SEND_TIMEOUT,
msg.msgh_size,
0,
MACH_PORT_NULL,
0, // Fail instantly if the port is full
MACH_PORT_NULL
);

switch (kr) {
case KERN_SUCCESS: {
return true;
}

// This means that the send would've blocked because the
// queue is full. We assume success because the port is full.
case MACH_SEND_TIMED_OUT: {
return true;
}

// No space means it will wake up.
case MACH_SEND_NO_BUFFER: {
return true;
}

default: {
ASSERT_NOT_REACHED_WITH_MESSAGE("mach_msg failed with %d", kr);
return false;
}
}
}

#else
Expand Down

0 comments on commit ccd92a9

Please sign in to comment.