diff --git a/README.md b/README.md index e4ad29d..469a2d6 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,8 @@ Consume events by single consumer. topics format :default => json message_key + add_headers + headers_key add_prefix add_suffix @@ -122,6 +124,7 @@ Consume events by kafka consumer group features.. message_key kafka_message_key add_headers + headers_key add_prefix add_suffix retry_emit_limit @@ -159,6 +162,7 @@ With the introduction of the rdkafka-ruby based input plugin we hope to support message_key kafka_message_key add_headers + headers_key add_prefix add_suffix retry_emit_limit diff --git a/lib/fluent/plugin/in_kafka.rb b/lib/fluent/plugin/in_kafka.rb index 4cb1c6a..746a4bb 100644 --- a/lib/fluent/plugin/in_kafka.rb +++ b/lib/fluent/plugin/in_kafka.rb @@ -51,6 +51,11 @@ class Fluent::KafkaInput < Fluent::Input config_param :kafka_message_key, :string, :default => nil, :desc => "Set kafka's message key to this field" + config_param :add_headers, :bool, :default => false, + :desc => "Add kafka's message headers to event record" + config_param :headers_key, :string, :default => nil, + :desc => "Record key to store kafka's message headers" + # Kafka#fetch_messages options config_param :max_bytes, :integer, :default => nil, :desc => "Maximum number of bytes to fetch." @@ -235,6 +240,8 @@ def start @record_time_key, @tag_source, @record_tag_key, + @add_headers, + @headers_key, opt) } @topic_watchers.each {|tw| @@ -259,7 +266,7 @@ def run end class TopicWatcher < Coolio::TimerWatcher - def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, time_source, record_time_key, tag_source, record_tag_key, options={}) + def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, time_source, record_time_key, tag_source, record_tag_key, add_headers, headers_key, options={}) @topic_entry = topic_entry @kafka = kafka @callback = method(:consume) @@ -274,6 +281,8 @@ def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, off @record_time_key = record_time_key @tag_source = tag_source @record_tag_key = record_tag_key + @add_headers = add_headers + @headers_key = headers_key @next_offset = @topic_entry.offset if @topic_entry.offset == -1 && offset_manager @@ -332,6 +341,16 @@ def consume if @kafka_message_key record[@kafka_message_key] = msg.key end + if @add_headers + if @headers_key + headers_record = record[@headers_key] = {} + else + headers_record = record + end + msg.headers.each_pair { |k, v| + headers_record[k] = v + } + end es.add(record_time, record) rescue => e $log.warn "parser error in #{@topic_entry.topic}/#{@topic_entry.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset diff --git a/lib/fluent/plugin/in_kafka_group.rb b/lib/fluent/plugin/in_kafka_group.rb index 70dbdbe..c952046 100644 --- a/lib/fluent/plugin/in_kafka_group.rb +++ b/lib/fluent/plugin/in_kafka_group.rb @@ -20,6 +20,8 @@ class Fluent::KafkaGroupInput < Fluent::Input :desc => "For 'text' format only." config_param :add_headers, :bool, :default => false, :desc => "Add kafka's message headers to event record" + config_param :headers_key, :string, :default => nil, + :desc => "Record key to store kafka's message headers" config_param :add_prefix, :string, :default => nil, :desc => "Tag prefix (Optional)" config_param :add_suffix, :string, :default => nil, @@ -259,7 +261,7 @@ def reconnect_consumer end def process_batch_with_record_tag(batch) - es = {} + es = {} batch.messages.each { |msg| begin record = @parser_proc.call(msg) @@ -285,8 +287,13 @@ def process_batch_with_record_tag(batch) record[@kafka_message_key] = msg.key end if @add_headers + if @headers_key + headers_record = record[@headers_key] = {} + else + headers_record = record + end msg.headers.each_pair { |k, v| - record[k] = v + headers_record[k] = v } end es[tag].add(record_time, record) @@ -332,8 +339,13 @@ def process_batch(batch) record[@kafka_message_key] = msg.key end if @add_headers + if @headers_key + headers_record = record[@headers_key] = {} + else + headers_record = record + end msg.headers.each_pair { |k, v| - record[k] = v + headers_record[k] = v } end es.add(record_time, record) @@ -355,7 +367,7 @@ def run if @tag_source == :record process_batch_with_record_tag(batch) else - process_batch(batch) + process_batch(batch) end } rescue ForShutdown diff --git a/lib/fluent/plugin/in_rdkafka_group.rb b/lib/fluent/plugin/in_rdkafka_group.rb index 5141a91..5503fe8 100644 --- a/lib/fluent/plugin/in_rdkafka_group.rb +++ b/lib/fluent/plugin/in_rdkafka_group.rb @@ -18,6 +18,8 @@ class Fluent::Plugin::RdKafkaGroupInput < Fluent::Plugin::Input :desc => "For 'text' format only." config_param :add_headers, :bool, :default => false, :desc => "Add kafka's message headers to event record" + config_param :headers_key, :string, :default => nil, + :desc => "Record key to store kafka's message headers" config_param :add_prefix, :string, :default => nil, :desc => "Tag prefix (Optional)" config_param :add_suffix, :string, :default => nil, @@ -254,8 +256,13 @@ def run record[@kafka_message_key] = msg.key end if @add_headers + if @headers_key + headers_record = record[@headers_key] = {} + else + headers_record = record + end msg.headers.each_pair { |k, v| - record[k] = v + headers_record[k] = v } end es.add(record_time, record) diff --git a/test/plugin/test_in_kafka.rb b/test/plugin/test_in_kafka.rb index cd970dd..2c037cf 100644 --- a/test/plugin/test_in_kafka.rb +++ b/test/plugin/test_in_kafka.rb @@ -21,7 +21,6 @@ def create_driver(conf = CONFIG) Fluent::Test::Driver::Input.new(Fluent::KafkaInput).configure(conf) end - def test_configure d = create_driver assert_equal TOPIC_NAME, d.instance.topics @@ -63,4 +62,63 @@ def test_consume assert_equal expected, d.events[0][2] end end + + class ConsumeWithHeadersTest < self + CONFIG_TEMPLATE = %[ + @type kafka + brokers localhost:9092 + format text + @label @kafka + topics %s + %s + ].freeze + + def topic_random + "kafka-input-#{SecureRandom.uuid}" + end + + def kafka_test_context(conf_adds: '', topic: topic_random, conf_template: CONFIG_TEMPLATE) + kafka = Kafka.new(['localhost:9092'], client_id: 'kafka') + producer = kafka.producer(required_acks: 1) + + config = format(conf_template, topic: topic, conf_adds: conf_adds) + driver = create_driver(config) + + yield topic, producer, driver + ensure + kafka.delete_topic(topic) + kafka.close + end + + def test_with_headers_content_merged_into_record + conf_adds = 'add_headers true' + kafka_test_context(conf_adds: conf_adds) do |topic, producer, driver| + driver.run(expect_records: 1, timeout: 5) do + producer.produce('Hello, fluent-plugin-kafka!', topic: topic, headers: { header1: 'content1' }) + producer.deliver_messages + end + + expected = { 'message' => 'Hello, fluent-plugin-kafka!', + 'header1' => 'content1' } + assert_equal expected, driver.events[0][2] + end + end + + def test_with_headers_content_merged_under_dedicated_key + conf_adds = %( + add_headers true + headers_key kafka_headers + ) + kafka_test_context(conf_adds: conf_adds) do |topic, producer, driver| + driver.run(expect_records: 1, timeout: 5) do + producer.produce('Hello, fluent-plugin-kafka!', topic: topic, headers: { header1: 'content1' }) + producer.deliver_messages + end + + expected = { 'message' => 'Hello, fluent-plugin-kafka!', + 'kafka_headers' => { 'header1' => 'content1' } } + assert_equal expected, driver.events[0][2] + end + end + end end diff --git a/test/plugin/test_in_kafka_group.rb b/test/plugin/test_in_kafka_group.rb index 0637068..d5e47fd 100644 --- a/test/plugin/test_in_kafka_group.rb +++ b/test/plugin/test_in_kafka_group.rb @@ -23,7 +23,6 @@ def create_driver(conf = CONFIG) Fluent::Test::Driver::Input.new(Fluent::KafkaGroupInput).configure(conf) end - def test_configure d = create_driver assert_equal [TOPIC_NAME], d.instance.topics @@ -48,14 +47,6 @@ def teardown end def test_consume - conf = %[ - @type kafka - brokers localhost:9092 - format text - @label @kafka - refresh_topic_interval 0 - topics #{TOPIC_NAME} - ] d = create_driver d.run(expect_records: 1, timeout: 10) do @@ -66,4 +57,65 @@ def test_consume assert_equal expected, d.events[0][2] end end + + class ConsumeWithHeadersTest < self + CONFIG_TEMPLATE = %( + @type kafka + brokers localhost:9092 + consumer_group fluentd + format text + refresh_topic_interval 0 + @label @kafka + topics %s + %s + ).freeze + + def topic_random + "kafka-input-#{SecureRandom.uuid}" + end + + def kafka_test_context(conf_adds: '', topic: topic_random, conf_template: CONFIG_TEMPLATE) + kafka = Kafka.new(['localhost:9092'], client_id: 'kafka') + producer = kafka.producer(required_acks: 1) + + config = format(conf_template, topic: topic, conf_adds: conf_adds) + driver = create_driver(config) + + yield topic, producer, driver + ensure + kafka.delete_topic(topic) + kafka.close + end + + def test_with_headers_content_merged_into_record + conf_adds = 'add_headers true' + kafka_test_context(conf_adds: conf_adds) do |topic, producer, driver| + driver.run(expect_records: 1, timeout: 5) do + producer.produce('Hello, fluent-plugin-kafka!', topic: topic, headers: { header1: 'content1' }) + producer.deliver_messages + end + + expected = { 'message' => 'Hello, fluent-plugin-kafka!', + 'header1' => 'content1' } + assert_equal expected, driver.events[0][2] + end + end + + def test_with_headers_content_merged_under_dedicated_key + conf_adds = %( + add_headers true + headers_key kafka_headers + ) + kafka_test_context(conf_adds: conf_adds) do |topic, producer, driver| + driver.run(expect_records: 1, timeout: 5) do + producer.produce('Hello, fluent-plugin-kafka!', topic: topic, headers: { header1: 'content1' }) + producer.deliver_messages + end + + expected = { 'message' => 'Hello, fluent-plugin-kafka!', + 'kafka_headers' => { 'header1' => 'content1' } } + assert_equal expected, driver.events[0][2] + end + end + end end