Skip to content

Commit

Permalink
dded a service availability check function for high availability.
Browse files Browse the repository at this point in the history
I created a service availability check function for high availability. New function get_reachable_hosts in in_opensearch.rb
I used one library opensearch to solve this problem.

Signed-off-by: OlehPalanskyi <[email protected]>
  • Loading branch information
OlehPalanskyi committed Apr 15, 2024
1 parent f5e9415 commit eb1f916
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 76 deletions.
163 changes: 87 additions & 76 deletions lib/fluent/plugin/in_opensearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def get_connection_options(con_host=nil)
end

{
hosts: hosts
hosts: get_reachable_hosts(hosts)
}
end

Expand Down Expand Up @@ -240,56 +240,78 @@ def parse_time(value, event_time, tag)
return Time.at(event_time).to_time
end

def client(host = nil)
retry_count = 0
max_retry = @infinite_check_connection ? Float::INFINITY : 3 # Adjust the maximum number of retries as needed

begin
# check here to see if we already have a client connection for the given host
connection_options = get_connection_options(host)

@_os = nil unless is_existing_connection(connection_options[:hosts])

@_os ||= begin
@current_config = connection_options[:hosts].clone
adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
local_reload_connections = @reload_connections
if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
local_reload_connections = @reload_after
def get_reachable_hosts(hosts=nil)
reachable_hosts = []
attempt = 0
loop do
hosts.each do |host|
begin
if @infinite_check_connection == true
check_host = OpenSearch::Client.new(
host: ["#{host[:scheme]}://#{host[:host]}:#{host[:port]}"],
user: host[:user],
password: host[:password],
reload_connections: true,
resurrect_after: @resurrect_after,
reload_on_failure: @reload_on_failure,
transport_options: { ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } }
)
response = check_host.ping #https://github.com/opensearch-project/opensearch-ruby/blob/136e1c975fc91b8cb80d7d1134e32c6dbefdb3eb/lib/opensearch/api/actions/ping.rb#L33
if response == true
reachable_hosts << host
else
log.warn "Connection to #{host[:scheme]}://#{host[:host]}:#{host[:port]} failed with status code #{response.status}"
end
else
reachable_hosts << host
end
rescue => e
log.warn "Failed to connect to #{host[:scheme]}://#{host[:host]}:#{host[:port]}"
end
end
break unless reachable_hosts.empty?
log.info "Attempt ##{attempt += 1} to get reachable hosts"
log.info "No reachable hosts found. Retrying in #{@request_timeout} seconds..."
sleep(@request_timeout)
end
reachable_hosts
end

def client(host = nil)
# check here to see if we already have a client connection for the given host
connection_options = get_connection_options(host)

headers = { 'Content-Type' => "application/json" }.merge(@custom_headers)
@_os = nil unless is_existing_connection(connection_options[:hosts])

transport = OpenSearch::Transport::Transport::HTTP::Faraday.new(
connection_options.merge(
options: {
reload_connections: local_reload_connections,
reload_on_failure: @reload_on_failure,
resurrect_after: @resurrect_after,
logger: @transport_logger,
transport_options: {
headers: headers,
request: { timeout: @request_timeout },
ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
},
http: {
user: @user,
password: @password
},
sniffer_class: @sniffer_class,
}), &adapter_conf)
OpenSearch::Client.new transport: transport
end
rescue Faraday::ConnectionFailed => e
# Retry logic for connection failures during client creation
if retry_count < max_retry
log.warn "Connection to OpenSearch failed during client creation: #{e.message}. Retrying (Attempt #{retry_count + 1})..."
retry_count += 1
sleep(@request_timeout)
retry
else
raise UnrecoverableRequestFailure, "Maximum retry attempts reached. Failed to execute OpenSearch search operation."
@_os ||= begin
@current_config = connection_options[:hosts].clone
adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
local_reload_connections = @reload_connections
if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
local_reload_connections = @reload_after
end

headers = { 'Content-Type' => "application/json" }.merge(@custom_headers)

transport = OpenSearch::Transport::Transport::HTTP::Faraday.new(
connection_options.merge(
options: {
reload_connections: local_reload_connections,
reload_on_failure: @reload_on_failure,
resurrect_after: @resurrect_after,
logger: @transport_logger,
transport_options: {
headers: headers,
request: { timeout: @request_timeout },
ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
},
http: {
user: @user,
password: @password
},
sniffer_class: @sniffer_class,
}), &adapter_conf)
OpenSearch::Client.new transport: transport
end
end

Expand Down Expand Up @@ -318,40 +340,29 @@ def run
run_slice(slice_id)
end
end
rescue Faraday::ConnectionFailed => e
log.warn "Connection to OpenSearch failed during search in the 'run' method: #{e.message}. Retrying..."
retry
end

def run_slice(slice_id=nil)
retry_count = 0
max_retry = @infinite_check_connection ? Float::INFINITY : 3 # Adjust the maximum number of retries as needed
begin
slice_query = @base_query
slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil?
result = client.search(@options.merge(:body => Yajl.dump(slice_query) ))
es = Fluent::MultiEventStream.new

result["hits"]["hits"].each {|hit| process_events(hit, es)}
has_hits = result['hits']['hits'].any?
slice_query = @base_query
slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil?
result = client.search(@options.merge(:body => Yajl.dump(slice_query) ))
es = Fluent::MultiEventStream.new

result["hits"]["hits"].each {|hit| process_events(hit, es)}
has_hits = result['hits']['hits'].any?
scroll_id = result['_scroll_id']

while has_hits && scroll_id
result = process_next_scroll_request(es, scroll_id)
has_hits = result['has_hits']
scroll_id = result['_scroll_id']

while has_hits && scroll_id
result = process_next_scroll_request(es, scroll_id)
has_hits = result['has_hits']
scroll_id = result['_scroll_id']
end

router.emit_stream(@tag, es)
clear_scroll(scroll_id)
rescue Faraday::ConnectionFailed => e
# Retry logic for connection failures during search
if retry_count < max_retry
log.warn "Connection to OpenSearch failed during search: #{e.message}. Retrying (Attempt #{retry_count + 1})..."
retry_count += 1
sleep(@request_timeout)
retry
else
raise UnrecoverableRequestFailure, "Maximum retry attempts reached. Failed to execute OpenSearch search operation."
end
end

router.emit_stream(@tag, es)
clear_scroll(scroll_id)
end

def clear_scroll(scroll_id)
Expand Down
7 changes: 7 additions & 0 deletions test/plugin/test_in_opensearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class OpenSearchInputTest < Test::Unit::TestCase
CONFIG = %[
tag raw.opensearch
interval 2
infinite_check_connection false
]

def setup
Expand Down Expand Up @@ -190,6 +191,7 @@ def test_configure
user john
password doe
tag raw.opensearch
infinite_check_connection false
}
instance = driver(config).instance

Expand Down Expand Up @@ -228,6 +230,7 @@ def test_single_host_params_and_defaults
user john
password doe
tag raw.opensearch
infinite_check_connection false
}
instance = driver(config).instance

Expand All @@ -249,6 +252,7 @@ def test_single_host_params_and_defaults_with_escape_placeholders
user %{j+hn}
password %{d@e}
tag raw.opensearch
infinite_check_connection false
}
instance = driver(config).instance

Expand All @@ -271,6 +275,7 @@ def test_legacy_hosts_list
path /es/
port 123
tag raw.opensearch
infinite_check_connection false
}
instance = driver(config).instance

Expand All @@ -295,6 +300,7 @@ def test_hosts_list
user default_user
password default_password
tag raw.opensearch
infinite_check_connection false
}
instance = driver(config).instance

Expand Down Expand Up @@ -323,6 +329,7 @@ def test_hosts_list_with_escape_placeholders
user default_user
password default_password
tag raw.opensearch
infinite_check_connection false
}
instance = driver(config).instance

Expand Down

0 comments on commit eb1f916

Please sign in to comment.