From 961cde0170d7193f47be6e5dff794146bb336da9 Mon Sep 17 00:00:00 2001 From: Adrian Smith Date: Mon, 10 Jul 2023 23:28:57 -0700 Subject: [PATCH] Update pipeline to gauge, add labels (#106) --- Makefile | 6 +++++ docs/telemetry.md | 16 +++++------ internal/api/service/invariant.go | 2 +- internal/core/id.go | 4 +++ internal/engine/manager.go | 4 +-- internal/etl/pipeline/manager.go | 9 +++---- internal/metrics/factory.go | 4 +-- internal/metrics/metrics.go | 44 +++++++++++++------------------ internal/subsystem/manager.go | 9 ++++--- 9 files changed, 51 insertions(+), 47 deletions(-) diff --git a/Makefile b/Makefile index 38698185..e08d2c9a 100644 --- a/Makefile +++ b/Makefile @@ -59,3 +59,9 @@ docker-build: docker-run: @echo "$(GREEN) Running docker image...$(COLOR_END)" @docker run -p 8080:8080 -p 7300:7300 -e config.env $(APP_NAME) + +.PHONY: metrics-docs +metrics-docs: build-app + @echo "$(GREEN) Generating metric documentation...$(COLOR_END)" + @./bin/pessimism doc metrics + diff --git a/docs/telemetry.md b/docs/telemetry.md index b41a2be0..13909b80 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -13,11 +13,11 @@ To generate documentation for metrics, run `make docs` from the root of the repo which can be pasted directly below to keep current system metric documentation up to date. ## Current Metrics -| METRIC | DESCRIPTION | LABELS | TYPE | -|-------------------------------------------|--------------------------------------------------------|-----------|---------| -| pessimism_up | 1 if the service is up | | gauge | -| pessimism_invariants_active_invariants | Number of active invariants | | gauge | -| pessimism_etl_active_pipelines | Number of active pipelines | | gauge | -| pessimism_invariants_invariant_runs_total | Number of times a specific invariant has been run | invariant | counter | -| pessimism_alarms_generated_total | Number of total alarms generated for a given invariant | invariant | counter | -| pessimism_node_errors_total | Number of node errors caught | node | counter | \ No newline at end of file +| METRIC | DESCRIPTION | LABELS | TYPE | +|-------------------------------------------|--------------------------------------------------------|----------------------------------------|---------| +| pessimism_up | 1 if the service is up | | gauge | +| pessimism_invariants_active_invariants | Number of active invariants | network,invariant,pipeline | gauge | +| pessimism_etl_active_pipelines | Number of active pipelines | pipeline,network | gauge | +| pessimism_invariants_invariant_runs_total | Number of times a specific invariant has been run | network,invariant | counter | +| pessimism_alerts_generated_total | Number of total alerts generated for a given invariant | network,invariant,pipeline,destination | counter | +| pessimism_node_errors_total | Number of node errors caught | node | counter | diff --git a/internal/api/service/invariant.go b/internal/api/service/invariant.go index b524d247..802ba799 100644 --- a/internal/api/service/invariant.go +++ b/internal/api/service/invariant.go @@ -15,7 +15,7 @@ func (svc *PessimismService) ProcessInvariantRequest(ir *models.InvRequestBody) return core.NilSUUID(), nil } -// runInvariantSession ... Runs an invariant session provided +// RunInvariantSession ... Runs an invariant session provided func (svc *PessimismService) RunInvariantSession(params *models.InvRequestParams) (core.SUUID, error) { pConfig, err := svc.m.BuildPipelineCfg(params) if err != nil { diff --git a/internal/core/id.go b/internal/core/id.go index 880e2edc..2239dc9c 100644 --- a/internal/core/id.go +++ b/internal/core/id.go @@ -67,6 +67,10 @@ func (uuid PUUID) PipelineType() PipelineType { return PipelineType(uuid.PID[0]) } +func (uuid PUUID) NetworkType() Network { + return Network(uuid.PID[1]) +} + // InvSessionPID ... Invariant session Primary ID type InvSessionPID [3]byte diff --git a/internal/engine/manager.go b/internal/engine/manager.go index 37cfc53a..b00f3610 100644 --- a/internal/engine/manager.go +++ b/internal/engine/manager.go @@ -75,7 +75,6 @@ func (em *engineManager) Transit() chan core.InvariantInput { // DeleteInvariantSession ... Deletes an invariant session func (em *engineManager) DeleteInvariantSession(_ core.SUUID) (core.SUUID, error) { - em.metrics.DecActiveInvariants() return core.NilSUUID(), nil } @@ -167,7 +166,8 @@ func (em *engineManager) DeployInvariantSession(cfg *invariant.DeployConfig) (co } } - em.metrics.IncActiveInvariants() + em.metrics.IncActiveInvariants(cfg.InvType.String(), cfg.Network.String(), cfg.PUUID.PipelineType().String()) + return sUUID, nil } diff --git a/internal/etl/pipeline/manager.go b/internal/etl/pipeline/manager.go index f92c43d0..21d51398 100644 --- a/internal/etl/pipeline/manager.go +++ b/internal/etl/pipeline/manager.go @@ -116,11 +116,8 @@ func (em *etlManager) CreateDataPipeline(cfg *core.PipelineConfig) (core.PUUID, em.store.AddPipeline(pUUID, pipeline) - if len(components) == 1 { - return pUUID, false, nil - } - - em.metrics.IncActivePipelines() + // Pipeline successfully created, increment for type and network + em.metrics.IncActivePipelines(pUUID.PipelineType().String(), pUUID.NetworkType().String()) return pUUID, false, nil } @@ -163,7 +160,7 @@ func (em *etlManager) Shutdown() error { zap.String(core.PUUIDKey, pl.UUID().String())) return err } - em.metrics.DecActivePipelines() + em.metrics.DecActivePipelines(pl.UUID().PipelineType().String(), pl.UUID().NetworkType().String()) } logger.Debug("Waiting for all component routines to end") em.wg.Wait() diff --git a/internal/metrics/factory.go b/internal/metrics/factory.go index 1dfa94ad..869c7c56 100644 --- a/internal/metrics/factory.go +++ b/internal/metrics/factory.go @@ -125,7 +125,7 @@ func (d *documentor) Document() []DocumentedMetric { func fullName(ns, subsystem, name string) string { out := ns if subsystem != "" { - out += "." + subsystem + out += "_" + subsystem } - return out + "." + name + return out + "_" + name } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 0faa2427..03bc01fc 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -30,10 +30,9 @@ type Config struct { } type Metricer interface { - IncActiveInvariants() - DecActiveInvariants() - IncActivePipelines() - DecActivePipelines() + IncActiveInvariants(invType, network, pipelineType string) + IncActivePipelines(pipelineType, network string) + DecActivePipelines(pipelineType, network string) RecordInvariantRun(invariant invariant.Invariant) RecordAlertGenerated(alert core.Alert) RecordNodeError(node string) @@ -44,9 +43,9 @@ type Metricer interface { } type Metrics struct { - ActiveInvariants prometheus.Gauge - ActivePipelines prometheus.Gauge Up prometheus.Gauge + ActivePipelines *prometheus.GaugeVec + ActiveInvariants *prometheus.GaugeVec InvariantRuns *prometheus.CounterVec AlertsGenerated *prometheus.CounterVec NodeErrors *prometheus.CounterVec @@ -85,19 +84,19 @@ func New(ctx context.Context, cfg *Config) (Metricer, func(), error) { Name: "up", Help: "1 if the service is up", }), - ActiveInvariants: factory.NewGauge(prometheus.GaugeOpts{ + ActiveInvariants: factory.NewGaugeVec(prometheus.GaugeOpts{ Name: "active_invariants", Help: "Number of active invariants", Namespace: metricsNamespace, Subsystem: SubsystemInvariants, - }), + }, []string{"invariant", "network", "pipeline"}), - ActivePipelines: factory.NewGauge(prometheus.GaugeOpts{ + ActivePipelines: factory.NewGaugeVec(prometheus.GaugeOpts{ Name: "active_pipelines", Help: "Number of active pipelines", Namespace: metricsNamespace, Subsystem: SubsystemEtl, - }), + }, []string{"pipeline", "network"}), InvariantRuns: factory.NewCounterVec(prometheus.CounterOpts{ Name: "invariant_runs_total", @@ -143,23 +142,18 @@ func (m *Metrics) RecordUp() { } // IncActiveInvariants ... Increments the number of active invariants -func (m *Metrics) IncActiveInvariants() { - m.ActiveInvariants.Inc() -} - -// DecActiveInvariants ... Decrements the number of active invariants -func (m *Metrics) DecActiveInvariants() { - m.ActiveInvariants.Dec() +func (m *Metrics) IncActiveInvariants(invType, network, pipelineType string) { + m.ActiveInvariants.WithLabelValues(invType, network, pipelineType).Inc() } // IncActivePipelines ... Increments the number of active pipelines -func (m *Metrics) IncActivePipelines() { - m.ActivePipelines.Inc() +func (m *Metrics) IncActivePipelines(pipelineType, network string) { + m.ActivePipelines.WithLabelValues(pipelineType, network).Inc() } // DecActivePipelines ... Decrements the number of active pipelines -func (m *Metrics) DecActivePipelines() { - m.ActivePipelines.Dec() +func (m *Metrics) DecActivePipelines(pipelineType, network string) { + m.ActivePipelines.WithLabelValues(pipelineType, network).Dec() } // RecordInvariantRun ... Records that a given invariant has been run @@ -197,10 +191,10 @@ type noopMetricer struct{} var NoopMetrics Metricer = new(noopMetricer) -func (n *noopMetricer) IncActiveInvariants() {} -func (n *noopMetricer) DecActiveInvariants() {} -func (n *noopMetricer) IncActivePipelines() {} -func (n *noopMetricer) DecActivePipelines() {} +func (n *noopMetricer) IncActiveInvariants(_, _, _ string) {} +func (n *noopMetricer) DecActiveInvariants(_, _, _ string) {} +func (n *noopMetricer) IncActivePipelines(_, _ string) {} +func (n *noopMetricer) DecActivePipelines(_, _ string) {} func (n *noopMetricer) RecordInvariantRun(_ invariant.Invariant) {} func (n *noopMetricer) RecordAlertGenerated(_ core.Alert) {} func (n *noopMetricer) RecordNodeError(_ string) {} diff --git a/internal/subsystem/manager.go b/internal/subsystem/manager.go index ea9f6978..b47a39f8 100644 --- a/internal/subsystem/manager.go +++ b/internal/subsystem/manager.go @@ -15,6 +15,7 @@ import ( "github.com/base-org/pessimism/internal/engine/invariant" "github.com/base-org/pessimism/internal/etl/pipeline" "github.com/base-org/pessimism/internal/logging" + "github.com/base-org/pessimism/internal/metrics" "go.uber.org/zap" ) @@ -53,9 +54,10 @@ type manager struct { cfg *Config ctx context.Context - etl pipeline.Manager - eng engine.Manager - alrt alert.Manager + etl pipeline.Manager + eng engine.Manager + alrt alert.Manager + stats metrics.Metricer *sync.WaitGroup } @@ -70,6 +72,7 @@ func NewManager(ctx context.Context, cfg *Config, etl pipeline.Manager, eng engi etl: etl, eng: eng, alrt: alrt, + stats: metrics.WithContext(ctx), WaitGroup: &sync.WaitGroup{}, } }