From c649f23be99c80f00240d583df6e5b2813b92cfb Mon Sep 17 00:00:00 2001 From: fvallenilla Date: Thu, 22 Aug 2024 11:24:59 -0600 Subject: [PATCH 1/2] Add condition to flush based on time Currently the Atlas high performance mode is enabled for all apps. This mode enables batching of metrics to reduce the number of events sent to spectatord. This high performance mode has a negative impact on really low RPS apps, because they can end up emitting IPC metrics every 2-4 minutes rather than every minute. This commit updates the buffering logic to ensure we emit metrics if some configurable amount of time has passed (flush_interval). --- spectator/config.h | 1 + spectator/publisher.cc | 13 +++++++++++-- spectator/publisher.h | 3 +++ spectator/publisher_test.cc | 31 +++++++++++++++++++++++++++++++ spectator/registry.h | 2 +- 5 files changed, 47 insertions(+), 3 deletions(-) diff --git a/spectator/config.h b/spectator/config.h index f959dbb..5f3498d 100644 --- a/spectator/config.h +++ b/spectator/config.h @@ -9,6 +9,7 @@ struct Config { std::string endpoint; std::unordered_map common_tags; uint32_t bytes_to_buffer; + std::chrono::seconds flush_interval; }; } // namespace spectator diff --git a/spectator/publisher.cc b/spectator/publisher.cc index ba25bcb..05384e3 100644 --- a/spectator/publisher.cc +++ b/spectator/publisher.cc @@ -8,10 +8,14 @@ static const char NEW_LINE = '\n'; SpectatordPublisher::SpectatordPublisher(absl::string_view endpoint, uint32_t bytes_to_buffer, + std::chrono::seconds flush_interval, std::shared_ptr logger) : logger_(std::move(logger)), udp_socket_(io_context_), - local_socket_(io_context_), bytes_to_buffer_(bytes_to_buffer) { + local_socket_(io_context_), + bytes_to_buffer_(bytes_to_buffer), + last_flush_time_(std::chrono::steady_clock::now()), + flush_interval_(flush_interval) { buffer_.reserve(bytes_to_buffer_ + 1024); if (absl::StartsWith(endpoint, "unix:")) { setup_unix_domain(endpoint.substr(5)); @@ -55,11 +59,16 @@ void SpectatordPublisher::setup_unix_domain(absl::string_view path) { std::string local_path{path}; sender_ = [local_path, this](std::string_view msg) { buffer_.append(msg); - if (buffer_.length() >= bytes_to_buffer_) { + auto now = std::chrono::steady_clock::now(); + bool should_flush = buffer_.length() >= bytes_to_buffer_ || + std::chrono::duration_cast(now - last_flush_time_) >= flush_interval_; + + if (should_flush) { for (auto i = 0; i < 3; ++i) { try { auto sent_bytes = local_socket_.send(asio::buffer(buffer_)); logger_->trace("Sent (local): {} bytes, in total had {}", sent_bytes, buffer_.length()); + last_flush_time_ = now; break; } catch (std::exception& e) { local_reconnect(local_path); diff --git a/spectator/publisher.h b/spectator/publisher.h index 0548137..a931f8a 100644 --- a/spectator/publisher.h +++ b/spectator/publisher.h @@ -12,6 +12,7 @@ class SpectatordPublisher { explicit SpectatordPublisher( absl::string_view endpoint, uint32_t bytes_to_buffer = 0, + std::chrono::seconds flush_interval = std::chrono::seconds(60), std::shared_ptr logger = DefaultLogger()); SpectatordPublisher(const SpectatordPublisher&) = delete; @@ -34,6 +35,8 @@ class SpectatordPublisher { asio::local::datagram_protocol::socket local_socket_; std::string buffer_; uint32_t bytes_to_buffer_; + std::chrono::steady_clock::time_point last_flush_time_; + std::chrono::seconds flush_interval_; }; } // namespace spectator diff --git a/spectator/publisher_test.cc b/spectator/publisher_test.cc index 84f817b..29f4e47 100644 --- a/spectator/publisher_test.cc +++ b/spectator/publisher_test.cc @@ -83,6 +83,37 @@ TEST(Publisher, UnixBuffer) { unlink(path.c_str()); } +TEST(Publisher, UnixBufferTimeFlush) { + auto logger = spectator::DefaultLogger(); + const auto* dir = first_not_null(std::getenv("TMPDIR"), "/tmp"); + auto path = fmt::format("{}/testserver.{}", dir, getpid()); + TestUnixServer server{path}; + server.Start(); + logger->info("Unix Server started on path {}", path); + + // Set buffer size to a large value so that flushing is based on time + SpectatordPublisher publisher{fmt::format("unix:{}", path), 10000, std::chrono::seconds(5)}; + Counter c{std::make_shared("counter", Tags{}), &publisher}; + + // Wait for 3 seconds, increment, and the counter should not be flushed (3s is less than the 5s flush interval) + std::this_thread::sleep_for(std::chrono::seconds(3)); + c.Increment(); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + auto msgs = server.GetMessages(); + EXPECT_TRUE(msgs.empty()); + + // Wait for another 3 seconds, increment, and the counter should be flushed (6s is greater than 5s flush interval) + std::this_thread::sleep_for(std::chrono::seconds(3)); + c.Increment(); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + msgs = server.GetMessages(); + std::vector first_flush{"c:counter:1\nc:counter:1"}; + EXPECT_EQ(msgs, first_flush); + + server.Stop(); + unlink(path.c_str()); +} + TEST(Publisher, Nop) { SpectatordPublisher publisher{"", 0}; Counter c{std::make_shared("counter", Tags{}), &publisher}; diff --git a/spectator/registry.h b/spectator/registry.h index a173910..4f0d102 100644 --- a/spectator/registry.h +++ b/spectator/registry.h @@ -324,7 +324,7 @@ class SpectatordRegistry std::move(logger)) { extra_tags_ = Tags::from(config.common_tags); state_.publisher = - std::make_unique(config.endpoint, config.bytes_to_buffer, logger_); + std::make_unique(config.endpoint, config.bytes_to_buffer, config.flush_interval, logger_); } }; From 7b6255bc4acf3b1656d7cfa01b5c60dd8250ee82 Mon Sep 17 00:00:00 2001 From: fvallenilla Date: Tue, 27 Aug 2024 17:42:19 -0600 Subject: [PATCH 2/2] PR feedback --- spectator/config.h | 2 +- spectator/publisher.cc | 8 ++++---- spectator/publisher.h | 4 ++-- spectator/publisher_test.cc | 10 +++++----- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/spectator/config.h b/spectator/config.h index 5f3498d..744e6ae 100644 --- a/spectator/config.h +++ b/spectator/config.h @@ -9,7 +9,7 @@ struct Config { std::string endpoint; std::unordered_map common_tags; uint32_t bytes_to_buffer; - std::chrono::seconds flush_interval; + const std::chrono::milliseconds flush_interval; }; } // namespace spectator diff --git a/spectator/publisher.cc b/spectator/publisher.cc index 05384e3..bd553d8 100644 --- a/spectator/publisher.cc +++ b/spectator/publisher.cc @@ -8,7 +8,7 @@ static const char NEW_LINE = '\n'; SpectatordPublisher::SpectatordPublisher(absl::string_view endpoint, uint32_t bytes_to_buffer, - std::chrono::seconds flush_interval, + std::chrono::milliseconds flush_interval, std::shared_ptr logger) : logger_(std::move(logger)), udp_socket_(io_context_), @@ -59,9 +59,9 @@ void SpectatordPublisher::setup_unix_domain(absl::string_view path) { std::string local_path{path}; sender_ = [local_path, this](std::string_view msg) { buffer_.append(msg); - auto now = std::chrono::steady_clock::now(); - bool should_flush = buffer_.length() >= bytes_to_buffer_ || - std::chrono::duration_cast(now - last_flush_time_) >= flush_interval_; + const auto now = std::chrono::steady_clock::now(); + const bool should_flush = buffer_.length() >= bytes_to_buffer_ || + now - last_flush_time_ >= flush_interval_; if (should_flush) { for (auto i = 0; i < 3; ++i) { diff --git a/spectator/publisher.h b/spectator/publisher.h index a931f8a..a27bd54 100644 --- a/spectator/publisher.h +++ b/spectator/publisher.h @@ -12,7 +12,7 @@ class SpectatordPublisher { explicit SpectatordPublisher( absl::string_view endpoint, uint32_t bytes_to_buffer = 0, - std::chrono::seconds flush_interval = std::chrono::seconds(60), + std::chrono::milliseconds flush_interval = std::chrono::milliseconds(60000), std::shared_ptr logger = DefaultLogger()); SpectatordPublisher(const SpectatordPublisher&) = delete; @@ -36,7 +36,7 @@ class SpectatordPublisher { std::string buffer_; uint32_t bytes_to_buffer_; std::chrono::steady_clock::time_point last_flush_time_; - std::chrono::seconds flush_interval_; + const std::chrono::milliseconds flush_interval_; }; } // namespace spectator diff --git a/spectator/publisher_test.cc b/spectator/publisher_test.cc index 29f4e47..3fbfdcf 100644 --- a/spectator/publisher_test.cc +++ b/spectator/publisher_test.cc @@ -92,18 +92,18 @@ TEST(Publisher, UnixBufferTimeFlush) { logger->info("Unix Server started on path {}", path); // Set buffer size to a large value so that flushing is based on time - SpectatordPublisher publisher{fmt::format("unix:{}", path), 10000, std::chrono::seconds(5)}; + SpectatordPublisher publisher{fmt::format("unix:{}", path), 10000, std::chrono::milliseconds(500)}; Counter c{std::make_shared("counter", Tags{}), &publisher}; - // Wait for 3 seconds, increment, and the counter should not be flushed (3s is less than the 5s flush interval) - std::this_thread::sleep_for(std::chrono::seconds(3)); + // Wait for 300ms, increment, and the counter should not be flushed (300ms is less than the 500ms flush interval) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); c.Increment(); std::this_thread::sleep_for(std::chrono::milliseconds(50)); auto msgs = server.GetMessages(); EXPECT_TRUE(msgs.empty()); - // Wait for another 3 seconds, increment, and the counter should be flushed (6s is greater than 5s flush interval) - std::this_thread::sleep_for(std::chrono::seconds(3)); + // Wait for another 300ms, increment, and the counter should be flushed (600ms is greater than 500ms flush interval) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); c.Increment(); std::this_thread::sleep_for(std::chrono::milliseconds(50)); msgs = server.GetMessages();