Skip to content

Commit

Permalink
stagedsync: add periodic progress logs for long prunning during initi…
Browse files Browse the repository at this point in the history
…al sync (#12717)

adding periodic logging for some pruning operations because after recent
changes we may spend a lot of time in pruning - this happened to me
while doing dev work and for quite a long time it looked like my node is
stuck - there was no info in the logs on what it is doing
  • Loading branch information
taratorio authored Nov 13, 2024
1 parent 747c254 commit c7f4ff3
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 13 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error {
if err != nil {
return err
}
err = stagedsync.PruneExecutionStage(p, tx, cfg, ctx)
err = stagedsync.PruneExecutionStage(p, tx, cfg, ctx, logger)
if err != nil {
return err
}
Expand Down
12 changes: 11 additions & 1 deletion core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1204,14 +1204,17 @@ func IsPosBlock(db kv.Getter, blockHash common.Hash) (trans bool, err error) {
}

// PruneTable has `limit` parameter to avoid too large data deletes per one sync cycle - better delete by small portions to reduce db.FreeList size
func PruneTable(tx kv.RwTx, table string, pruneTo uint64, ctx context.Context, limit int, timeout time.Duration) error {
func PruneTable(tx kv.RwTx, table string, pruneTo uint64, ctx context.Context, limit int, timeout time.Duration, logger log.Logger, logPrefix string) error {
t := time.Now()
c, err := tx.RwCursor(table)
if err != nil {
return fmt.Errorf("failed to create cursor for pruning %w", err)
}
defer c.Close()

logEvery := time.NewTimer(30 * time.Second)
defer logEvery.Stop()

i := 0
for k, _, err := c.First(); k != nil; k, _, err = c.Next() {
if err != nil {
Expand All @@ -1226,6 +1229,13 @@ func PruneTable(tx kv.RwTx, table string, pruneTo uint64, ctx context.Context, l
if blockNum >= pruneTo {
break
}

select {
case <-logEvery.C:
logger.Info(fmt.Sprintf("[%s] pruning table periodic progress", logPrefix), table, "blockNum", blockNum)
default:
}

if err = c.DeleteCurrent(); err != nil {
return fmt.Errorf("failed to remove for block %d: %w", blockNum, err)
}
Expand Down
8 changes: 4 additions & 4 deletions eth/stagedsync/default_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func DefaultStages(ctx context.Context,
return UnwindExecutionStage(u, s, txc, ctx, exec, logger)
},
Prune: func(p *PruneState, tx kv.RwTx, logger log.Logger) error {
return PruneExecutionStage(p, tx, exec, ctx)
return PruneExecutionStage(p, tx, exec, ctx, logger)
},
},
//{
Expand Down Expand Up @@ -241,7 +241,7 @@ func PipelineStages(ctx context.Context, snapshots SnapshotsCfg, blockHashCfg Bl
return UnwindExecutionStage(u, s, txc, ctx, exec, logger)
},
Prune: func(p *PruneState, tx kv.RwTx, logger log.Logger) error {
return PruneExecutionStage(p, tx, exec, ctx)
return PruneExecutionStage(p, tx, exec, ctx, logger)
},
},

Expand Down Expand Up @@ -358,7 +358,7 @@ func UploaderPipelineStages(ctx context.Context, snapshots SnapshotsCfg, headers
return UnwindExecutionStage(u, s, txc, ctx, exec, logger)
},
Prune: func(p *PruneState, tx kv.RwTx, logger log.Logger) error {
return PruneExecutionStage(p, tx, exec, ctx)
return PruneExecutionStage(p, tx, exec, ctx, logger)
},
},
{
Expand Down Expand Up @@ -509,7 +509,7 @@ func PolygonSyncStages(
return UnwindExecutionStage(u, s, txc, ctx, exec, logger)
},
Prune: func(p *PruneState, tx kv.RwTx, logger log.Logger) error {
return PruneExecutionStage(p, tx, exec, ctx)
return PruneExecutionStage(p, tx, exec, ctx, logger)
},
},
{
Expand Down
16 changes: 11 additions & 5 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func unwindExecutionStage(u *UnwindState, s *StageState, txc wrap.TxContainer, c
return unwindExec3(u, s, txc, ctx, cfg.blockReader, accumulator, logger)
}

func PruneExecutionStage(s *PruneState, tx kv.RwTx, cfg ExecuteBlockCfg, ctx context.Context) (err error) {
func PruneExecutionStage(s *PruneState, tx kv.RwTx, cfg ExecuteBlockCfg, ctx context.Context, logger log.Logger) (err error) {
useExternalTx := tx != nil
if !useExternalTx {
tx, err = cfg.db.BeginRw(ctx)
Expand All @@ -407,16 +407,22 @@ func PruneExecutionStage(s *PruneState, tx kv.RwTx, cfg ExecuteBlockCfg, ctx con
pruneDiffsLimitOnChainTip = math.MaxInt
pruneTimeout = time.Hour
}
if err := rawdb.PruneTable(tx, kv.ChangeSets3, s.ForwardProgress-config3.MaxReorgDepthV3, ctx, pruneDiffsLimitOnChainTip, pruneTimeout); err != nil {
if err := rawdb.PruneTable(
tx,
kv.ChangeSets3,
s.ForwardProgress-config3.MaxReorgDepthV3,
ctx,
pruneDiffsLimitOnChainTip,
pruneTimeout,
logger,
s.LogPrefix(),
); err != nil {
return err
}
}

mxExecStepsInDB.Set(rawdbhelpers.IdxStepsCountV3(tx) * 100)

logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()

// on chain-tip:
// - can prune only between blocks (without blocking blocks processing)
// - need also leave some time to prune blocks
Expand Down
9 changes: 9 additions & 0 deletions eth/stagedsync/stage_txlookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,18 @@ func PruneTxLookup(s *PruneState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Conte
}

if blockFrom < blockTo {
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()

t := time.Now()
var pruneBlockNum = blockFrom
for ; pruneBlockNum < blockTo; pruneBlockNum++ {
select {
case <-logEvery.C:
logger.Info(fmt.Sprintf("[%s] pruning tx lookup periodic progress", logPrefix), "blockNum", pruneBlockNum)
default:
}

err = deleteTxLookupRange(tx, logPrefix, pruneBlockNum, pruneBlockNum+1, ctx, cfg, logger)
if err != nil {
return fmt.Errorf("prune TxLookUp: %w", err)
Expand Down
6 changes: 4 additions & 2 deletions turbo/jsonrpc/eth_call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/erigontech/erigon-lib/kv/kvcache"
"github.com/erigontech/erigon-lib/kv/rawdbv3"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/turbo/testlog"

"github.com/erigontech/erigon/cmd/rpcdaemon/rpcdaemontest"
"github.com/erigontech/erigon/core"
Expand Down Expand Up @@ -565,6 +566,7 @@ func chainWithDeployedContract(t *testing.T) (*mock.MockSentry, libcommon.Addres

func doPrune(t *testing.T, db kv.RwDB, pruneTo uint64) {
ctx := context.Background()
logger := testlog.Logger(t, log.LvlCrit)
tx, err := db.BeginRw(ctx)
assert.NoError(t, err)

Expand All @@ -576,10 +578,10 @@ func doPrune(t *testing.T, db kv.RwDB, pruneTo uint64) {
err = rawdb.PruneTableDupSort(tx, kv.StorageChangeSet, "", pruneTo, logEvery, ctx)
assert.NoError(t, err)

err = rawdb.PruneTable(tx, kv.Receipts, pruneTo, ctx, math.MaxInt32, time.Hour)
err = rawdb.PruneTable(tx, kv.Receipts, pruneTo, ctx, math.MaxInt32, time.Hour, logger, "")
assert.NoError(t, err)

err = rawdb.PruneTable(tx, kv.Log, pruneTo, ctx, math.MaxInt32, time.Hour)
err = rawdb.PruneTable(tx, kv.Log, pruneTo, ctx, math.MaxInt32, time.Hour, logger, "")
assert.NoError(t, err)

err = rawdb.PruneTableDupSort(tx, kv.CallTraceSet, "", pruneTo, logEvery, ctx)
Expand Down

0 comments on commit c7f4ff3

Please sign in to comment.