Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
Update pipeline to gauge, add labels (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
adrain-cb authored Jul 11, 2023
1 parent abe0895 commit 961cde0
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 47 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,9 @@ docker-build:
docker-run:
@echo "$(GREEN) Running docker image...$(COLOR_END)"
@docker run -p 8080:8080 -p 7300:7300 -e config.env $(APP_NAME)

.PHONY: metrics-docs
metrics-docs: build-app
@echo "$(GREEN) Generating metric documentation...$(COLOR_END)"
@./bin/pessimism doc metrics

16 changes: 8 additions & 8 deletions docs/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ To generate documentation for metrics, run `make docs` from the root of the repo
which can be pasted directly below to keep current system metric documentation up to date.

## Current Metrics
| METRIC | DESCRIPTION | LABELS | TYPE |
|-------------------------------------------|--------------------------------------------------------|-----------|---------|
| pessimism_up | 1 if the service is up | | gauge |
| pessimism_invariants_active_invariants | Number of active invariants | | gauge |
| pessimism_etl_active_pipelines | Number of active pipelines | | gauge |
| pessimism_invariants_invariant_runs_total | Number of times a specific invariant has been run | invariant | counter |
| pessimism_alarms_generated_total | Number of total alarms generated for a given invariant | invariant | counter |
| pessimism_node_errors_total | Number of node errors caught | node | counter |
| METRIC | DESCRIPTION | LABELS | TYPE |
|-------------------------------------------|--------------------------------------------------------|----------------------------------------|---------|
| pessimism_up | 1 if the service is up | | gauge |
| pessimism_invariants_active_invariants | Number of active invariants | network,invariant,pipeline | gauge |
| pessimism_etl_active_pipelines | Number of active pipelines | pipeline,network | gauge |
| pessimism_invariants_invariant_runs_total | Number of times a specific invariant has been run | network,invariant | counter |
| pessimism_alerts_generated_total | Number of total alerts generated for a given invariant | network,invariant,pipeline,destination | counter |
| pessimism_node_errors_total | Number of node errors caught | node | counter |
2 changes: 1 addition & 1 deletion internal/api/service/invariant.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (svc *PessimismService) ProcessInvariantRequest(ir *models.InvRequestBody)
return core.NilSUUID(), nil
}

// runInvariantSession ... Runs an invariant session provided
// RunInvariantSession ... Runs an invariant session provided
func (svc *PessimismService) RunInvariantSession(params *models.InvRequestParams) (core.SUUID, error) {
pConfig, err := svc.m.BuildPipelineCfg(params)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/core/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (uuid PUUID) PipelineType() PipelineType {
return PipelineType(uuid.PID[0])
}

func (uuid PUUID) NetworkType() Network {
return Network(uuid.PID[1])
}

// InvSessionPID ... Invariant session Primary ID
type InvSessionPID [3]byte

Expand Down
4 changes: 2 additions & 2 deletions internal/engine/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func (em *engineManager) Transit() chan core.InvariantInput {

// DeleteInvariantSession ... Deletes an invariant session
func (em *engineManager) DeleteInvariantSession(_ core.SUUID) (core.SUUID, error) {
em.metrics.DecActiveInvariants()
return core.NilSUUID(), nil
}

Expand Down Expand Up @@ -167,7 +166,8 @@ func (em *engineManager) DeployInvariantSession(cfg *invariant.DeployConfig) (co
}
}

em.metrics.IncActiveInvariants()
em.metrics.IncActiveInvariants(cfg.InvType.String(), cfg.Network.String(), cfg.PUUID.PipelineType().String())

return sUUID, nil
}

Expand Down
9 changes: 3 additions & 6 deletions internal/etl/pipeline/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,8 @@ func (em *etlManager) CreateDataPipeline(cfg *core.PipelineConfig) (core.PUUID,

em.store.AddPipeline(pUUID, pipeline)

if len(components) == 1 {
return pUUID, false, nil
}

em.metrics.IncActivePipelines()
// Pipeline successfully created, increment for type and network
em.metrics.IncActivePipelines(pUUID.PipelineType().String(), pUUID.NetworkType().String())

return pUUID, false, nil
}
Expand Down Expand Up @@ -163,7 +160,7 @@ func (em *etlManager) Shutdown() error {
zap.String(core.PUUIDKey, pl.UUID().String()))
return err
}
em.metrics.DecActivePipelines()
em.metrics.DecActivePipelines(pl.UUID().PipelineType().String(), pl.UUID().NetworkType().String())
}
logger.Debug("Waiting for all component routines to end")
em.wg.Wait()
Expand Down
4 changes: 2 additions & 2 deletions internal/metrics/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (d *documentor) Document() []DocumentedMetric {
func fullName(ns, subsystem, name string) string {
out := ns
if subsystem != "" {
out += "." + subsystem
out += "_" + subsystem
}
return out + "." + name
return out + "_" + name
}
44 changes: 19 additions & 25 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ type Config struct {
}

type Metricer interface {
IncActiveInvariants()
DecActiveInvariants()
IncActivePipelines()
DecActivePipelines()
IncActiveInvariants(invType, network, pipelineType string)
IncActivePipelines(pipelineType, network string)
DecActivePipelines(pipelineType, network string)
RecordInvariantRun(invariant invariant.Invariant)
RecordAlertGenerated(alert core.Alert)
RecordNodeError(node string)
Expand All @@ -44,9 +43,9 @@ type Metricer interface {
}

type Metrics struct {
ActiveInvariants prometheus.Gauge
ActivePipelines prometheus.Gauge
Up prometheus.Gauge
ActivePipelines *prometheus.GaugeVec
ActiveInvariants *prometheus.GaugeVec
InvariantRuns *prometheus.CounterVec
AlertsGenerated *prometheus.CounterVec
NodeErrors *prometheus.CounterVec
Expand Down Expand Up @@ -85,19 +84,19 @@ func New(ctx context.Context, cfg *Config) (Metricer, func(), error) {
Name: "up",
Help: "1 if the service is up",
}),
ActiveInvariants: factory.NewGauge(prometheus.GaugeOpts{
ActiveInvariants: factory.NewGaugeVec(prometheus.GaugeOpts{
Name: "active_invariants",
Help: "Number of active invariants",
Namespace: metricsNamespace,
Subsystem: SubsystemInvariants,
}),
}, []string{"invariant", "network", "pipeline"}),

ActivePipelines: factory.NewGauge(prometheus.GaugeOpts{
ActivePipelines: factory.NewGaugeVec(prometheus.GaugeOpts{
Name: "active_pipelines",
Help: "Number of active pipelines",
Namespace: metricsNamespace,
Subsystem: SubsystemEtl,
}),
}, []string{"pipeline", "network"}),

InvariantRuns: factory.NewCounterVec(prometheus.CounterOpts{
Name: "invariant_runs_total",
Expand Down Expand Up @@ -143,23 +142,18 @@ func (m *Metrics) RecordUp() {
}

// IncActiveInvariants ... Increments the number of active invariants
func (m *Metrics) IncActiveInvariants() {
m.ActiveInvariants.Inc()
}

// DecActiveInvariants ... Decrements the number of active invariants
func (m *Metrics) DecActiveInvariants() {
m.ActiveInvariants.Dec()
func (m *Metrics) IncActiveInvariants(invType, network, pipelineType string) {
m.ActiveInvariants.WithLabelValues(invType, network, pipelineType).Inc()
}

// IncActivePipelines ... Increments the number of active pipelines
func (m *Metrics) IncActivePipelines() {
m.ActivePipelines.Inc()
func (m *Metrics) IncActivePipelines(pipelineType, network string) {
m.ActivePipelines.WithLabelValues(pipelineType, network).Inc()
}

// DecActivePipelines ... Decrements the number of active pipelines
func (m *Metrics) DecActivePipelines() {
m.ActivePipelines.Dec()
func (m *Metrics) DecActivePipelines(pipelineType, network string) {
m.ActivePipelines.WithLabelValues(pipelineType, network).Dec()
}

// RecordInvariantRun ... Records that a given invariant has been run
Expand Down Expand Up @@ -197,10 +191,10 @@ type noopMetricer struct{}

var NoopMetrics Metricer = new(noopMetricer)

func (n *noopMetricer) IncActiveInvariants() {}
func (n *noopMetricer) DecActiveInvariants() {}
func (n *noopMetricer) IncActivePipelines() {}
func (n *noopMetricer) DecActivePipelines() {}
func (n *noopMetricer) IncActiveInvariants(_, _, _ string) {}
func (n *noopMetricer) DecActiveInvariants(_, _, _ string) {}
func (n *noopMetricer) IncActivePipelines(_, _ string) {}
func (n *noopMetricer) DecActivePipelines(_, _ string) {}
func (n *noopMetricer) RecordInvariantRun(_ invariant.Invariant) {}
func (n *noopMetricer) RecordAlertGenerated(_ core.Alert) {}
func (n *noopMetricer) RecordNodeError(_ string) {}
Expand Down
9 changes: 6 additions & 3 deletions internal/subsystem/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/base-org/pessimism/internal/engine/invariant"
"github.com/base-org/pessimism/internal/etl/pipeline"
"github.com/base-org/pessimism/internal/logging"
"github.com/base-org/pessimism/internal/metrics"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -53,9 +54,10 @@ type manager struct {
cfg *Config
ctx context.Context

etl pipeline.Manager
eng engine.Manager
alrt alert.Manager
etl pipeline.Manager
eng engine.Manager
alrt alert.Manager
stats metrics.Metricer

*sync.WaitGroup
}
Expand All @@ -70,6 +72,7 @@ func NewManager(ctx context.Context, cfg *Config, etl pipeline.Manager, eng engi
etl: etl,
eng: eng,
alrt: alrt,
stats: metrics.WithContext(ctx),
WaitGroup: &sync.WaitGroup{},
}
}
Expand Down

0 comments on commit 961cde0

Please sign in to comment.