Skip to content

Commit

Permalink
Add elasticsearch output metrics (BonnierNews#4)
Browse files Browse the repository at this point in the history
* [ADD] documents and bulk_requests metrics

* [CHG] transfered to SeQura and fixed problems with linter
  • Loading branch information
mpucholblasco authored Jan 29, 2019
1 parent 34cdbaa commit 4b9c79d
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 40 deletions.
3 changes: 1 addition & 2 deletions .promu.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
go:
cgo: false
repository:
path: github.com/BonnierNews/logstash_exporter
path: github.com/sequra/logstash_exporter
build:
binaries:
- name: logstash_exporter
Expand All @@ -16,4 +16,3 @@ build:
tarball:
files:
- LICENSE
- NOTICE
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ ARG GODEP_VERSION=v0.5.0

RUN curl -fsSL -o /usr/local/bin/dep https://github.com/golang/dep/releases/download/${GODEP_VERSION}/dep-linux-amd64 && \
chmod +x /usr/local/bin/dep && \
go get -u github.com/mpucholblasco/logstash_exporter && \
cd $GOPATH/src/github.com/mpucholblasco/logstash_exporter && \
go get -u github.com/sequra/logstash_exporter && \
cd $GOPATH/src/github.com/sequra/logstash_exporter && \
dep ensure && \
make

FROM busybox:1.30.0-glibc
COPY --from=golang /go/src/github.com/mpucholblasco/logstash_exporter/logstash_exporter /
LABEL maintainer [email protected]
COPY --from=golang /go/src/github.com/sequra/logstash_exporter/logstash_exporter /
LABEL maintainer [email protected]
EXPOSE 9198
ENTRYPOINT ["/logstash_exporter"]
25 changes: 25 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ Prometheus exporter for the metrics available in Logstash since version 5.0.
## Usage

```bash
go get -u github.com/mpucholblasco/logstash_exporter
cd $GOPATH/src/github.com/mpucholblasco/logstash_exporter
go get -u github.com/sequra/logstash_exporter
cd $GOPATH/src/github.com/sequra/logstash_exporter
make
./logstash_exporter --web.listen-address=:1234 --logstash.endpoint="http://localhost:1235"
```
Expand Down Expand Up @@ -59,8 +59,15 @@ Flags:
* `logstash_node_pipeline_events_in_total` (counter)
* `logstash_node_pipeline_events_out_total` (counter)
* `logstash_node_pipeline_queue_push_duration_seconds_total` (counter)
* `logstash_node_plugin_bulk_requests_failures_total` (counter)
* `logstash_node_plugin_bulk_requests_successes_total` (counter)
* `logstash_node_plugin_bulk_requests_with_errors_total` (counter)
* `logstash_node_plugin_documents_failures_total` (counter)
* `logstash_node_plugin_documents_successes_total` (counter)
* `logstash_node_plugin_duration_seconds_total` (counter
* `logstash_node_plugin_queue_push_duration_seconds_total` (counter)
* `logstash_node_plugin_events_in_total` (counter)
* `logstash_node_plugin_events_out_total` (counter)
* `logstash_node_process_cpu_total_seconds_total` (counter)
* `logstash_node_process_max_filedescriptors` (gauge)
* `logstash_node_process_mem_total_virtual_bytes` (gauge)
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.3
0.1.4
11 changes: 10 additions & 1 deletion collector/nodestats_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,16 @@ type Pipeline struct {
Formats int `json:"formats,omitempty"`
} `json:"filters"`
Outputs []struct {
ID string `json:"id"`
ID string `json:"id"`
BulkRequests *struct {
Successes int `json:"successes"`
WithErrors int `json:"with_errors"`
Failures int `json:"failures"`
} `json:"bulk_requests,omitempty"`
Documents *struct {
Successes int `json:"successes"`
NonRetryableFailures int `json:"non_retryable_failures"`
} `json:"documents,omitempty"`
Events struct {
DurationInMillis int `json:"duration_in_millis"`
In int `json:"in"`
Expand Down
132 changes: 116 additions & 16 deletions collector/nodestats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ type NodeStatsCollector struct {
PipelinePluginEventsQueuePushDuration *prometheus.Desc
PipelinePluginEventsIn *prometheus.Desc
PipelinePluginEventsOut *prometheus.Desc
PipelinePluginDocumentsSuccesses *prometheus.Desc
PipelinePluginDocumentsFailures *prometheus.Desc
PipelinePluginBulkRequestsSuccesses *prometheus.Desc
PipelinePluginBulkRequestsWithErrors *prometheus.Desc
PipelinePluginBulkRequestsFailures *prometheus.Desc
PipelinePluginMatches *prometheus.Desc
PipelinePluginFailures *prometheus.Desc

Expand Down Expand Up @@ -250,6 +255,41 @@ func NewNodeStatsCollector(logstashEndpoint string) (Collector, error) {
nil,
),

PipelinePluginDocumentsSuccesses: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "plugin_documents_successes_total"),
"plugin_documents_successes",
[]string{"pipeline", "plugin", "plugin_id", "plugin_type"},
nil,
),

PipelinePluginDocumentsFailures: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "plugin_documents_failures_total"),
"plugin_documents_failures",
[]string{"pipeline", "plugin", "plugin_id", "plugin_type"},
nil,
),

PipelinePluginBulkRequestsSuccesses: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "plugin_bulk_requests_successes_total"),
"plugin_bulk_requests_successes",
[]string{"pipeline", "plugin", "plugin_id", "plugin_type"},
nil,
),

PipelinePluginBulkRequestsWithErrors: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "plugin_bulk_requests_with_errors_total"),
"plugin_bulk_requests_with_errors",
[]string{"pipeline", "plugin", "plugin_id", "plugin_type"},
nil,
),

PipelinePluginBulkRequestsFailures: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "plugin_bulk_requests_failures_total"),
"plugin_bulk_requests_failures",
[]string{"pipeline", "plugin", "plugin_id", "plugin_type"},
nil,
),

PipelinePluginMatches: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "plugin_matches_total"),
"plugin_matches",
Expand Down Expand Up @@ -310,19 +350,14 @@ func NewNodeStatsCollector(logstashEndpoint string) (Collector, error) {

// Collect function implements nodestats_collector collector
func (c *NodeStatsCollector) Collect(ch chan<- prometheus.Metric) error {
if desc, err := c.collect(ch); err != nil {
log.Error("Failed collecting node metrics", desc, err)
if err := c.collect(ch); err != nil {
log.Errorf("Failed collecting node metrics: %v", err)
return err
}
return nil
}

func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.Desc, error) {
stats, err := NodeStats(c.endpoint)
if err != nil {
return nil, err
}

func (c *NodeStatsCollector) collectJVM(stats NodeStatsResponse, ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
c.JvmThreadsCount,
prometheus.GaugeValue,
Expand Down Expand Up @@ -497,7 +532,9 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D
float64(stats.Jvm.Gc.Collectors.Young.CollectionCount),
"young",
)
}

func (c *NodeStatsCollector) collectProcess(stats NodeStatsResponse, ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
c.ProcessOpenFileDescriptors,
prometheus.GaugeValue,
Expand All @@ -521,14 +558,9 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D
prometheus.CounterValue,
float64(stats.Process.CPU.TotalInMillis/1000),
)
}

// For backwards compatibility with Logstash 5
pipelines := make(map[string]Pipeline)
if len(stats.Pipelines) == 0 {
pipelines["main"] = stats.Pipeline
} else {
pipelines = stats.Pipelines
}
func (c *NodeStatsCollector) collectPipelines(pipelines map[string]Pipeline, ch chan<- prometheus.Metric) {

for pipelineID, pipeline := range pipelines {
ch <- prometheus.MustNewConstMetric(
Expand Down Expand Up @@ -672,6 +704,55 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D
plugin.ID,
"output",
)
if plugin.Documents != nil {
ch <- prometheus.MustNewConstMetric(
c.PipelinePluginDocumentsSuccesses,
prometheus.CounterValue,
float64(plugin.Documents.Successes),
pipelineID,
plugin.Name,
plugin.ID,
"output",
)
ch <- prometheus.MustNewConstMetric(
c.PipelinePluginDocumentsFailures,
prometheus.CounterValue,
float64(plugin.Documents.NonRetryableFailures),
pipelineID,
plugin.Name,
plugin.ID,
"output",
)
}
if plugin.BulkRequests != nil {
ch <- prometheus.MustNewConstMetric(
c.PipelinePluginBulkRequestsSuccesses,
prometheus.CounterValue,
float64(plugin.BulkRequests.Successes),
pipelineID,
plugin.Name,
plugin.ID,
"output",
)
ch <- prometheus.MustNewConstMetric(
c.PipelinePluginBulkRequestsFailures,
prometheus.CounterValue,
float64(plugin.BulkRequests.Failures),
pipelineID,
plugin.Name,
plugin.ID,
"output",
)
ch <- prometheus.MustNewConstMetric(
c.PipelinePluginBulkRequestsWithErrors,
prometheus.CounterValue,
float64(plugin.BulkRequests.WithErrors),
pipelineID,
plugin.Name,
plugin.ID,
"output",
)
}
}

if pipeline.Queue.Type != "memory" {
Expand Down Expand Up @@ -720,6 +801,25 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D
)
}
}
}

func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) error {
stats, err := NodeStats(c.endpoint)
if err != nil {
return err
}

c.collectJVM(stats, ch)
c.collectProcess(stats, ch)

// For backwards compatibility with Logstash 5
pipelines := make(map[string]Pipeline)
if len(stats.Pipelines) == 0 {
pipelines["main"] = stats.Pipeline
} else {
pipelines = stats.Pipelines
}
c.collectPipelines(pipelines, ch)

return nil, nil
return nil
}
Loading

0 comments on commit 4b9c79d

Please sign in to comment.