Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(out_waterdrop): add basic waterdrop output plugin #508

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ require 'rake/testtask'

Rake::TestTask.new(:test) do |test|
test.libs << 'lib' << 'test'

# TODO: include the waterdrop tests after fixing up CI
test.test_files = FileList['test/**/test_*.rb']
.exclude('test/**/test_out_waterdrop.rb')
test.verbose = true
end

Expand Down
8 changes: 8 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: "3"
services:
kafka:
image: confluentinc/confluent-local:7.4.3
ports:
- "9092:9092"


1 change: 1 addition & 0 deletions fluent-plugin-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |gem|
gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
gem.add_dependency 'ltsv'
gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2'
gem.add_dependency 'waterdrop', '~> 2.6'
gem.add_development_dependency "rake", ">= 0.9.2"
gem.add_development_dependency "test-unit", ">= 3.0.8"
gem.add_development_dependency "test-unit-rr", "~> 1.0"
Expand Down
91 changes: 91 additions & 0 deletions lib/fluent/plugin/out_waterdrop.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
require 'thread'
require 'logger'
require 'fluent/plugin/output'
require 'fluent/plugin/kafka_plugin_util'
require 'waterdrop'

module Fluent::Plugin
class Fluent::WaterdropOutput < Output
Fluent::Plugin.register_output('waterdrop', self)
helpers :inject, :formatter, :record_accessor

config_param :bootstrap_servers, :string, default: 'localhost:9092',
desc: <<-DESC
Set bootstrap servers directly:
<broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
DESC

config_param :default_topic, :string, default: nil, desc: <<-DESC
Default output topic when record doesn't have topic field
DESC

config_param :topic_key, :string, :default => 'topic', :desc => "Field for kafka topic"

config_section :buffer do
config_set_default :chunk_keys, ["topic"]
end

config_section :format do
config_set_default :@type, 'json'
config_set_default :add_newline, false
end

def initialize
super

config = {
'bootstrap.servers': @bootstrap_servers
}

@producer = WaterDrop::Producer.new do |conf|
conf.deliver = true
conf.kafka = config
end

@formatter_proc = nil
@topic_key_sym = @topic_key.to_sym
end

def configure(conf)
super

formatter_conf = conf.elements('format').first
unless formatter_conf
raise Fluent::ConfigError, "<format> section is required."
end
unless formatter_conf["@type"]
raise Fluent::ConfigError, "format/@type is required."
end

@formatter_proc = setup_formatter(formatter_conf)
end

def setup_formatter(conf)
@formatter = formatter_create(usage: 'waterdrop-plugin', conf: conf)
@formatter.method(:format)
end

def write(chunk)
tag = chunk.metadata.tag
topic = if @topic
extract_placeholders(@topic, chunk)
else
(chunk.metadata.variables && chunk.metadata.variables[@topic_key_sym]) || @default_topic || tag
end
begin
chunk.msgpack_each do |time, record|
record_buf = @formatter_proc.call(tag, time, record)
@producer.buffer(topic: topic, payload: record_buf)
end

@producer.flush_sync
end
end

def shutdown
super

@producer.close
end
end
end
110 changes: 110 additions & 0 deletions test/plugin/test_out_waterdrop.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
require 'fluent/test'
require 'fluent/test/helpers'
require 'fluent/test/driver/output'
require 'fluent/plugin/out_waterdrop'
require 'rdkafka'
require 'json'

# 1. run docker-compose to spin up the Kafka broker
# 2. Run these tests
class WaterdropOutputTest < Test::Unit::TestCase
include Fluent::Test::Helpers

def setup
Fluent::Test.setup
end

def create_driver(conf, tag = 'test')
Fluent::Test::Driver::Output.new(Fluent::WaterdropOutput).configure(conf)
end

sub_test_case 'configure' do
test 'basic configuration' do
assert_nothing_raised(Fluent::ConfigError) do
config = %[
@type waterdrop
<format>
@type json
</format>
]
driver = create_driver(config)

assert_equal 'localhost:9092', driver.instance.bootstrap_servers
end
end

test 'missing format section' do
assert_raise(Fluent::ConfigError) do
config = %[
@type waterdrop
]
create_driver(config)
end
end

test 'formatter section missing @type' do
assert_raise(Fluent::ConfigError) do
config = %[
@type waterdrop
<format>
literally 'anything else'
</format>
]
create_driver(config)
end
end
end

sub_test_case 'produce' do
GLOBAL_CONFIG = {
"bootstrap.servers" => "localhost:9092",
"topic.metadata.propagation.max.ms" => 11 * 1_000,
"topic.metadata.refresh.interval.ms" => 10 * 1_000,
}
TOPIC = 'produce.basic-produce'

def setup
@kafka_admin = Rdkafka::Config.new(GLOBAL_CONFIG).admin
@kafka_consumer = Rdkafka::Config.new(
GLOBAL_CONFIG.merge(
{
"group.id" => "waterdrop",
"auto.offset.reset" => "earliest",
}
)
).consumer

@kafka_admin.delete_topic(TOPIC)
@kafka_admin.create_topic(TOPIC, 1, 1)
.wait(max_wait_timeout: 30)
@kafka_consumer.subscribe(TOPIC)
end

def teardown
@kafka_consumer.close
@kafka_admin.delete_topic(TOPIC)
@kafka_admin.close
end

test 'basic produce' do
config = %[
@type waterdrop
default_topic #{TOPIC}
<format>
@type json
</format>
]
d = create_driver(config)
d.run(default_tag: TOPIC, flush: true) do
d.feed(Fluent::EventTime.now, { topic: TOPIC, body: '123' })
end

sleep(12)

raw_message = @kafka_consumer.poll(5_000)

message = JSON.parse!(raw_message.payload)
assert_equal '123', message["body"]
end
end
end
Loading