Skip to content

Commit

Permalink
Merge pull request #1474 from openmeterio/expose-connection-pool-stats
Browse files Browse the repository at this point in the history
feat: expose connection pool stats
  • Loading branch information
turip authored Sep 5, 2024
2 parents 0a6868a + e56a6c0 commit 5ba5524
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 3 deletions.
1 change: 1 addition & 0 deletions cmd/balance-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func main() {
conf.Postgres.URL,
pgdriver.WithTracerProvider(otelTracerProvider),
pgdriver.WithMeterProvider(otelMeterProvider),
pgdriver.WithMetricMeter(metricMeter),
)
if err != nil {
logger.Error("failed to initialize postgres driver", "error", err)
Expand Down
1 change: 1 addition & 0 deletions cmd/notification-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func main() {
conf.Postgres.URL,
pgdriver.WithTracerProvider(otelTracerProvider),
pgdriver.WithMeterProvider(otelMeterProvider),
pgdriver.WithMetricMeter(metricMeter),
)
if err != nil {
logger.Error("failed to initialize postgres driver", "error", err)
Expand Down
1 change: 1 addition & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func main() {
conf.Postgres.URL,
pgdriver.WithTracerProvider(otelTracerProvider),
pgdriver.WithMeterProvider(otelMeterProvider),
pgdriver.WithMetricMeter(metricMeter),
)
if err != nil {
logger.Error("failed to initialize postgres driver", "error", err)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ require (
go.opentelemetry.io/otel v1.29.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0
go.opentelemetry.io/otel/exporters/prometheus v0.50.0
go.opentelemetry.io/otel/exporters/prometheus v0.51.0
go.opentelemetry.io/otel/metric v1.29.0
go.opentelemetry.io/otel/sdk v1.29.0
go.opentelemetry.io/otel/sdk/metric v1.29.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1372,8 +1372,8 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 h1:R3X6Z
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0/go.mod h1:QWFXnDavXWwMx2EEcZsf3yxgEKAqsxQ+Syjp+seyInw=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1 h1:cfuy3bXmLJS7M1RZmAL6SuhGtKUp2KEsrm00OlAXkq4=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1/go.mod h1:22jr92C6KwlwItJmQzfixzQM3oyyuYLCfHiMY+rpsPU=
go.opentelemetry.io/otel/exporters/prometheus v0.50.0 h1:2Ewsda6hejmbhGFyUvWZjUThC98Cf8Zy6g0zkIimOng=
go.opentelemetry.io/otel/exporters/prometheus v0.50.0/go.mod h1:pMm5PkUo5YwbLiuEf7t2xg4wbP0/eSJrMxIMxKosynY=
go.opentelemetry.io/otel/exporters/prometheus v0.51.0 h1:G7uexXb/K3T+T9fNLCCKncweEtNEBMTO+46hKX5EdKw=
go.opentelemetry.io/otel/exporters/prometheus v0.51.0/go.mod h1:v0mFe5Kk7woIh938mrZBJBmENYquyA0IICrlYm4Y0t4=
go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc=
go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8=
go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo=
Expand Down
15 changes: 15 additions & 0 deletions pkg/framework/pgdriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
"go.opentelemetry.io/otel/trace"

"github.com/openmeterio/openmeter/pkg/pgxpoolobserver"
)

type Option interface {
Expand All @@ -35,9 +37,16 @@ func WithMeterProvider(p metric.MeterProvider) Option {
})
}

func WithMetricMeter(m metric.Meter) Option {
return optionFunc(func(o *options) {
o.metricMeter = m
})
}

type options struct {
connConfig *pgxpool.Config
otelOptions []otelsql.Option
metricMeter metric.Meter
}

type Driver struct {
Expand Down Expand Up @@ -79,6 +88,12 @@ func NewPostgresDriver(ctx context.Context, url string, opts ...Option) (*Driver
return nil, fmt.Errorf("failed to create postgres pool: %w", err)
}

if o.metricMeter != nil {
if err := pgxpoolobserver.ObservePoolMetrics(o.metricMeter, pool); err != nil {
return nil, err
}
}

db := otelsql.OpenDB(pgxstdlib.GetPoolConnector(pool), o.otelOptions...)

// Set maximum idle connections to 0 as connections are managed from pgx.Pool.
Expand Down
158 changes: 158 additions & 0 deletions pkg/pgxpoolobserver/observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package pgxpoolobserver

import (
"context"

"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

// ObservePoolMetrics registers a callback that observes the metrics of the provided pgxpool.Pool.
// the implementation is based on https://github.com/cmackenzie1/pgxpool-prometheus
func ObservePoolMetrics(meter metric.Meter, pool *pgxpool.Pool, additionalAttributes ...attribute.KeyValue) error {
allMetrics := []metric.Observable{}

acquireCountMetric, err := meter.Int64ObservableCounter(
"pgxpool.acquire_count",
metric.WithDescription("The cumulative count of successful acquires from the pool."),
)
if err != nil {
return err
}
allMetrics = append(allMetrics, acquireCountMetric)

acquiredDurationMetric, err := meter.Int64ObservableGauge(
"pgxpool.acquire_duration",
metric.WithDescription("The total duration of all successful acquires from the pool in ms."),
metric.WithUnit("ms"),
)
if err != nil {
return err
}
allMetrics = append(allMetrics, acquiredDurationMetric)

avgAcquiredDurationMetric, err := meter.Int64ObservableGauge(
"pgxpool.acquire_duration_avg",
metric.WithDescription("The average duration of all successful acquires from the pool in ms."),
metric.WithUnit("ms"),
)
if err != nil {
return err
}
allMetrics = append(allMetrics, avgAcquiredDurationMetric)

acquiredConnsMetric, err := meter.Int64ObservableGauge(
"pgxpool.acquired_conns",
metric.WithDescription("The number of currently acquired connections in the pool."),
)
if err != nil {
return err
}
allMetrics = append(allMetrics, acquiredConnsMetric)

canceledAcquireCountMetric, err := meter.Int64ObservableCounter(
"pgxpool.canceled_acquire_count",
metric.WithDescription("The cumulative count of acquires from the pool that were canceled by a context."),
)
if err != nil {
return err
}
allMetrics = append(allMetrics, canceledAcquireCountMetric)

constructingConnsMetric, err := meter.Int64ObservableGauge(
"pgxpool.constructing_conns",
metric.WithDescription("The number of conns with construction in progress in the pool."),
)
if err != nil {
return err
}
allMetrics = append(allMetrics, constructingConnsMetric)

emptyAcquireCountMetric, err := meter.Int64ObservableCounter(
"pgxpool.empty_acquire_count",
metric.WithDescription("The cumulative count of successful acquires from the pool that waited for a resource to be released or constructed because the pool was empty."),
)
if err != nil {
return err
}
allMetrics = append(allMetrics, emptyAcquireCountMetric)

idleConnsMetric, err := meter.Int64ObservableGauge(
"pgxpool.idle_conns",
metric.WithDescription("The number of currently idle conns in the pool."),
)
if err != nil {
return err
}
allMetrics = append(allMetrics, idleConnsMetric)

maxConns, err := meter.Int64ObservableGauge(
"pgxpool.max_conns",
metric.WithDescription("The maximum size of the pool."),
)
if err != nil {
return err
}
allMetrics = append(allMetrics, maxConns)

totalConns, err := meter.Int64ObservableGauge(
"pgxpool.total_conns",
metric.WithDescription("The total number of resources currently in the pool. The value is the sum of ConstructingConns, AcquiredConns, and IdleConns."),
)
if err != nil {
return err
}
allMetrics = append(allMetrics, totalConns)

newConnsCount, err := meter.Int64ObservableCounter(
"pgxpool.new_conns_count",
metric.WithDescription("The cumulative count of new connections opened."),
)
if err != nil {
return err
}
allMetrics = append(allMetrics, newConnsCount)

maxLifetimeDestroyCount, err := meter.Int64ObservableCounter(
"pgxpool.max_lifetime_destroy_count",
metric.WithDescription("The cumulative count of connections closed due to reaching their maximum lifetime (MaxConnLifetime)."),
)
if err != nil {
return err
}
allMetrics = append(allMetrics, maxLifetimeDestroyCount)

maxIdleDestroyCount, err := meter.Int64ObservableCounter(
"pgxpool.max_idle_destroy_count",
metric.WithDescription("The cumulative count of connections closed due to reaching their maximum idle time (MaxConnIdleTime)."),
)
if err != nil {
return err
}
allMetrics = append(allMetrics, maxIdleDestroyCount)

_, err = meter.RegisterCallback(func(_ context.Context, o metric.Observer) error {
stat := pool.Stat()
o.ObserveInt64(acquireCountMetric, stat.AcquireCount(), metric.WithAttributes(additionalAttributes...))
o.ObserveInt64(acquiredDurationMetric, stat.AcquireDuration().Milliseconds(), metric.WithAttributes(additionalAttributes...))
o.ObserveInt64(avgAcquiredDurationMetric, stat.AcquireDuration().Milliseconds()/stat.AcquireCount(), metric.WithAttributes(additionalAttributes...))
o.ObserveInt64(acquiredConnsMetric, int64(stat.AcquiredConns()), metric.WithAttributes(additionalAttributes...))
o.ObserveInt64(canceledAcquireCountMetric, stat.CanceledAcquireCount(), metric.WithAttributes(additionalAttributes...))
o.ObserveInt64(constructingConnsMetric, int64(stat.ConstructingConns()), metric.WithAttributes(additionalAttributes...))
o.ObserveInt64(emptyAcquireCountMetric, stat.EmptyAcquireCount(), metric.WithAttributes(additionalAttributes...))
o.ObserveInt64(idleConnsMetric, int64(stat.IdleConns()), metric.WithAttributes(additionalAttributes...))
o.ObserveInt64(maxConns, int64(stat.MaxConns()), metric.WithAttributes(additionalAttributes...))
o.ObserveInt64(totalConns, int64(stat.TotalConns()), metric.WithAttributes(additionalAttributes...))
o.ObserveInt64(newConnsCount, stat.NewConnsCount(), metric.WithAttributes(additionalAttributes...))
o.ObserveInt64(maxLifetimeDestroyCount, stat.MaxLifetimeDestroyCount(), metric.WithAttributes(additionalAttributes...))
o.ObserveInt64(maxIdleDestroyCount, stat.MaxIdleDestroyCount(), metric.WithAttributes(additionalAttributes...))

return nil
}, allMetrics...)
if err != nil {
return err
}

return nil
}

0 comments on commit 5ba5524

Please sign in to comment.