diff --git a/core/rawdb/accessors_chain_zkevm.go b/core/rawdb/accessors_chain_zkevm.go index 9d3be52e1b8..8742b917b48 100644 --- a/core/rawdb/accessors_chain_zkevm.go +++ b/core/rawdb/accessors_chain_zkevm.go @@ -11,6 +11,15 @@ import ( "github.com/ledgerwatch/erigon/rlp" ) +func DeleteCumulativeGasUsed(tx kv.RwTx, blockFrom uint64) error { + if err := tx.ForEach(kv.CumulativeGasIndex, hexutility.EncodeTs(blockFrom), func(k, v []byte) error { + return tx.Delete(kv.CumulativeGasIndex, k) + }); err != nil { + return fmt.Errorf("TruncateCanonicalHash: %w", err) + } + return nil +} + func DeleteTransactions(db kv.RwTx, txsCount, baseTxId uint64, blockHash *libcommon.Hash) error { for id := baseTxId; id < baseTxId+txsCount; id++ { txIdKey := make([]byte, 8) diff --git a/eth/stagedsync/stage_cumulative_index.go b/eth/stagedsync/stage_cumulative_index.go index 231f6bac6c5..d310d9d409e 100644 --- a/eth/stagedsync/stage_cumulative_index.go +++ b/eth/stagedsync/stage_cumulative_index.go @@ -48,12 +48,13 @@ func SpawnStageCumulativeIndex(cfg CumulativeIndexCfg, s *StageState, tx kv.RwTx logEvery := time.NewTicker(logInterval) defer logEvery.Stop() - headNumber, err := stages.GetStageProgress(tx, stages.Headers) + // we can do this till the last block executed, since we get gas used from there + executeProgress, err := stages.GetStageProgress(tx, stages.Execution) if err != nil { - return fmt.Errorf("getting bodies progress: %w", err) + return fmt.Errorf("getting progress: %w", err) } // If we are done already, we can exit the stage - if s.BlockNumber == headNumber { + if s.BlockNumber == executeProgress { log.Info(fmt.Sprintf("[%s] Nothing new to process", logPrefix)) return nil } @@ -69,8 +70,8 @@ func SpawnStageCumulativeIndex(cfg CumulativeIndexCfg, s *StageState, tx kv.RwTx return err } - currentBlockNumber := s.BlockNumber + 1 - for k, v, err := headerC.Seek(hexutility.EncodeTs(s.BlockNumber)); k != nil; k, v, err = headerC.Next() { + currentBlockNumber := s.BlockNumber + for k, v, err := headerC.Seek(hexutility.EncodeTs(s.BlockNumber + 1)); k != nil; k, v, err = headerC.Next() { if err != nil { return err } @@ -79,6 +80,7 @@ func SpawnStageCumulativeIndex(cfg CumulativeIndexCfg, s *StageState, tx kv.RwTx continue } + blockHash := libcommon.BytesToHash(k[8:]) blockNumber, err := dbutils.DecodeBlockNumber(k[:8]) if err != nil { return err @@ -89,7 +91,7 @@ func SpawnStageCumulativeIndex(cfg CumulativeIndexCfg, s *StageState, tx kv.RwTx return err } - if canonicalHash != libcommon.BytesToHash(k[8:]) { + if canonicalHash != blockHash { continue } @@ -99,7 +101,9 @@ func SpawnStageCumulativeIndex(cfg CumulativeIndexCfg, s *StageState, tx kv.RwTx if err := rlp.Decode(bytes.NewReader(v), &header); err != nil { return err } - cumulativeGasUsed.Add(cumulativeGasUsed, big.NewInt(int64(header.GasUsed))) + + blockGasUsedBig := new(big.Int).SetUint64(header.GasUsed) + cumulativeGasUsed.Add(cumulativeGasUsed, blockGasUsedBig) if err := rawdb.WriteCumulativeGasUsed(tx, currentBlockNumber, cumulativeGasUsed); err != nil { return err @@ -137,6 +141,10 @@ func UnwindCumulativeIndexStage(u *UnwindState, cfg CumulativeIndexCfg, tx kv.Rw defer tx.Rollback() } + if err := rawdb.DeleteCumulativeGasUsed(tx, u.UnwindPoint); err != nil { + return fmt.Errorf("failed to delete cumulative gas used: %w", err) + } + if err = u.Done(tx); err != nil { return fmt.Errorf(" reset: %w", err) } @@ -153,6 +161,14 @@ func UnwindCumulativeIndexStage(u *UnwindState, cfg CumulativeIndexCfg, tx kv.Rw func PruneCumulativeIndexStage(p *PruneState, tx kv.RwTx, ctx context.Context) (err error) { useExternalTx := tx != nil + if err := rawdb.DeleteCumulativeGasUsed(tx, 0); err != nil { + return fmt.Errorf("failed to delete cumulative gas used: %w", err) + } + + if err := p.Done(tx); err != nil { + return fmt.Errorf(" reset: %w", err) + } + if !useExternalTx { if err = tx.Commit(); err != nil { return err diff --git a/zk/stages/stages.go b/zk/stages/stages.go index e1648ed2e24..6168c9e7825 100644 --- a/zk/stages/stages.go +++ b/zk/stages/stages.go @@ -288,24 +288,6 @@ func DefaultZkStages( return PruneBatchesStage(p, tx, batchesCfg, ctx) }, }, - { - /* - * FIXME: this stage doesn't work since the "headers" we have in the datastream don't have any gasUsed, it's always 0. - * - * to solve this we probably should move it after execution (execution doesn't depend on it) and update the unwinds. - **/ - ID: stages2.CumulativeIndex, - Description: "Write Cumulative Index", - Forward: func(firstCycle bool, badBlockUnwind bool, s *stages.StageState, u stages.Unwinder, tx kv.RwTx, quiet bool) error { - return stagedsync.SpawnStageCumulativeIndex(cumulativeIndex, s, tx, ctx) - }, - Unwind: func(firstCycle bool, u *stages.UnwindState, s *stages.StageState, tx kv.RwTx) error { - return stagedsync.UnwindCumulativeIndexStage(u, cumulativeIndex, tx, ctx) - }, - Prune: func(firstCycle bool, p *stages.PruneState, tx kv.RwTx) error { - return stagedsync.PruneCumulativeIndexStage(p, tx, ctx) - }, - }, { ID: stages2.BlockHashes, Description: "Write block hashes", @@ -344,6 +326,18 @@ func DefaultZkStages( Prune: func(firstCycle bool, p *stages.PruneState, tx kv.RwTx) error { return stagedsync.PruneExecutionStageZk(p, tx, exec, ctx, firstCycle) }, + }, { + ID: stages2.CumulativeIndex, + Description: "Write Cumulative Index", + Forward: func(firstCycle bool, badBlockUnwind bool, s *stages.StageState, u stages.Unwinder, tx kv.RwTx, quiet bool) error { + return stagedsync.SpawnStageCumulativeIndex(cumulativeIndex, s, tx, ctx) + }, + Unwind: func(firstCycle bool, u *stages.UnwindState, s *stages.StageState, tx kv.RwTx) error { + return stagedsync.UnwindCumulativeIndexStage(u, cumulativeIndex, tx, ctx) + }, + Prune: func(firstCycle bool, p *stages.PruneState, tx kv.RwTx) error { + return stagedsync.PruneCumulativeIndexStage(p, tx, ctx) + }, }, { ID: stages2.HashState,