Skip to content

Commit

Permalink
Merge pull request #147 from ganmacs/use-proc-to-prepare-for-using-ht…
Browse files Browse the repository at this point in the history
…tp-helper

Use mount_proc instead of mount
  • Loading branch information
ganmacs authored Apr 16, 2020
2 parents 31ca606 + ed3d277 commit b1421b5
Showing 1 changed file with 44 additions and 45 deletions.
89 changes: 44 additions & 45 deletions lib/fluent/plugin/in_prometheus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ class PrometheusInput < Fluent::Plugin::Input
config_param :extra_conf, :hash, default: {:SSLCertName => [['CN','nobody'],['DC','example']]}, symbolize_keys: true
end

attr_reader :registry

attr_reader :num_workers
attr_reader :base_port
attr_reader :metrics_path

def initialize
super
@registry = ::Prometheus::Client.registry
Expand Down Expand Up @@ -100,8 +94,22 @@ def start
end

@server = WEBrick::HTTPServer.new(config)
@server.mount(@metrics_path, MonitorServlet, self)
@server.mount(@aggregated_metrics_path, MonitorServletAll, self)
@server.mount_proc(@metrics_path) do |_req, res|
status, header, body = all_metrics
res.status = status
res['Content-Type'] = header['Content-Type']
res.body = body
res
end

@server.mount_proc(@aggregated_metrics_path) do |_req, res|
status, header, body = all_workers_metrics
res.status = status
res['Content-Type'] = header['Content-Type']
res.body = body
res
end

thread_create(:in_prometheus) do
@server.start
end
Expand All @@ -115,49 +123,40 @@ def shutdown
super
end

class MonitorServlet < WEBrick::HTTPServlet::AbstractServlet
def initialize(server, prometheus)
@prometheus = prometheus
end
def all_metrics
[200, { 'Content-Type' => ::Prometheus::Client::Formats::Text::CONTENT_TYPE }, ::Prometheus::Client::Formats::Text.marshal(@registry)]
rescue => e
[500, { 'Content-Type' => 'text/plain' }, e.to_s]
end

def all_workers_metrics
full_result = PromMetricsAggregator.new

def do_GET(req, res)
res.status = 200
res['Content-Type'] = ::Prometheus::Client::Formats::Text::CONTENT_TYPE
res.body = ::Prometheus::Client::Formats::Text.marshal(@prometheus.registry)
rescue
res.status = 500
res['Content-Type'] = 'text/plain'
res.body = $!.to_s
send_request_to_each_worker do |resp|
if resp.is_a?(Net::HTTPSuccess)
full_result.add_metrics(resp.body)
end
end

[200, { 'Content-Type' => ::Prometheus::Client::Formats::Text::CONTENT_TYPE }, full_result.get_metrics]
rescue => e
[500, { 'Content-Type' => 'text/plain' }, e.to_s]
end

class MonitorServletAll < WEBrick::HTTPServlet::AbstractServlet
def initialize(server, prometheus)
@prometheus = prometheus
def send_request_to_each_worker
bind = (@bind == '0.0.0.0') ? '127.0.0.1' : @bind
req = Net::HTTP::Get.new(@metrics_path)
[*(@base_port...(@base_port + @num_workers))].each do |worker_port|
do_request(host: bind, port: worker_port) do |http|
yield(http.request(req))
end
end
end

def do_GET(req, res)
res.status = 200
res['Content-Type'] = ::Prometheus::Client::Formats::Text::CONTENT_TYPE

full_result = PromMetricsAggregator.new
fluent_server_ip = @prometheus.bind == '0.0.0.0' ? '127.0.0.1' : @prometheus.bind
current_worker = 0
while current_worker < @prometheus.num_workers
Net::HTTP.start(fluent_server_ip, @prometheus.base_port + current_worker) do |http|
req = Net::HTTP::Get.new(@prometheus.metrics_path)
result = http.request(req)
if result.is_a?(Net::HTTPSuccess)
full_result.add_metrics(result.body)
end
end
current_worker += 1
end
res.body = full_result.get_metrics
rescue
res.status = 500
res['Content-Type'] = 'text/plain'
res.body = $!.to_s
def do_request(host:, port:)
http = Net::HTTP.new(host, port)
http.start do
yield(http)
end
end
end
Expand Down

0 comments on commit b1421b5

Please sign in to comment.