Skip to content

Commit

Permalink
Merge pull request #1054 from GeorgeTsagk/prom-loadtest-metrics
Browse files Browse the repository at this point in the history
Add more prometheus metrics
  • Loading branch information
Roasbeef committed Aug 29, 2024
2 parents 30e7166 + 5a0a75c commit 0551a3f
Show file tree
Hide file tree
Showing 13 changed files with 371 additions and 8 deletions.
2 changes: 1 addition & 1 deletion dev.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ COPY . /app

ENV CGO_ENABLED=0

RUN make install
RUN make release-install TAGS=monitoring

# FINAL IMAGE
FROM alpine as final
Expand Down
120 changes: 120 additions & 0 deletions monitoring/db_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package monitoring

import (
"context"
"errors"
"sync"

"github.com/prometheus/client_golang/prometheus"
)

const (
dbSizeMetric = "total_db_size"

assetProofSizesHistogram = "asset_proofs_sizes"
)

// dbCollector is a Prometheus collector that exports metrics related to the
// daemon's database.
type dbCollector struct {
collectMx sync.Mutex

cfg *PrometheusConfig
registry *prometheus.Registry

dbSize prometheus.Gauge
proofSizesHistogram prometheus.Histogram
}

func newDbCollector(cfg *PrometheusConfig,
registry *prometheus.Registry) (*dbCollector, error) {

if cfg == nil {
return nil, errors.New("db collector prometheus cfg is nil")
}

if cfg.AssetStore == nil {
return nil, errors.New("db collector asset store is nil")
}

return &dbCollector{
cfg: cfg,
registry: registry,
dbSize: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: dbSizeMetric,
Help: "Total size of db",
},
),
proofSizesHistogram: newProofSizesHistogram(),
}, nil
}

// newProofSizesHistogram generates a fresh instance of the proof sizes
// histogram.
func newProofSizesHistogram() (h prometheus.Histogram) {
return prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: assetProofSizesHistogram,
Help: "Histogram of asset proof sizes",
Buckets: prometheus.ExponentialBuckets(
2, 2, 32,
),
},
)
}

// Describe sends the super-set of all possible descriptors of metrics
// collected by this Collector to the provided channel and returns once the
// last descriptor has been sent.
//
// NOTE: Part of the prometheus.Collector interface.
func (a *dbCollector) Describe(ch chan<- *prometheus.Desc) {
a.collectMx.Lock()
defer a.collectMx.Unlock()

a.dbSize.Describe(ch)
a.proofSizesHistogram.Describe(ch)
}

// Collect is called by the Prometheus registry when collecting metrics.
//
// NOTE: Part of the prometheus.Collector interface.
func (a *dbCollector) Collect(ch chan<- prometheus.Metric) {
a.collectMx.Lock()
defer a.collectMx.Unlock()

ctxdb, cancel := context.WithTimeout(context.Background(), dbTimeout)
defer cancel()

// Fetch the db size.
dbSize, err := a.cfg.AssetStore.AssetsDBSize(ctxdb)
if err != nil {
log.Errorf("unable to fetch db size: %v", err)
return
}

a.dbSize.Set(float64(dbSize))

// Fetch all proof sizes.
proofSizes, err := a.cfg.AssetStore.FetchAssetProofsSizes(ctxdb)
if err != nil {
log.Errorf("unable to fetch asset proofs: %v", err)
return
}

// We use the histogram in a non-standard way. Everytime we collect data
// we ask the database to return all proof sizes and then we feed them
// to the histogram. That's why on every different pass we need to reset
// the histogram instance, in order to not duplicate data on every
// prometheus pass.
a.proofSizesHistogram = newProofSizesHistogram()

// We'll feed the proof sizes to the histogram.
for _, p := range proofSizes {
a.proofSizesHistogram.Observe(p.ProofFileLength)
}

a.proofSizesHistogram.Collect(ch)
a.dbSize.Collect(ch)
}
6 changes: 6 additions & 0 deletions monitoring/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ func (p *PrometheusExporter) Start() error {
}
p.registry.MustRegister(gardenCollector)

dbCollector, err := newDbCollector(p.config, p.registry)
if err != nil {
return err
}
p.registry.MustRegister(dbCollector)

// Make ensure that all metrics exist when collecting and querying.
serverMetrics.InitializeMetrics(p.config.RPCServer)

Expand Down
19 changes: 16 additions & 3 deletions tapcfg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,25 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
lndServices *lndclient.LndServices, enableChannelFeatures bool,
mainErrChan chan<- error) (*tap.Config, error) {

var err error
var (
err error
db databaseBackend
dbType sqlc.BackendType
)

// Now that we know where the database will live, we'll go ahead and
// open up the default implementation of it.
var db databaseBackend
switch cfg.DatabaseBackend {
case DatabaseBackendSqlite:
dbType = sqlc.BackendTypeSqlite

cfgLogger.Infof("Opening sqlite3 database at: %v",
cfg.Sqlite.DatabaseFileName)
db, err = tapdb.NewSqliteStore(cfg.Sqlite)

case DatabaseBackendPostgres:
dbType = sqlc.BackendTypePostgres

cfgLogger.Infof("Opening postgres database at: %v",
cfg.Postgres.DSN(true))
db, err = tapdb.NewPostgresStore(cfg.Postgres)
Expand Down Expand Up @@ -85,6 +92,12 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
},
)

metaDB := tapdb.NewTransactionExecutor(
db, func(tx *sql.Tx) tapdb.MetaStore {
return db.WithTx(tx)
},
)

addrBookDB := tapdb.NewTransactionExecutor(
db, func(tx *sql.Tx) tapdb.AddrBook {
return db.WithTx(tx)
Expand All @@ -94,7 +107,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
tapdbAddrBook := tapdb.NewTapAddressBook(
addrBookDB, &tapChainParams, defaultClock,
)
assetStore := tapdb.NewAssetStore(assetDB, defaultClock)
assetStore := tapdb.NewAssetStore(assetDB, metaDB, defaultClock, dbType)

keyRing := tap.NewLndRpcKeyRing(lndServices)
walletAnchor := tap.NewLndRpcWalletAnchor(lndServices)
Expand Down
8 changes: 7 additions & 1 deletion tapdb/asset_minting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,18 @@ func newAssetStoreFromDB(db *BaseDB) (*AssetMintingStore, *AssetStore) {
return db.WithTx(tx)
}

metaTxCreator := func(tx *sql.Tx) MetaStore {
return db.WithTx(tx)
}

assetMintingDB := NewTransactionExecutor(db, txCreator)
assetsDB := NewTransactionExecutor(db, activeTxCreator)
metaDB := NewTransactionExecutor(db, metaTxCreator)

testClock := clock.NewTestClock(time.Now())

return NewAssetMintingStore(assetMintingDB),
NewAssetStore(assetsDB, testClock)
NewAssetStore(assetsDB, metaDB, testClock, db.Backend())
}

func assertBatchState(t *testing.T, batch *tapgarden.MintingBatch,
Expand Down
110 changes: 109 additions & 1 deletion tapdb/assets_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type (
// script key.
AssetProof = sqlc.FetchAssetProofsRow

// AssetProofSize is the asset proof size for a given asset, identified
// by its script key.
AssetProofSize = sqlc.FetchAssetProofsSizesRow

// AssetProofI is identical to AssetProof but is used for the case
// where the proofs for a specific asset are fetched.
AssetProofI = sqlc.FetchAssetProofRow
Expand Down Expand Up @@ -195,6 +199,10 @@ type ActiveAssetsStore interface {
// disk.
FetchAssetProofs(ctx context.Context) ([]AssetProof, error)

// FetchAssetsProofsSizes fetches all the asset proofs lengths that are
// stored on disk.
FetchAssetProofsSizes(ctx context.Context) ([]AssetProofSize, error)

// FetchAssetProof fetches the asset proof for a given asset identified
// by its script key.
FetchAssetProof(ctx context.Context,
Expand Down Expand Up @@ -339,6 +347,18 @@ type ActiveAssetsStore interface {
assetID []byte) (sqlc.FetchAssetMetaForAssetRow, error)
}

// MetaStore is a sub-set of the main sqlc.Querier interface that contains
// methods related to metadata of the daemon.
type MetaStore interface {
// AssetsDBSize returns the total size of the taproot assets sqlite
// database.
AssetsDBSizeSqlite(ctx context.Context) (int32, error)

// AssetsDBSize returns the total size of the taproot assets postgres
// database.
AssetsDBSizePostgres(ctx context.Context) (int64, error)
}

// AssetBalance holds a balance query result for a particular asset or all
// assets tracked by this daemon.
type AssetBalance struct {
Expand Down Expand Up @@ -378,29 +398,46 @@ type BatchedAssetStore interface {
BatchedTx[ActiveAssetsStore]
}

// BatchedMetaStore combines the MetaStore interface with the BatchedTx
// interface, allowing for multiple queries to be executed in a single SQL
// transaction.
type BatchedMetaStore interface {
MetaStore

BatchedTx[MetaStore]
}

// AssetStore is used to query for the set of pending and confirmed assets.
type AssetStore struct {
db BatchedAssetStore

metaDb BatchedMetaStore

// eventDistributor is an event distributor that will be used to notify
// subscribers about new proofs that are added to the archiver.
eventDistributor *fn.EventDistributor[proof.Blob]

clock clock.Clock

txHeights *lru.Cache[chainhash.Hash, cacheableBlockHeight]

dbType sqlc.BackendType
}

// NewAssetStore creates a new AssetStore from the specified BatchedAssetStore
// interface.
func NewAssetStore(db BatchedAssetStore, clock clock.Clock) *AssetStore {
func NewAssetStore(db BatchedAssetStore, metaDB BatchedMetaStore,
clock clock.Clock, dbType sqlc.BackendType) *AssetStore {

return &AssetStore{
db: db,
metaDb: metaDB,
eventDistributor: fn.NewEventDistributor[proof.Blob](),
clock: clock,
txHeights: lru.NewCache[chainhash.Hash, cacheableBlockHeight](
10_000,
),
dbType: dbType,
}
}

Expand Down Expand Up @@ -1171,6 +1208,38 @@ func (a *AssetStore) FetchManagedUTXOs(ctx context.Context) (
return managedUtxos, nil
}

// FetchAssetProofsSizes fetches the sizes of the proofs in the db.
func (a *AssetStore) FetchAssetProofsSizes(
ctx context.Context) ([]AssetProofSize, error) {

var pSizes []AssetProofSize

readOpts := NewAssetStoreReadTx()
dbErr := a.db.ExecTx(ctx, &readOpts, func(q ActiveAssetsStore) error {
proofSizes, err := q.FetchAssetProofsSizes(ctx)
if err != nil {
return err
}

for _, v := range proofSizes {
pSizes = append(
pSizes, AssetProofSize{
ScriptKey: v.ScriptKey,
ProofFileLength: v.ProofFileLength,
},
)
}

return nil
})

if dbErr != nil {
return nil, dbErr
}

return pSizes, nil
}

// FetchAssetProofs returns the latest proof file for either the set of target
// assets, or all assets if no script keys for an asset are passed in.
//
Expand Down Expand Up @@ -3280,6 +3349,45 @@ func (a *AssetStore) FetchAssetMetaForAsset(ctx context.Context,
return assetMeta, nil
}

// AssetsDBSize returns the total size of the taproot assets database.
func (a *AssetStore) AssetsDBSize(ctx context.Context) (int64, error) {
var totalSize int64

readOpts := NewAssetStoreReadTx()
dbErr := a.metaDb.ExecTx(ctx, &readOpts, func(q MetaStore) error {
var (
size int64
err error
)
switch a.dbType {
case sqlc.BackendTypePostgres:
size, err = q.AssetsDBSizePostgres(ctx)

case sqlc.BackendTypeSqlite:
var res int32
res, err = q.AssetsDBSizeSqlite(ctx)
size = int64(res)

default:
return fmt.Errorf("unsupported db backend type")
}

if err != nil {
return err
}

totalSize = size

return nil
})

if dbErr != nil {
return 0, dbErr
}

return totalSize, nil
}

// FetchAssetMetaByHash attempts to fetch an asset meta based on an asset hash.
func (a *AssetStore) FetchAssetMetaByHash(ctx context.Context,
metaHash [asset.MetaHashLen]byte) (*proof.MetaReveal, error) {
Expand Down
Loading

0 comments on commit 0551a3f

Please sign in to comment.