Skip to content

Commit

Permalink
delay_serialization: implement feature
Browse files Browse the repository at this point in the history
Add the `delay_serialization` option, allowing users to delay expensive
serialization until a more convenient time, such as after an HTTP
request has completed. In multi-threaded mode, it causes serialization
to happen inside the sender thread.

Also, support the `sender_queue_size` in `single_thread` mode, so that
it can benefit from the new `delay_serialization` option. Messages are
now queued (possibly unserialized) until `sender_queue_size` is reached
or `#flush` is called. It may be set to `Float::INFINITY`, so that
messages are indefinitely queued until an explicit `#flush`.

Fix #271

Co-Authored-By: Blake Williams <[email protected]>
  • Loading branch information
pudiva and BlakeWilliams committed Jun 7, 2023
1 parent 9b3ae2d commit 378d519
Show file tree
Hide file tree
Showing 12 changed files with 170 additions and 17 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,21 @@

[//]: # (comment: Don't forget to update lib/datadog/statsd/version.rb:DogStatsd::Statsd::VERSION when releasing a new version)

## 5.6.0 / 2023.06.07

* [FEATURE] Add the `delay_serialization` option, allowing users to delay
expensive serialization until a more convenient time, such as after an HTTP
request has completed. In multi-threaded mode, it causes serialization to
happen inside the sender thread. [#271][] by [@pudiva][] and
[@BlakeWilliams][]

* [FEATURE] Also, support the `sender_queue_size` in `single_thread` mode, so
that it can benefit from the new `delay_serialization` option. Messages are
now queued (possibly unserialized) until `sender_queue_size` is reached or
`#flush` is called. It may be set to `Float::INFINITY`, so that messages
are indefinitely queued until an explicit `#flush`. [#271][] by [@pudiva][]
and [@BlakeWilliams][]

## 5.5.0 / 2022.06.01

* [FEATURE] Add `distribution_time` method to facilitate measuring timing of a yielded block. [#248][] by [@jordan-brough][]
Expand Down Expand Up @@ -431,6 +446,7 @@ Future versions are likely to introduce backward incompatibilities with < Ruby 1
[#257]: https://github.com/DataDog/dogstatsd-ruby/issues/257
[#258]: https://github.com/DataDog/dogstatsd-ruby/issues/258
[#260]: https://github.com/DataDog/dogstatsd-ruby/issues/260
[#271]: https://github.com/DataDog/dogstatsd-ruby/issues/271
[@AMekss]: https://github.com/AMekss
[@abicky]: https://github.com/abicky
[@adimitrov]: https://github.com/adimitrov
Expand Down Expand Up @@ -469,3 +485,5 @@ Future versions are likely to introduce backward incompatibilities with < Ruby 1
[@delner]: https://github.com/delner
[@tenderlove]: https://github.com/tenderlove
[@zachmccormick]: https://github.com/zachmccormick
[@pudiva]: https://github.com/pudiva
[@BlakeWilliams]: https://github.com/BlakeWilliams
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ There is also an implicit message which closes the queue which will cause the se
statsd = Datadog::Statsd.new('localhost', 8125)
```

The message queue's maximum size (in messages) is given by the `sender_queue_size` argument, and has appropriate defaults for UDP (2048) and UDS (512).
The message queue's maximum size (in messages) is given by the `sender_queue_size` argument, and has appropriate defaults for UDP (2048), UDS (512) and `single_thread: true` (1).

The `buffer_flush_interval`, if enabled, is implemented with an additional thread which manages the timing of those flushes. This additional thread is used even if `single_thread: true`.

Expand All @@ -209,6 +209,16 @@ By default, instances of `Datadog::Statsd` are thread-safe and we recommend that

When using the `single_thread: true` mode, instances of `Datadog::Statsd` are still thread-safe, but you may run into contention on heavily-threaded applications, so we don’t recommend (for performance reasons) reusing these instances.

### Delaying serialization

By default, message serialization happens synchronously whenever stat methods such as `#increment` gets called, blocking the caller. If the blocking is impacting your program's performance, you may want to consider the `delay_serialization: true` mode.

The `delay_serialization: true` mode delays the serialization of metrics to avoid the wait when submitting metrics. Serialization will still have to happen at some point, but it might be postponed until at a more convenient time, such as after an HTTP request has completed.

In `single_thread: true` mode, you'll probably want to set `sender_queue_size:` from it's default of `1` to some greater value, so that it can benefit from `delay_serialization: true`. Messages will then be queued unserialized in the sender queue and processed normally whenever `sender_queue_size` is reached or `#flush` is called. You might set `sender_queue_size: Float::INFINITY` to allow for an unbounded queue that will only be processed on explicit `#flush`.

In `single_thread: false` mode, `delay_serialization: true`, will cause serialization to happen inside the sender thread.

## Versioning

This Ruby gem is using [Semantic Versioning](https://guides.rubygems.org/patterns/#semantic-versioning) but please note that supported Ruby versions can change in a minor release of this library.
Expand Down
13 changes: 11 additions & 2 deletions lib/datadog/statsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,12 @@ def tags
# @option [Logger] logger for debugging
# @option [Integer] buffer_max_payload_size max bytes to buffer
# @option [Integer] buffer_max_pool_size max messages to buffer
# @option [Integer] sender_queue_size size of the sender queue in number of buffers (multi-thread only)
# @option [Integer] sender_queue_size size of the sender queue in number of buffers
# @option [Numeric] buffer_flush_interval interval in second to flush buffer
# @option [String] socket_path unix socket path
# @option [Float] default sample rate if not overridden
# @option [Boolean] single_thread flushes the metrics on the main thread instead of in a companion thread
# @option [Boolean] delay_serialization delays stat serialization
def initialize(
host = nil,
port = nil,
Expand All @@ -100,6 +101,7 @@ def initialize(
logger: nil,

single_thread: false,
delay_serialization: false,

telemetry_enable: true,
telemetry_flush_interval: DEFAULT_TELEMETRY_FLUSH_INTERVAL
Expand All @@ -112,6 +114,7 @@ def initialize(
@prefix = @namespace ? "#{@namespace}.".freeze : nil
@serializer = Serialization::Serializer.new(prefix: @prefix, global_tags: tags)
@sample_rate = sample_rate
@delay_serialization = delay_serialization

@forwarder = Forwarder.new(
connection_cfg: ConnectionCfg.new(
Expand All @@ -133,6 +136,7 @@ def initialize(
sender_queue_size: sender_queue_size,

telemetry_flush_interval: telemetry_enable ? telemetry_flush_interval : nil,
serializer: serializer
)
end

Expand Down Expand Up @@ -425,7 +429,12 @@ def send_stats(stat, delta, type, opts = EMPTY_OPTIONS)
sample_rate = opts[:sample_rate] || @sample_rate || 1

if sample_rate == 1 || opts[:pre_sampled] || rand <= sample_rate
full_stat = serializer.to_stat(stat, delta, type, tags: opts[:tags], sample_rate: sample_rate)
full_stat =
if @delay_serialization
[[stat, delta, type], {tags: opts[:tags], sample_rate: sample_rate}]
else
serializer.to_stat(stat, delta, type, tags: opts[:tags], sample_rate: sample_rate)
end

forwarder.send_message(full_stat)
end
Expand Down
9 changes: 7 additions & 2 deletions lib/datadog/statsd/forwarder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ def initialize(

single_thread: false,

logger: nil
logger: nil,

serializer:
)
@transport_type = connection_cfg.transport_type

Expand Down Expand Up @@ -52,16 +54,19 @@ def initialize(
max_payload_size: buffer_max_payload_size,
max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE,
overflowing_stategy: buffer_overflowing_stategy,
serializer: serializer
)

sender_queue_size ||= 1 if single_thread
sender_queue_size ||= (@transport_type == :udp ?
UDP_DEFAULT_SENDER_QUEUE_SIZE : UDS_DEFAULT_SENDER_QUEUE_SIZE)

@sender = single_thread ?
SingleThreadSender.new(
buffer,
logger: logger,
flush_interval: buffer_flush_interval) :
flush_interval: buffer_flush_interval,
queue_size: sender_queue_size) :
Sender.new(
buffer,
logger: logger,
Expand Down
10 changes: 9 additions & 1 deletion lib/datadog/statsd/message_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ class MessageBuffer
def initialize(connection,
max_payload_size: nil,
max_pool_size: DEFAULT_BUFFER_POOL_SIZE,
overflowing_stategy: :drop
overflowing_stategy: :drop,
serializer:
)
raise ArgumentError, 'max_payload_size keyword argument must be provided' unless max_payload_size
raise ArgumentError, 'max_pool_size keyword argument must be provided' unless max_pool_size
Expand All @@ -17,12 +18,19 @@ def initialize(connection,
@max_payload_size = max_payload_size
@max_pool_size = max_pool_size
@overflowing_stategy = overflowing_stategy
@serializer = serializer

@buffer = String.new
clear_buffer
end

def add(message)
# Serializes the message if it hasn't been already. Part of the
# delay_serialization feature.
if message.is_a?(Array)
message = @serializer.to_stat(*message[0], **message[1])
end

message_size = message.bytesize

return nil unless message_size > 0 # to avoid adding empty messages to the buffer
Expand Down
5 changes: 4 additions & 1 deletion lib/datadog/statsd/sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ def add(message)
if message_queue.length <= @queue_size
message_queue << message
else
@telemetry.dropped_queue(packets: 1, bytes: message.bytesize) if @telemetry
if @telemetry
bytesize = message.respond_to?(:bytesize) ? message.bytesize : 0
@telemetry.dropped_queue(packets: 1, bytes: bytesize)
end
end
end

Expand Down
18 changes: 16 additions & 2 deletions lib/datadog/statsd/single_thread_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ class Statsd
# It is using current Process.PID to check it is the result of a recent fork
# and it is reseting the MessageBuffer if that's the case.
class SingleThreadSender
def initialize(message_buffer, logger: nil, flush_interval: nil)
def initialize(message_buffer, logger: nil, flush_interval: nil, queue_size: 1)
@message_buffer = message_buffer
@logger = logger
@mx = Mutex.new
@queue_size = queue_size
@queue = []
@flush_timer = if flush_interval
Datadog::Statsd::Timer.new(flush_interval) { flush }
else
Expand All @@ -26,15 +28,21 @@ def add(message)
# not send, they belong to the parent process, let's clear the buffer.
if forked?
@message_buffer.reset
@queue.clear
@flush_timer.start if @flush_timer && @flush_timer.stop?
update_fork_pid
end
@message_buffer.add(message)

@queue << message
if @queue.size >= @queue_size
drain_queue
end
}
end

def flush(*)
@mx.synchronize {
drain_queue
@message_buffer.flush()
}
end
Expand All @@ -53,6 +61,12 @@ def rendez_vous()

private

def drain_queue
while msg = @queue.shift
@message_buffer.add(msg)
end
end

# below are "fork management" methods to be able to clean the MessageBuffer
# if it detects that it is running in a unknown PID.

Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/statsd/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

module Datadog
class Statsd
VERSION = '5.5.0'
VERSION = '5.6.0'
end
end
38 changes: 38 additions & 0 deletions spec/integrations/delay_serialization_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
require "spec_helper"

describe "Delayed serialization mode" do
it "defers serialization to message buffer" do
buffer = double(Datadog::Statsd::MessageBuffer)
# expects an Array is passed and not a String
expect(buffer)
.to receive(:add)
.with([["boo", 1, "c"], {tags: nil, sample_rate: 1}])
# and then expect no more adds!
expect(buffer).to receive(:add).exactly(0).times
expect(buffer)
.to receive(:flush)

allow(Datadog::Statsd::MessageBuffer).to receive(:new).and_return(buffer)
dogstats = Datadog::Statsd.new("localhost", 1234, delay_serialization: true)

dogstats.increment("boo")
dogstats.flush(sync: true)
end

it "serializes messages normally" do
socket = FakeUDPSocket.new(copy_message: true)
allow(UDPSocket).to receive(:new).and_return(socket)
dogstats = Datadog::Statsd.new("localhost", 1234, delay_serialization: true)

dogstats.increment("boo")
dogstats.increment("oob", tags: {tag1: "val1"})
dogstats.increment("pow", tags: {tag1: "val1"}, sample_rate: 2)
dogstats.flush(sync: true)

expect(socket.recv[0]).to eq([
"boo:1|c",
"oob:1|c|#tag1:val1",
"pow:1|c|@2|#tag1:val1"
].join("\n"))
end
end
10 changes: 10 additions & 0 deletions spec/statsd/forwarder_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
instance_double(Logger)
end

let(:serializer) do
Datadog::Statsd::Serialization::Serializer.new
end

before do
allow(Datadog::Statsd::MessageBuffer)
.to receive(:new)
Expand Down Expand Up @@ -94,6 +98,8 @@

logger: logger,
global_tags: global_tags,

serializer: serializer,
}
end

Expand Down Expand Up @@ -277,6 +283,8 @@

logger: logger,
global_tags: global_tags,

serializer: serializer,
}
end

Expand Down Expand Up @@ -464,6 +472,8 @@

logger: logger,
global_tags: global_tags,

serializer: serializer,
}
end

Expand Down
5 changes: 5 additions & 0 deletions spec/statsd/message_buffer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
max_payload_size: max_payload_size,
max_pool_size: max_pool_size,
overflowing_stategy: overflowing_stategy,
serializer: serializer,
)
end

Expand All @@ -25,6 +26,10 @@
:drop
end

let(:serializer) do
Datadog::Statsd::Serialization::Serializer.new
end

describe '#add' do
context 'when the message is empty' do
it 'returns nil' do
Expand Down
Loading

0 comments on commit 378d519

Please sign in to comment.