Skip to content

Commit

Permalink
feat(out_waterdrop): add basic waterdrop output plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
raytung committed Apr 1, 2024
1 parent ea0f10a commit bb8052b
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ require 'rake/testtask'

Rake::TestTask.new(:test) do |test|
test.libs << 'lib' << 'test'

# TODO: include the waterdrop tests after fixing up CI
test.test_files = FileList['test/**/test_*.rb']
.exclude('test/**/test_out_waterdrop.rb')
test.verbose = true
end

Expand Down
8 changes: 8 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: "3"
services:
kafka:
image: confluentinc/confluent-local:7.4.3
ports:
- "9092:9092"


1 change: 1 addition & 0 deletions fluent-plugin-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |gem|
gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
gem.add_dependency 'ltsv'
gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2'
gem.add_dependency 'waterdrop', '~> 2.6'
gem.add_development_dependency "rake", ">= 0.9.2"
gem.add_development_dependency "test-unit", ">= 3.0.8"
gem.add_development_dependency "test-unit-rr", "~> 1.0"
Expand Down
91 changes: 91 additions & 0 deletions lib/fluent/plugin/out_waterdrop.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
require 'thread'
require 'logger'
require 'fluent/plugin/output'
require 'fluent/plugin/kafka_plugin_util'
require 'waterdrop'

module Fluent::Plugin
class Fluent::WaterdropOutput < Output
Fluent::Plugin.register_output('waterdrop', self)
helpers :inject, :formatter, :record_accessor

config_param :bootstrap_servers, :string, default: 'localhost:9092',
desc: <<-DESC
Set bootstrap servers directly:
<broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
DESC

config_param :default_topic, :string, default: nil, desc: <<-DESC
Default output topic when record doesn't have topic field
DESC

config_param :topic_key, :string, :default => 'topic', :desc => "Field for kafka topic"

config_section :buffer do
config_set_default :chunk_keys, ["topic"]
end

config_section :format do
config_set_default :@type, 'json'
config_set_default :add_newline, false
end

def initialize
super

config = {
'bootstrap.servers': @bootstrap_servers
}

@producer = WaterDrop::Producer.new do |conf|
conf.deliver = true
conf.kafka = config
end

@formatter_proc = nil
@topic_key_sym = @topic_key.to_sym
end

def configure(conf)
super

formatter_conf = conf.elements('format').first
unless formatter_conf
raise Fluent::ConfigError, "<format> section is required."
end
unless formatter_conf["@type"]
raise Fluent::ConfigError, "format/@type is required."
end

@formatter_proc = setup_formatter(formatter_conf)
end

def setup_formatter(conf)
@formatter = formatter_create(usage: 'waterdrop-plugin', conf: conf)
@formatter.method(:format)
end

def write(chunk)
tag = chunk.metadata.tag
topic = if @topic
extract_placeholders(@topic, chunk)
else
(chunk.metadata.variables && chunk.metadata.variables[@topic_key_sym]) || @default_topic || tag
end
begin
chunk.msgpack_each do |time, record|
record_buf = @formatter_proc.call(tag, time, record)
@producer.buffer(topic: topic, payload: record_buf)
end

@producer.flush_sync
end
end

def shutdown
super

@producer.close
end
end
end
110 changes: 110 additions & 0 deletions test/plugin/test_out_waterdrop.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
require 'fluent/test'
require 'fluent/test/helpers'
require 'fluent/test/driver/output'
require 'fluent/plugin/out_waterdrop'
require 'rdkafka'
require 'json'

# 1. run docker-compose to spin up the Kafka broker
# 2. Run these tests
class WaterdropOutputTest < Test::Unit::TestCase
include Fluent::Test::Helpers

def setup
Fluent::Test.setup
end

def create_driver(conf, tag = 'test')
Fluent::Test::Driver::Output.new(Fluent::WaterdropOutput).configure(conf)
end

sub_test_case 'configure' do
test 'basic configuration' do
assert_nothing_raised(Fluent::ConfigError) do
config = %[
@type waterdrop
<format>
@type json
</format>
]
driver = create_driver(config)

assert_equal 'localhost:9092', driver.instance.bootstrap_servers
end
end

test 'missing format section' do
assert_raise(Fluent::ConfigError) do
config = %[
@type waterdrop
]
create_driver(config)
end
end

test 'formatter section missing @type' do
assert_raise(Fluent::ConfigError) do
config = %[
@type waterdrop
<format>
literally 'anything else'
</format>
]
create_driver(config)
end
end
end

sub_test_case 'produce' do
GLOBAL_CONFIG = {
"bootstrap.servers" => "localhost:9092",
"topic.metadata.propagation.max.ms" => 11 * 1_000,
"topic.metadata.refresh.interval.ms" => 10 * 1_000,
}
TOPIC = 'produce.basic-produce'

def setup
@kafka_admin = Rdkafka::Config.new(GLOBAL_CONFIG).admin
@kafka_consumer = Rdkafka::Config.new(
GLOBAL_CONFIG.merge(
{
"group.id" => "waterdrop",
"auto.offset.reset" => "earliest",
}
)
).consumer

@kafka_admin.delete_topic(TOPIC)
@kafka_admin.create_topic(TOPIC, 1, 1)
.wait(max_wait_timeout: 30)
@kafka_consumer.subscribe(TOPIC)
end

def teardown
@kafka_consumer.close
@kafka_admin.delete_topic(TOPIC)
@kafka_admin.close
end

test 'basic produce' do
config = %[
@type waterdrop
default_topic #{TOPIC}
<format>
@type json
</format>
]
d = create_driver(config)
d.run(default_tag: TOPIC, flush: true) do
d.feed(Fluent::EventTime.now, { topic: TOPIC, body: '123' })
end

sleep(12)

raw_message = @kafka_consumer.poll(5_000)

message = JSON.parse!(raw_message.payload)
assert_equal '123', message["body"]
end
end
end

0 comments on commit bb8052b

Please sign in to comment.