diff --git a/lib/fluent/plugin/in_opensearch.rb b/lib/fluent/plugin/in_opensearch.rb index 7e33872..3a4cd83 100644 --- a/lib/fluent/plugin/in_opensearch.rb +++ b/lib/fluent/plugin/in_opensearch.rb @@ -179,7 +179,7 @@ def get_connection_options(con_host=nil) end { - hosts: hosts + hosts: get_reachable_hosts(hosts) } end @@ -240,56 +240,78 @@ def parse_time(value, event_time, tag) return Time.at(event_time).to_time end - def client(host = nil) - retry_count = 0 - max_retry = @infinite_check_connection ? Float::INFINITY : 3 # Adjust the maximum number of retries as needed - - begin - # check here to see if we already have a client connection for the given host - connection_options = get_connection_options(host) - - @_os = nil unless is_existing_connection(connection_options[:hosts]) - - @_os ||= begin - @current_config = connection_options[:hosts].clone - adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options } - local_reload_connections = @reload_connections - if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER - local_reload_connections = @reload_after + def get_reachable_hosts(hosts=nil) + reachable_hosts = [] + attempt = 0 + loop do + hosts.each do |host| + begin + if @infinite_check_connection == true + check_host = OpenSearch::Client.new( + host: ["#{host[:scheme]}://#{host[:host]}:#{host[:port]}"], + user: host[:user], + password: host[:password], + reload_connections: true, + resurrect_after: @resurrect_after, + reload_on_failure: @reload_on_failure, + transport_options: { ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } } + ) + response = check_host.ping #https://github.com/opensearch-project/opensearch-ruby/blob/136e1c975fc91b8cb80d7d1134e32c6dbefdb3eb/lib/opensearch/api/actions/ping.rb#L33 + if response == true + reachable_hosts << host + else + log.warn "Connection to #{host[:scheme]}://#{host[:host]}:#{host[:port]} failed with status code #{response.status}" + end + else + reachable_hosts << host + end + rescue => e + log.warn "Failed to connect to #{host[:scheme]}://#{host[:host]}:#{host[:port]}" end + end + break unless reachable_hosts.empty? + log.info "Attempt ##{attempt += 1} to get reachable hosts" + log.info "No reachable hosts found. Retrying in #{@request_timeout} seconds..." + sleep(@request_timeout) + end + reachable_hosts + end + + def client(host = nil) + # check here to see if we already have a client connection for the given host + connection_options = get_connection_options(host) - headers = { 'Content-Type' => "application/json" }.merge(@custom_headers) + @_os = nil unless is_existing_connection(connection_options[:hosts]) - transport = OpenSearch::Transport::Transport::HTTP::Faraday.new( - connection_options.merge( - options: { - reload_connections: local_reload_connections, - reload_on_failure: @reload_on_failure, - resurrect_after: @resurrect_after, - logger: @transport_logger, - transport_options: { - headers: headers, - request: { timeout: @request_timeout }, - ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } - }, - http: { - user: @user, - password: @password - }, - sniffer_class: @sniffer_class, - }), &adapter_conf) - OpenSearch::Client.new transport: transport - end - rescue Faraday::ConnectionFailed => e - # Retry logic for connection failures during client creation - if retry_count < max_retry - log.warn "Connection to OpenSearch failed during client creation: #{e.message}. Retrying (Attempt #{retry_count + 1})..." - retry_count += 1 - sleep(@request_timeout) - retry - else - raise UnrecoverableRequestFailure, "Maximum retry attempts reached. Failed to execute OpenSearch search operation." + @_os ||= begin + @current_config = connection_options[:hosts].clone + adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options } + local_reload_connections = @reload_connections + if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER + local_reload_connections = @reload_after end + + headers = { 'Content-Type' => "application/json" }.merge(@custom_headers) + + transport = OpenSearch::Transport::Transport::HTTP::Faraday.new( + connection_options.merge( + options: { + reload_connections: local_reload_connections, + reload_on_failure: @reload_on_failure, + resurrect_after: @resurrect_after, + logger: @transport_logger, + transport_options: { + headers: headers, + request: { timeout: @request_timeout }, + ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } + }, + http: { + user: @user, + password: @password + }, + sniffer_class: @sniffer_class, + }), &adapter_conf) + OpenSearch::Client.new transport: transport end end @@ -318,40 +340,29 @@ def run run_slice(slice_id) end end + rescue Faraday::ConnectionFailed => e + log.warn "Connection to OpenSearch failed during search in the 'run' method: #{e.message}. Retrying..." + retry end def run_slice(slice_id=nil) - retry_count = 0 - max_retry = @infinite_check_connection ? Float::INFINITY : 3 # Adjust the maximum number of retries as needed - begin - slice_query = @base_query - slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil? - result = client.search(@options.merge(:body => Yajl.dump(slice_query) )) - es = Fluent::MultiEventStream.new - - result["hits"]["hits"].each {|hit| process_events(hit, es)} - has_hits = result['hits']['hits'].any? + slice_query = @base_query + slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil? + result = client.search(@options.merge(:body => Yajl.dump(slice_query) )) + es = Fluent::MultiEventStream.new + + result["hits"]["hits"].each {|hit| process_events(hit, es)} + has_hits = result['hits']['hits'].any? + scroll_id = result['_scroll_id'] + + while has_hits && scroll_id + result = process_next_scroll_request(es, scroll_id) + has_hits = result['has_hits'] scroll_id = result['_scroll_id'] - - while has_hits && scroll_id - result = process_next_scroll_request(es, scroll_id) - has_hits = result['has_hits'] - scroll_id = result['_scroll_id'] - end - - router.emit_stream(@tag, es) - clear_scroll(scroll_id) - rescue Faraday::ConnectionFailed => e - # Retry logic for connection failures during search - if retry_count < max_retry - log.warn "Connection to OpenSearch failed during search: #{e.message}. Retrying (Attempt #{retry_count + 1})..." - retry_count += 1 - sleep(@request_timeout) - retry - else - raise UnrecoverableRequestFailure, "Maximum retry attempts reached. Failed to execute OpenSearch search operation." - end end + + router.emit_stream(@tag, es) + clear_scroll(scroll_id) end def clear_scroll(scroll_id) diff --git a/test/plugin/test_in_opensearch.rb b/test/plugin/test_in_opensearch.rb index 43be253..ce81351 100644 --- a/test/plugin/test_in_opensearch.rb +++ b/test/plugin/test_in_opensearch.rb @@ -39,6 +39,7 @@ class OpenSearchInputTest < Test::Unit::TestCase CONFIG = %[ tag raw.opensearch interval 2 + infinite_check_connection false ] def setup @@ -190,6 +191,7 @@ def test_configure user john password doe tag raw.opensearch + infinite_check_connection false } instance = driver(config).instance @@ -228,6 +230,7 @@ def test_single_host_params_and_defaults user john password doe tag raw.opensearch + infinite_check_connection false } instance = driver(config).instance @@ -249,6 +252,7 @@ def test_single_host_params_and_defaults_with_escape_placeholders user %{j+hn} password %{d@e} tag raw.opensearch + infinite_check_connection false } instance = driver(config).instance @@ -271,6 +275,7 @@ def test_legacy_hosts_list path /es/ port 123 tag raw.opensearch + infinite_check_connection false } instance = driver(config).instance @@ -295,6 +300,7 @@ def test_hosts_list user default_user password default_password tag raw.opensearch + infinite_check_connection false } instance = driver(config).instance @@ -323,6 +329,7 @@ def test_hosts_list_with_escape_placeholders user default_user password default_password tag raw.opensearch + infinite_check_connection false } instance = driver(config).instance