From 103291d7e5c763c0fb647d3631eac174b75532aa Mon Sep 17 00:00:00 2001 From: Eduard Voiculescu Date: Wed, 25 Oct 2023 15:47:11 -0400 Subject: [PATCH] adding resolve accounts block command --- accountresolver/processor.go | 80 ++++--- cmd/firesol/find_invalid_block.go | 21 +- cmd/firesol/main.go | 3 + cmd/firesol/resolve_accounts_block.go | 319 ++++++++++++++++++++++++++ 4 files changed, 374 insertions(+), 49 deletions(-) create mode 100644 cmd/firesol/resolve_accounts_block.go diff --git a/accountresolver/processor.go b/accountresolver/processor.go index 69511f4c..efa35643 100644 --- a/accountresolver/processor.go +++ b/accountresolver/processor.go @@ -36,6 +36,10 @@ type Stats struct { totalAccountsResolvedByCache int } +func NewStats() *Stats { + return &Stats{} +} + func (s *Stats) Log(logger *zap.Logger) { lookupAvg := time.Duration(0) if s.lookupCount > 0 { @@ -240,7 +244,7 @@ func (p *Processor) ProcessBlock(ctx context.Context, stats *Stats, block *pbsol continue } //p.logger.Debug("processing transaction", zap.Uint64("block_num", block.Slot), zap.String("trx_id", base58.Encode(trx.Transaction.Signatures[0]))) - err := p.applyTableLookup(ctx, stats, block.Slot, trx) + err := ApplyTableLookup(ctx, stats, block.Slot, trx, p.accountsResolver, p.logger) if err != nil { return fmt.Errorf("applying table lookup at block %d: %w", block.Slot, err) } @@ -263,16 +267,49 @@ func (p *Processor) manageAddressLookup(ctx context.Context, stats *Stats, block return nil } -func (p *Processor) applyTableLookup(ctx context.Context, stats *Stats, blockNum uint64, trx *pbsol.ConfirmedTransaction) error { +func (p *Processor) ProcessTransaction(ctx context.Context, stats *Stats, blockNum uint64, confirmedTransaction *pbsol.ConfirmedTransaction) error { + start := time.Now() + if confirmedTransaction.Meta.Err != nil { + p.logger.Info("skipping transaction with error", zap.Uint64("block_num", blockNum), zap.String("trx_id", base58.Encode(confirmedTransaction.Transaction.Signatures[0]))) + return nil + } + accountKeys := confirmedTransaction.Transaction.Message.AccountKeys + for instructionIndex, compiledInstruction := range confirmedTransaction.Transaction.Message.Instructions { + idx := compiledInstruction.ProgramIdIndex + err := p.ProcessInstruction(ctx, stats, blockNum, confirmedTransaction.Transaction.Signatures[0], fmt.Sprintf("%d", instructionIndex), confirmedTransaction.Transaction.Message.AccountKeys[idx], accountKeys, compiledInstruction) + if err != nil { + return fmt.Errorf("confirmedTransaction %s processing compiled instruction: %w", getTransactionHash(confirmedTransaction.Transaction.Signatures), err) + } + inner := GetInnerInstructions(instructionIndex, confirmedTransaction.Meta.InnerInstructions) + if inner == nil { + continue // there are no inner instructions for the CompiledInstruction + } + for i, instruction := range inner.Instructions { + index := fmt.Sprintf("%d.%d", instructionIndex, i) + if len(accountKeys) < int(instruction.ProgramIdIndex) { + return fmt.Errorf("missing account key at instructionIndex %d for transaction %s with account keys count of %d", instruction.ProgramIdIndex, getTransactionHash(confirmedTransaction.Transaction.Signatures), len(accountKeys)) + } + + err := p.ProcessInstruction(ctx, stats, blockNum, confirmedTransaction.Transaction.Signatures[0], index, accountKeys[instruction.ProgramIdIndex], accountKeys, instruction) + if err != nil { + return fmt.Errorf("confirmedTransaction %s processing instruxction: %w", getTransactionHash(confirmedTransaction.Transaction.Signatures), err) + } + } + } + stats.totalTransactionProcessingDuration += time.Since(start) + return nil +} + +func ApplyTableLookup(ctx context.Context, stats *Stats, blockNum uint64, trx *pbsol.ConfirmedTransaction, accountsResolver AccountsResolver, logger *zap.Logger) error { start := time.Now() for _, addressTableLookup := range trx.Transaction.Message.AddressTableLookups { - resolvedAccounts, cached, err := p.accountsResolver.Resolve(ctx, blockNum, addressTableLookup.AccountKey) + resolvedAccounts, cached, err := accountsResolver.Resolve(ctx, blockNum, addressTableLookup.AccountKey) if err != nil { return fmt.Errorf("resolving address table %s at block %d: %w", base58.Encode(addressTableLookup.AccountKey), blockNum, err) } if len(resolvedAccounts) == 0 { - p.logger.Warn("Resolved accounts is empty", zap.Uint64("block", blockNum), zap.String("table account", base58.Encode(addressTableLookup.AccountKey)), zap.Bool("cached", cached), zap.Int("account_count", len(resolvedAccounts))) + logger.Warn("Resolved accounts is empty", zap.Uint64("block", blockNum), zap.String("table account", base58.Encode(addressTableLookup.AccountKey)), zap.Bool("cached", cached), zap.Int("account_count", len(resolvedAccounts))) } if cached { @@ -303,7 +340,7 @@ func (p *Processor) applyTableLookup(ctx context.Context, stats *Stats, blockNum if lookupCount > 0 { stats.lookupCount += lookupCount stats.totalLookupDuration += totalDuration - p.logger.Debug( + logger.Debug( "applyTableLookup", zap.Duration("duration", totalDuration), zap.Int("lookup_count", lookupCount), @@ -314,39 +351,6 @@ func (p *Processor) applyTableLookup(ctx context.Context, stats *Stats, blockNum return nil } -func (p *Processor) ProcessTransaction(ctx context.Context, stats *Stats, blockNum uint64, confirmedTransaction *pbsol.ConfirmedTransaction) error { - start := time.Now() - if confirmedTransaction.Meta.Err != nil { - p.logger.Info("skipping transaction with error", zap.Uint64("block_num", blockNum), zap.String("trx_id", base58.Encode(confirmedTransaction.Transaction.Signatures[0]))) - return nil - } - accountKeys := confirmedTransaction.Transaction.Message.AccountKeys - for instructionIndex, compiledInstruction := range confirmedTransaction.Transaction.Message.Instructions { - idx := compiledInstruction.ProgramIdIndex - err := p.ProcessInstruction(ctx, stats, blockNum, confirmedTransaction.Transaction.Signatures[0], fmt.Sprintf("%d", instructionIndex), confirmedTransaction.Transaction.Message.AccountKeys[idx], accountKeys, compiledInstruction) - if err != nil { - return fmt.Errorf("confirmedTransaction %s processing compiled instruction: %w", getTransactionHash(confirmedTransaction.Transaction.Signatures), err) - } - inner := GetInnerInstructions(instructionIndex, confirmedTransaction.Meta.InnerInstructions) - if inner == nil { - continue // there are no inner instructions for the CompiledInstruction - } - for i, instruction := range inner.Instructions { - index := fmt.Sprintf("%d.%d", instructionIndex, i) - if len(accountKeys) < int(instruction.ProgramIdIndex) { - return fmt.Errorf("missing account key at instructionIndex %d for transaction %s with account keys count of %d", instruction.ProgramIdIndex, getTransactionHash(confirmedTransaction.Transaction.Signatures), len(accountKeys)) - } - - err := p.ProcessInstruction(ctx, stats, blockNum, confirmedTransaction.Transaction.Signatures[0], index, accountKeys[instruction.ProgramIdIndex], accountKeys, instruction) - if err != nil { - return fmt.Errorf("confirmedTransaction %s processing instruxction: %w", getTransactionHash(confirmedTransaction.Transaction.Signatures), err) - } - } - } - stats.totalTransactionProcessingDuration += time.Since(start) - return nil -} - func GetInnerInstructions(index int, trxMetaInnerInstructions []*pbsol.InnerInstructions) *pbsol.InnerInstructions { for _, innerInstructions := range trxMetaInnerInstructions { if int(innerInstructions.Index) == index { diff --git a/cmd/firesol/find_invalid_block.go b/cmd/firesol/find_invalid_block.go index e1ee7fc6..334be9ea 100644 --- a/cmd/firesol/find_invalid_block.go +++ b/cmd/firesol/find_invalid_block.go @@ -3,7 +3,7 @@ package main import ( "cloud.google.com/go/bigtable" "fmt" - "github.com/gagliardetto/solana-go/rpc" + "github.com/mr-tron/base58" "github.com/spf13/cobra" "github.com/streamingfast/cli/sflags" firecore "github.com/streamingfast/firehose-core" @@ -35,8 +35,7 @@ func processFindInvalidBlockE(chain *firecore.Chain[*pbsolv1.Block], logger *zap return func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() - rpcClient := rpc.New(sflags.MustGetString(cmd, "rpc-endpoint")) - _ = rpcClient + //rpcClient := rpc.New(sflags.MustGetString(cmd, "rpc-endpoint")) startBlockNum, err := strconv.ParseUint(args[0], 10, 64) if err != nil { @@ -65,20 +64,20 @@ func processFindInvalidBlockE(chain *firecore.Chain[*pbsolv1.Block], logger *zap btClient := bt.New(client, 10, logger, tracer) return btClient.ReadBlocks(ctx, startBlockNum, endBlockNum, linkable, func(block *pbsolv1.Block) error { - missingLogMessagesAndInnerInstructions := 0 + trxMissingLogMessagesAndInnerInstructions := 0 + numberOfTransactions := len(block.Transactions) + var transactionNotMissing []string for _, trx := range block.Transactions { - if trx.Meta.Err != nil { - continue - } - if trx.Meta.LogMessagesNone && trx.Meta.InnerInstructionsNone { - missingLogMessagesAndInnerInstructions++ + trxMissingLogMessagesAndInnerInstructions++ + continue } + transactionNotMissing = append(transactionNotMissing, base58.Encode(trx.Transaction.Signatures[0])) } - if missingLogMessagesAndInnerInstructions > 0 { + if trxMissingLogMessagesAndInnerInstructions == numberOfTransactions { fmt.Printf("Block: %d number of transactions: %d\n", block.Slot, len(block.Transactions)) - fmt.Printf("\tNumber transactions with missing log messags and inner instructions: %d\n", missingLogMessagesAndInnerInstructions) + fmt.Printf("\tTransactions containing logs: %s\n", transactionNotMissing) } return nil }) diff --git a/cmd/firesol/main.go b/cmd/firesol/main.go index 46e3693d..34588cde 100644 --- a/cmd/firesol/main.go +++ b/cmd/firesol/main.go @@ -44,6 +44,7 @@ func main() { RegisterExtraCmd: func(chain *firecore.Chain[*pbsol.Block], toolsCmd *cobra.Command, zlog *zap.Logger, tracer logging.Tracer) error { toolsCmd.AddCommand(newToolsBigtableCmd(zlog, tracer)) toolsCmd.AddCommand(newToolsBatchFileCmd(zlog)) + toolsCmd.AddCommand(newProcessAddressLookupCmd(zlog, tracer, chain)) toolsCmd.AddCommand(newTrxAddressesLookupCmd(zlog, tracer, chain)) toolsCmd.AddCommand(newAddressesLookupCmd(zlog, tracer, chain)) @@ -51,6 +52,8 @@ func main() { toolsCmd.AddCommand(newValidateResolvedAddresses(zlog, tracer, chain)) toolsCmd.AddCommand(newValidateAllResolvedAddresses(zlog, tracer, chain)) + toolsCmd.AddCommand(newResolveAccountsBlockCmd(zlog, tracer, chain)) + return nil }, }, diff --git a/cmd/firesol/resolve_accounts_block.go b/cmd/firesol/resolve_accounts_block.go new file mode 100644 index 00000000..5d89cca3 --- /dev/null +++ b/cmd/firesol/resolve_accounts_block.go @@ -0,0 +1,319 @@ +package main + +import ( + "context" + "fmt" + "github.com/hako/durafmt" + "github.com/spf13/cobra" + "github.com/streamingfast/bstream" + "github.com/streamingfast/dhammer" + "github.com/streamingfast/dstore" + firecore "github.com/streamingfast/firehose-core" + accountsresolver "github.com/streamingfast/firehose-solana/accountresolver" + pbsol "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1" + kvstore "github.com/streamingfast/kvdb/store" + "github.com/streamingfast/logging" + "go.uber.org/zap" + "io" + "strconv" + "strings" + "time" +) + +func newResolveAccountsBlockCmd(logger *zap.Logger, tracer logging.Tracer, chain *firecore.Chain[*pbsol.Block]) *cobra.Command { + cmd := &cobra.Command{ + Use: "resolve-accounts-block {store} {destination-store} {kv-dsn} {startBlock} {endBlock}", + Short: "Apply table lookup accounts to merge blocks.", + RunE: processResolveAccountsBlockE(chain, logger, tracer), + Args: cobra.ExactArgs(5), + } + + return cmd +} + +func processResolveAccountsBlockE(chain *firecore.Chain[*pbsol.Block], logger *zap.Logger, tracer logging.Tracer) func(cmd *cobra.Command, args []string) error { + return func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + srcStore, err := dstore.NewDBinStore(args[0]) + if err != nil { + return fmt.Errorf("unable to create source store: %w", err) + } + + destStore, err := dstore.NewDBinStore(args[1]) + if err != nil { + return fmt.Errorf("unable to create destination store: %w", err) + } + + db, err := kvstore.New(args[2]) + if err != nil { + return fmt.Errorf("unable to create sourceStore: %w", err) + } + + resolver := accountsresolver.NewKVDBAccountsResolver(db, logger) + if err != nil { + return fmt.Errorf("unable to get cursor: %w", err) + } + + startBlock, err := strconv.ParseUint(args[3], 10, 64) + if err != nil { + return fmt.Errorf("parsing start block: %w", err) + } + + stopBlock, err := strconv.ParseUint(args[4], 10, 64) + if err != nil { + return fmt.Errorf("parsing stop block: %w", err) + } + + err = processMergeBlocks(ctx, startBlock, stopBlock, srcStore, destStore, resolver, chain.BlockEncoder, logger) + if err != nil { + return fmt.Errorf("processing merge blocks for range %d - %d: %w", startBlock, stopBlock, err) + } + + logger.Info("All done. Goodbye!") + return nil + } +} + +func processMergeBlocks( + ctx context.Context, + startBlock uint64, + stopBlock uint64, + sourceStore dstore.Store, + destinationStore dstore.Store, + resolver *accountsresolver.KVDBAccountsResolver, + encoder firecore.BlockEncoder, + logger *zap.Logger, +) error { + + paddedBlockNum := fmt.Sprintf("%010d", startBlock) + logger.Info("Processing merge blocks", + zap.Uint64("start_block_num", startBlock), + zap.Uint64("stop_block_num", stopBlock), + zap.String("first_merge_filename", paddedBlockNum), + ) + + mergeBlocksFileChan := make(chan *mergeBlocksFile, 20) + done := make(chan interface{}) + + go func() { + err := processMergeBlocksFiles(ctx, mergeBlocksFileChan, destinationStore, resolver, encoder, logger) + if err != nil { + panic(fmt.Errorf("processing merge blocks files: %w", err)) + } + close(done) + }() + + err := sourceStore.WalkFrom(ctx, "", paddedBlockNum, func(filename string) error { + mbf := newMergeBlocksFile(filename, logger) + + blkNumber, err := mbf.BlockNumber() + if err != nil { + return fmt.Errorf("converting block number of merged block file: %w", err) + } + if blkNumber >= stopBlock { + logger.Info("Reached stop block", zap.Uint64("stop_block", stopBlock)) + close(mergeBlocksFileChan) + return io.EOF + } + + go func() { + err := mbf.process(ctx, sourceStore) + if err != nil { + panic(fmt.Errorf("processing merge block file %s: %w", mbf.filename, err)) + } + }() + mergeBlocksFileChan <- mbf + return nil + }) + + if err != nil && err != io.EOF { + return fmt.Errorf("walking merge block sourceStore: %w", err) + } + + logger.Info("Waiting for completion") + <-done + + logger.Info("Done processing merge blocks") + + return nil +} + +type bundleJob struct { + filename string + bundleReader *accountsresolver.BundleReader +} + +func processMergeBlocksFiles( + ctx context.Context, + mergeBlocksFileChan chan *mergeBlocksFile, + destinationStore dstore.Store, + resolver *accountsresolver.KVDBAccountsResolver, + encoder firecore.BlockEncoder, + logger *zap.Logger, +) error { + + writerNailer := dhammer.NewNailer(100, func(ctx context.Context, br *bundleJob) (*bundleJob, error) { + logger.Info("nailing writing bundle file", zap.String("filename", br.filename)) + err := destinationStore.WriteObject(ctx, br.filename, br.bundleReader) + if err != nil { + return br, fmt.Errorf("writing bundle file: %w", err) + } + + logger.Info("nailed writing bundle file", zap.String("filename", br.filename)) + return br, nil + }) + writerNailer.OnTerminating(func(err error) { + if err != nil { + panic(fmt.Errorf("writing bundle file: %w", err)) + } + }) + writerNailer.Start(ctx) + done := make(chan interface{}) + + go func() { + for out := range writerNailer.Out { + logger.Info("new merge blocks file written:", zap.String("filename", out.filename)) + } + close(done) + }() + + timeOfLastPush := time.Now() + for mbf := range mergeBlocksFileChan { + logger.Info("Receive merge block file", zap.String("filename", mbf.filename), zap.String("time_since_last push", durafmt.Parse(time.Since(timeOfLastPush)).String())) + bundleReader := accountsresolver.NewBundleReader(ctx, logger) + + decoderNailer := dhammer.NewNailer(100, func(ctx context.Context, blk *pbsol.Block) (*bstream.Block, error) { + b, err := encoder.Encode(blk) + if err != nil { + return nil, fmt.Errorf("encoding block: %w", err) + } + + return b, nil + }) + decoderNailer.OnTerminating(func(err error) { + if err != nil { + panic(fmt.Errorf("encoding block: %w", err)) + } + }) + decoderNailer.Start(ctx) + + job := &bundleJob{ + mbf.filename, + bundleReader, + } + writerNailer.Push(ctx, job) + + mbf := mbf + go func() { + for { + select { + case <-ctx.Done(): + return + case blk, ok := <-mbf.blockChan: + if !ok { + decoderNailer.Close() + return + } + logger.Debug("handling block", zap.Uint64("slot", blk.Slot), zap.Uint64("parent_slot", blk.ParentSlot)) + + err := processBlock(context.Background(), blk, resolver, logger) + if err != nil { + bundleReader.PushError(fmt.Errorf("processing block: %w", err)) + return + } + + decoderNailer.Push(ctx, blk) + } + } + }() + for bb := range decoderNailer.Out { + logger.Debug("pushing block", zap.Uint64("slot", bb.Num())) + err := bundleReader.PushBlock(bb) + if err != nil { + bundleReader.PushError(fmt.Errorf("pushing block to bundle reader: %w", err)) + return fmt.Errorf("pushing block to bundle reader: %w", err) + } + } + bundleReader.Close() + timeOfLastPush = time.Now() + } + + writerNailer.Close() + logger.Info("Waiting for writer to complete") + <-done + logger.Info("Writer completed") + + return nil +} + +func processBlock(ctx context.Context, block *pbsol.Block, resolver *accountsresolver.KVDBAccountsResolver, logger *zap.Logger) error { + for _, trx := range block.Transactions { + if trx.Meta.Err != nil { + continue + } + //p.logger.Debug("processing transaction", zap.Uint64("block_num", block.Slot), zap.String("trx_id", base58.Encode(trx.Transaction.Signatures[0]))) + err := accountsresolver.ApplyTableLookup(ctx, accountsresolver.NewStats(), block.Slot, trx, resolver, logger) + if err != nil { + return fmt.Errorf("applying table lookup at block %d: %w", block.Slot, err) + } + } + + return nil +} + +type mergeBlocksFile struct { + filename string + blockChan chan *pbsol.Block + logger *zap.Logger +} + +func newMergeBlocksFile(fileName string, logger *zap.Logger) *mergeBlocksFile { + return &mergeBlocksFile{ + filename: fileName, + blockChan: make(chan *pbsol.Block, 100), + logger: logger, + } +} + +func (f *mergeBlocksFile) BlockNumber() (uint64, error) { + return strconv.ParseUint(strings.TrimLeft(f.filename, "0"), 10, 64) +} + +func (f *mergeBlocksFile) process(ctx context.Context, sourceStore dstore.Store) error { + f.logger.Info("Processing merge block file", zap.String("filename", f.filename)) + firstBlockOfFile, err := strconv.Atoi(strings.TrimLeft(f.filename, "0")) + if err != nil { + return fmt.Errorf("converting filename to block number: %w", err) + } + + reader, err := sourceStore.OpenObject(ctx, f.filename) + if err != nil { + return fmt.Errorf("opening merge block file %s: %w", f.filename, err) + } + defer reader.Close() + + blockReader, err := bstream.GetBlockReaderFactory.New(reader) + if err != nil { + return fmt.Errorf("creating block reader for file %s: %w", f.filename, err) + } + + for { + block, err := blockReader.Read() + if err != nil { + if err == io.EOF { + close(f.blockChan) + return nil + } + return fmt.Errorf("reading block: %w", err) + } + + blk := block.ToProtocol().(*pbsol.Block) + if blk.Slot < uint64(firstBlockOfFile) { + f.logger.Info("skip block process in previous file", zap.Uint64("slot", blk.Slot)) + continue + } + + f.blockChan <- blk + } +}