From bf83f099f89ffaf6ed16684494d29dee641ce9b7 Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Mon, 24 Jun 2024 19:21:40 -0700 Subject: [PATCH] Copy a42b74ae8139738a14148f94543c659ec2d5b92b from libxev (#12128) --- .../bun-usockets/src/eventing/epoll_kqueue.c | 68 ++++++++++++--- src/io/io_darwin.cpp | 84 ++++++++++++++----- 2 files changed, 118 insertions(+), 34 deletions(-) diff --git a/packages/bun-usockets/src/eventing/epoll_kqueue.c b/packages/bun-usockets/src/eventing/epoll_kqueue.c index 155e329c7ef07c..192d0972fa712e 100644 --- a/packages/bun-usockets/src/eventing/epoll_kqueue.c +++ b/packages/bun-usockets/src/eventing/epoll_kqueue.c @@ -549,12 +549,28 @@ struct us_internal_async *us_internal_create_async(struct us_loop_t *loop, int f } cb->machport_buf = us_malloc(MACHPORT_BUF_LEN); - kern_return_t kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &cb->port); + mach_port_t self = mach_task_self(); + kern_return_t kr = mach_port_allocate(self, MACH_PORT_RIGHT_RECEIVE, &cb->port); if (UNLIKELY(kr != KERN_SUCCESS)) { return NULL; } + // Insert a send right into the port since we also use this to send + kr = mach_port_insert_right(self, cb->port, cb->port, MACH_MSG_TYPE_MAKE_SEND); + if (UNLIKELY(kr != KERN_SUCCESS)) { + return NULL; + } + + // Modify the port queue size to be 1 because we are only + // using it for notifications and not for any other purpose. + mach_port_limits_t limits = { .mpl_qlimit = 1 }; + kr = mach_port_set_attributes(self, cb->port, MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits, MACH_PORT_LIMITS_INFO_COUNT); + + if (UNLIKELY(kr != KERN_SUCCESS)) { + return NULL; + } + return (struct us_internal_async *) cb; } @@ -602,18 +618,44 @@ void us_internal_async_set(struct us_internal_async *a, void (*cb)(struct us_int void us_internal_async_wakeup(struct us_internal_async *a) { struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) a; - mach_msg_empty_send_t message; - memset(&message, 0, sizeof(message)); - message.header.msgh_size = sizeof(message); - message.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE); - message.header.msgh_remote_port = internal_cb->port; - kern_return_t kr = mach_msg_send(&message.header); - if (kr != KERN_SUCCESS) { - // If us_internal_async_wakeup is being called by other threads faster - // than the pump can dispatch work, the kernel message queue for the wakeup - // port can fill The kernel does return a SEND_ONCE right in the case of - // failure, which must be destroyed to avoid leaking. - mach_msg_destroy(&message.header); + mach_msg_header_t msg = { + .msgh_bits = MACH_MSGH_BITS(MACH_MSG_TYPE_COPY_SEND, 0), + .msgh_size = sizeof(mach_msg_header_t), + .msgh_remote_port = internal_cb->port, + .msgh_local_port = MACH_PORT_NULL, + .msgh_voucher_port = 0, + .msgh_id = 0, + }; + + mach_msg_return_t kr = mach_msg( + &msg, + MACH_SEND_MSG | MACH_SEND_TIMEOUT, + msg.msgh_size, + 0, + MACH_PORT_NULL, + 0, // Fail instantly if the port is full + MACH_PORT_NULL + ); + + switch (kr) { + case KERN_SUCCESS: { + break; + } + + // This means that the send would've blocked because the + // queue is full. We assume success because the port is full. + case MACH_SEND_TIMED_OUT: { + break; + } + + // No space means it will wake up. + case MACH_SEND_NO_BUFFER: { + break; + } + + default: { + break; + } } } #endif diff --git a/src/io/io_darwin.cpp b/src/io/io_darwin.cpp index 61e3f16243e1c7..05f7a0d94359df 100644 --- a/src/io/io_darwin.cpp +++ b/src/io/io_darwin.cpp @@ -7,18 +7,36 @@ // errno #include +#include "wtf/Assertions.h" + extern "C" mach_port_t io_darwin_create_machport(uint64_t wakeup, int32_t fd, void *wakeup_buffer_, size_t nbytes) { mach_port_t port; - // Create a Mach port that will be used to wake up the pump - kern_return_t kr = - mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &port); - if (kr != KERN_SUCCESS) { - return 0; + mach_port_t self = mach_task_self(); + kern_return_t kr = mach_port_allocate(self, MACH_PORT_RIGHT_RECEIVE, &port); + + if (UNLIKELY(kr != KERN_SUCCESS)) { + return 0; + } + + // Insert a send right into the port since we also use this to send + kr = mach_port_insert_right(self, port, port, MACH_MSG_TYPE_MAKE_SEND); + if (UNLIKELY(kr != KERN_SUCCESS)) { + return 0; } + // Modify the port queue size to be 1 because we are only + // using it for notifications and not for any other purpose. + mach_port_limits_t limits = { .mpl_qlimit = 1 }; + kr = mach_port_set_attributes(self, port, MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits, MACH_PORT_LIMITS_INFO_COUNT); + + if (UNLIKELY(kr != KERN_SUCCESS)) { + return 0; + } + + // Configure the event to directly receive the Mach message as part of the // kevent64() call. kevent64_s event{}; @@ -58,22 +76,46 @@ extern "C" bool getaddrinfo_send_reply(mach_port_t port, } extern "C" bool io_darwin_schedule_wakeup(mach_port_t waker) { - mach_msg_empty_send_t message{}; - message.header.msgh_size = sizeof(message); - message.header.msgh_bits = - MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_MAKE_SEND_ONCE); - message.header.msgh_remote_port = waker; - kern_return_t kr = mach_msg_send(&message.header); - if (kr != KERN_SUCCESS) { - // If io_darwin_schedule_wakeup() is being called by other threads faster - // than the pump can dispatch work, the kernel message queue for the wakeup - // port can fill The kernel does return a SEND_ONCE right in the case of - // failure, which must be destroyed to avoid leaking. - mach_msg_destroy(&message.header); - return false; - } - - return true; + mach_msg_header_t msg = { + .msgh_bits = MACH_MSGH_BITS(MACH_MSG_TYPE_COPY_SEND, 0), + .msgh_size = sizeof(mach_msg_header_t), + .msgh_remote_port = waker, + .msgh_local_port = MACH_PORT_NULL, + .msgh_voucher_port = 0, + .msgh_id = 0, + }; + + mach_msg_return_t kr = mach_msg( + &msg, + MACH_SEND_MSG | MACH_SEND_TIMEOUT, + msg.msgh_size, + 0, + MACH_PORT_NULL, + 0, // Fail instantly if the port is full + MACH_PORT_NULL + ); + + switch (kr) { + case KERN_SUCCESS: { + return true; + } + + // This means that the send would've blocked because the + // queue is full. We assume success because the port is full. + case MACH_SEND_TIMED_OUT: { + return true; + } + + // No space means it will wake up. + case MACH_SEND_NO_BUFFER: { + return true; + } + + default: { + ASSERT_NOT_REACHED_WITH_MESSAGE("mach_msg failed with %d", kr); + return false; + } + } } #else