Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asio standalone support #206

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@ ifeq ($(WITH_LIBUV),1)
override LDFLAGS += -luv
endif

# WITH_ASIO builds with boot asio event-loop
# WITH_ASIO builds with asio/boost::asio event-loop
ifeq ($(WITH_ASIO),1)
override CFLAGS += -DLIBUS_USE_ASIO
ifeq ($(ASIO_STANDALONE),1)
override CXXFLAGS += -pthread -DLIBUS_USE_ASIO -DASIO_STANDALONE
else
override CXXFLAGS += -pthread -DLIBUS_USE_ASIO
endif
override LDFLAGS += -lstdc++ -lpthread
override CXXFLAGS += -pthread -DLIBUS_USE_ASIO
endif

# WITH_GCD=1 builds with libdispatch as event-loop
Expand Down
64 changes: 42 additions & 22 deletions src/eventing/asio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,36 @@ extern "C" {

#ifdef LIBUS_USE_ASIO

#ifdef ASIO_STANDALONE
#include <asio.hpp>
#include <asio/version.hpp>
#include <system_error>
#else
#include <boost/asio.hpp>
#include <boost/version.hpp>
#endif
#include <iostream>
#include <mutex>
#include <memory>
#include <boost/version.hpp>

// New interfaces require boost 1.66.0
#if BOOST_VERSION < 106600
#ifdef ASIO_STANDALONE
namespace net = asio;
using error_code = std::error_code;
namespace chronons = asio::chrono;
#else
namespace net = boost::asio;
using error_code = boost::system::error_code;
namespace chronons = boost::posix_time;
#endif

// New interfaces require boost 1.66.0 or asio standalone 1.10.0
#if BOOST_VERSION < 106600 || ASIO_VERSION < 101000
#define LIBUS_USE_OLD_ASIO
#define LIBUS_ASIO_DESCRIPTOR boost::asio::posix::stream_descriptor
#define LIBUS_ASIO_LOOP boost::asio::io_service
#define LIBUS_ASIO_DESCRIPTOR net::posix::stream_descriptor
#define LIBUS_ASIO_LOOP net::io_service
#else
#define LIBUS_ASIO_DESCRIPTOR boost::asio::posix::descriptor
#define LIBUS_ASIO_LOOP boost::asio::io_context
#define LIBUS_ASIO_DESCRIPTOR net::posix::descriptor
#define LIBUS_ASIO_LOOP net::io_context
#endif

// setting polls to 1 disables fallthrough
Expand All @@ -45,7 +61,11 @@ int polls = 0; // temporary solution keeping track of outstanding work
// define a timer internally as something that inherits from callback_t
// us_timer_t is convertible to this one
struct boost_timer : us_internal_callback_t {
boost::asio::deadline_timer timer;
#ifdef ASIO_STANDALONE
net::steady_timer timer;
#else
net::deadline_timer timer;
#endif
std::shared_ptr<boost_timer> isValid;

unsigned char nr = 0;
Expand Down Expand Up @@ -90,10 +110,10 @@ void poll_for_error(struct boost_block_poll_t *boost_block) {
/* There is no such thing as polling for error in old asio */
#ifndef LIBUS_USE_OLD_ASIO
polls++;
boost_block->async_wait(boost::asio::posix::descriptor::wait_type::wait_error, [nr = boost_block->nr, weakBoostBlock = std::weak_ptr<boost_block_poll_t>(boost_block->isValid)](boost::system::error_code ec) {
boost_block->async_wait(net::posix::descriptor::wait_type::wait_error, [nr = boost_block->nr, weakBoostBlock = std::weak_ptr<boost_block_poll_t>(boost_block->isValid)](error_code ec) {
polls--;

if (ec != boost::asio::error::operation_aborted) {
if (ec != net::error::operation_aborted) {

// post mortem check
struct boost_block_poll_t *boost_block;
Expand All @@ -117,8 +137,8 @@ void poll_for_error(struct boost_block_poll_t *boost_block) {

void poll_for_read(struct boost_block_poll_t *boost_block);

inline void handle_read(const std::weak_ptr<boost_block_poll_t> &weakBoostBlock, unsigned char nr, boost::system::error_code ec) {
if (ec != boost::asio::error::operation_aborted) {
inline void handle_read(const std::weak_ptr<boost_block_poll_t> &weakBoostBlock, unsigned char nr, error_code ec) {
if (ec != net::error::operation_aborted) {

// post mortem check
struct boost_block_poll_t *boost_block;
Expand All @@ -141,12 +161,12 @@ inline void handle_read(const std::weak_ptr<boost_block_poll_t> &weakBoostBlock,
void poll_for_read(struct boost_block_poll_t *boost_block) {
polls++;
#ifndef LIBUS_USE_OLD_ASIO
boost_block->async_wait(boost::asio::posix::descriptor::wait_type::wait_read, [nr = boost_block->nr, weakBoostBlock = std::weak_ptr<boost_block_poll_t>(boost_block->isValid)](boost::system::error_code ec) {
boost_block->async_wait(net::posix::descriptor::wait_type::wait_read, [nr = boost_block->nr, weakBoostBlock = std::weak_ptr<boost_block_poll_t>(boost_block->isValid)](error_code ec) {
polls--;
handle_read(weakBoostBlock, nr, ec);
});
#else
boost_block->async_read_some(boost::asio::null_buffers(), [nr = boost_block->nr, weakBoostBlock = std::weak_ptr<boost_block_poll_t>(boost_block->isValid)](boost::system::error_code ec, std::size_t) {
boost_block->async_read_some(net::null_buffers(), [nr = boost_block->nr, weakBoostBlock = std::weak_ptr<boost_block_poll_t>(boost_block->isValid)](error_code ec, std::size_t) {
polls--;
handle_read(weakBoostBlock, nr, ec);
});
Expand All @@ -155,8 +175,8 @@ void poll_for_read(struct boost_block_poll_t *boost_block) {

void poll_for_write(struct boost_block_poll_t *boost_block);

inline void handle_write(const std::weak_ptr<boost_block_poll_t> &weakBoostBlock, unsigned char nr, boost::system::error_code ec) {
if (ec != boost::asio::error::operation_aborted) {
inline void handle_write(const std::weak_ptr<boost_block_poll_t> &weakBoostBlock, unsigned char nr, error_code ec) {
if (ec != net::error::operation_aborted) {

// post mortem check
struct boost_block_poll_t *boost_block;
Expand All @@ -178,12 +198,12 @@ inline void handle_write(const std::weak_ptr<boost_block_poll_t> &weakBoostBlock
void poll_for_write(struct boost_block_poll_t *boost_block) {
polls++;
#ifndef LIBUS_USE_OLD_ASIO
boost_block->async_wait(boost::asio::posix::descriptor::wait_type::wait_write, [nr = boost_block->nr, weakBoostBlock = std::weak_ptr<boost_block_poll_t>(boost_block->isValid)](boost::system::error_code ec) {
boost_block->async_wait(net::posix::descriptor::wait_type::wait_write, [nr = boost_block->nr, weakBoostBlock = std::weak_ptr<boost_block_poll_t>(boost_block->isValid)](error_code ec) {
polls--;
handle_write(weakBoostBlock, nr, ec);
});
#else
boost_block->async_write_some(boost::asio::null_buffers(), [nr = boost_block->nr, weakBoostBlock = std::weak_ptr<boost_block_poll_t>(boost_block->isValid)](boost::system::error_code ec, std::size_t) {
boost_block->async_write_some(net::null_buffers(), [nr = boost_block->nr, weakBoostBlock = std::weak_ptr<boost_block_poll_t>(boost_block->isValid)](error_code ec, std::size_t) {
polls--;
handle_write(weakBoostBlock, nr, ec);
});
Expand Down Expand Up @@ -349,8 +369,8 @@ void us_timer_close(struct us_timer_t *t) {
}

void poll_for_timeout(struct boost_timer *b_timer, int repeat_ms) {
b_timer->timer.async_wait([nr = b_timer->nr, repeat_ms, weakBoostBlock = std::weak_ptr<boost_timer>(b_timer->isValid)](const boost::system::error_code &ec) {
if (ec != boost::asio::error::operation_aborted) {
b_timer->timer.async_wait([nr = b_timer->nr, repeat_ms, weakBoostBlock = std::weak_ptr<boost_timer>(b_timer->isValid)](const error_code &ec) {
if (ec != net::error::operation_aborted) {

struct boost_timer *b_timer;
if (auto observe = weakBoostBlock.lock()) {
Expand All @@ -375,7 +395,7 @@ void poll_for_timeout(struct boost_timer *b_timer, int repeat_ms) {
return;
}

b_timer->timer.expires_at(b_timer->timer.expires_at() + boost::posix_time::milliseconds(repeat_ms));
b_timer->timer.expires_at(b_timer->timer.expires_at() + chronons::milliseconds(repeat_ms));
poll_for_timeout(b_timer, repeat_ms);
}
us_internal_dispatch_ready_poll((struct us_poll_t *)b_timer, 0, LIBUS_SOCKET_READABLE);
Expand All @@ -392,7 +412,7 @@ void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms
} else {
b_timer->cb = (void(*)(struct us_internal_callback_t *)) cb;

b_timer->timer.expires_from_now(boost::posix_time::milliseconds(ms));
b_timer->timer.expires_from_now(chronons::milliseconds(ms));
poll_for_timeout(b_timer, repeat_ms);
}
}
Expand Down