Skip to content

Commit

Permalink
add stat for total block duration
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Feb 8, 2024
1 parent 636ceab commit eaa5b87
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 14 deletions.
2 changes: 1 addition & 1 deletion sinker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ var metrics = dmetrics.NewSet()
var FlushedEntriesCount = metrics.NewCounter("substreams_sink_kv_flushed_entries_count", "The number of flushed entries")
var FlushCount = metrics.NewCounter("substreams_sink_kv_store_flush_count", "The amount of flush that happened so far")
var BlockCount = metrics.NewCounter("substreams_sink_kv_store_block_count", "The block processed so far")
var FlushDuration = metrics.NewCounter("substreams_sink_kv_store_flush_duration", "The amount of time spent flushing cache to db")
var BlockScopedData = metrics.NewCounter("substreams_sink_kv_block_scope_data_process_duration", "The amount of time spent process block scoped data")
3 changes: 2 additions & 1 deletion sinker/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (s *KVSinker) onTerminating(ctx context.Context, err error) {
}

func (s *KVSinker) handleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *sink.Cursor) error {
start := time.Now()
kvOps := &pbkv.KVOperations{}
err := proto.Unmarshal(data.GetOutput().MapOutput.Value, kvOps)
if err != nil {
Expand All @@ -113,11 +114,11 @@ func (s *KVSinker) handleBlockScopedData(ctx context.Context, data *pbsubstreams

FlushCount.Inc()
FlushedEntriesCount.AddInt(count)
FlushDuration.AddInt64(time.Since(flushStart).Nanoseconds())
s.stats.RecordFlushDuration(time.Since(flushStart))
s.stats.RecordBlock(blockRef)
}

s.stats.RecordProcessDuration(time.Since(start))
s.lastCursor = cursor
return nil
}
Expand Down
30 changes: 18 additions & 12 deletions sinker/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,26 @@ import (
type Stats struct {
*shutter.Shutter

dbFlushRate *dmetrics.AvgRatePromCounter
flushedEntries *dmetrics.ValueFromMetric
lastBlock bstream.BlockRef
logger *zap.Logger
blockRate *dmetrics.AvgRatePromCounter
flushDuration *dmetrics.AvgDurationCounter
dbFlushRate *dmetrics.AvgRatePromCounter
flushedEntries *dmetrics.ValueFromMetric
lastBlock bstream.BlockRef
logger *zap.Logger
blockRate *dmetrics.AvgRatePromCounter
flushDuration *dmetrics.AvgDurationCounter
blockScopedDataProcessDuration *dmetrics.AvgDurationCounter
}

func NewStats(logger *zap.Logger) *Stats {
return &Stats{
Shutter: shutter.New(),

dbFlushRate: dmetrics.MustNewAvgRateFromPromCounter(FlushCount, 1*time.Second, 30*time.Second, "flush"),
blockRate: dmetrics.MustNewAvgRateFromPromCounter(BlockCount, 1*time.Second, 30*time.Second, "block"),
flushedEntries: dmetrics.NewValueFromMetric(FlushedEntriesCount, "entries"),
flushDuration: dmetrics.NewAvgDurationCounter(30*time.Second, time.Millisecond, "flush duration"),
lastBlock: unsetBlockRef{},
logger: logger,
dbFlushRate: dmetrics.MustNewAvgRateFromPromCounter(FlushCount, 1*time.Second, 30*time.Second, "flush"),
blockRate: dmetrics.MustNewAvgRateFromPromCounter(BlockCount, 1*time.Second, 30*time.Second, "block"),
flushedEntries: dmetrics.NewValueFromMetric(FlushedEntriesCount, "entries"),
flushDuration: dmetrics.NewAvgDurationCounter(30*time.Second, time.Millisecond, "flush duration"),
blockScopedDataProcessDuration: dmetrics.NewAvgDurationCounter(30*time.Second, time.Millisecond, "flush duration"),
lastBlock: unsetBlockRef{},
logger: logger,
}
}

Expand All @@ -41,6 +43,9 @@ func (s *Stats) RecordBlock(block bstream.BlockRef) {
func (s *Stats) RecordFlushDuration(duration time.Duration) {
s.flushDuration.AddDuration(duration)
}
func (s *Stats) RecordProcessDuration(duration time.Duration) {
s.blockScopedDataProcessDuration.AddDuration(duration)
}

func (s *Stats) Start(each time.Duration, cursor *sink.Cursor) {
if !cursor.IsBlank() {
Expand Down Expand Up @@ -72,6 +77,7 @@ func (s *Stats) LogNow() {
s.logger.Info("substreams kv stats",
zap.Stringer("db_flush_rate", s.dbFlushRate),
zap.String("flush_duration", s.flushDuration.String()),
zap.String("process_duration", s.flushDuration.String()),
zap.Stringer("block_rate", s.blockRate),
zap.Uint64("flushed_entries", s.flushedEntries.ValueUint()),
zap.Stringer("last_block", s.lastBlock),
Expand Down

0 comments on commit eaa5b87

Please sign in to comment.