diff --git a/lib/fluent/plugin/in_prometheus.rb b/lib/fluent/plugin/in_prometheus.rb index f2d64c4..806c3a9 100644 --- a/lib/fluent/plugin/in_prometheus.rb +++ b/lib/fluent/plugin/in_prometheus.rb @@ -32,12 +32,6 @@ class PrometheusInput < Fluent::Plugin::Input config_param :extra_conf, :hash, default: {:SSLCertName => [['CN','nobody'],['DC','example']]}, symbolize_keys: true end - attr_reader :registry - - attr_reader :num_workers - attr_reader :base_port - attr_reader :metrics_path - def initialize super @registry = ::Prometheus::Client.registry @@ -100,8 +94,22 @@ def start end @server = WEBrick::HTTPServer.new(config) - @server.mount(@metrics_path, MonitorServlet, self) - @server.mount(@aggregated_metrics_path, MonitorServletAll, self) + @server.mount_proc(@metrics_path) do |_req, res| + status, header, body = all_metrics + res.status = status + res['Content-Type'] = header['Content-Type'] + res.body = body + res + end + + @server.mount_proc(@aggregated_metrics_path) do |_req, res| + status, header, body = all_workers_metrics + res.status = status + res['Content-Type'] = header['Content-Type'] + res.body = body + res + end + thread_create(:in_prometheus) do @server.start end @@ -115,49 +123,40 @@ def shutdown super end - class MonitorServlet < WEBrick::HTTPServlet::AbstractServlet - def initialize(server, prometheus) - @prometheus = prometheus - end + def all_metrics + [200, { 'Content-Type' => ::Prometheus::Client::Formats::Text::CONTENT_TYPE }, ::Prometheus::Client::Formats::Text.marshal(@registry)] + rescue => e + [500, { 'Content-Type' => 'text/plain' }, e.to_s] + end + + def all_workers_metrics + full_result = PromMetricsAggregator.new - def do_GET(req, res) - res.status = 200 - res['Content-Type'] = ::Prometheus::Client::Formats::Text::CONTENT_TYPE - res.body = ::Prometheus::Client::Formats::Text.marshal(@prometheus.registry) - rescue - res.status = 500 - res['Content-Type'] = 'text/plain' - res.body = $!.to_s + send_request_to_each_worker do |resp| + if resp.is_a?(Net::HTTPSuccess) + full_result.add_metrics(resp.body) + end end + + [200, { 'Content-Type' => ::Prometheus::Client::Formats::Text::CONTENT_TYPE }, full_result.get_metrics] + rescue => e + [500, { 'Content-Type' => 'text/plain' }, e.to_s] end - class MonitorServletAll < WEBrick::HTTPServlet::AbstractServlet - def initialize(server, prometheus) - @prometheus = prometheus + def send_request_to_each_worker + bind = (@bind == '0.0.0.0') ? '127.0.0.1' : @bind + req = Net::HTTP::Get.new(@metrics_path) + [*(@base_port...(@base_port + @num_workers))].each do |worker_port| + do_request(host: bind, port: worker_port) do |http| + yield(http.request(req)) + end end + end - def do_GET(req, res) - res.status = 200 - res['Content-Type'] = ::Prometheus::Client::Formats::Text::CONTENT_TYPE - - full_result = PromMetricsAggregator.new - fluent_server_ip = @prometheus.bind == '0.0.0.0' ? '127.0.0.1' : @prometheus.bind - current_worker = 0 - while current_worker < @prometheus.num_workers - Net::HTTP.start(fluent_server_ip, @prometheus.base_port + current_worker) do |http| - req = Net::HTTP::Get.new(@prometheus.metrics_path) - result = http.request(req) - if result.is_a?(Net::HTTPSuccess) - full_result.add_metrics(result.body) - end - end - current_worker += 1 - end - res.body = full_result.get_metrics - rescue - res.status = 500 - res['Content-Type'] = 'text/plain' - res.body = $!.to_s + def do_request(host:, port:) + http = Net::HTTP.new(host, port) + http.start do + yield(http) end end end