Skip to content

Commit

Permalink
IoUring: add IoUringServerSocket (envoyproxy#30773)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: He Jie Xu <[email protected]>
  • Loading branch information
soulxu authored Nov 28, 2023
1 parent 10d57a4 commit ddcaf1c
Show file tree
Hide file tree
Showing 10 changed files with 2,016 additions and 33 deletions.
162 changes: 162 additions & 0 deletions envoy/common/io/io_uring.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,20 @@ class IoUring {
*/
virtual IoUringResult prepareClose(os_fd_t fd, Request* user_data) PURE;

/**
* Prepares a cancellation and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareCancel(Request* cancelling_user_data, Request* user_data) PURE;

/**
* Prepares a shutdown operation and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareShutdown(os_fd_t fd, int how, Request* user_data) PURE;

/**
* Submits the entries in the submission queue to the kernel using the
* `io_uring_enter()` system call.
Expand Down Expand Up @@ -166,6 +180,37 @@ class IoUring {
using IoUringPtr = std::unique_ptr<IoUring>;
class IoUringWorker;

/**
* The Status of IoUringSocket.
*/
enum IoUringSocketStatus {
Initialized,
ReadEnabled,
ReadDisabled,
RemoteClosed,
Closed,
};

/**
* A callback will be invoked when a close requested done on the socket.
*/
using IoUringSocketOnClosedCb = std::function<void(Buffer::Instance& read_buffer)>;

/**
* The data returned from the read request.
*/
struct ReadParam {
Buffer::Instance& buf_;
int32_t result_;
};

/**
* The data returned from the write request.
*/
struct WriteParam {
int32_t result_;
};

/**
* Abstract for each socket.
*/
Expand All @@ -183,6 +228,55 @@ class IoUringSocket {
*/
virtual os_fd_t fd() const PURE;

/**
* Close the socket.
* @param keep_fd_open indicates the file descriptor of the socket will be closed or not in the
* end. The value of `true` is used for destroy the IoUringSocket but keep the file descriptor
* open. This is used for migrating the IoUringSocket between worker threads.
* @param cb will be invoked when the close request is done. This is also used for migrating the
* IoUringSocket between worker threads.
*/
virtual void close(bool keep_fd_open, IoUringSocketOnClosedCb cb = nullptr) PURE;

/**
* Enable the read on the socket. The socket will be begin to submit the read request and deliver
* read event when the request is done. This is used when the socket is listening on the file read
* event.
*/
virtual void enableRead() PURE;

/**
* Disable the read on the socket. The socket stops to submit the read request, although the
* existing read request won't be canceled and no read event will be delivered. This is used when
* the socket isn't listening on the file read event.
*/
virtual void disableRead() PURE;

/**
* Enable close event. This is used for the case the socket is listening on the file close event.
* Then a remote close is found by a read request will delievered as file close event.
*/
virtual void enableCloseEvent(bool enable) PURE;

/**
* Write data to the socket.
* @param data is going to write.
*/
virtual void write(Buffer::Instance& data) PURE;

/**
* Write data to the socket.
* @param slices includes the data to write.
* @param num_slice the number of slices.
*/
virtual uint64_t write(const Buffer::RawSlice* slices, uint64_t num_slice) PURE;

/**
* Shutdown the socket.
* @param how is SHUT_RD, SHUT_WR and SHUT_RDWR.
*/
virtual void shutdown(int how) PURE;

/**
* On accept request completed.
* TODO (soulxu): wrap the raw result into a type. It can be `IoCallUint64Result`.
Expand Down Expand Up @@ -251,6 +345,31 @@ class IoUringSocket {
* @param type the request type of injected completion.
*/
virtual void injectCompletion(Request::RequestType type) PURE;

/**
* Return the current status of IoUringSocket.
* @return the status.
*/
virtual IoUringSocketStatus getStatus() const PURE;

/**
* Return the data get from the read request.
* @return Only return valid ReadParam when the callback is invoked with
* `Event::FileReadyType::Read`, otherwise `absl::nullopt` returned.
*/
virtual const OptRef<ReadParam>& getReadParam() const PURE;
/**
* Return the data get from the write request.
* @return Only return valid WriteParam when the callback is invoked with
* `Event::FileReadyType::Write`, otherwise `absl::nullopt` returned.
*/
virtual const OptRef<WriteParam>& getWriteParam() const PURE;

/**
* Set the callback when file ready event triggered.
* @param cb the callback function.
*/
virtual void setFileReadyCb(Event::FileReadyCb cb) PURE;
};

using IoUringSocketPtr = std::unique_ptr<IoUringSocket>;
Expand All @@ -262,10 +381,53 @@ class IoUringWorker : public ThreadLocal::ThreadLocalObject {
public:
~IoUringWorker() override = default;

/**
* Add an server socket socket to the worker.
*/
virtual IoUringSocket& addServerSocket(os_fd_t fd, Event::FileReadyCb cb,
bool enable_close_event) PURE;

/**
* Add an server socket from an existing socket from another thread.
*/
virtual IoUringSocket& addServerSocket(os_fd_t fd, Buffer::Instance& read_buf,
Event::FileReadyCb cb, bool enable_close_event) PURE;

/**
* Return the current thread's dispatcher.
*/
virtual Event::Dispatcher& dispatcher() PURE;

/**
* Submit a read request for a socket.
*/
virtual Request* submitReadRequest(IoUringSocket& socket) PURE;

/**
* Submit a write request for a socket.
*/
virtual Request* submitWriteRequest(IoUringSocket& socket,
const Buffer::RawSliceVector& slices) PURE;

/**
* Submit a close request for a socket.
*/
virtual Request* submitCloseRequest(IoUringSocket& socket) PURE;

/**
* Submit a cancel request for a socket.
*/
virtual Request* submitCancelRequest(IoUringSocket& socket, Request* request_to_cancel) PURE;

/**
* Submit a shutdown request for a socket.
*/
virtual Request* submitShutdownRequest(IoUringSocket& socket, int how) PURE;

/**
* Return the number of sockets in the worker.
*/
virtual uint32_t getNumOfSockets() const PURE;
};

/**
Expand Down
2 changes: 2 additions & 0 deletions source/common/io/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ envoy_cc_library(
deps = [
":io_uring_impl_lib",
"//envoy/common/io:io_uring_interface",
"//envoy/event:file_event_interface",
"//source/common/buffer:buffer_lib",
"//source/common/common:linked_object",
],
)
30 changes: 30 additions & 0 deletions source/common/io/io_uring_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,36 @@ IoUringResult IoUringImpl::prepareClose(os_fd_t fd, Request* user_data) {
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareCancel(Request* cancelling_user_data, Request* user_data) {
ENVOY_LOG(trace, "prepare cancels for user data = {}", fmt::ptr(cancelling_user_data));
// TODO (soulxu): Handling the case of CQ ring is overflow.
ASSERT(!(*(ring_.sq.kflags) & IORING_SQ_CQ_OVERFLOW));
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
ENVOY_LOG(trace, "failed to prepare cancel for user data = {}", fmt::ptr(cancelling_user_data));
return IoUringResult::Failed;
}

io_uring_prep_cancel(sqe, cancelling_user_data, 0);
io_uring_sqe_set_data(sqe, user_data);
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareShutdown(os_fd_t fd, int how, Request* user_data) {
ENVOY_LOG(trace, "prepare shutdown for fd = {}, how = {}", fd, how);
// TODO (soulxu): Handling the case of CQ ring is overflow.
ASSERT(!(*(ring_.sq.kflags) & IORING_SQ_CQ_OVERFLOW));
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
ENVOY_LOG(trace, "failed to prepare shutdown for fd = {}", fd);
return IoUringResult::Failed;
}

io_uring_prep_shutdown(sqe, fd, how);
io_uring_sqe_set_data(sqe, user_data);
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::submit() {
int res = io_uring_submit(&ring_);
RELEASE_ASSERT(res >= 0 || res == -EBUSY, "unable to submit io_uring queue entries");
Expand Down
2 changes: 2 additions & 0 deletions source/common/io/io_uring_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class IoUringImpl : public IoUring,
IoUringResult prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, Request* user_data) override;
IoUringResult prepareClose(os_fd_t fd, Request* user_data) override;
IoUringResult prepareCancel(Request* cancelling_user_data, Request* user_data) override;
IoUringResult prepareShutdown(os_fd_t fd, int how, Request* user_data) override;
IoUringResult submit() override;
void injectCompletion(os_fd_t fd, Request* user_data, int32_t result) override;
void removeInjectedCompletion(os_fd_t fd) override;
Expand Down
Loading

0 comments on commit ddcaf1c

Please sign in to comment.