From 8d9ab6010596e0ba3678364ba18c5ac5783af0ad Mon Sep 17 00:00:00 2001 From: Can Cecen Date: Tue, 25 Apr 2023 14:05:52 -0700 Subject: [PATCH] Add the ability to buffer metrics before sending them to unix domain socket (#98) * Make publisher buffer metrics before sending them to spectatord with each call. * Make max_buffer_size configurable. * Make sure max_buffer_size is passed to the publisher. * Fix build. * Only add newline if buffering. * Clear the buffer after an exception too. * Clear the buffer after all retries. * Fix logging so that we log the actual buffer that was sent. * Add logging around how many bytes sent for a given send(...) call * Add more logging when sending zero byte buffer. * Fix tests and add a new test for buffering publisher. * Rename config variables, append a character instead. * PR comments: 1. Add information about buffering functionality to the README. 2. Add .bazeliskrc file. --- .bazeliskrc | 1 + README.md | 4 ++++ spectator/config.h | 1 + spectator/publisher.cc | 32 +++++++++++++++++++++----------- spectator/publisher.h | 3 +++ spectator/publisher_test.cc | 35 ++++++++++++++++++++++++++++++----- spectator/registry.h | 2 +- 7 files changed, 61 insertions(+), 17 deletions(-) create mode 100644 .bazeliskrc diff --git a/.bazeliskrc b/.bazeliskrc new file mode 100644 index 0000000..95447bd --- /dev/null +++ b/.bazeliskrc @@ -0,0 +1 @@ +USE_BAZEL_VERSION=5.4.0 \ No newline at end of file diff --git a/README.md b/README.md index d4763d6..6d5616f 100644 --- a/README.md +++ b/README.md @@ -80,3 +80,7 @@ int main() { } } ``` +## High-Volume Publishing + +By default, the library sends every meter change to the spectatord sidecar immediately. This involves a blocking `send` call and underlying system calls, and may not be the most efficient way to publish metrics in high-volume use cases. +For this purpose a simple buffering functionality in `Publisher` is implemented, and it can be turned on by passing a buffer size to the `spectator::Config` constructor. It is important to note that, until this buffer fills up, the `Publisher` will not send nay meters to the sidecar. Therefore, if your application doesn't emit meters at a high rate, you should either keep the buffer very small, or do not configure a buffer size at all, which will fall back to the "publish immediately" mode of operation. diff --git a/spectator/config.h b/spectator/config.h index a9717c6..f959dbb 100644 --- a/spectator/config.h +++ b/spectator/config.h @@ -8,6 +8,7 @@ namespace spectator { struct Config { std::string endpoint; std::unordered_map common_tags; + uint32_t bytes_to_buffer; }; } // namespace spectator diff --git a/spectator/publisher.cc b/spectator/publisher.cc index 13c8879..ba25bcb 100644 --- a/spectator/publisher.cc +++ b/spectator/publisher.cc @@ -4,11 +4,15 @@ namespace spectator { +static const char NEW_LINE = '\n'; + SpectatordPublisher::SpectatordPublisher(absl::string_view endpoint, + uint32_t bytes_to_buffer, std::shared_ptr logger) : logger_(std::move(logger)), udp_socket_(io_context_), - local_socket_(io_context_) { + local_socket_(io_context_), bytes_to_buffer_(bytes_to_buffer) { + buffer_.reserve(bytes_to_buffer_ + 1024); if (absl::StartsWith(endpoint, "unix:")) { setup_unix_domain(endpoint.substr(5)); } else if (absl::StartsWith(endpoint, "udp:")) { @@ -50,17 +54,23 @@ void SpectatordPublisher::setup_unix_domain(absl::string_view path) { // get a copy of the file path std::string local_path{path}; sender_ = [local_path, this](std::string_view msg) { - for (auto i = 0; i < 3; ++i) { - try { - local_socket_.send(asio::buffer(msg)); - logger_->trace("Sent (local): {}", msg); - break; - } catch (std::exception& e) { - local_reconnect(local_path); - logger_->warn("Unable to send {} - attempt {}/3 ({})", msg, i, - e.what()); + buffer_.append(msg); + if (buffer_.length() >= bytes_to_buffer_) { + for (auto i = 0; i < 3; ++i) { + try { + auto sent_bytes = local_socket_.send(asio::buffer(buffer_)); + logger_->trace("Sent (local): {} bytes, in total had {}", sent_bytes, buffer_.length()); + break; + } catch (std::exception& e) { + local_reconnect(local_path); + logger_->warn("Unable to send {} - attempt {}/3 ({})", buffer_, i, + e.what()); + } } - } + buffer_.clear(); + } else { + buffer_.push_back(NEW_LINE); + } }; } diff --git a/spectator/publisher.h b/spectator/publisher.h index dc325e3..0548137 100644 --- a/spectator/publisher.h +++ b/spectator/publisher.h @@ -11,6 +11,7 @@ class SpectatordPublisher { public: explicit SpectatordPublisher( absl::string_view endpoint, + uint32_t bytes_to_buffer = 0, std::shared_ptr logger = DefaultLogger()); SpectatordPublisher(const SpectatordPublisher&) = delete; @@ -31,6 +32,8 @@ class SpectatordPublisher { asio::io_context io_context_; asio::ip::udp::socket udp_socket_; asio::local::datagram_protocol::socket local_socket_; + std::string buffer_; + uint32_t bytes_to_buffer_; }; } // namespace spectator diff --git a/spectator/publisher_test.cc b/spectator/publisher_test.cc index 1b8f777..84f817b 100644 --- a/spectator/publisher_test.cc +++ b/spectator/publisher_test.cc @@ -22,7 +22,7 @@ TEST(Publisher, Udp) { logger->info("Udp Server started on port {}", server.GetPort()); SpectatordPublisher publisher{ - fmt::format("udp:localhost:{}", server.GetPort())}; + fmt::format("udp:localhost:{}", server.GetPort()), 0}; Counter c{std::make_shared("counter", Tags{}), &publisher}; c.Increment(); c.Add(2); @@ -39,14 +39,14 @@ const char* first_not_null(char* a, const char* b) { return b; } -TEST(Publisher, Unix) { +TEST(Publisher, UnixNoBuffer) { auto logger = spectator::DefaultLogger(); const auto* dir = first_not_null(std::getenv("TMPDIR"), "/tmp"); auto path = fmt::format("{}/testserver.{}", dir, getpid()); TestUnixServer server{path}; server.Start(); logger->info("Unix Server started on path {}", path); - SpectatordPublisher publisher{fmt::format("unix:{}", path)}; + SpectatordPublisher publisher{fmt::format("unix:{}", path), 0}; Counter c{std::make_shared("counter", Tags{}), &publisher}; c.Increment(); c.Add(2); @@ -55,11 +55,36 @@ TEST(Publisher, Unix) { server.Stop(); unlink(path.c_str()); std::vector expected{"c:counter:1", "c:counter:2"}; - EXPECT_EQ(server.GetMessages(), expected); + EXPECT_EQ(msgs, expected); +} + +TEST(Publisher, UnixBuffer) { + auto logger = spectator::DefaultLogger(); + const auto* dir = first_not_null(std::getenv("TMPDIR"), "/tmp"); + auto path = fmt::format("{}/testserver.{}", dir, getpid()); + TestUnixServer server{path}; + server.Start(); + logger->info("Unix Server started on path {}", path); + // Do not send until we buffer 32 bytes of data. + SpectatordPublisher publisher{fmt::format("unix:{}", path), 32}; + Counter c{std::make_shared("counter", Tags{}), &publisher}; + c.Increment(); + c.Increment(); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + auto msgs = server.GetMessages(); + std::vector emptyVector {}; + EXPECT_EQ(msgs, emptyVector); + c.Increment(); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + msgs = server.GetMessages(); + std::vector expected{"c:counter:1\nc:counter:1\nc:counter:1"}; + EXPECT_EQ(msgs, expected); + server.Stop(); + unlink(path.c_str()); } TEST(Publisher, Nop) { - SpectatordPublisher publisher{""}; + SpectatordPublisher publisher{"", 0}; Counter c{std::make_shared("counter", Tags{}), &publisher}; c.Increment(); c.Add(2); diff --git a/spectator/registry.h b/spectator/registry.h index 1b22982..0154bd3 100644 --- a/spectator/registry.h +++ b/spectator/registry.h @@ -322,7 +322,7 @@ class SpectatordRegistry std::move(logger)) { extra_tags_ = Tags::from(config.common_tags); state_.publisher = - std::make_unique(config.endpoint, logger_); + std::make_unique(config.endpoint, config.bytes_to_buffer, logger_); } };