Skip to content

Commit

Permalink
add RecordDurationFetchPrevValueFetch stat
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Apr 4, 2024
1 parent fd1af77 commit 9cbe79e
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 6 deletions.
13 changes: 10 additions & 3 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"errors"
"fmt"
"math"
"time"

"github.com/streamingfast/substreams-sink-kv/sinker"

"github.com/streamingfast/bstream"
"github.com/streamingfast/kvdb/store"
Expand Down Expand Up @@ -76,14 +79,15 @@ func (db *OperationDB) AddOperation(op *pbkv.KVOperation) {
//this will only keep the last operation for a given key
db.pendingOperations[op.Key] = op
}
func (db *OperationDB) HandleOperations(ctx context.Context, blockNumber uint64, finalBlockHeight uint64, step bstream.StepType, kvOps *pbkv.KVOperations) error {

func (db *OperationDB) HandleOperations(ctx context.Context, blockNumber uint64, finalBlockHeight uint64, step bstream.StepType, kvOps *pbkv.KVOperations, stats *sinker.Stats) error {
if step == bstream.StepNew {
err := db.PurgeUndoOperations(ctx, finalBlockHeight)
if err != nil {
return fmt.Errorf("deleting LIB undo operations: %w", err)
}

undoOperations, err := db.GenerateUndoOperations(ctx, kvOps.Operations)
undoOperations, err := db.GenerateUndoOperations(ctx, kvOps.Operations, stats)
if err != nil {
return fmt.Errorf("generating reverse operations: %w", err)
}
Expand Down Expand Up @@ -157,10 +161,13 @@ func (db *OperationDB) AddUndosOperations(ctx context.Context, blockNumber uint6
return nil
}

func (db *OperationDB) GenerateUndoOperations(ctx context.Context, ops []*pbkv.KVOperation) (*pbkv.KVOperations, error) {
func (db *OperationDB) GenerateUndoOperations(ctx context.Context, ops []*pbkv.KVOperation, stats *sinker.Stats) (*pbkv.KVOperations, error) {
var undoOperations []*pbkv.KVOperation
for _, op := range ops {
start := time.Now()
previousValue, err := db.store.Get(ctx, userKey(op.Key))
stats.RecordDurationFetchPrevValueFetch(time.Since(start))

previousKeyExists := true
if err != nil {
if !errors.Is(err, store.ErrNotFound) {
Expand Down
4 changes: 2 additions & 2 deletions db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestDB_HandleOperations(t *testing.T) {
require.NoError(t, err)

for _, block := range c.blocks {
err = db.HandleOperations(ctx, block.blockNumber, block.finalBlockHeight, bstream.StepNew, block.operations)
err = db.HandleOperations(ctx, block.blockNumber, block.finalBlockHeight, bstream.StepNew, block.operations, nil)
require.NoError(t, err)
_, err = db.Flush(ctx, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestDB_HandleUndo(t *testing.T) {
require.NoError(t, err)

for _, block := range c.blocks {
err = db.HandleOperations(ctx, block.blockNumber, block.finalBlockHeight, bstream.StepNew, block.operations)
err = db.HandleOperations(ctx, block.blockNumber, block.finalBlockHeight, bstream.StepNew, block.operations, nil)
require.NoError(t, err)
_, err = db.Flush(ctx, nil)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion sinker/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *KVSinker) handleBlockScopedData(ctx context.Context, data *pbsubstreams
return fmt.Errorf("unmarshal database changes: %w", err)
}

err = s.operationDB.HandleOperations(ctx, data.Clock.Number, data.FinalBlockHeight, cursor.Step, kvOps)
err = s.operationDB.HandleOperations(ctx, data.Clock.Number, data.FinalBlockHeight, cursor.Step, kvOps, s.stats)
if err != nil {
return fmt.Errorf("handling operation: %w", err)
}
Expand Down
7 changes: 7 additions & 0 deletions sinker/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Stats struct {
flushDuration *dmetrics.AvgDurationCounter
blockScopedDataProcessDuration *dmetrics.AvgDurationCounter
durationBetweenBlock *dmetrics.AvgDurationCounter
fetchPrevValuesDuration *dmetrics.AvgDurationCounter
finalBlockHeight uint64
}

Expand All @@ -32,6 +33,7 @@ func NewStats(logger *zap.Logger) *Stats {
blockRate: dmetrics.MustNewAvgRateFromPromCounter(BlockCount, 1*time.Second, 30*time.Second, "block"),
flushedEntries: dmetrics.NewValueFromMetric(FlushedEntriesCount, "entries"),
flushDuration: dmetrics.NewAvgDurationCounter(30*time.Second, time.Millisecond, "flush duration"),
fetchPrevValuesDuration: dmetrics.NewAvgDurationCounter(30*time.Second, time.Millisecond, "fetch prev values duration"),
blockScopedDataProcessDuration: dmetrics.NewAvgDurationCounter(30*time.Second, time.Millisecond, "process duration"),
durationBetweenBlock: dmetrics.NewAvgDurationCounter(30*time.Second, time.Millisecond, "duration between block"),
lastBlock: unsetBlockRef{},
Expand All @@ -56,6 +58,10 @@ func (s *Stats) RecordDuractionBetweenBlock(duration time.Duration) {
s.durationBetweenBlock.AddDuration(duration)
}

func (s *Stats) RecordDurationFetchPrevValueFetch(duration time.Duration) {
s.fetchPrevValuesDuration.AddDuration(duration)
}

func (s *Stats) Start(each time.Duration, cursor *sink.Cursor) {
if !cursor.IsBlank() {
s.lastBlock = cursor.Block()
Expand Down Expand Up @@ -88,6 +94,7 @@ func (s *Stats) LogNow() {
zap.String("flush_duration", s.flushDuration.String()),
zap.String("process_duration", s.blockScopedDataProcessDuration.String()),
zap.String("duration_between_block", s.durationBetweenBlock.String()),
zap.String("fetch_prev_values_duration", s.fetchPrevValuesDuration.String()),
zap.Stringer("block_rate", s.blockRate),
zap.Uint64("flushed_entries", s.flushedEntries.ValueUint()),
zap.Stringer("last_block", s.lastBlock),
Expand Down

0 comments on commit 9cbe79e

Please sign in to comment.