Skip to content

Commit

Permalink
Updated in_opensearch.rb and added retry logic for both client creati…
Browse files Browse the repository at this point in the history
…on and search operations in case of connection failures.

Signed-off-by: Oleh Palanskyi <[email protected]>

Signed-off-by: Oleh <[email protected]>
Signed-off-by: OlehPalanskyi <[email protected]>
  • Loading branch information
OlehPalanskyi committed Apr 15, 2024
1 parent 0762b3f commit f5e9415
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 44 deletions.
10 changes: 10 additions & 0 deletions README.OpenSearchInput.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
+ [docinfo_fields](#docinfo_fields)
+ [docinfo_target](#docinfo_target)
+ [docinfo](#docinfo)
+ [infinite_check_connection](#infinite_check_connection)
* [Advanced Usage](#advanced-usage)

## Usage
Expand Down Expand Up @@ -274,6 +275,15 @@ This parameter specifies whether docinfo information including or not. The defau
docinfo false
```

### infinite_check_connection

The parameter infinite checking on connection availability with Elasticsearch or opensearch hosts, every request_timeout (default 5) seconds. The default value is `true,`. But if value is `false` then checking of connection will be only 3 times

```
infinite_check_connection true
```


## Advanced Usage

OpenSearch Input plugin and OpenSearch output plugin can combine to transfer records into another cluster.
Expand Down
125 changes: 81 additions & 44 deletions lib/fluent/plugin/in_opensearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class UnrecoverableRequestFailure < Fluent::UnrecoverableError; end
config_param :docinfo_fields, :array, :default => ['_index', '_type', '_id']
config_param :docinfo_target, :string, :default => METADATA
config_param :docinfo, :bool, :default => false
config_param :infinite_check_connection, :bool, :default => true

include Fluent::Plugin::OpenSearchConstants

Expand Down Expand Up @@ -240,40 +241,55 @@ def parse_time(value, event_time, tag)
end

def client(host = nil)
# check here to see if we already have a client connection for the given host
connection_options = get_connection_options(host)
retry_count = 0
max_retry = @infinite_check_connection ? Float::INFINITY : 3 # Adjust the maximum number of retries as needed

@_os = nil unless is_existing_connection(connection_options[:hosts])
begin
# check here to see if we already have a client connection for the given host
connection_options = get_connection_options(host)

@_os ||= begin
@current_config = connection_options[:hosts].clone
adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
local_reload_connections = @reload_connections
if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
local_reload_connections = @reload_after
end
@_os = nil unless is_existing_connection(connection_options[:hosts])

@_os ||= begin
@current_config = connection_options[:hosts].clone
adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
local_reload_connections = @reload_connections
if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
local_reload_connections = @reload_after
end

headers = { 'Content-Type' => "application/json" }.merge(@custom_headers)

transport = OpenSearch::Transport::Transport::HTTP::Faraday.new(
connection_options.merge(
options: {
reload_connections: local_reload_connections,
reload_on_failure: @reload_on_failure,
resurrect_after: @resurrect_after,
logger: @transport_logger,
transport_options: {
headers: headers,
request: { timeout: @request_timeout },
ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
},
http: {
user: @user,
password: @password
},
sniffer_class: @sniffer_class,
}), &adapter_conf)
OpenSearch::Client.new transport: transport
headers = { 'Content-Type' => "application/json" }.merge(@custom_headers)

transport = OpenSearch::Transport::Transport::HTTP::Faraday.new(
connection_options.merge(
options: {
reload_connections: local_reload_connections,
reload_on_failure: @reload_on_failure,
resurrect_after: @resurrect_after,
logger: @transport_logger,
transport_options: {
headers: headers,
request: { timeout: @request_timeout },
ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
},
http: {
user: @user,
password: @password
},
sniffer_class: @sniffer_class,
}), &adapter_conf)
OpenSearch::Client.new transport: transport
end
rescue Faraday::ConnectionFailed => e
# Retry logic for connection failures during client creation
if retry_count < max_retry
log.warn "Connection to OpenSearch failed during client creation: #{e.message}. Retrying (Attempt #{retry_count + 1})..."
retry_count += 1
sleep(@request_timeout)
retry
else
raise UnrecoverableRequestFailure, "Maximum retry attempts reached. Failed to execute OpenSearch search operation."
end
end
end

Expand Down Expand Up @@ -305,23 +321,44 @@ def run
end

def run_slice(slice_id=nil)
slice_query = @base_query
slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil?
result = client.search(@options.merge(:body => Yajl.dump(slice_query) ))
es = Fluent::MultiEventStream.new

result["hits"]["hits"].each {|hit| process_events(hit, es)}
has_hits = result['hits']['hits'].any?
scroll_id = result['_scroll_id']

while has_hits && scroll_id
result = process_next_scroll_request(es, scroll_id)
has_hits = result['has_hits']
retry_count = 0
max_retry = @infinite_check_connection ? Float::INFINITY : 3 # Adjust the maximum number of retries as needed
begin
slice_query = @base_query
slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil?
result = client.search(@options.merge(:body => Yajl.dump(slice_query) ))
es = Fluent::MultiEventStream.new

result["hits"]["hits"].each {|hit| process_events(hit, es)}
has_hits = result['hits']['hits'].any?
scroll_id = result['_scroll_id']

while has_hits && scroll_id
result = process_next_scroll_request(es, scroll_id)
has_hits = result['has_hits']
scroll_id = result['_scroll_id']
end

router.emit_stream(@tag, es)
clear_scroll(scroll_id)
rescue Faraday::ConnectionFailed => e
# Retry logic for connection failures during search
if retry_count < max_retry
log.warn "Connection to OpenSearch failed during search: #{e.message}. Retrying (Attempt #{retry_count + 1})..."
retry_count += 1
sleep(@request_timeout)
retry
else
raise UnrecoverableRequestFailure, "Maximum retry attempts reached. Failed to execute OpenSearch search operation."
end
end
end

router.emit_stream(@tag, es)
def clear_scroll(scroll_id)
client.clear_scroll(scroll_id: scroll_id) if scroll_id
rescue => e
# ignore & log any clear_scroll errors
log.warn("Ignoring clear_scroll exception", message: e.message, exception: e.class)
end

def process_scroll_request(scroll_id)
Expand Down

0 comments on commit f5e9415

Please sign in to comment.