Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSV connections metric implementation #6

Merged
merged 3 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions configs/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ type ExecutionMetrics struct {
Peers Metric `mapstructure:"peers"`
}

type SsvMetrics struct {
Peers Metric `mapstructure:"peers"`
type SSVMetrics struct {
Peers Metric `mapstructure:"peers"`
Connections Metric `mapstructure:"connections"`
}

type InfrastructureMetrics struct {
Expand All @@ -43,7 +44,7 @@ type Execution struct {

type SSV struct {
Address string `mapstructure:"address"`
Metrics SsvMetrics `mapstructure:"metrics"`
Metrics SSVMetrics `mapstructure:"metrics"`
}

type Infrastructure struct {
Expand Down Expand Up @@ -75,7 +76,7 @@ func (b Benchmark) Validate() (bool, error) {
}
}

if b.SSV.Metrics.Peers.Enabled {
if b.SSV.Metrics.Peers.Enabled || b.SSV.Metrics.Connections.Enabled {
if err := validateURL(b.SSV.Address); err != nil {
return false, errors.Join(err, errors.New("SSV client address was not a valid URL"))
}
Expand Down
12 changes: 7 additions & 5 deletions configs/config.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
benchmark:
duration: 15m
network: "mainnet"
network: mainnet
consensus:
address: ""
address:
metrics:
client:
enabled: true
Expand All @@ -14,16 +14,18 @@ benchmark:
enabled: true

execution:
address: ""
address:
metrics:
peers:
enabled: true

ssv:
address: ""
address:
metrics:
peers:
enabled: true
connections:
enabled: true

infrastructure:
metrics:
Expand All @@ -33,4 +35,4 @@ benchmark:
enabled: true

analyzer:
log-file-path: ""
log-file-path:
11 changes: 8 additions & 3 deletions internal/benchmark/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ const (
executionAddrFlag = "execution-addr"
executionMetricPeersFlag = "execution-metric-peers-enabled"

ssvAddrFlag = "ssv-addr"
ssvMetricPeersFlag = "ssv-metric-peers-enabled"
ssvAddrFlag = "ssv-addr"
ssvMetricPeersFlag = "ssv-metric-peers-enabled"
ssvMetricConnectionsFlag = "ssv-metric-connections-enabled"

infraMetricCPUFlag = "infra-metric-cpu-enabled"
infraMetricMemoryFlag = "infra-metric-memory-enabled"

networkFlag = "network"
defaultExecutionDuration = time.Second * 60 * 5
defaultExecutionDuration = time.Minute * 15
)

func init() {
Expand Down Expand Up @@ -78,6 +79,7 @@ func addFlags(cobraCMD *cobra.Command) {

cobraCMD.Flags().String(ssvAddrFlag, "", "SSV API address with scheme (HTTP/HTTPS) and port, e.g. http://ssv-node:16000")
cobraCMD.Flags().Bool(ssvMetricPeersFlag, true, "Enable SSV peers metric")
cobraCMD.Flags().Bool(ssvMetricConnectionsFlag, true, "Enable SSV connections metric")

cobraCMD.Flags().Bool(infraMetricCPUFlag, true, "Enable infrastructure CPU metric")
cobraCMD.Flags().Bool(infraMetricMemoryFlag, true, "Enable infrastructure memory metric")
Expand Down Expand Up @@ -119,6 +121,9 @@ func bindFlags(cmd *cobra.Command) error {
if err := viper.BindPFlag("benchmark.ssv.metrics.peers.enabled", cmd.Flags().Lookup(ssvMetricPeersFlag)); err != nil {
return err
}
if err := viper.BindPFlag("benchmark.ssv.metrics.connections.enabled", cmd.Flags().Lookup(ssvMetricConnectionsFlag)); err != nil {
return err
}
if err := viper.BindPFlag("benchmark.infrastructure.metrics.cpu.enabled", cmd.Flags().Lookup(infraMetricCPUFlag)); err != nil {
return err
}
Expand Down
11 changes: 11 additions & 0 deletions internal/benchmark/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ func LoadEnabledMetrics(config configs.Config) map[metric.Group][]metricService
}))
}

if config.Benchmark.SSV.Metrics.Connections.Enabled {
enabledMetrics[metric.SSVGroup] = append(enabledMetrics[metric.SSVGroup], ssv.NewConnectionsMetric(
configs.Values.Benchmark.SSV.Address,
"Connections",
time.Second*10,
[]metric.HealthCondition[uint32]{
{Name: ssv.InboundConnectionsMeasurement, Threshold: 0, Operator: metric.OperatorEqual, Severity: metric.SeverityHigh},
{Name: ssv.OutboundConnectionsMeasurement, Threshold: 0, Operator: metric.OperatorEqual, Severity: metric.SeverityHigh},
}))
}

if config.Benchmark.Infrastructure.Metrics.CPU.Enabled {
enabledMetrics[metric.InfrastructureGroup] = append(enabledMetrics[metric.InfrastructureGroup],
infrastructure.NewCPUMetric("CPU", time.Second*5, []metric.HealthCondition[float64]{}),
Expand Down
127 changes: 127 additions & 0 deletions internal/benchmark/metrics/ssv/connections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package ssv

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"time"

"github.com/ssvlabsinfra/ssv-pulse/internal/platform/logger"
"github.com/ssvlabsinfra/ssv-pulse/internal/platform/metric"
)

const (
InboundConnectionsMeasurement = "InboundConnections"
OutboundConnectionsMeasurement = "OutboundConnections"
)

type ConnectionsMetric struct {
metric.Base[uint32]
url string
interval time.Duration
}

func NewConnectionsMetric(url, name string, interval time.Duration, healthCondition []metric.HealthCondition[uint32]) *ConnectionsMetric {
return &ConnectionsMetric{
url: url,
Base: metric.Base[uint32]{
HealthConditions: healthCondition,
Name: name,
},
interval: interval,
}
}

func (p *ConnectionsMetric) Measure(ctx context.Context) {
ticker := time.NewTicker(p.interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
slog.With("metric_name", p.Name).Debug("metric was stopped")
return
case <-ticker.C:
p.measure(ctx)
}
}
}

func (p *ConnectionsMetric) measure(ctx context.Context) {
var (
resp struct {
Advanced struct {
Inbound uint32 `json:"inbound_conns"`
Outbound uint32 `json:"outbound_conns"`
} `json:"advanced"`
}
)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/v1/node/health", p.url), nil)
if err != nil {
logger.WriteError(metric.SSVGroup, p.Name, err)
return
}
res, err := http.DefaultClient.Do(req)
if err != nil {
p.AddDataPoint(map[string]uint32{
InboundConnectionsMeasurement: 0,
OutboundConnectionsMeasurement: 0,
})
logger.WriteError(metric.SSVGroup, p.Name, err)
return
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
p.AddDataPoint(map[string]uint32{
InboundConnectionsMeasurement: 0,
OutboundConnectionsMeasurement: 0,
})

var errorResponse any
_ = json.NewDecoder(res.Body).Decode(&errorResponse)
jsonErrResponse, _ := json.Marshal(errorResponse)
logger.WriteError(
metric.SSVGroup,
p.Name,
fmt.Errorf("received unsuccessful status code. Code: '%s'. Response: '%s'", res.Status, jsonErrResponse))
return
}

if err = json.NewDecoder(res.Body).Decode(&resp); err != nil {
p.AddDataPoint(map[string]uint32{
InboundConnectionsMeasurement: 0,
OutboundConnectionsMeasurement: 0,
})
logger.WriteError(metric.SSVGroup, p.Name, err)
return
}

p.AddDataPoint(map[string]uint32{
InboundConnectionsMeasurement: resp.Advanced.Inbound,
OutboundConnectionsMeasurement: resp.Advanced.Outbound,
})

logger.WriteMetric(metric.SSVGroup, p.Name, map[string]any{
InboundConnectionsMeasurement: resp.Advanced.Inbound,
OutboundConnectionsMeasurement: resp.Advanced.Outbound})
}

func (p *ConnectionsMetric) AggregateResults() string {
var measurements map[string][]uint32 = make(map[string][]uint32)

for _, point := range p.DataPoints {
measurements[InboundConnectionsMeasurement] = append(measurements[InboundConnectionsMeasurement], point.Values[InboundConnectionsMeasurement])
measurements[OutboundConnectionsMeasurement] = append(measurements[OutboundConnectionsMeasurement], point.Values[OutboundConnectionsMeasurement])
}

return fmt.Sprintf("inbound_min=%d, inbound_P50=%d, outbound_min=%d, outbound_P50=%d",
metric.CalculatePercentile(measurements[InboundConnectionsMeasurement], 0),
metric.CalculatePercentile(measurements[InboundConnectionsMeasurement], 50),
metric.CalculatePercentile(measurements[OutboundConnectionsMeasurement], 0),
metric.CalculatePercentile(measurements[OutboundConnectionsMeasurement], 50))
}