Skip to content

Commit

Permalink
Fix cursor stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Sep 14, 2023
1 parent 2a1dd8f commit 8a54b41
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 20 deletions.
41 changes: 21 additions & 20 deletions accountresolver/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"strings"
"time"

firecore "github.com/streamingfast/firehose-core"

"github.com/hako/durafmt"
"github.com/mr-tron/base58"
"github.com/streamingfast/bstream"
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
pbsol "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"
"github.com/streamingfast/solana-go/programs/addresstablelookup"
"go.uber.org/zap"
Expand Down Expand Up @@ -47,19 +47,19 @@ func (s *stats) log(logger *zap.Logger) {
zap.Int("transaction_count", s.transactionCount),
zap.Int("lookup_count", s.lookupCount),
zap.Int("extend_count", s.extendCount),
zap.Duration("total_block_handling_duration", s.totalBlockHandlingDuration),
zap.Duration("total_block_processing_duration", s.totalBlockProcessingDuration),
zap.Duration("total_block_storage_duration", s.totalBlockStorageDuration),
zap.Duration("total_transaction_processing_duration", s.totalTransactionProcessingDuration),
zap.Duration("total_lookup_duration", s.totalLookupDuration),
zap.Duration("total_extend_duration", s.totalExtendDuration),
zap.Duration("total_duration", time.Since(s.startProcessing)),
zap.Duration("average_block_handling_duration", s.totalBlockHandlingDuration/time.Duration(s.totalBlockCount)),
zap.Duration("average_block_processing_duration", s.totalBlockProcessingDuration/time.Duration(s.totalBlockCount)),
zap.Duration("average_block_storage_duration", s.totalBlockStorageDuration/time.Duration(s.totalBlockCount)),
zap.Duration("average_transaction_processing_duration", s.totalTransactionProcessingDuration/time.Duration(s.transactionCount)),
zap.Duration("average_lookup_duration", lookupAvg),
zap.Duration("average_extend_duration", extendAvg),
zap.String("total_block_handling_duration", durafmt.Parse(s.totalBlockHandlingDuration).String()),
zap.String("total_block_processing_duration", durafmt.Parse(s.totalBlockProcessingDuration).String()),
zap.String("total_block_storage_duration", durafmt.Parse(s.totalBlockStorageDuration).String()),
zap.String("total_transaction_processing_duration", durafmt.Parse(s.totalTransactionProcessingDuration).String()),
zap.String("total_lookup_duration", durafmt.Parse(s.totalLookupDuration).String()),
zap.String("total_extend_duration", durafmt.Parse(s.totalExtendDuration).String()),
zap.String("total_duration", durafmt.Parse(time.Since(s.startProcessing)).String()),
zap.String("average_block_handling_duration", durafmt.Parse(s.totalBlockHandlingDuration/time.Duration(s.totalBlockCount)).String()),
zap.String("average_block_processing_duration", durafmt.Parse(s.totalBlockProcessingDuration/time.Duration(s.totalBlockCount)).String()),
zap.String("average_block_storage_duration", durafmt.Parse(s.totalBlockStorageDuration/time.Duration(s.totalBlockCount)).String()),
zap.String("average_transaction_processing_duration", durafmt.Parse(s.totalTransactionProcessingDuration/time.Duration(s.transactionCount)).String()),
zap.String("average_lookup_duration", durafmt.Parse(lookupAvg).String()),
zap.String("average_extend_duration", durafmt.Parse(extendAvg).String()),
)
}

Expand Down Expand Up @@ -197,6 +197,11 @@ func (p *Processor) processMergeBlocksFile(ctx context.Context, filename string,
return fmt.Errorf("writing bundle file: %w", err)
}
//p.logger.Info("new merge blocks file written:", zap.String("filename", filename), zap.Duration("duration", time.Since(start)))
err = p.accountsResolver.StoreCursor(ctx, p.readerName, p.cursor)
if err != nil {
return fmt.Errorf("storing cursor at block %d: %w", p.cursor.slotNum, err)
}

p.stats.log(p.logger)
return nil
}
Expand Down Expand Up @@ -308,11 +313,7 @@ func (p *Processor) ProcessInstruction(ctx context.Context, blockNum uint64, trx
if err != nil {
return fmt.Errorf("extending address table %s at block %d: %w", tableLookupAccount, blockNum, err)
}
p.cursor = NewCursor(blockNum)
err = p.accountsResolver.StoreCursor(ctx, p.readerName, p.cursor)
if err != nil {
return fmt.Errorf("storing cursor at block %d: %w", blockNum, err)
}

p.stats.totalExtendDuration += time.Since(start)
p.stats.extendCount += 1
}
Expand Down
4 changes: 4 additions & 0 deletions accountresolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (r *KVDBAccountsResolver) Extend(ctx context.Context, blockNum uint64, trxH
if err != nil {
return fmt.Errorf("writing known transaction %x: %w", trxHash, err)
}
err = r.store.FlushPuts(ctx) //todo: move that up in call stack
if err != nil {
return fmt.Errorf("flushing extended accounts for key %q: %w", key, err)
}

return nil
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/hako/durafmt v0.0.0-20210608085754-5c1018a4e16b // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/ipfs/boxo v0.8.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFb
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 h1:lLT7ZLSzGLI08vc9cpd+tYmNWjdKDqyr/2L+f6U12Fk=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w=
github.com/hako/durafmt v0.0.0-20210608085754-5c1018a4e16b h1:wDUNC2eKiL35DbLvsDhiblTUXHxcOPwQSCzi7xpQUN4=
github.com/hako/durafmt v0.0.0-20210608085754-5c1018a4e16b/go.mod h1:VzxiSdG6j1pi7rwGm/xYI5RbtpBgM8sARDXlvEvxlu0=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
Expand Down

0 comments on commit 8a54b41

Please sign in to comment.