diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 069f21f625..5f0a3b9e37 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -68,8 +68,10 @@ const ( errUnableToInstantiate = "unable to instantiate datastore" errRevision = "unable to find revision: %w" - querySelectNow = "SELECT cluster_logical_timestamp()" - queryShowZoneConfig = "SHOW ZONE CONFIGURATION FOR RANGE default;" + querySelectNow = "SELECT cluster_logical_timestamp()" + queryTransactionNowPreV23 = querySelectNow + queryTransactionNow = "SHOW COMMIT TIMESTAMP" + queryShowZoneConfig = "SHOW ZONE CONFIGURATION FOR RANGE default;" livingTupleConstraint = "pk_relation_tuple" ) @@ -122,6 +124,12 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas changefeedQuery = queryChangefeedPreV22 } + transactionNowQuery := queryTransactionNow + if version.Major < 23 { + log.Info().Object("version", version).Msg("using transaction now query for CRDB version < 23") + transactionNowQuery = queryTransactionNowPreV23 + } + clusterTTLNanos, err := readClusterTTLNanos(initCtx, initPool) if err != nil { return nil, fmt.Errorf("unable to read cluster gc window: %w", err) @@ -172,8 +180,9 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas watchBufferWriteTimeout: config.watchBufferWriteTimeout, writeOverlapKeyer: keyer, overlapKeyInit: keySetInit, - disableStats: config.disableStats, beginChangefeedQuery: changefeedQuery, + transactionNowQuery: transactionNowQuery, + analyzeBeforeStatistics: config.analyzeBeforeStatistics, } ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal) @@ -254,9 +263,10 @@ type crdbDatastore struct { watchBufferWriteTimeout time.Duration writeOverlapKeyer overlapKeyer overlapKeyInit func(ctx context.Context) keySet - disableStats bool + analyzeBeforeStatistics bool beginChangefeedQuery string + transactionNowQuery string featureGroup singleflight.Group[string, *datastore.Features] @@ -321,21 +331,11 @@ func (cds *crdbDatastore) ReadWriteTx( } } - if cds.disableStats { - var err error - commitTimestamp, err = readCRDBNow(ctx, querier) - if err != nil { - return fmt.Errorf("error getting commit timestamp: %w", err) - } - return nil - } - var err error - commitTimestamp, err = updateCounter(ctx, tx, rwt.relCountChange) + commitTimestamp, err = cds.readTransactionCommitRev(ctx, querier) if err != nil { - return fmt.Errorf("error updating relationship counter: %w", err) + return fmt.Errorf("error getting commit timestamp: %w", err) } - return nil }) if err != nil { @@ -371,7 +371,9 @@ func (cds *crdbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, return datastore.ReadyState{}, err } - if version != headMigration { + // TODO(jschorr): Remove the check for the older migration once we are confident + // that all users have migrated past it. + if version != headMigration && version != "add-caveats" { return datastore.ReadyState{ Message: fmt.Sprintf( "datastore is not migrated: currently at revision `%s`, but requires `%s`. Please run `spicedb migrate`.", @@ -467,6 +469,20 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er return &features, nil } +func (cds *crdbDatastore) readTransactionCommitRev(ctx context.Context, reader pgxcommon.DBFuncQuerier) (datastore.Revision, error) { + ctx, span := tracer.Start(ctx, "readTransactionCommitRev") + defer span.End() + + var hlcNow decimal.Decimal + if err := reader.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error { + return row.Scan(&hlcNow) + }, cds.transactionNowQuery); err != nil { + return datastore.NoRevision, fmt.Errorf("unable to read timestamp: %w", err) + } + + return revisions.NewForHLC(hlcNow) +} + func readCRDBNow(ctx context.Context, reader pgxcommon.DBFuncQuerier) (datastore.Revision, error) { ctx, span := tracer.Start(ctx, "readCRDBNow") defer span.End() diff --git a/internal/datastore/crdb/crdb_test.go b/internal/datastore/crdb/crdb_test.go index b5deccb3b0..6463f82001 100644 --- a/internal/datastore/crdb/crdb_test.go +++ b/internal/datastore/crdb/crdb_test.go @@ -53,6 +53,7 @@ func TestCRDBDatastore(t *testing.T) { RevisionQuantization(revisionQuantization), WatchBufferLength(watchBufferLength), OverlapStrategy(overlapStrategyPrefix), + DebugAnalyzeBeforeStatistics(), ) require.NoError(t, err) return ds @@ -84,6 +85,7 @@ func TestCRDBDatastoreWithFollowerReads(t *testing.T) { GCWindow(gcWindow), RevisionQuantization(quantization), FollowerReadDelay(followerReadDelay), + DebugAnalyzeBeforeStatistics(), ) require.NoError(err) return ds @@ -134,15 +136,19 @@ func TestWatchFeatureDetection(t *testing.T) { require.NoError(t, err) }, expectEnabled: false, - expectMessage: "Range feeds must be enabled in CockroachDB and the user must have permission to create them in order to enable the Watch API: ERROR: user unprivileged does not have CHANGEFEED privilege on relation relation_tuple (SQLSTATE 42501)", + expectMessage: "(SQLSTATE 42501)", }, { name: "rangefeeds enabled, user has permission", postInit: func(ctx context.Context, adminConn *pgx.Conn) { _, err = adminConn.Exec(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`) require.NoError(t, err) + _, err = adminConn.Exec(ctx, fmt.Sprintf(`GRANT CHANGEFEED ON TABLE testspicedb.%s TO unprivileged;`, tableTuple)) require.NoError(t, err) + + _, err = adminConn.Exec(ctx, fmt.Sprintf(`GRANT SELECT ON TABLE testspicedb.%s TO unprivileged;`, tableTuple)) + require.NoError(t, err) }, expectEnabled: true, }, diff --git a/internal/datastore/crdb/migrations/zz_migration.0005_remove_stats_table.go b/internal/datastore/crdb/migrations/zz_migration.0005_remove_stats_table.go new file mode 100644 index 0000000000..e1cd73ed8a --- /dev/null +++ b/internal/datastore/crdb/migrations/zz_migration.0005_remove_stats_table.go @@ -0,0 +1,25 @@ +package migrations + +import ( + "context" + + "github.com/jackc/pgx/v5" +) + +const ( + dropStatsTable = `DROP TABLE relationship_estimate_counters;` +) + +func init() { + err := CRDBMigrations.Register("remove-stats-table", "add-caveats", removeStatsTable, noAtomicMigration) + if err != nil { + panic("failed to register migration: " + err.Error()) + } +} + +func removeStatsTable(ctx context.Context, conn *pgx.Conn) error { + if _, err := conn.Exec(ctx, dropStatsTable); err != nil { + return err + } + return nil +} diff --git a/internal/datastore/crdb/options.go b/internal/datastore/crdb/options.go index 905861b27f..366b040733 100644 --- a/internal/datastore/crdb/options.go +++ b/internal/datastore/crdb/options.go @@ -20,8 +20,8 @@ type crdbOptions struct { maxRetries uint8 overlapStrategy string overlapKey string - disableStats bool enableConnectionBalancing bool + analyzeBeforeStatistics bool enablePrometheusStats bool } @@ -65,7 +65,6 @@ func generateConfig(options []Option) (crdbOptions, error) { maxRetries: defaultMaxRetries, overlapKey: defaultOverlapKey, overlapStrategy: defaultOverlapStrategy, - disableStats: false, enablePrometheusStats: defaultEnablePrometheusStats, enableConnectionBalancing: defaultEnableConnectionBalancing, connectRate: defaultConnectRate, @@ -283,11 +282,6 @@ func OverlapKey(key string) Option { return func(po *crdbOptions) { po.overlapKey = key } } -// DisableStats disables recording counts to the stats table -func DisableStats(disable bool) Option { - return func(po *crdbOptions) { po.disableStats = disable } -} - // WithEnablePrometheusStats marks whether Prometheus metrics provided by the Postgres // clients being used by the datastore are enabled. // @@ -303,3 +297,12 @@ func WithEnablePrometheusStats(enablePrometheusStats bool) Option { func WithEnableConnectionBalancing(connectionBalancing bool) Option { return func(po *crdbOptions) { po.enableConnectionBalancing = connectionBalancing } } + +// DebugAnalyzeBeforeStatistics signals to the Statistics method that it should +// run Analyze on the database before returning statistics. This should only be +// used for debug and testing. +// +// Disabled by default. +func DebugAnalyzeBeforeStatistics() Option { + return func(po *crdbOptions) { po.analyzeBeforeStatistics = true } +} diff --git a/internal/datastore/crdb/stats.go b/internal/datastore/crdb/stats.go index 954a64050b..5da5ed355e 100644 --- a/internal/datastore/crdb/stats.go +++ b/internal/datastore/crdb/stats.go @@ -3,39 +3,24 @@ package crdb import ( "context" "fmt" - "math/rand" - "time" + "slices" "github.com/Masterminds/squirrel" "github.com/jackc/pgx/v5" - "github.com/shopspring/decimal" + "github.com/rs/zerolog/log" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" - "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" ) const ( tableMetadata = "metadata" colUniqueID = "unique_id" - - tableCounters = "relationship_estimate_counters" - colID = "id" - colCount = "count" ) var ( - queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata) - queryRelationshipEstimate = fmt.Sprintf("SELECT COALESCE(SUM(%s), 0) FROM %s AS OF SYSTEM TIME follower_read_timestamp()", colCount, tableCounters) - - upsertCounterQuery = psql.Insert(tableCounters).Columns( - colID, - colCount, - ).Suffix(fmt.Sprintf("ON CONFLICT (%[1]s) DO UPDATE SET %[2]s = %[3]s.%[2]s + EXCLUDED.%[2]s RETURNING cluster_logical_timestamp()", colID, colCount, tableCounters)) - - rng = rand.NewSource(time.Now().UnixNano()) - - uniqueID string + queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata) + uniqueID string ) func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) { @@ -52,14 +37,6 @@ func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, erro } var nsDefs []datastore.RevisionedNamespace - var relCount int64 - - if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error { - return row.Scan(&relCount) - }, queryRelationshipEstimate); err != nil { - return datastore.Stats{}, fmt.Errorf("unable to read relationship count: %w", err) - } - if err := cds.readPool.BeginTxFunc(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error { _, err := tx.Exec(ctx, "SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()") if err != nil { @@ -76,37 +53,85 @@ func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, erro return datastore.Stats{}, err } - // NOTE: this is a stop-gap solution to prevent panics in telemetry collection - if relCount < 0 { - relCount = 0 - } - - return datastore.Stats{ - UniqueID: uniqueID, - EstimatedRelationshipCount: uint64(relCount), - ObjectTypeStatistics: datastore.ComputeObjectTypeStats(nsDefs), - }, nil -} + if cds.analyzeBeforeStatistics { + if err := cds.readPool.BeginTxFunc(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error { + if _, err := tx.Exec(ctx, "ANALYZE "+tableTuple); err != nil { + return fmt.Errorf("unable to analyze tuple table: %w", err) + } -func updateCounter(ctx context.Context, tx pgx.Tx, change int64) (datastore.Revision, error) { - counterID := make([]byte, 2) - // nolint:gosec - // G404 use of non cryptographically secure random number generator is not concern here, - // as this is only used to randomly distributed the counters across multiple rows and reduce write row contention - _, err := rand.New(rng).Read(counterID) - if err != nil { - return datastore.NoRevision, fmt.Errorf("unable to select random counter: %w", err) + return nil + }); err != nil { + return datastore.Stats{}, err + } } - sql, args, err := upsertCounterQuery.Values(counterID, change).ToSql() - if err != nil { - return datastore.NoRevision, fmt.Errorf("unable to prepare upsert counter sql: %w", err) - } + var estimatedRelCount uint64 + if err := cds.readPool.QueryFunc(ctx, func(ctx context.Context, rows pgx.Rows) error { + hasRows := false + + for rows.Next() { + hasRows = true + values, err := rows.Values() + if err != nil { + log.Warn().Err(err).Msg("unable to read statistics") + return nil + } + + // Find the row whose column_names contains the expected columns for the + // full relationship. + isFullRelationshipRow := false + for index, fd := range rows.FieldDescriptions() { + if fd.Name != "column_names" { + continue + } + + columnNames, ok := values[index].([]any) + if !ok { + log.Warn().Msg("unable to read column names") + return nil + } + + if slices.Contains(columnNames, "namespace") && + slices.Contains(columnNames, "object_id") && + slices.Contains(columnNames, "relation") && + slices.Contains(columnNames, "userset_namespace") && + slices.Contains(columnNames, "userset_object_id") && + slices.Contains(columnNames, "userset_relation") { + isFullRelationshipRow = true + break + } + } + + if !isFullRelationshipRow { + continue + } + + // Read the estimated relationship count. + for index, fd := range rows.FieldDescriptions() { + if fd.Name != "row_count" { + continue + } + + rowCount, ok := values[index].(int64) + if !ok { + log.Warn().Msg("unable to read row count") + return nil + } + + estimatedRelCount = uint64(rowCount) + return nil + } + } - var timestamp decimal.Decimal - if err := tx.QueryRow(ctx, sql, args...).Scan(×tamp); err != nil { - return datastore.NoRevision, fmt.Errorf("unable to executed upsert counter query: %w", err) + log.Warn().Bool("has-rows", hasRows).Msg("unable to find row count in statistics query result") + return nil + }, "SHOW STATISTICS FOR TABLE relation_tuple;"); err != nil { + return datastore.Stats{}, fmt.Errorf("unable to query unique estimated row count: %w", err) } - return revisions.NewForHLC(timestamp) + return datastore.Stats{ + UniqueID: uniqueID, + EstimatedRelationshipCount: estimatedRelCount, + ObjectTypeStatistics: datastore.ComputeObjectTypeStats(nsDefs), + }, nil } diff --git a/internal/testserver/datastore/crdb.go b/internal/testserver/datastore/crdb.go index 4d7d3acdd6..be61279a18 100644 --- a/internal/testserver/datastore/crdb.go +++ b/internal/testserver/datastore/crdb.go @@ -20,7 +20,7 @@ import ( ) const ( - CRDBTestVersionTag = "v22.2.0" + CRDBTestVersionTag = "v23.1.16" enableRangefeeds = `SET CLUSTER SETTING kv.rangefeed.enabled = true;` ) diff --git a/pkg/cmd/datastore/datastore.go b/pkg/cmd/datastore/datastore.go index 31b6a55db8..2fd3eaa7c7 100644 --- a/pkg/cmd/datastore/datastore.go +++ b/pkg/cmd/datastore/datastore.go @@ -382,7 +382,6 @@ func newCRDBDatastore(ctx context.Context, opts Config) (datastore.Datastore, er crdb.OverlapStrategy(opts.OverlapStrategy), crdb.WatchBufferLength(opts.WatchBufferLength), crdb.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout), - crdb.DisableStats(opts.DisableStats), crdb.WithEnablePrometheusStats(opts.EnableDatastoreMetrics), crdb.WithEnableConnectionBalancing(opts.EnableConnectionBalancing), crdb.ConnectRate(opts.ConnectRate), diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index 48ef11e6a4..56701cd81f 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -101,6 +101,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories) t.Run("TestCreateDeleteTouchTest", func(t *testing.T) { CreateDeleteTouchTest(t, tester) }) t.Run("TestCreateTouchDeleteTouchTest", func(t *testing.T) { CreateTouchDeleteTouchTest(t, tester) }) t.Run("TestTouchAlreadyExistingCaveated", func(t *testing.T) { TouchAlreadyExistingCaveatedTest(t, tester) }) + t.Run("TestBulkDeleteRelationships", func(t *testing.T) { BulkDeleteRelationshipsTest(t, tester) }) t.Run("TestMultipleReadsInRWT", func(t *testing.T) { MultipleReadsInRWTTest(t, tester) }) t.Run("TestConcurrentWriteSerialization", func(t *testing.T) { ConcurrentWriteSerializationTest(t, tester) }) diff --git a/pkg/datastore/test/stats.go b/pkg/datastore/test/stats.go index 3b0ab371b9..aa0cb785cb 100644 --- a/pkg/datastore/test/stats.go +++ b/pkg/datastore/test/stats.go @@ -34,7 +34,7 @@ func StatsTest(t *testing.T, tester DatastoreTester) { if stats.EstimatedRelationshipCount == uint64(0) && retryCount > 0 { // Sleep for a bit to get the stats table to update. - time.Sleep(50 * time.Millisecond) + time.Sleep(500 * time.Millisecond) continue } diff --git a/pkg/datastore/test/tuples.go b/pkg/datastore/test/tuples.go index 9787f09e94..a43994036e 100644 --- a/pkg/datastore/test/tuples.go +++ b/pkg/datastore/test/tuples.go @@ -29,6 +29,7 @@ const ( testResourceNamespace = "test/resource" testGroupNamespace = "test/group" testReaderRelation = "reader" + testEditorRelation = "editor" testMemberRelation = "member" ellipsis = "..." ) @@ -781,6 +782,60 @@ func ConcurrentWriteSerializationTest(t *testing.T, tester DatastoreTester) { require.Less(time.Since(startTime), 10*time.Second) } +func BulkDeleteRelationshipsTest(t *testing.T, tester DatastoreTester) { + require := require.New(t) + + rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) + require.NoError(err) + + ds, _ := testfixtures.StandardDatastoreWithSchema(rawDS, require) + ctx := context.Background() + + // Write a bunch of relationships. + t.Log(time.Now(), "starting write") + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + _, err := rwt.BulkLoad(ctx, testfixtures.NewBulkTupleGenerator(testResourceNamespace, testReaderRelation, testUserNamespace, 1000, t)) + if err != nil { + return err + } + + _, err = rwt.BulkLoad(ctx, testfixtures.NewBulkTupleGenerator(testResourceNamespace, testEditorRelation, testUserNamespace, 1000, t)) + if err != nil { + return err + } + + return nil + }) + require.NoError(err) + + // Issue a deletion for the first set of relationships. + t.Log(time.Now(), "starting delete") + deleteCount := 0 + deletedRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + t.Log(time.Now(), "deleting") + deleteCount++ + return rwt.DeleteRelationships(ctx, &v1.RelationshipFilter{ + ResourceType: testResourceNamespace, + OptionalRelation: testReaderRelation, + }) + }) + require.NoError(err) + require.Equal(1, deleteCount) + t.Log(time.Now(), "finished delete") + + // Ensure the relationships were removed. + t.Log(time.Now(), "starting check") + reader := ds.SnapshotReader(deletedRev) + iter, err := reader.QueryRelationships(ctx, datastore.RelationshipsFilter{ + ResourceType: testResourceNamespace, + OptionalResourceRelation: testReaderRelation, + }) + require.NoError(err) + defer iter.Close() + + require.Nil(iter.Next(), "expected no results") +} + func onrToSubjectsFilter(onr *core.ObjectAndRelation) datastore.SubjectsFilter { return datastore.SubjectsFilter{ SubjectType: onr.Namespace,