From 8e6ade9ffe95881256e8218bc84f76e5f708d6e8 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 28 Feb 2024 17:59:00 -0500 Subject: [PATCH] Change CRDB driver to use new method for getting transaction timestamp The existing call disables most optimizations on write transactions, but was necessary for legacy reasons. Following this change, any CRDB version 23 (or later) will use the newer, better call Also changes the stats system to use the CRDB stats instead of a custom table --- internal/datastore/crdb/crdb.go | 50 ++++--- internal/datastore/crdb/crdb_test.go | 8 +- .../zz_migration.0005_remove_stats_table.go | 25 ++++ internal/datastore/crdb/options.go | 17 ++- internal/datastore/crdb/stats.go | 133 +++++++++++------- internal/testserver/datastore/crdb.go | 2 +- pkg/cmd/datastore/datastore.go | 1 - pkg/datastore/test/datastore.go | 1 + pkg/datastore/test/stats.go | 2 +- pkg/datastore/test/tuples.go | 55 ++++++++ 10 files changed, 212 insertions(+), 82 deletions(-) create mode 100644 internal/datastore/crdb/migrations/zz_migration.0005_remove_stats_table.go diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 069f21f625..5f0a3b9e37 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -68,8 +68,10 @@ const ( errUnableToInstantiate = "unable to instantiate datastore" errRevision = "unable to find revision: %w" - querySelectNow = "SELECT cluster_logical_timestamp()" - queryShowZoneConfig = "SHOW ZONE CONFIGURATION FOR RANGE default;" + querySelectNow = "SELECT cluster_logical_timestamp()" + queryTransactionNowPreV23 = querySelectNow + queryTransactionNow = "SHOW COMMIT TIMESTAMP" + queryShowZoneConfig = "SHOW ZONE CONFIGURATION FOR RANGE default;" livingTupleConstraint = "pk_relation_tuple" ) @@ -122,6 +124,12 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas changefeedQuery = queryChangefeedPreV22 } + transactionNowQuery := queryTransactionNow + if version.Major < 23 { + log.Info().Object("version", version).Msg("using transaction now query for CRDB version < 23") + transactionNowQuery = queryTransactionNowPreV23 + } + clusterTTLNanos, err := readClusterTTLNanos(initCtx, initPool) if err != nil { return nil, fmt.Errorf("unable to read cluster gc window: %w", err) @@ -172,8 +180,9 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas watchBufferWriteTimeout: config.watchBufferWriteTimeout, writeOverlapKeyer: keyer, overlapKeyInit: keySetInit, - disableStats: config.disableStats, beginChangefeedQuery: changefeedQuery, + transactionNowQuery: transactionNowQuery, + analyzeBeforeStatistics: config.analyzeBeforeStatistics, } ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal) @@ -254,9 +263,10 @@ type crdbDatastore struct { watchBufferWriteTimeout time.Duration writeOverlapKeyer overlapKeyer overlapKeyInit func(ctx context.Context) keySet - disableStats bool + analyzeBeforeStatistics bool beginChangefeedQuery string + transactionNowQuery string featureGroup singleflight.Group[string, *datastore.Features] @@ -321,21 +331,11 @@ func (cds *crdbDatastore) ReadWriteTx( } } - if cds.disableStats { - var err error - commitTimestamp, err = readCRDBNow(ctx, querier) - if err != nil { - return fmt.Errorf("error getting commit timestamp: %w", err) - } - return nil - } - var err error - commitTimestamp, err = updateCounter(ctx, tx, rwt.relCountChange) + commitTimestamp, err = cds.readTransactionCommitRev(ctx, querier) if err != nil { - return fmt.Errorf("error updating relationship counter: %w", err) + return fmt.Errorf("error getting commit timestamp: %w", err) } - return nil }) if err != nil { @@ -371,7 +371,9 @@ func (cds *crdbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, return datastore.ReadyState{}, err } - if version != headMigration { + // TODO(jschorr): Remove the check for the older migration once we are confident + // that all users have migrated past it. + if version != headMigration && version != "add-caveats" { return datastore.ReadyState{ Message: fmt.Sprintf( "datastore is not migrated: currently at revision `%s`, but requires `%s`. Please run `spicedb migrate`.", @@ -467,6 +469,20 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er return &features, nil } +func (cds *crdbDatastore) readTransactionCommitRev(ctx context.Context, reader pgxcommon.DBFuncQuerier) (datastore.Revision, error) { + ctx, span := tracer.Start(ctx, "readTransactionCommitRev") + defer span.End() + + var hlcNow decimal.Decimal + if err := reader.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error { + return row.Scan(&hlcNow) + }, cds.transactionNowQuery); err != nil { + return datastore.NoRevision, fmt.Errorf("unable to read timestamp: %w", err) + } + + return revisions.NewForHLC(hlcNow) +} + func readCRDBNow(ctx context.Context, reader pgxcommon.DBFuncQuerier) (datastore.Revision, error) { ctx, span := tracer.Start(ctx, "readCRDBNow") defer span.End() diff --git a/internal/datastore/crdb/crdb_test.go b/internal/datastore/crdb/crdb_test.go index b5deccb3b0..6463f82001 100644 --- a/internal/datastore/crdb/crdb_test.go +++ b/internal/datastore/crdb/crdb_test.go @@ -53,6 +53,7 @@ func TestCRDBDatastore(t *testing.T) { RevisionQuantization(revisionQuantization), WatchBufferLength(watchBufferLength), OverlapStrategy(overlapStrategyPrefix), + DebugAnalyzeBeforeStatistics(), ) require.NoError(t, err) return ds @@ -84,6 +85,7 @@ func TestCRDBDatastoreWithFollowerReads(t *testing.T) { GCWindow(gcWindow), RevisionQuantization(quantization), FollowerReadDelay(followerReadDelay), + DebugAnalyzeBeforeStatistics(), ) require.NoError(err) return ds @@ -134,15 +136,19 @@ func TestWatchFeatureDetection(t *testing.T) { require.NoError(t, err) }, expectEnabled: false, - expectMessage: "Range feeds must be enabled in CockroachDB and the user must have permission to create them in order to enable the Watch API: ERROR: user unprivileged does not have CHANGEFEED privilege on relation relation_tuple (SQLSTATE 42501)", + expectMessage: "(SQLSTATE 42501)", }, { name: "rangefeeds enabled, user has permission", postInit: func(ctx context.Context, adminConn *pgx.Conn) { _, err = adminConn.Exec(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`) require.NoError(t, err) + _, err = adminConn.Exec(ctx, fmt.Sprintf(`GRANT CHANGEFEED ON TABLE testspicedb.%s TO unprivileged;`, tableTuple)) require.NoError(t, err) + + _, err = adminConn.Exec(ctx, fmt.Sprintf(`GRANT SELECT ON TABLE testspicedb.%s TO unprivileged;`, tableTuple)) + require.NoError(t, err) }, expectEnabled: true, }, diff --git a/internal/datastore/crdb/migrations/zz_migration.0005_remove_stats_table.go b/internal/datastore/crdb/migrations/zz_migration.0005_remove_stats_table.go new file mode 100644 index 0000000000..e1cd73ed8a --- /dev/null +++ b/internal/datastore/crdb/migrations/zz_migration.0005_remove_stats_table.go @@ -0,0 +1,25 @@ +package migrations + +import ( + "context" + + "github.com/jackc/pgx/v5" +) + +const ( + dropStatsTable = `DROP TABLE relationship_estimate_counters;` +) + +func init() { + err := CRDBMigrations.Register("remove-stats-table", "add-caveats", removeStatsTable, noAtomicMigration) + if err != nil { + panic("failed to register migration: " + err.Error()) + } +} + +func removeStatsTable(ctx context.Context, conn *pgx.Conn) error { + if _, err := conn.Exec(ctx, dropStatsTable); err != nil { + return err + } + return nil +} diff --git a/internal/datastore/crdb/options.go b/internal/datastore/crdb/options.go index 905861b27f..366b040733 100644 --- a/internal/datastore/crdb/options.go +++ b/internal/datastore/crdb/options.go @@ -20,8 +20,8 @@ type crdbOptions struct { maxRetries uint8 overlapStrategy string overlapKey string - disableStats bool enableConnectionBalancing bool + analyzeBeforeStatistics bool enablePrometheusStats bool } @@ -65,7 +65,6 @@ func generateConfig(options []Option) (crdbOptions, error) { maxRetries: defaultMaxRetries, overlapKey: defaultOverlapKey, overlapStrategy: defaultOverlapStrategy, - disableStats: false, enablePrometheusStats: defaultEnablePrometheusStats, enableConnectionBalancing: defaultEnableConnectionBalancing, connectRate: defaultConnectRate, @@ -283,11 +282,6 @@ func OverlapKey(key string) Option { return func(po *crdbOptions) { po.overlapKey = key } } -// DisableStats disables recording counts to the stats table -func DisableStats(disable bool) Option { - return func(po *crdbOptions) { po.disableStats = disable } -} - // WithEnablePrometheusStats marks whether Prometheus metrics provided by the Postgres // clients being used by the datastore are enabled. // @@ -303,3 +297,12 @@ func WithEnablePrometheusStats(enablePrometheusStats bool) Option { func WithEnableConnectionBalancing(connectionBalancing bool) Option { return func(po *crdbOptions) { po.enableConnectionBalancing = connectionBalancing } } + +// DebugAnalyzeBeforeStatistics signals to the Statistics method that it should +// run Analyze on the database before returning statistics. This should only be +// used for debug and testing. +// +// Disabled by default. +func DebugAnalyzeBeforeStatistics() Option { + return func(po *crdbOptions) { po.analyzeBeforeStatistics = true } +} diff --git a/internal/datastore/crdb/stats.go b/internal/datastore/crdb/stats.go index 954a64050b..5da5ed355e 100644 --- a/internal/datastore/crdb/stats.go +++ b/internal/datastore/crdb/stats.go @@ -3,39 +3,24 @@ package crdb import ( "context" "fmt" - "math/rand" - "time" + "slices" "github.com/Masterminds/squirrel" "github.com/jackc/pgx/v5" - "github.com/shopspring/decimal" + "github.com/rs/zerolog/log" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" - "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" ) const ( tableMetadata = "metadata" colUniqueID = "unique_id" - - tableCounters = "relationship_estimate_counters" - colID = "id" - colCount = "count" ) var ( - queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata) - queryRelationshipEstimate = fmt.Sprintf("SELECT COALESCE(SUM(%s), 0) FROM %s AS OF SYSTEM TIME follower_read_timestamp()", colCount, tableCounters) - - upsertCounterQuery = psql.Insert(tableCounters).Columns( - colID, - colCount, - ).Suffix(fmt.Sprintf("ON CONFLICT (%[1]s) DO UPDATE SET %[2]s = %[3]s.%[2]s + EXCLUDED.%[2]s RETURNING cluster_logical_timestamp()", colID, colCount, tableCounters)) - - rng = rand.NewSource(time.Now().UnixNano()) - - uniqueID string + queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata) + uniqueID string ) func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) { @@ -52,14 +37,6 @@ func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, erro } var nsDefs []datastore.RevisionedNamespace - var relCount int64 - - if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error { - return row.Scan(&relCount) - }, queryRelationshipEstimate); err != nil { - return datastore.Stats{}, fmt.Errorf("unable to read relationship count: %w", err) - } - if err := cds.readPool.BeginTxFunc(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error { _, err := tx.Exec(ctx, "SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()") if err != nil { @@ -76,37 +53,85 @@ func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, erro return datastore.Stats{}, err } - // NOTE: this is a stop-gap solution to prevent panics in telemetry collection - if relCount < 0 { - relCount = 0 - } - - return datastore.Stats{ - UniqueID: uniqueID, - EstimatedRelationshipCount: uint64(relCount), - ObjectTypeStatistics: datastore.ComputeObjectTypeStats(nsDefs), - }, nil -} + if cds.analyzeBeforeStatistics { + if err := cds.readPool.BeginTxFunc(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error { + if _, err := tx.Exec(ctx, "ANALYZE "+tableTuple); err != nil { + return fmt.Errorf("unable to analyze tuple table: %w", err) + } -func updateCounter(ctx context.Context, tx pgx.Tx, change int64) (datastore.Revision, error) { - counterID := make([]byte, 2) - // nolint:gosec - // G404 use of non cryptographically secure random number generator is not concern here, - // as this is only used to randomly distributed the counters across multiple rows and reduce write row contention - _, err := rand.New(rng).Read(counterID) - if err != nil { - return datastore.NoRevision, fmt.Errorf("unable to select random counter: %w", err) + return nil + }); err != nil { + return datastore.Stats{}, err + } } - sql, args, err := upsertCounterQuery.Values(counterID, change).ToSql() - if err != nil { - return datastore.NoRevision, fmt.Errorf("unable to prepare upsert counter sql: %w", err) - } + var estimatedRelCount uint64 + if err := cds.readPool.QueryFunc(ctx, func(ctx context.Context, rows pgx.Rows) error { + hasRows := false + + for rows.Next() { + hasRows = true + values, err := rows.Values() + if err != nil { + log.Warn().Err(err).Msg("unable to read statistics") + return nil + } + + // Find the row whose column_names contains the expected columns for the + // full relationship. + isFullRelationshipRow := false + for index, fd := range rows.FieldDescriptions() { + if fd.Name != "column_names" { + continue + } + + columnNames, ok := values[index].([]any) + if !ok { + log.Warn().Msg("unable to read column names") + return nil + } + + if slices.Contains(columnNames, "namespace") && + slices.Contains(columnNames, "object_id") && + slices.Contains(columnNames, "relation") && + slices.Contains(columnNames, "userset_namespace") && + slices.Contains(columnNames, "userset_object_id") && + slices.Contains(columnNames, "userset_relation") { + isFullRelationshipRow = true + break + } + } + + if !isFullRelationshipRow { + continue + } + + // Read the estimated relationship count. + for index, fd := range rows.FieldDescriptions() { + if fd.Name != "row_count" { + continue + } + + rowCount, ok := values[index].(int64) + if !ok { + log.Warn().Msg("unable to read row count") + return nil + } + + estimatedRelCount = uint64(rowCount) + return nil + } + } - var timestamp decimal.Decimal - if err := tx.QueryRow(ctx, sql, args...).Scan(×tamp); err != nil { - return datastore.NoRevision, fmt.Errorf("unable to executed upsert counter query: %w", err) + log.Warn().Bool("has-rows", hasRows).Msg("unable to find row count in statistics query result") + return nil + }, "SHOW STATISTICS FOR TABLE relation_tuple;"); err != nil { + return datastore.Stats{}, fmt.Errorf("unable to query unique estimated row count: %w", err) } - return revisions.NewForHLC(timestamp) + return datastore.Stats{ + UniqueID: uniqueID, + EstimatedRelationshipCount: estimatedRelCount, + ObjectTypeStatistics: datastore.ComputeObjectTypeStats(nsDefs), + }, nil } diff --git a/internal/testserver/datastore/crdb.go b/internal/testserver/datastore/crdb.go index 4d7d3acdd6..be61279a18 100644 --- a/internal/testserver/datastore/crdb.go +++ b/internal/testserver/datastore/crdb.go @@ -20,7 +20,7 @@ import ( ) const ( - CRDBTestVersionTag = "v22.2.0" + CRDBTestVersionTag = "v23.1.16" enableRangefeeds = `SET CLUSTER SETTING kv.rangefeed.enabled = true;` ) diff --git a/pkg/cmd/datastore/datastore.go b/pkg/cmd/datastore/datastore.go index 31b6a55db8..2fd3eaa7c7 100644 --- a/pkg/cmd/datastore/datastore.go +++ b/pkg/cmd/datastore/datastore.go @@ -382,7 +382,6 @@ func newCRDBDatastore(ctx context.Context, opts Config) (datastore.Datastore, er crdb.OverlapStrategy(opts.OverlapStrategy), crdb.WatchBufferLength(opts.WatchBufferLength), crdb.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout), - crdb.DisableStats(opts.DisableStats), crdb.WithEnablePrometheusStats(opts.EnableDatastoreMetrics), crdb.WithEnableConnectionBalancing(opts.EnableConnectionBalancing), crdb.ConnectRate(opts.ConnectRate), diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index 48ef11e6a4..56701cd81f 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -101,6 +101,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories) t.Run("TestCreateDeleteTouchTest", func(t *testing.T) { CreateDeleteTouchTest(t, tester) }) t.Run("TestCreateTouchDeleteTouchTest", func(t *testing.T) { CreateTouchDeleteTouchTest(t, tester) }) t.Run("TestTouchAlreadyExistingCaveated", func(t *testing.T) { TouchAlreadyExistingCaveatedTest(t, tester) }) + t.Run("TestBulkDeleteRelationships", func(t *testing.T) { BulkDeleteRelationshipsTest(t, tester) }) t.Run("TestMultipleReadsInRWT", func(t *testing.T) { MultipleReadsInRWTTest(t, tester) }) t.Run("TestConcurrentWriteSerialization", func(t *testing.T) { ConcurrentWriteSerializationTest(t, tester) }) diff --git a/pkg/datastore/test/stats.go b/pkg/datastore/test/stats.go index 3b0ab371b9..aa0cb785cb 100644 --- a/pkg/datastore/test/stats.go +++ b/pkg/datastore/test/stats.go @@ -34,7 +34,7 @@ func StatsTest(t *testing.T, tester DatastoreTester) { if stats.EstimatedRelationshipCount == uint64(0) && retryCount > 0 { // Sleep for a bit to get the stats table to update. - time.Sleep(50 * time.Millisecond) + time.Sleep(500 * time.Millisecond) continue } diff --git a/pkg/datastore/test/tuples.go b/pkg/datastore/test/tuples.go index 9787f09e94..a43994036e 100644 --- a/pkg/datastore/test/tuples.go +++ b/pkg/datastore/test/tuples.go @@ -29,6 +29,7 @@ const ( testResourceNamespace = "test/resource" testGroupNamespace = "test/group" testReaderRelation = "reader" + testEditorRelation = "editor" testMemberRelation = "member" ellipsis = "..." ) @@ -781,6 +782,60 @@ func ConcurrentWriteSerializationTest(t *testing.T, tester DatastoreTester) { require.Less(time.Since(startTime), 10*time.Second) } +func BulkDeleteRelationshipsTest(t *testing.T, tester DatastoreTester) { + require := require.New(t) + + rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) + require.NoError(err) + + ds, _ := testfixtures.StandardDatastoreWithSchema(rawDS, require) + ctx := context.Background() + + // Write a bunch of relationships. + t.Log(time.Now(), "starting write") + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + _, err := rwt.BulkLoad(ctx, testfixtures.NewBulkTupleGenerator(testResourceNamespace, testReaderRelation, testUserNamespace, 1000, t)) + if err != nil { + return err + } + + _, err = rwt.BulkLoad(ctx, testfixtures.NewBulkTupleGenerator(testResourceNamespace, testEditorRelation, testUserNamespace, 1000, t)) + if err != nil { + return err + } + + return nil + }) + require.NoError(err) + + // Issue a deletion for the first set of relationships. + t.Log(time.Now(), "starting delete") + deleteCount := 0 + deletedRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + t.Log(time.Now(), "deleting") + deleteCount++ + return rwt.DeleteRelationships(ctx, &v1.RelationshipFilter{ + ResourceType: testResourceNamespace, + OptionalRelation: testReaderRelation, + }) + }) + require.NoError(err) + require.Equal(1, deleteCount) + t.Log(time.Now(), "finished delete") + + // Ensure the relationships were removed. + t.Log(time.Now(), "starting check") + reader := ds.SnapshotReader(deletedRev) + iter, err := reader.QueryRelationships(ctx, datastore.RelationshipsFilter{ + ResourceType: testResourceNamespace, + OptionalResourceRelation: testReaderRelation, + }) + require.NoError(err) + defer iter.Close() + + require.Nil(iter.Next(), "expected no results") +} + func onrToSubjectsFilter(onr *core.ObjectAndRelation) datastore.SubjectsFilter { return datastore.SubjectsFilter{ SubjectType: onr.Namespace,