Skip to content

Commit

Permalink
Parallel block decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Sep 14, 2023
1 parent ccdbe78 commit 2a1dd8f
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 22 deletions.
53 changes: 36 additions & 17 deletions accountresolver/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

firecore "github.com/streamingfast/firehose-core"

"github.com/mr-tron/base58"
"github.com/streamingfast/bstream"
"github.com/streamingfast/dstore"
Expand Down Expand Up @@ -92,15 +94,15 @@ func NewProcessor(readerName string, cursor *Cursor, accountsResolver AccountsRe
}
}

func (p *Processor) ProcessMergeBlocks(ctx context.Context, sourceStore dstore.Store, destinationStore dstore.Store) error {
func (p *Processor) ProcessMergeBlocks(ctx context.Context, sourceStore dstore.Store, destinationStore dstore.Store, encoder firecore.BlockEncoder) error {
startBlockNum := p.cursor.slotNum - p.cursor.slotNum%100
paddedBlockNum := fmt.Sprintf("%010d", startBlockNum)

p.logger.Info("Processing merge blocks", zap.Uint64("cursor_block_num", p.cursor.slotNum), zap.String("first_merge_filename", paddedBlockNum))

err := sourceStore.WalkFrom(ctx, "", paddedBlockNum, func(filename string) error {
p.logger.Debug("processing merge block file", zap.String("filename", filename))
return p.processMergeBlocksFile(ctx, filename, sourceStore, destinationStore)
return p.processMergeBlocksFile(ctx, filename, sourceStore, destinationStore, encoder)
})

if err != nil {
Expand All @@ -112,7 +114,7 @@ func (p *Processor) ProcessMergeBlocks(ctx context.Context, sourceStore dstore.S
return nil
}

func (p *Processor) processMergeBlocksFile(ctx context.Context, filename string, sourceStore dstore.Store, destinationStore dstore.Store) error {
func (p *Processor) processMergeBlocksFile(ctx context.Context, filename string, sourceStore dstore.Store, destinationStore dstore.Store, encoder firecore.BlockEncoder) error {
p.logger.Info("Processing merge block file", zap.String("filename", filename))
p.stats = &stats{
startProcessing: time.Now(),
Expand All @@ -135,6 +137,7 @@ func (p *Processor) processMergeBlocksFile(ctx context.Context, filename string,
}

bundleReader := NewBundleReader(ctx, p.logger)
blockChan := make(chan *pbsol.Block, 10)

go func() {
for {
Expand All @@ -154,22 +157,38 @@ func (p *Processor) processMergeBlocksFile(ctx context.Context, filename string,
continue
}

start := time.Now()
err = p.ProcessBlock(context.Background(), blk)
if err != nil {
bundleReader.PushError(fmt.Errorf("processing block: %w", err))
return
}
p.stats.totalBlockProcessingDuration += time.Since(start)
pushStart := time.Now()
err = bundleReader.PushBlock(block)
if err != nil {
bundleReader.PushError(fmt.Errorf("pushing block to bundle reader: %w", err))
blockChan <- blk
}
}()

go func() {
for {
select {
case <-ctx.Done():
return
case blk := <-blockChan:
start := time.Now()
err := p.ProcessBlock(context.Background(), blk)
if err != nil {
bundleReader.PushError(fmt.Errorf("processing block: %w", err))
return
}
p.stats.totalBlockProcessingDuration += time.Since(start)
pushStart := time.Now()
b, err := encoder.Encode(blk)
if err != nil {
bundleReader.PushError(fmt.Errorf("encoding block: %w", err))
return
}
err = bundleReader.PushBlock(b)
if err != nil {
bundleReader.PushError(fmt.Errorf("pushing block to bundle reader: %w", err))
return
}
p.stats.totalBlockStorageDuration += time.Since(pushStart)
p.stats.totalBlockCount += 1
p.stats.totalBlockHandlingDuration += time.Since(start)
}
p.stats.totalBlockStorageDuration += time.Since(pushStart)
p.stats.totalBlockCount += 1
p.stats.totalBlockHandlingDuration += time.Since(start)
}
}()

Expand Down
2 changes: 1 addition & 1 deletion cmd/firesol/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func main() {
toolsCmd.AddCommand(newToolsBigtableCmd(zlog, tracer))
toolsCmd.AddCommand(newToolsBatchFileCmd(zlog))
toolsCmd.AddCommand(newPrintTransactionCmd())
toolsCmd.AddCommand(newProcessAddressLookupCmd(zlog, tracer))
toolsCmd.AddCommand(newProcessAddressLookupCmd(zlog, tracer, chain))
return nil
},
},
Expand Down
11 changes: 7 additions & 4 deletions cmd/firesol/tablelookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package main
import (
"fmt"

firecore "github.com/streamingfast/firehose-core"
pbsolv1 "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/streamingfast/dstore"
Expand All @@ -14,16 +17,16 @@ import (
"go.uber.org/zap"
)

func newProcessAddressLookupCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
func newProcessAddressLookupCmd(logger *zap.Logger, tracer logging.Tracer, chain *firecore.Chain[*pbsolv1.Block]) *cobra.Command {
return &cobra.Command{
Use: "process-address-lookup {store} {destination-store} {badger-db}",
Short: "scan the blocks and process and extract the address lookup data",
RunE: processAddressLookupE(logger, tracer),
RunE: processAddressLookupE(chain, logger, tracer),
Args: cobra.ExactArgs(3),
}
}

func processAddressLookupE(logger *zap.Logger, tracer logging.Tracer) func(cmd *cobra.Command, args []string) error {
func processAddressLookupE(chain *firecore.Chain[*pbsolv1.Block], logger *zap.Logger, tracer logging.Tracer) func(cmd *cobra.Command, args []string) error {
return func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

Expand Down Expand Up @@ -60,7 +63,7 @@ func processAddressLookupE(logger *zap.Logger, tracer logging.Tracer) func(cmd *
fmt.Println("Cursor", cursor)
processor := accountsresolver.NewProcessor("reproc", cursor, resolver, logger)

err = processor.ProcessMergeBlocks(ctx, sourceStore, destinationStore)
err = processor.ProcessMergeBlocks(ctx, sourceStore, destinationStore, chain.BlockEncoder)
if err != nil {
return fmt.Errorf("unable to process merge blocks: %w", err)
}
Expand Down

0 comments on commit 2a1dd8f

Please sign in to comment.