Skip to content

Commit

Permalink
Add logic to write method of out_opensearch_data_stream.
Browse files Browse the repository at this point in the history
Add tag_key to record and remove keys if config options are set.
Fix issue #83.
Fix typo/language in README related to contributions.
  • Loading branch information
renegaderyu committed Jul 11, 2023
1 parent e351ee8 commit a8d3eb4
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1606,7 +1606,7 @@ There are usually a few feature requests, tagged [Easy](https://github.com/fluen

Pull Requests are welcomed.

Becore send a pull request or report an issue, please read [the contribution guideline](CONTRIBUTING.md).
Before sending a pull request or reporting an issue, please read [the contribution guideline](CONTRIBUTING.md).

## Running tests

Expand Down
6 changes: 6 additions & 0 deletions lib/fluent/plugin/out_opensearch_data_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ def write(chunk)
dt = Time.at(time).to_datetime
end
record.merge!({"@timestamp" => dt.iso8601(@time_precision)})
if @include_tag_key
record[@tag_key] = tag
end
if @remove_keys
@remove_keys.each { |key| record.delete(key) }
end
bulk_message = append_record_to_messages(CREATE_OP, {}, headers, record, bulk_message)
rescue => e
emit_error_label_event do
Expand Down
43 changes: 43 additions & 0 deletions test/plugin/test_out_opensearch_data_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -678,4 +678,47 @@ def test_record_no_timestamp
assert(index_cmds[1].has_key? '@timestamp')
end

def test_record_with_include_tag_key
stub_default
stub_bulk_feed
stub_default
stub_bulk_feed
conf = config_element(
'ROOT', '', {
'@type' => OPENSEARCH_DATA_STREAM_TYPE,
'data_stream_name' => 'foo',
'data_stream_template_name' => 'foo_tpl',
'include_tag_key' => true,
'tag_key' => 'test_tag'
})
record = {
'message' => 'Sample Record'
}
driver(conf).run(default_tag: 'test') do
driver.feed(record)
end
assert(index_cmds[1].has_key? 'test_tag')
end

def test_record_without_include_tag_key
stub_default
stub_bulk_feed
stub_default
stub_bulk_feed
conf = config_element(
'ROOT', '', {
'@type' => OPENSEARCH_DATA_STREAM_TYPE,
'data_stream_name' => 'foo',
'data_stream_template_name' => 'foo_tpl',
'include_tag_key' => false
})
record = {
'message' => 'Sample Record'
}
driver(conf).run(default_tag: 'test') do
driver.feed(record)
end
assert_not(index_cmds[1].has_key? 'test')
end

end

0 comments on commit a8d3eb4

Please sign in to comment.