Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Ruby 3.0 Fiber Scheduler #77

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions ext/hiredis_ext/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,63 @@
#include <errno.h>
#include "hiredis_ext.h"

#ifdef HAVE_RUBY_FIBER_SCHEDULER_H
#include "ruby/fiber/scheduler.h"
#include "ruby/io.h"

// TODO: I copied these from Ruby; are they supposed to be exposed as part of the C extension API?
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we get rid of this boilerplate? Seems like a lot of C extensions implementing support for Fiber Scheduler are going to want these utility fns.


#define FMODE_PREP (1<<16)

static int
io_check_tty(rb_io_t *fptr)
{
int t = isatty(fptr->fd);
if (t)
fptr->mode |= FMODE_TTY|FMODE_DUPLEX;
return t;
}

static VALUE
io_alloc(VALUE klass)
{
NEWOBJ_OF(io, struct RFile, klass, T_FILE);

io->fptr = 0;

return (VALUE)io;
}

static VALUE
prep_io(int fd, int fmode, VALUE klass, const char *path)
{
rb_io_t *fp;
VALUE io = io_alloc(klass);

MakeOpenFile(io, fp);
fp->self = io;
fp->fd = fd;
fp->mode = fmode;
if (!io_check_tty(fp)) {
#ifdef __CYGWIN__
fp->mode |= FMODE_BINMODE;
setmode(fd, O_BINARY);
#endif
}
if (path) fp->pathv = rb_obj_freeze(rb_str_new_cstr(path));
rb_update_max_fd(fd);

return io;
}

static VALUE
io_from_fd(int fd)
{
return prep_io(fd, FMODE_PREP, rb_cIO, NULL);
}

#endif

typedef struct redisParentContext {
redisContext *context;
struct timeval *timeout;
Expand Down Expand Up @@ -107,6 +164,26 @@ typedef fd_set _fdset_t;
#endif

static int __wait_readable(int fd, const struct timeval *timeout, int *isset) {
#ifdef HAVE_RUBY_FIBER_SCHEDULER_H
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_wait(scheduler,
io_from_fd(fd),
RB_UINT2NUM(RUBY_IO_READABLE),
rb_fiber_scheduler_make_timeout((struct timeval *) timeout));
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I be using io_from_fd here? Is it going to allocate a Ruby IO object every time it gets to this fn? What's the alternative?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The simplest most compatible option is to look up IO.for_fd method and call it dynamically. We can introduce a C interface for this, but it won't be supported on older rubies, so you'll need to have that as a fallback.


if (RTEST(result)) {
if (isset) {
*isset = 1;
}
return 0;
} else {
// timeout
return -1;
}
}
#endif

struct timeval to;
struct timeval *toptr = NULL;

Expand Down Expand Up @@ -137,6 +214,25 @@ static int __wait_readable(int fd, const struct timeval *timeout, int *isset) {
}

static int __wait_writable(int fd, const struct timeval *timeout, int *isset) {
#ifdef HAVE_RUBY_FIBER_SCHEDULER_H
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_wait(scheduler,
io_from_fd(fd),
RB_UINT2NUM(RUBY_IO_WRITABLE),
rb_fiber_scheduler_make_timeout((struct timeval *) timeout));
if (RTEST(result)) {
if (isset) {
*isset = 1;
}
return 0;
} else {
// timeout
return -1;
}
}
#endif

struct timeval to;
struct timeval *toptr = NULL;

Expand Down Expand Up @@ -236,6 +332,7 @@ static VALUE connection_generic_connect(VALUE self, redisContext *c, VALUE arg_t
rb_sys_fail(0);
}

// entrypoint for Driver#connect
static VALUE connection_connect(int argc, VALUE *argv, VALUE self) {
redisContext *c;
VALUE arg_host = Qnil;
Expand Down
28 changes: 23 additions & 5 deletions lib/hiredis/ruby/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
module Hiredis
module Ruby
class Connection
EMPTY_ARRAY = [].freeze

if defined?(RUBY_ENGINE) && RUBY_ENGINE == "rbx"

Expand Down Expand Up @@ -123,14 +124,14 @@ def _write(sock, data, timeout)
data.force_encoding("binary") if data.respond_to?(:force_encoding)

begin
nwritten = @sock.write_nonblock(data)
nwritten = sock.write_nonblock(data)

while nwritten < string_size(data)
data = data[nwritten..-1]
nwritten = @sock.write_nonblock(data)
nwritten = sock.write_nonblock(data)
end
rescue Errno::EAGAIN
if IO.select([], [@sock], [], timeout)
if _wait_writable(sock, timeout)
# Writable, try again
retry
else
Expand All @@ -140,13 +141,29 @@ def _write(sock, data, timeout)
end
end

def _wait_readable(io, timeout)
if @fiber_scheduler_supported && Fiber.scheduler
Fiber.scheduler.io_wait(io, IO::READABLE, timeout)
else
IO.select([io], EMPTY_ARRAY, EMPTY_ARRAY, timeout)
end
end

def _wait_writable(io, timeout)
if @fiber_scheduler_supported && Fiber.scheduler
Fiber.scheduler.io_wait(io, IO::WRITABLE, timeout)
else
IO.select(EMPTY_ARRAY, [io], EMPTY_ARRAY, timeout)
end
end

def _connect_sockaddr(af, sockaddr, timeout)
sock = Socket.new(af, Socket::SOCK_STREAM, 0)

begin
sock.connect_nonblock(sockaddr)
rescue Errno::EINPROGRESS
if IO.select(nil, [sock], nil, timeout)
if _wait_writable(sock, timeout)
# Writable, check for errors
optval = sock.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR)
errno = optval.unpack("i").first
Expand Down Expand Up @@ -176,6 +193,7 @@ def _connect_sockaddr(af, sockaddr, timeout)
def initialize
@sock = nil
@timeout = nil
@fiber_scheduler_supported = defined?(Fiber.scheduler)
end

def connected?
Expand Down Expand Up @@ -267,7 +285,7 @@ def read
begin
@reader.feed @sock.read_nonblock(1024)
rescue Errno::EAGAIN
if IO.select([@sock], [], [], @timeout)
if _wait_readable(@sock, @timeout)
# Readable, try again
retry
else
Expand Down