Skip to content

Commit

Permalink
rewritten with the above recommendations in mind
Browse files Browse the repository at this point in the history
Signed-off-by: OlehPalanskyi <[email protected]>
  • Loading branch information
OlehPalanskyi committed Apr 24, 2024
1 parent 414f075 commit ba3cbda
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 54 deletions.
90 changes: 86 additions & 4 deletions README.OpenSearchInput.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,16 @@
+ [docinfo_fields](#docinfo_fields)
+ [docinfo_target](#docinfo_target)
+ [docinfo](#docinfo)
+ [infinite_check_connection](#infinite_check_connection)
+ [check_connection](#check_connection)
+ [retry_forever](#retry_forever)
+ [retry_timeout](#retry_timeout)
+ [retry_max_times](#retry_max_times)
+ [retry_type](#retry_type)
+ [retry_wait](#retry_wait)
+ [retry_exponential_backoff_base](#retry_exponential_backoff_base)
+ [retry_max_interval](#retry_max_interval)
+ [retry_randomize](#retry_randomize)

* [Advanced Usage](#advanced-usage)

## Usage
Expand Down Expand Up @@ -275,14 +284,87 @@ This parameter specifies whether docinfo information including or not. The defau
docinfo false
```

### infinite_check_connection
### check_connection

The parameter for checking on connection availability with Elasticsearch or Opensearch hosts. The default value is `true`.

```
check_connection true
```
### retry_forever

The parameter If true, plugin will ignore retry_timeout and retry_max_times options and retry forever. The default value is `true`.

```
retry_forever true
```

### retry_timeout

The parameter maximum time (seconds) to retry again the failed try, until the plugin discards the retry.
If the next retry is going to exceed this time limit, the last retry will be made at exactly this time limit..
The default value is `72h`.
72hours == 17 times with exponential backoff (not to change default behavior)

```
retry_timeout 72 * 60 * 60
```

### retry_max_times

The parameter maximum number of times to retry the failed try. The default value is `5`

```
retry_max_times 5
```

### retry_type

The parameter infinite checking on connection availability with Elasticsearch or opensearch hosts, every request_timeout (default 5) seconds. The default value is `true`.
The parameter needs for how long need to wait (time in seconds) to retry again:
`exponential_backoff`: wait in seconds will become large exponentially per failure,
`periodic`: plugin will retry periodically with fixed intervals (configured via retry_wait). The default value is `:exponential_backoff`
Periodic -> fixed :retry_wait
Exponential backoff: k is number of retry times
c: constant factor, @retry_wait
b: base factor, @retry_exponential_backoff_base
k: times
total retry time: c + c * b^1 + (...) + c*b^k = c*b^(k+1) - 1

```
infinite_check_connection true
retry_type exponential_backoff
```

### retry_wait

The parameter needs for wait in seconds before the next retry to again or constant factor of exponential backoff. The default value is `5`

```
retry_wait 5
```

### retry_exponential_backoff_base

The parameter The base number of exponential backoff for retries. The default value is `2`

```
retry_exponential_backoff_base 2
```

### retry_max_interval

The parameter maximum interval (seconds) for exponential backoff between retries while failing. The default value is `nil`

```
retry_max_interval nil
```

### retry_randomize

The parameter If true, the plugin will retry after randomized interval not to do burst retries. The default value is `false`

```
retry_randomize false
```

## Advanced Usage

Expand Down
101 changes: 58 additions & 43 deletions lib/fluent/plugin/in_opensearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
require 'faraday/excon'
require 'fluent/log-ext'
require 'fluent/plugin/input'
require 'fluent/plugin_helper'
require_relative 'opensearch_constants'

module Fluent::Plugin
Expand All @@ -39,7 +40,7 @@ class UnrecoverableRequestFailure < Fluent::UnrecoverableError; end
DEFAULT_STORAGE_TYPE = 'local'
METADATA = "@metadata".freeze

helpers :timer, :thread
helpers :timer, :thread, :retry_state

Fluent::Plugin.register_input('opensearch', self)

Expand Down Expand Up @@ -80,7 +81,23 @@ class UnrecoverableRequestFailure < Fluent::UnrecoverableError; end
config_param :docinfo_fields, :array, :default => ['_index', '_type', '_id']
config_param :docinfo_target, :string, :default => METADATA
config_param :docinfo, :bool, :default => false
config_param :infinite_check_connection, :bool, :default => true
config_param :check_connection, :bool, :default => true
config_param :retry_forever, :bool, default: true, desc: 'If true, plugin will ignore retry_timeout and retry_max_times options and retry forever.'
config_param :retry_timeout, :time, default: 72 * 60 * 60, desc: 'The maximum seconds to retry'
# 72hours == 17 times with exponential backoff (not to change default behavior)
config_param :retry_max_times, :integer, default: 5, desc: 'The maximum number of times to retry'
# exponential backoff sequence will be initialized at the time of this threshold
config_param :retry_type, :enum, list: [:exponential_backoff, :periodic], default: :exponential_backoff
### Periodic -> fixed :retry_wait
### Exponential backoff: k is number of retry times
# c: constant factor, @retry_wait
# b: base factor, @retry_exponential_backoff_base
# k: times
# total retry time: c + c * b^1 + (...) + c*b^k = c*b^(k+1) - 1
config_param :retry_wait, :time, default: 5, desc: 'Seconds to wait before next retry , or constant factor of exponential backoff.'
config_param :retry_exponential_backoff_base, :float, default: 2, desc: 'The base number of exponential backoff for retries.'
config_param :retry_max_interval, :time, default: nil, desc: 'The maximum interval seconds for exponential backoff between retries while failing.'
config_param :retry_randomize, :bool, default: false, desc: 'If true, output plugin will retry after randomized interval not to do burst retries.'

include Fluent::Plugin::OpenSearchConstants

Expand All @@ -93,6 +110,7 @@ def configure(conf)

@timestamp_parser = create_time_parser
@backend_options = backend_options
@retry = retry_state(@retry_randomize)

raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil?

Expand Down Expand Up @@ -139,6 +157,15 @@ def backend_options
raise Fluent::ConfigError, "You must install #{@http_backend} gem. Exception: #{ex}"
end

def retry_state(randomize)
retry_state_create(
:input_retries, @retry_type, @retry_wait, @retry_timeout,
forever: @retry_forever, max_steps: @retry_max_times,
max_interval: @retry_max_interval, backoff_base: @retry_exponential_backoff_base,
randomize: randomize
)
end

def get_escaped_userinfo(host_str)
if m = host_str.match(/(?<scheme>.*)%{(?<user>.*)}:%{(?<password>.*)}(?<path>@.*)/)
m["scheme"] +
Expand Down Expand Up @@ -177,12 +204,29 @@ def get_connection_options(con_host=nil)
host.merge!(user: @user, password: @password) if !host[:user] && @user
host.merge!(path: @path) if !host[:path] && @path
end

live_hosts = @check_connection ? hosts.select { |host| reachable_host?(host) } : hosts
{
hosts: get_reachable_hosts(hosts)
hosts: live_hosts
}
end

def reachable_host?(host)
client = OpenSearch::Client.new(
host: ["#{host[:scheme]}://#{host[:host]}:#{host[:port]}"],
user: host[:user],
password: host[:password],
reload_connections: @reload_connections,
request_timeout: @request_timeout,
resurrect_after: @resurrect_after,
reload_on_failure: @reload_on_failure,
transport_options: { ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } }
)
client.ping
rescue => e
log.warn "Failed to connect to #{host[:scheme]}://#{host[:host]}:#{host[:port]}: #{e.message}"
false
end

def emit_error_label_event(&block)
# If `emit_error_label_event` is specified as false, error event emittions are not occurred.
if emit_error_label_event
Expand Down Expand Up @@ -240,43 +284,6 @@ def parse_time(value, event_time, tag)
return Time.at(event_time).to_time
end

def get_reachable_hosts(hosts=nil)
reachable_hosts = []
attempt = 0
loop do
hosts.each do |host|
begin
if @infinite_check_connection == true
check_host = OpenSearch::Client.new(
host: ["#{host[:scheme]}://#{host[:host]}:#{host[:port]}"],
user: host[:user],
password: host[:password],
reload_connections: true,
resurrect_after: @resurrect_after,
reload_on_failure: @reload_on_failure,
transport_options: { ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } }
)
response = check_host.ping #https://github.com/opensearch-project/opensearch-ruby/blob/136e1c975fc91b8cb80d7d1134e32c6dbefdb3eb/lib/opensearch/api/actions/ping.rb#L33
if response == true
reachable_hosts << host
else
log.warn "Connection to #{host[:scheme]}://#{host[:host]}:#{host[:port]} failed with status code #{response.status}"
end
else
reachable_hosts << host
end
rescue => e
log.warn "Failed to connect to #{host[:scheme]}://#{host[:host]}:#{host[:port]}"
end
end
break unless reachable_hosts.empty?
log.info "Attempt ##{attempt += 1} to get reachable hosts"
log.info "No reachable hosts found. Retrying in #{@request_timeout} seconds..."
sleep(@request_timeout)
end
reachable_hosts
end

def client(host = nil)
# check here to see if we already have a client connection for the given host
connection_options = get_connection_options(host)
Expand Down Expand Up @@ -340,8 +347,13 @@ def run
run_slice(slice_id)
end
end
rescue Faraday::ConnectionFailed => e
log.warn "Connection to OpenSearch failed during search in the 'run' method: #{e.message}. Retrying..."
rescue Faraday::ConnectionFailed, OpenSearch::Transport::Transport::Error => e
@retry.step
#Raise error if the retry limit has been reached
raise "Hit limit for retries. retry_times: #{@retry.steps}, error: #{e.message}" if @retry.limit?
#Retry if the retry limit hasn't been reached
log.warn("failed to connect or search.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, error: e.message)
sleep(@retry.next_time - Time.now)
retry
end

Expand All @@ -363,6 +375,9 @@ def run_slice(slice_id=nil)

router.emit_stream(@tag, es)
clear_scroll(scroll_id)
#reset steps and next_time if our function successful ends
@retry.instance_variable_set(:@steps, 0)
@retry.instance_variable_set(:@next_time, nil)
end

def clear_scroll(scroll_id)
Expand Down
14 changes: 7 additions & 7 deletions test/plugin/test_in_opensearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class OpenSearchInputTest < Test::Unit::TestCase
CONFIG = %[
tag raw.opensearch
interval 2
infinite_check_connection false
check_connection false
]

def setup
Expand Down Expand Up @@ -191,7 +191,7 @@ def test_configure
user john
password doe
tag raw.opensearch
infinite_check_connection false
check_connection false
}
instance = driver(config).instance

Expand Down Expand Up @@ -230,7 +230,7 @@ def test_single_host_params_and_defaults
user john
password doe
tag raw.opensearch
infinite_check_connection false
check_connection false
}
instance = driver(config).instance

Expand All @@ -252,7 +252,7 @@ def test_single_host_params_and_defaults_with_escape_placeholders
user %{j+hn}
password %{d@e}
tag raw.opensearch
infinite_check_connection false
check_connection false
}
instance = driver(config).instance

Expand All @@ -275,7 +275,7 @@ def test_legacy_hosts_list
path /es/
port 123
tag raw.opensearch
infinite_check_connection false
check_connection false
}
instance = driver(config).instance

Expand All @@ -300,7 +300,7 @@ def test_hosts_list
user default_user
password default_password
tag raw.opensearch
infinite_check_connection false
check_connection false
}
instance = driver(config).instance

Expand Down Expand Up @@ -329,7 +329,7 @@ def test_hosts_list_with_escape_placeholders
user default_user
password default_password
tag raw.opensearch
infinite_check_connection false
check_connection false
}
instance = driver(config).instance

Expand Down

0 comments on commit ba3cbda

Please sign in to comment.