Skip to content

Commit

Permalink
Add post-receive trigger for better packet handling
Browse files Browse the repository at this point in the history
This adds an optional packet post-receive callback to Endpoint that
allows Endpoint to signal the application when it is done processing a
set of incoming packets -- either because it has processed them all, or
because it hit the max-recv-per-loop limit (64).

This is designed to allow an application to more efficiently deal with
packets, especially datagrams, in batches: by using this an application
can use the datagram handler to collect incoming datagrams, then use the
post-receive callback to process accumulated datagrams in one go.
Because the post-receive callback always fires immediately *before*
libquic goes back to potentially blocking waiting to read new packets,
this means the application can do batch processing without needing to
resort to timers or polling for packet handling.

This is particularly important for Lokinet, where we take the packet in
the callback transfer it to a job in the Lokinet main loop thread to
process it there: doing this one packet at a time is not likely to scale
well because of the sheer number of jobs that would have to be put on
the logic thread event queue; by batching them into groups of up to 64
packets at a time we ought to be able to do considerably better, and by
triggering the processing based on the post-receive trigger we ensure we
don't introduce unnecessary delays in terms of when packets get
processed.
  • Loading branch information
jagerman committed Nov 3, 2023
1 parent 5eb3bac commit 8318b81
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 8 deletions.
10 changes: 10 additions & 0 deletions include/quic/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ namespace oxen::quic
// called when a connection closes or times out before the handshake completes
using connection_closed_callback = std::function<void(connection_interface& conn, uint64_t ec)>;

// Called after we are done reading currently-available UDP packets to allow batch processing of
// incoming data (or any other just-before-potentially-blocking needed handling). First we fire
// off any available callbacks triggered for incoming packets, then just before we go back to
// potentially block waiting for more packets, we fire this to let the application know that
// there might not be more callbacks immediately arriving and so it should process what it has.
using post_receive_callback = std::function<void()>;

class Endpoint : std::enable_shared_from_this<Endpoint>
{
private:
Expand All @@ -50,6 +57,7 @@ namespace oxen::quic
void handle_ep_opt(opt::inbound_alpns alpns);
void handle_ep_opt(opt::handshake_timeout timeout);
void handle_ep_opt(dgram_data_callback dgram_cb);
void handle_ep_opt(post_receive_callback post_recv_cb);
void handle_ep_opt(connection_established_callback conn_established_cb);
void handle_ep_opt(connection_closed_callback conn_closed_cb);

Expand Down Expand Up @@ -238,6 +246,8 @@ namespace oxen::quic
bool _packet_splitting{false};
Splitting _policy{Splitting::NONE};

post_receive_callback _post_receive{nullptr};

std::shared_ptr<IOContext> outbound_ctx;
std::shared_ptr<IOContext> inbound_ctx;

Expand Down
20 changes: 18 additions & 2 deletions include/quic/udp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,31 @@ namespace oxen::quic
;

using receive_callback_t = std::function<void(const Packet& pkt)>;
using post_receive_callback_t = std::function<void()>;

UDPSocket() = delete;

/// Constructs a UDP socket bound to the given address. Throws if binding fails. If
/// binding to an any address (or any port) you can retrieve the realized address via
/// address() after construction.
///
/// When packets are received they will be fed into the given callback.
/// When packets are received they will be fed into the given `on_receive` callback.
///
/// The optional `post_receive` callback will be invoked after processing available incoming
/// packets but before returning to polling the socket for additional incoming packets.
/// This is meant to allow the caller to bundle incoming packets into batches without
/// introducing delays: each time one or more packets are read from the socket there will be
/// a sequence of `on_receive(...)` calls for each packet, followed by a `post_receive()`
/// call immediately before the socket returns to waiting for additional packets. Thus a
/// caller can use the `on_receive` callback to collect packets and the `post_receive`
/// callback to process the collected packets all at once.
///
/// ev_loop must outlive this object.
UDPSocket(event_base* ev_loop, const Address& addr, receive_callback_t cb);
UDPSocket(
event_base* ev_loop,
const Address& addr,
receive_callback_t on_receive,
post_receive_callback_t post_receive = nullptr);

/// Non-copyable and non-moveable
UDPSocket(const UDPSocket& s) = delete;
Expand Down Expand Up @@ -103,6 +117,8 @@ namespace oxen::quic

event_ptr rev_ = nullptr;
receive_callback_t receive_callback_;
post_receive_callback_t post_receive_;
bool have_received_ = false;
event_ptr wev_ = nullptr;
std::vector<std::function<void()>> writeable_callbacks_;
};
Expand Down
16 changes: 14 additions & 2 deletions src/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ namespace oxen::quic
dgram_recv_cb = std::move(func);
}

void Endpoint::handle_ep_opt(post_receive_callback func)
{
log::trace(log_cat, "Endpoint given post-receive callback");
_post_receive = std::move(func);
}

void Endpoint::handle_ep_opt(connection_established_callback conn_established_cb)
{
log::trace(log_cat, "Endpoint given connection established callback");
Expand All @@ -71,8 +77,14 @@ namespace oxen::quic
void Endpoint::_init_internals()
{
log::debug(log_cat, "Starting new UDP socket on {}", _local);
socket =
std::make_unique<UDPSocket>(get_loop().get(), _local, [this](const auto& packet) { handle_packet(packet); });
socket = std::make_unique<UDPSocket>(
get_loop().get(),
_local,
[this](const auto& packet) { handle_packet(packet); },
[this] {
if (_post_receive)
_post_receive();
});

_local = socket->address();

Expand Down
17 changes: 14 additions & 3 deletions src/udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ namespace oxen::quic
}
#endif

UDPSocket::UDPSocket(event_base* ev_loop, const Address& addr, receive_callback_t on_receive) :
ev_{ev_loop}, receive_callback_{std::move(on_receive)}
UDPSocket::UDPSocket(
event_base* ev_loop, const Address& addr, receive_callback_t on_receive, post_receive_callback_t post_receive) :
ev_{ev_loop}, receive_callback_{std::move(on_receive)}, post_receive_{std::move(post_receive)}
{
assert(ev_);

Expand Down Expand Up @@ -125,7 +126,16 @@ namespace oxen::quic
ev_,
sock_,
EV_READ | EV_PERSIST,
[](evutil_socket_t, short, void* self) { static_cast<UDPSocket*>(self)->receive(); },
[](evutil_socket_t, short, void* self_) {
auto& self = *static_cast<UDPSocket*>(self_);
self.receive();
if (self.have_received_)
{
self.have_received_ = false;
if (self.post_receive_)
self.post_receive_();
}
},
this));
event_add(rev_.get(), nullptr);

Expand Down Expand Up @@ -190,6 +200,7 @@ namespace oxen::quic
return;
}

have_received_ = true;
receive_callback_(Packet{bound_, payload, hdr});
}

Expand Down
81 changes: 80 additions & 1 deletion tests/007-datagrams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ namespace oxen::quic::test
good_msg += v++;

for (int i = 0; i < n; ++i)
conn_interface->send_datagram(std::basic_string_view<uint8_t>{good_msg});
conn_interface->send_datagram(good_msg);

for (auto& f : data_futures)
REQUIRE(f.get());
Expand Down Expand Up @@ -638,4 +638,83 @@ namespace oxen::quic::test
};
#endif
};

TEST_CASE("007 - Datagram support: packet post-receive triggers", "[007][datagrams][packet-post-receive]")
{
auto client_established = callback_waiter{[](connection_interface&) {}};

Network test_net{};

std::basic_string<std::byte> big_msg{};

for (int v = 0; big_msg.size() < 1000; v++)
big_msg += static_cast<std::byte>(v % 256);

std::atomic<int> recv_counter{0};
std::atomic<int> data_counter{0};

std::promise<void> got_first, acked_first;
std::promise<int> got_all_n_recvs;

dgram_data_callback recv_dgram_cb = [&](dgram_interface&, bstring value) {
auto count = ++data_counter;
CHECK(value == big_msg);
if (count == 1)
{
// We get one datagram, then stall the quic thread so that the test can fire
// multiple packets that we should then receive in one go.
got_first.set_value();
REQUIRE(acked_first.get_future().wait_for(1s) == std::future_status::ready);
}
else if (count == 31)
{
got_all_n_recvs.set_value(recv_counter);
}
};

auto recv_notifier = [&] { recv_counter++; };

opt::local_addr server_local{};
opt::local_addr client_local{};

auto server_tls = GNUTLSCreds::make("./serverkey.pem"s, "./servercert.pem"s, "./clientcert.pem"s);
auto client_tls = GNUTLSCreds::make("./clientkey.pem"s, "./clientcert.pem"s, "./servercert.pem"s);

auto server_endpoint = test_net.endpoint(server_local, recv_dgram_cb, recv_notifier, opt::enable_datagrams{});
REQUIRE_NOTHROW(server_endpoint->listen(server_tls));

opt::remote_addr client_remote{"127.0.0.1"s, server_endpoint->local().port()};

auto client = test_net.endpoint(client_local, client_established, opt::enable_datagrams{});
auto conn = client->connect(client_remote, client_tls);

REQUIRE(client_established.wait());

// Start off with *one* datagram; the first one the server receives will stall the
// server until we signal it via the acked_first promise, during which we'll send a
// bunch more that ought to be processed in a single batch.
conn->send_datagram(big_msg);

REQUIRE(got_first.get_future().wait_for(1s) == std::future_status::ready);

int batches_before_flood = recv_counter;

for (int i = 0; i < 30; i++)
conn->send_datagram(big_msg);

acked_first.set_value();

auto f = got_all_n_recvs.get_future();
REQUIRE(f.wait_for(1s) == std::future_status::ready);
auto recv_counter_before_final = f.get();
REQUIRE(data_counter == 31);
REQUIRE(recv_counter_before_final > batches_before_flood);
// There should be a recv callback fired *immediately* after the data callback that
// fulfilled the above proimise, so a miniscule wait here should guarantee that it has been
// set.
std::this_thread::sleep_for(1ms);
auto final_recv_counter = recv_counter.load();
REQUIRE(final_recv_counter > recv_counter_before_final);
};

} // namespace oxen::quic::test

0 comments on commit 8318b81

Please sign in to comment.