Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Update pipeline to gauge, add labels #106

Merged
merged 7 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,9 @@ docker-build:
docker-run:
@echo "$(GREEN) Running docker image...$(COLOR_END)"
@docker run -p 8080:8080 -p 7300:7300 -e config.env $(APP_NAME)

.PHONY: metrics-docs
metrics-docs: build-app
@echo "$(GREEN) Generating metric documentation...$(COLOR_END)"
@./bin/pessimism doc metrics

16 changes: 8 additions & 8 deletions docs/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ To generate documentation for metrics, run `make docs` from the root of the repo
which can be pasted directly below to keep current system metric documentation up to date.

## Current Metrics
| METRIC | DESCRIPTION | LABELS | TYPE |
|-------------------------------------------|--------------------------------------------------------|-----------|---------|
| pessimism_up | 1 if the service is up | | gauge |
| pessimism_invariants_active_invariants | Number of active invariants | | gauge |
| pessimism_etl_active_pipelines | Number of active pipelines | | gauge |
| pessimism_invariants_invariant_runs_total | Number of times a specific invariant has been run | invariant | counter |
| pessimism_alarms_generated_total | Number of total alarms generated for a given invariant | invariant | counter |
| pessimism_node_errors_total | Number of node errors caught | node | counter |
| METRIC | DESCRIPTION | LABELS | TYPE |
|-------------------------------------------|--------------------------------------------------------|----------------------------------------|---------|
| pessimism_up | 1 if the service is up | | gauge |
| pessimism_invariants_active_invariants | Number of active invariants | network,invariant,pipeline | gauge |
| pessimism_etl_active_pipelines | Number of active pipelines | pipeline,network | gauge |
| pessimism_invariants_invariant_runs_total | Number of times a specific invariant has been run | network,invariant | counter |
| pessimism_alerts_generated_total | Number of total alerts generated for a given invariant | network,invariant,pipeline,destination | counter |
| pessimism_node_errors_total | Number of node errors caught | node | counter |
2 changes: 1 addition & 1 deletion internal/api/service/invariant.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (svc *PessimismService) ProcessInvariantRequest(ir *models.InvRequestBody)
return core.NilSUUID(), nil
}

// runInvariantSession ... Runs an invariant session provided
// RunInvariantSession ... Runs an invariant session provided
func (svc *PessimismService) RunInvariantSession(params *models.InvRequestParams) (core.SUUID, error) {
pConfig, err := svc.m.BuildPipelineCfg(params)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/core/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (uuid PUUID) PipelineType() PipelineType {
return PipelineType(uuid.PID[0])
}

func (uuid PUUID) NetworkType() Network {
return Network(uuid.PID[1])
}

// InvSessionPID ... Invariant session Primary ID
type InvSessionPID [3]byte

Expand Down
4 changes: 2 additions & 2 deletions internal/engine/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func (em *engineManager) Transit() chan core.InvariantInput {

// DeleteInvariantSession ... Deletes an invariant session
func (em *engineManager) DeleteInvariantSession(_ core.SUUID) (core.SUUID, error) {
em.metrics.DecActiveInvariants()
return core.NilSUUID(), nil
}

Expand Down Expand Up @@ -167,7 +166,8 @@ func (em *engineManager) DeployInvariantSession(cfg *invariant.DeployConfig) (co
}
}

em.metrics.IncActiveInvariants()
em.metrics.IncActiveInvariants(cfg.InvType.String(), cfg.Network.String(), cfg.PUUID.PipelineType().String())

return sUUID, nil
}

Expand Down
9 changes: 3 additions & 6 deletions internal/etl/pipeline/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,8 @@ func (em *etlManager) CreateDataPipeline(cfg *core.PipelineConfig) (core.PUUID,

em.store.AddPipeline(pUUID, pipeline)

if len(components) == 1 {
return pUUID, false, nil
}

em.metrics.IncActivePipelines()
// Pipeline successfully created, increment for type and network
em.metrics.IncActivePipelines(pUUID.PipelineType().String(), pUUID.NetworkType().String())

return pUUID, false, nil
}
Expand Down Expand Up @@ -163,7 +160,7 @@ func (em *etlManager) Shutdown() error {
zap.String(core.PUUIDKey, pl.UUID().String()))
return err
}
em.metrics.DecActivePipelines()
em.metrics.DecActivePipelines(pl.UUID().PipelineType().String(), pl.UUID().NetworkType().String())
}
logger.Debug("Waiting for all component routines to end")
em.wg.Wait()
Expand Down
4 changes: 2 additions & 2 deletions internal/metrics/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (d *documentor) Document() []DocumentedMetric {
func fullName(ns, subsystem, name string) string {
out := ns
if subsystem != "" {
out += "." + subsystem
out += "_" + subsystem
}
return out + "." + name
return out + "_" + name
}
44 changes: 19 additions & 25 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ type Config struct {
}

type Metricer interface {
IncActiveInvariants()
DecActiveInvariants()
IncActivePipelines()
DecActivePipelines()
IncActiveInvariants(invType, network, pipelineType string)
IncActivePipelines(pipelineType, network string)
DecActivePipelines(pipelineType, network string)
RecordInvariantRun(invariant invariant.Invariant)
RecordAlertGenerated(alert core.Alert)
RecordNodeError(node string)
Expand All @@ -44,9 +43,9 @@ type Metricer interface {
}

type Metrics struct {
ActiveInvariants prometheus.Gauge
ActivePipelines prometheus.Gauge
Up prometheus.Gauge
ActivePipelines *prometheus.GaugeVec
ActiveInvariants *prometheus.GaugeVec
InvariantRuns *prometheus.CounterVec
AlertsGenerated *prometheus.CounterVec
NodeErrors *prometheus.CounterVec
Expand Down Expand Up @@ -85,19 +84,19 @@ func New(ctx context.Context, cfg *Config) (Metricer, func(), error) {
Name: "up",
Help: "1 if the service is up",
}),
ActiveInvariants: factory.NewGauge(prometheus.GaugeOpts{
ActiveInvariants: factory.NewGaugeVec(prometheus.GaugeOpts{
Name: "active_invariants",
Help: "Number of active invariants",
Namespace: metricsNamespace,
Subsystem: SubsystemInvariants,
}),
}, []string{"invariant", "network", "pipeline"}),

ActivePipelines: factory.NewGauge(prometheus.GaugeOpts{
ActivePipelines: factory.NewGaugeVec(prometheus.GaugeOpts{
Name: "active_pipelines",
Help: "Number of active pipelines",
Namespace: metricsNamespace,
Subsystem: SubsystemEtl,
}),
}, []string{"pipeline", "network"}),

InvariantRuns: factory.NewCounterVec(prometheus.CounterOpts{
Name: "invariant_runs_total",
Expand Down Expand Up @@ -143,23 +142,18 @@ func (m *Metrics) RecordUp() {
}

// IncActiveInvariants ... Increments the number of active invariants
func (m *Metrics) IncActiveInvariants() {
m.ActiveInvariants.Inc()
}

// DecActiveInvariants ... Decrements the number of active invariants
func (m *Metrics) DecActiveInvariants() {
m.ActiveInvariants.Dec()
func (m *Metrics) IncActiveInvariants(invType, network, pipelineType string) {
m.ActiveInvariants.WithLabelValues(invType, network, pipelineType).Inc()
}

// IncActivePipelines ... Increments the number of active pipelines
func (m *Metrics) IncActivePipelines() {
m.ActivePipelines.Inc()
func (m *Metrics) IncActivePipelines(pipelineType, network string) {
m.ActivePipelines.WithLabelValues(pipelineType, network).Inc()
}

// DecActivePipelines ... Decrements the number of active pipelines
func (m *Metrics) DecActivePipelines() {
m.ActivePipelines.Dec()
func (m *Metrics) DecActivePipelines(pipelineType, network string) {
m.ActivePipelines.WithLabelValues(pipelineType, network).Dec()
}

// RecordInvariantRun ... Records that a given invariant has been run
Expand Down Expand Up @@ -197,10 +191,10 @@ type noopMetricer struct{}

var NoopMetrics Metricer = new(noopMetricer)

func (n *noopMetricer) IncActiveInvariants() {}
func (n *noopMetricer) DecActiveInvariants() {}
func (n *noopMetricer) IncActivePipelines() {}
func (n *noopMetricer) DecActivePipelines() {}
func (n *noopMetricer) IncActiveInvariants(_, _, _ string) {}
func (n *noopMetricer) DecActiveInvariants(_, _, _ string) {}
func (n *noopMetricer) IncActivePipelines(_, _ string) {}
func (n *noopMetricer) DecActivePipelines(_, _ string) {}
func (n *noopMetricer) RecordInvariantRun(_ invariant.Invariant) {}
func (n *noopMetricer) RecordAlertGenerated(_ core.Alert) {}
func (n *noopMetricer) RecordNodeError(_ string) {}
Expand Down
9 changes: 6 additions & 3 deletions internal/subsystem/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/base-org/pessimism/internal/engine/invariant"
"github.com/base-org/pessimism/internal/etl/pipeline"
"github.com/base-org/pessimism/internal/logging"
"github.com/base-org/pessimism/internal/metrics"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -53,9 +54,10 @@ type manager struct {
cfg *Config
ctx context.Context

etl pipeline.Manager
eng engine.Manager
alrt alert.Manager
etl pipeline.Manager
eng engine.Manager
alrt alert.Manager
stats metrics.Metricer

*sync.WaitGroup
}
Expand All @@ -70,6 +72,7 @@ func NewManager(ctx context.Context, cfg *Config, etl pipeline.Manager, eng engi
etl: etl,
eng: eng,
alrt: alrt,
stats: metrics.WithContext(ctx),
WaitGroup: &sync.WaitGroup{},
}
}
Expand Down