Skip to content

Commit

Permalink
Merge pull request #4 from fvallenilla/freddy/timed-flush
Browse files Browse the repository at this point in the history
Add condition to flush based on time
  • Loading branch information
cancecen authored Aug 30, 2024
2 parents 6890ee0 + 7b6255b commit 170401a
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 3 deletions.
1 change: 1 addition & 0 deletions spectator/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ struct Config {
std::string endpoint;
std::unordered_map<std::string, std::string> common_tags;
uint32_t bytes_to_buffer;
const std::chrono::milliseconds flush_interval;
};

} // namespace spectator
13 changes: 11 additions & 2 deletions spectator/publisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ static const char NEW_LINE = '\n';

SpectatordPublisher::SpectatordPublisher(absl::string_view endpoint,
uint32_t bytes_to_buffer,
std::chrono::milliseconds flush_interval,
std::shared_ptr<spdlog::logger> logger)
: logger_(std::move(logger)),
udp_socket_(io_context_),
local_socket_(io_context_), bytes_to_buffer_(bytes_to_buffer) {
local_socket_(io_context_),
bytes_to_buffer_(bytes_to_buffer),
last_flush_time_(std::chrono::steady_clock::now()),
flush_interval_(flush_interval) {
buffer_.reserve(bytes_to_buffer_ + 1024);
if (absl::StartsWith(endpoint, "unix:")) {
setup_unix_domain(endpoint.substr(5));
Expand Down Expand Up @@ -55,11 +59,16 @@ void SpectatordPublisher::setup_unix_domain(absl::string_view path) {
std::string local_path{path};
sender_ = [local_path, this](std::string_view msg) {
buffer_.append(msg);
if (buffer_.length() >= bytes_to_buffer_) {
const auto now = std::chrono::steady_clock::now();
const bool should_flush = buffer_.length() >= bytes_to_buffer_ ||
now - last_flush_time_ >= flush_interval_;

if (should_flush) {
for (auto i = 0; i < 3; ++i) {
try {
auto sent_bytes = local_socket_.send(asio::buffer(buffer_));
logger_->trace("Sent (local): {} bytes, in total had {}", sent_bytes, buffer_.length());
last_flush_time_ = now;
break;
} catch (std::exception& e) {
local_reconnect(local_path);
Expand Down
3 changes: 3 additions & 0 deletions spectator/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class SpectatordPublisher {
explicit SpectatordPublisher(
absl::string_view endpoint,
uint32_t bytes_to_buffer = 0,
std::chrono::milliseconds flush_interval = std::chrono::milliseconds(60000),
std::shared_ptr<spdlog::logger> logger = DefaultLogger());
SpectatordPublisher(const SpectatordPublisher&) = delete;

Expand All @@ -34,6 +35,8 @@ class SpectatordPublisher {
asio::local::datagram_protocol::socket local_socket_;
std::string buffer_;
uint32_t bytes_to_buffer_;
std::chrono::steady_clock::time_point last_flush_time_;
const std::chrono::milliseconds flush_interval_;
};

} // namespace spectator
31 changes: 31 additions & 0 deletions spectator/publisher_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,37 @@ TEST(Publisher, UnixBuffer) {
unlink(path.c_str());
}

TEST(Publisher, UnixBufferTimeFlush) {
auto logger = spectator::DefaultLogger();
const auto* dir = first_not_null(std::getenv("TMPDIR"), "/tmp");
auto path = fmt::format("{}/testserver.{}", dir, getpid());
TestUnixServer server{path};
server.Start();
logger->info("Unix Server started on path {}", path);

// Set buffer size to a large value so that flushing is based on time
SpectatordPublisher publisher{fmt::format("unix:{}", path), 10000, std::chrono::milliseconds(500)};
Counter c{std::make_shared<Id>("counter", Tags{}), &publisher};

// Wait for 300ms, increment, and the counter should not be flushed (300ms is less than the 500ms flush interval)
std::this_thread::sleep_for(std::chrono::milliseconds(300));
c.Increment();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
auto msgs = server.GetMessages();
EXPECT_TRUE(msgs.empty());

// Wait for another 300ms, increment, and the counter should be flushed (600ms is greater than 500ms flush interval)
std::this_thread::sleep_for(std::chrono::milliseconds(300));
c.Increment();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
msgs = server.GetMessages();
std::vector<std::string> first_flush{"c:counter:1\nc:counter:1"};
EXPECT_EQ(msgs, first_flush);

server.Stop();
unlink(path.c_str());
}

TEST(Publisher, Nop) {
SpectatordPublisher publisher{"", 0};
Counter c{std::make_shared<Id>("counter", Tags{}), &publisher};
Expand Down
2 changes: 1 addition & 1 deletion spectator/registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ class SpectatordRegistry
std::move(logger)) {
extra_tags_ = Tags::from(config.common_tags);
state_.publisher =
std::make_unique<SpectatordPublisher>(config.endpoint, config.bytes_to_buffer, logger_);
std::make_unique<SpectatordPublisher>(config.endpoint, config.bytes_to_buffer, config.flush_interval, logger_);
}
};

Expand Down

0 comments on commit 170401a

Please sign in to comment.