From 050dceb7ad21eac93ace253f294bc43fcdbda466 Mon Sep 17 00:00:00 2001 From: George Tsagkarelis Date: Thu, 1 Aug 2024 15:40:18 +0200 Subject: [PATCH 1/3] tapdb: add meta store and metrics queries --- tapcfg/server.go | 19 +++++- tapdb/asset_minting_test.go | 8 ++- tapdb/assets_store.go | 110 +++++++++++++++++++++++++++++++- tapdb/sqlc/assets.sql.go | 38 +++++++++++ tapdb/sqlc/metadata.sql.go | 33 ++++++++++ tapdb/sqlc/querier.go | 3 + tapdb/sqlc/queries/assets.sql | 9 +++ tapdb/sqlc/queries/metadata.sql | 6 ++ tapdb/sqlutils_test.go | 12 +++- tapgarden/custodian_test.go | 13 +++- 10 files changed, 244 insertions(+), 7 deletions(-) create mode 100644 tapdb/sqlc/metadata.sql.go create mode 100644 tapdb/sqlc/queries/metadata.sql diff --git a/tapcfg/server.go b/tapcfg/server.go index 302a59825..7a957cbf2 100644 --- a/tapcfg/server.go +++ b/tapcfg/server.go @@ -42,18 +42,25 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, lndServices *lndclient.LndServices, enableChannelFeatures bool, mainErrChan chan<- error) (*tap.Config, error) { - var err error + var ( + err error + db databaseBackend + dbType sqlc.BackendType + ) // Now that we know where the database will live, we'll go ahead and // open up the default implementation of it. - var db databaseBackend switch cfg.DatabaseBackend { case DatabaseBackendSqlite: + dbType = sqlc.BackendTypeSqlite + cfgLogger.Infof("Opening sqlite3 database at: %v", cfg.Sqlite.DatabaseFileName) db, err = tapdb.NewSqliteStore(cfg.Sqlite) case DatabaseBackendPostgres: + dbType = sqlc.BackendTypePostgres + cfgLogger.Infof("Opening postgres database at: %v", cfg.Postgres.DSN(true)) db, err = tapdb.NewPostgresStore(cfg.Postgres) @@ -85,6 +92,12 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, }, ) + metaDB := tapdb.NewTransactionExecutor( + db, func(tx *sql.Tx) tapdb.MetaStore { + return db.WithTx(tx) + }, + ) + addrBookDB := tapdb.NewTransactionExecutor( db, func(tx *sql.Tx) tapdb.AddrBook { return db.WithTx(tx) @@ -94,7 +107,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, tapdbAddrBook := tapdb.NewTapAddressBook( addrBookDB, &tapChainParams, defaultClock, ) - assetStore := tapdb.NewAssetStore(assetDB, defaultClock) + assetStore := tapdb.NewAssetStore(assetDB, metaDB, defaultClock, dbType) keyRing := tap.NewLndRpcKeyRing(lndServices) walletAnchor := tap.NewLndRpcWalletAnchor(lndServices) diff --git a/tapdb/asset_minting_test.go b/tapdb/asset_minting_test.go index dd2781bb8..2cb73abad 100644 --- a/tapdb/asset_minting_test.go +++ b/tapdb/asset_minting_test.go @@ -54,12 +54,18 @@ func newAssetStoreFromDB(db *BaseDB) (*AssetMintingStore, *AssetStore) { return db.WithTx(tx) } + metaTxCreator := func(tx *sql.Tx) MetaStore { + return db.WithTx(tx) + } + assetMintingDB := NewTransactionExecutor(db, txCreator) assetsDB := NewTransactionExecutor(db, activeTxCreator) + metaDB := NewTransactionExecutor(db, metaTxCreator) + testClock := clock.NewTestClock(time.Now()) return NewAssetMintingStore(assetMintingDB), - NewAssetStore(assetsDB, testClock) + NewAssetStore(assetsDB, metaDB, testClock, db.Backend()) } func assertBatchState(t *testing.T, batch *tapgarden.MintingBatch, diff --git a/tapdb/assets_store.go b/tapdb/assets_store.go index 8f962aeec..c5161c26e 100644 --- a/tapdb/assets_store.go +++ b/tapdb/assets_store.go @@ -43,6 +43,10 @@ type ( // script key. AssetProof = sqlc.FetchAssetProofsRow + // AssetProofSize is the asset proof size for a given asset, identified + // by its script key. + AssetProofSize = sqlc.FetchAssetProofsSizesRow + // AssetProofI is identical to AssetProof but is used for the case // where the proofs for a specific asset are fetched. AssetProofI = sqlc.FetchAssetProofRow @@ -195,6 +199,10 @@ type ActiveAssetsStore interface { // disk. FetchAssetProofs(ctx context.Context) ([]AssetProof, error) + // FetchAssetsProofsSizes fetches all the asset proofs lengths that are + // stored on disk. + FetchAssetProofsSizes(ctx context.Context) ([]AssetProofSize, error) + // FetchAssetProof fetches the asset proof for a given asset identified // by its script key. FetchAssetProof(ctx context.Context, @@ -339,6 +347,18 @@ type ActiveAssetsStore interface { assetID []byte) (sqlc.FetchAssetMetaForAssetRow, error) } +// MetaStore is a sub-set of the main sqlc.Querier interface that contains +// methods related to metadata of the daemon. +type MetaStore interface { + // AssetsDBSize returns the total size of the taproot assets sqlite + // database. + AssetsDBSizeSqlite(ctx context.Context) (int32, error) + + // AssetsDBSize returns the total size of the taproot assets postgres + // database. + AssetsDBSizePostgres(ctx context.Context) (int64, error) +} + // AssetBalance holds a balance query result for a particular asset or all // assets tracked by this daemon. type AssetBalance struct { @@ -378,10 +398,21 @@ type BatchedAssetStore interface { BatchedTx[ActiveAssetsStore] } +// BatchedMetaStore combines the MetaStore interface with the BatchedTx +// interface, allowing for multiple queries to be executed in a single SQL +// transaction. +type BatchedMetaStore interface { + MetaStore + + BatchedTx[MetaStore] +} + // AssetStore is used to query for the set of pending and confirmed assets. type AssetStore struct { db BatchedAssetStore + metaDb BatchedMetaStore + // eventDistributor is an event distributor that will be used to notify // subscribers about new proofs that are added to the archiver. eventDistributor *fn.EventDistributor[proof.Blob] @@ -389,18 +420,24 @@ type AssetStore struct { clock clock.Clock txHeights *lru.Cache[chainhash.Hash, cacheableBlockHeight] + + dbType sqlc.BackendType } // NewAssetStore creates a new AssetStore from the specified BatchedAssetStore // interface. -func NewAssetStore(db BatchedAssetStore, clock clock.Clock) *AssetStore { +func NewAssetStore(db BatchedAssetStore, metaDB BatchedMetaStore, + clock clock.Clock, dbType sqlc.BackendType) *AssetStore { + return &AssetStore{ db: db, + metaDb: metaDB, eventDistributor: fn.NewEventDistributor[proof.Blob](), clock: clock, txHeights: lru.NewCache[chainhash.Hash, cacheableBlockHeight]( 10_000, ), + dbType: dbType, } } @@ -1171,6 +1208,38 @@ func (a *AssetStore) FetchManagedUTXOs(ctx context.Context) ( return managedUtxos, nil } +// FetchAssetProofsSizes fetches the sizes of the proofs in the db. +func (a *AssetStore) FetchAssetProofsSizes( + ctx context.Context) ([]AssetProofSize, error) { + + var pSizes []AssetProofSize + + readOpts := NewAssetStoreReadTx() + dbErr := a.db.ExecTx(ctx, &readOpts, func(q ActiveAssetsStore) error { + proofSizes, err := q.FetchAssetProofsSizes(ctx) + if err != nil { + return err + } + + for _, v := range proofSizes { + pSizes = append( + pSizes, AssetProofSize{ + ScriptKey: v.ScriptKey, + ProofFileLength: v.ProofFileLength, + }, + ) + } + + return nil + }) + + if dbErr != nil { + return nil, dbErr + } + + return pSizes, nil +} + // FetchAssetProofs returns the latest proof file for either the set of target // assets, or all assets if no script keys for an asset are passed in. // @@ -3267,6 +3336,45 @@ func (a *AssetStore) FetchAssetMetaForAsset(ctx context.Context, return assetMeta, nil } +// AssetsDBSize returns the total size of the taproot assets database. +func (a *AssetStore) AssetsDBSize(ctx context.Context) (int64, error) { + var totalSize int64 + + readOpts := NewAssetStoreReadTx() + dbErr := a.metaDb.ExecTx(ctx, &readOpts, func(q MetaStore) error { + var ( + size int64 + err error + ) + switch a.dbType { + case sqlc.BackendTypePostgres: + size, err = q.AssetsDBSizePostgres(ctx) + + case sqlc.BackendTypeSqlite: + var res int32 + res, err = q.AssetsDBSizeSqlite(ctx) + size = int64(res) + + default: + return fmt.Errorf("unsupported db backend type") + } + + if err != nil { + return err + } + + totalSize = size + + return nil + }) + + if dbErr != nil { + return 0, dbErr + } + + return totalSize, nil +} + // FetchAssetMetaByHash attempts to fetch an asset meta based on an asset hash. func (a *AssetStore) FetchAssetMetaByHash(ctx context.Context, metaHash [asset.MetaHashLen]byte) (*proof.MetaReveal, error) { diff --git a/tapdb/sqlc/assets.sql.go b/tapdb/sqlc/assets.sql.go index e6967e482..4f57f0067 100644 --- a/tapdb/sqlc/assets.sql.go +++ b/tapdb/sqlc/assets.sql.go @@ -763,6 +763,44 @@ func (q *Queries) FetchAssetProofsByAssetID(ctx context.Context, assetID []byte) return items, nil } +const fetchAssetProofsSizes = `-- name: FetchAssetProofsSizes :many +SELECT script_keys.tweaked_script_key AS script_key, + LENGTH(asset_proofs.proof_file) AS proof_file_length +FROM asset_proofs +JOIN assets + ON asset_proofs.asset_id = assets.asset_id +JOIN script_keys + ON assets.script_key_id = script_keys.script_key_id +` + +type FetchAssetProofsSizesRow struct { + ScriptKey []byte + ProofFileLength float64 +} + +func (q *Queries) FetchAssetProofsSizes(ctx context.Context) ([]FetchAssetProofsSizesRow, error) { + rows, err := q.db.QueryContext(ctx, fetchAssetProofsSizes) + if err != nil { + return nil, err + } + defer rows.Close() + var items []FetchAssetProofsSizesRow + for rows.Next() { + var i FetchAssetProofsSizesRow + if err := rows.Scan(&i.ScriptKey, &i.ProofFileLength); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const fetchAssetWitnesses = `-- name: FetchAssetWitnesses :many SELECT assets.asset_id, prev_out_point, prev_asset_id, prev_script_key, diff --git a/tapdb/sqlc/metadata.sql.go b/tapdb/sqlc/metadata.sql.go new file mode 100644 index 000000000..3ba5abb27 --- /dev/null +++ b/tapdb/sqlc/metadata.sql.go @@ -0,0 +1,33 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.25.0 +// source: metadata.sql + +package sqlc + +import ( + "context" +) + +const assetsDBSizePostgres = `-- name: AssetsDBSizePostgres :one +SELECT pg_catalog.pg_database_size(current_database()) AS size +` + +func (q *Queries) AssetsDBSizePostgres(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, assetsDBSizePostgres) + var size int64 + err := row.Scan(&size) + return size, err +} + +const assetsDBSizeSqlite = `-- name: AssetsDBSizeSqlite :one +SELECT page_count * page_size AS size_in_bytes +FROM pragma_page_count(), pragma_page_size() +` + +func (q *Queries) AssetsDBSizeSqlite(ctx context.Context) (int32, error) { + row := q.db.QueryRowContext(ctx, assetsDBSizeSqlite) + var size_in_bytes int32 + err := row.Scan(&size_in_bytes) + return size_in_bytes, err +} diff --git a/tapdb/sqlc/querier.go b/tapdb/sqlc/querier.go index 2cb90e03f..93d8d4506 100644 --- a/tapdb/sqlc/querier.go +++ b/tapdb/sqlc/querier.go @@ -18,6 +18,8 @@ type Querier interface { AnchorPendingAssets(ctx context.Context, arg AnchorPendingAssetsParams) error ApplyPendingOutput(ctx context.Context, arg ApplyPendingOutputParams) (int64, error) AssetsByGenesisPoint(ctx context.Context, prevOut []byte) ([]AssetsByGenesisPointRow, error) + AssetsDBSizePostgres(ctx context.Context) (int64, error) + AssetsDBSizeSqlite(ctx context.Context) (int32, error) AssetsInBatch(ctx context.Context, rawKey []byte) ([]AssetsInBatchRow, error) BindMintingBatchWithTapSibling(ctx context.Context, arg BindMintingBatchWithTapSiblingParams) error BindMintingBatchWithTx(ctx context.Context, arg BindMintingBatchWithTxParams) error @@ -51,6 +53,7 @@ type Querier interface { FetchAssetProof(ctx context.Context, arg FetchAssetProofParams) ([]FetchAssetProofRow, error) FetchAssetProofs(ctx context.Context) ([]FetchAssetProofsRow, error) FetchAssetProofsByAssetID(ctx context.Context, assetID []byte) ([]FetchAssetProofsByAssetIDRow, error) + FetchAssetProofsSizes(ctx context.Context) ([]FetchAssetProofsSizesRow, error) FetchAssetWitnesses(ctx context.Context, assetID sql.NullInt64) ([]FetchAssetWitnessesRow, error) FetchAssetsByAnchorTx(ctx context.Context, anchorUtxoID sql.NullInt64) ([]Asset, error) // We use a LEFT JOIN here as not every asset has a group key, so this'll diff --git a/tapdb/sqlc/queries/assets.sql b/tapdb/sqlc/queries/assets.sql index d3625d8bd..e68634339 100644 --- a/tapdb/sqlc/queries/assets.sql +++ b/tapdb/sqlc/queries/assets.sql @@ -709,6 +709,15 @@ FROM asset_proofs JOIN asset_info ON asset_info.asset_id = asset_proofs.asset_id; +-- name: FetchAssetProofsSizes :many +SELECT script_keys.tweaked_script_key AS script_key, + LENGTH(asset_proofs.proof_file) AS proof_file_length +FROM asset_proofs +JOIN assets + ON asset_proofs.asset_id = assets.asset_id +JOIN script_keys + ON assets.script_key_id = script_keys.script_key_id; + -- name: FetchAssetProofsByAssetID :many WITH asset_info AS ( SELECT assets.asset_id, script_keys.tweaked_script_key diff --git a/tapdb/sqlc/queries/metadata.sql b/tapdb/sqlc/queries/metadata.sql new file mode 100644 index 000000000..ce383a79a --- /dev/null +++ b/tapdb/sqlc/queries/metadata.sql @@ -0,0 +1,6 @@ +-- name: AssetsDBSizePostgres :one +SELECT pg_catalog.pg_database_size(current_database()) AS size; + +-- name: AssetsDBSizeSqlite :one +SELECT page_count * page_size AS size_in_bytes +FROM pragma_page_count(), pragma_page_size(); \ No newline at end of file diff --git a/tapdb/sqlutils_test.go b/tapdb/sqlutils_test.go index 05b224c20..8a86b552d 100644 --- a/tapdb/sqlutils_test.go +++ b/tapdb/sqlutils_test.go @@ -274,7 +274,17 @@ func newDbHandleFromDb(db *BaseDB) *DbHandler { return db.WithTx(tx) }, ) - activeAssetsStore := NewAssetStore(assetsDB, testClock) + + // Gain a handle to the meta store. + metaDB := NewTransactionExecutor( + db, func(tx *sql.Tx) MetaStore { + return db.WithTx(tx) + }, + ) + + activeAssetsStore := NewAssetStore( + assetsDB, metaDB, testClock, db.Backend(), + ) return &DbHandler{ UniverseFederationStore: fedStore, diff --git a/tapgarden/custodian_test.go b/tapgarden/custodian_test.go index 6fa738937..3a44843ae 100644 --- a/tapgarden/custodian_test.go +++ b/tapgarden/custodian_test.go @@ -107,11 +107,22 @@ func newProofArchiveForDB(t *testing.T, db *tapdb.BaseDB) (*proof.MultiArchiver, return db.WithTx(tx) } + metaTxCreator := func(tx *sql.Tx) tapdb.MetaStore { + return db.WithTx(tx) + } + assetDB := tapdb.NewTransactionExecutor( db, txCreator, ) + + metaDB := tapdb.NewTransactionExecutor( + db, metaTxCreator, + ) + testClock := clock.NewTestClock(time.Now()) - assetStore := tapdb.NewAssetStore(assetDB, testClock) + assetStore := tapdb.NewAssetStore( + assetDB, metaDB, testClock, db.Backend(), + ) proofArchive := proof.NewMultiArchiver( proof.NewMockVerifier(t), tapdb.DefaultStoreTimeout, From dd9823ee364e132d8c4a9920ab4f3eb4f5f1c79c Mon Sep 17 00:00:00 2001 From: George Tsagkarelis Date: Thu, 1 Aug 2024 15:42:34 +0200 Subject: [PATCH 2/3] monitoring: add database collector --- monitoring/db_collector.go | 120 +++++++++++++++++++++++++++++++++++++ monitoring/prometheus.go | 6 ++ 2 files changed, 126 insertions(+) create mode 100644 monitoring/db_collector.go diff --git a/monitoring/db_collector.go b/monitoring/db_collector.go new file mode 100644 index 000000000..cb3826199 --- /dev/null +++ b/monitoring/db_collector.go @@ -0,0 +1,120 @@ +package monitoring + +import ( + "context" + "errors" + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + dbSizeMetric = "total_db_size" + + assetProofSizesHistogram = "asset_proofs_sizes" +) + +// dbCollector is a Prometheus collector that exports metrics related to the +// daemon's database. +type dbCollector struct { + collectMx sync.Mutex + + cfg *PrometheusConfig + registry *prometheus.Registry + + dbSize prometheus.Gauge + proofSizesHistogram prometheus.Histogram +} + +func newDbCollector(cfg *PrometheusConfig, + registry *prometheus.Registry) (*dbCollector, error) { + + if cfg == nil { + return nil, errors.New("db collector prometheus cfg is nil") + } + + if cfg.AssetStore == nil { + return nil, errors.New("db collector asset store is nil") + } + + return &dbCollector{ + cfg: cfg, + registry: registry, + dbSize: prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: dbSizeMetric, + Help: "Total size of db", + }, + ), + proofSizesHistogram: newProofSizesHistogram(), + }, nil +} + +// newProofSizesHistogram generates a fresh instance of the proof sizes +// histogram. +func newProofSizesHistogram() (h prometheus.Histogram) { + return prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: assetProofSizesHistogram, + Help: "Histogram of asset proof sizes", + Buckets: prometheus.ExponentialBuckets( + 2, 2, 32, + ), + }, + ) +} + +// Describe sends the super-set of all possible descriptors of metrics +// collected by this Collector to the provided channel and returns once the +// last descriptor has been sent. +// +// NOTE: Part of the prometheus.Collector interface. +func (a *dbCollector) Describe(ch chan<- *prometheus.Desc) { + a.collectMx.Lock() + defer a.collectMx.Unlock() + + a.dbSize.Describe(ch) + a.proofSizesHistogram.Describe(ch) +} + +// Collect is called by the Prometheus registry when collecting metrics. +// +// NOTE: Part of the prometheus.Collector interface. +func (a *dbCollector) Collect(ch chan<- prometheus.Metric) { + a.collectMx.Lock() + defer a.collectMx.Unlock() + + ctxdb, cancel := context.WithTimeout(context.Background(), dbTimeout) + defer cancel() + + // Fetch the db size. + dbSize, err := a.cfg.AssetStore.AssetsDBSize(ctxdb) + if err != nil { + log.Errorf("unable to fetch db size: %v", err) + return + } + + a.dbSize.Set(float64(dbSize)) + + // Fetch all proof sizes. + proofSizes, err := a.cfg.AssetStore.FetchAssetProofsSizes(ctxdb) + if err != nil { + log.Errorf("unable to fetch asset proofs: %v", err) + return + } + + // We use the histogram in a non-standard way. Everytime we collect data + // we ask the database to return all proof sizes and then we feed them + // to the histogram. That's why on every different pass we need to reset + // the histogram instance, in order to not duplicate data on every + // prometheus pass. + a.proofSizesHistogram = newProofSizesHistogram() + + // We'll feed the proof sizes to the histogram. + for _, p := range proofSizes { + a.proofSizesHistogram.Observe(p.ProofFileLength) + } + + a.proofSizesHistogram.Collect(ch) + a.dbSize.Collect(ch) +} diff --git a/monitoring/prometheus.go b/monitoring/prometheus.go index b352e65f7..afeb7c89b 100644 --- a/monitoring/prometheus.go +++ b/monitoring/prometheus.go @@ -74,6 +74,12 @@ func (p *PrometheusExporter) Start() error { } p.registry.MustRegister(gardenCollector) + dbCollector, err := newDbCollector(p.config, p.registry) + if err != nil { + return err + } + p.registry.MustRegister(dbCollector) + // Make ensure that all metrics exist when collecting and querying. serverMetrics.InitializeMetrics(p.config.RPCServer) From 5a0a75c1962824a697a207c8a2911e98add23c5d Mon Sep 17 00:00:00 2001 From: George Tsagkarelis Date: Tue, 6 Aug 2024 13:44:15 +0200 Subject: [PATCH 3/3] build: add monitoring tag to dev dockerfile --- dev.Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev.Dockerfile b/dev.Dockerfile index d5ea0f2af..452458b94 100644 --- a/dev.Dockerfile +++ b/dev.Dockerfile @@ -6,7 +6,7 @@ COPY . /app ENV CGO_ENABLED=0 -RUN make install +RUN make release-install TAGS=monitoring # FINAL IMAGE FROM alpine as final