Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache the start key for message pruning and use latest confirmed #1757

Merged
merged 5 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions arbnode/message_pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (

type MessagePruner struct {
stopwaiter.StopWaiter
transactionStreamer *TransactionStreamer
inboxTracker *InboxTracker
config MessagePrunerConfigFetcher
pruningLock sync.Mutex
lastPruneDone time.Time
transactionStreamer *TransactionStreamer
inboxTracker *InboxTracker
config MessagePrunerConfigFetcher
pruningLock sync.Mutex
lastPruneDone time.Time
cachedPrunedMessages uint64
cachedPrunedDelayedMessages uint64
}

type MessagePrunerConfig struct {
Expand Down Expand Up @@ -62,7 +64,7 @@ func (m *MessagePruner) Start(ctxIn context.Context) {
m.StopWaiter.Start(ctxIn, m)
}

func (m *MessagePruner) UpdateLatestStaked(count arbutil.MessageIndex, globalState validator.GoGlobalState) {
func (m *MessagePruner) UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState) {
locked := m.pruningLock.TryLock()
if !locked {
return
Expand Down Expand Up @@ -108,19 +110,19 @@ func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, g
msgCount := endBatchMetadata.MessageCount
delayedCount := endBatchMetadata.DelayedMessageCount

return deleteOldMessageFromDB(ctx, msgCount, delayedCount, m.inboxTracker.db, m.transactionStreamer.db)
return m.deleteOldMessagesFromDB(ctx, msgCount, delayedCount)
}

func deleteOldMessageFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64, inboxTrackerDb ethdb.Database, transactionStreamerDb ethdb.Database) error {
prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, transactionStreamerDb, messagePrefix, uint64(messageCount))
func (m *MessagePruner) deleteOldMessagesFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64) error {
prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, &m.cachedPrunedMessages, uint64(messageCount))
if err != nil {
return fmt.Errorf("error deleting last batch messages: %w", err)
}
if len(prunedKeysRange) > 0 {
log.Info("Pruned last batch messages:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1])
}

prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, inboxTrackerDb, rlpDelayedMessagePrefix, delayedMessageCount)
prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.inboxTracker.db, rlpDelayedMessagePrefix, &m.cachedPrunedDelayedMessages, delayedMessageCount)
if err != nil {
return fmt.Errorf("error deleting last batch delayed messages: %w", err)
}
Expand All @@ -130,15 +132,25 @@ func deleteOldMessageFromDB(ctx context.Context, messageCount arbutil.MessageInd
return nil
}

func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, endMinKey uint64) ([]uint64, error) {
startIter := db.NewIterator(prefix, uint64ToKey(1))
if !startIter.Next() {
// deleteFromLastPrunedUptoEndKey is similar to deleteFromRange but automatically populates the start key
// cachedStartMinKey must not be nil. It's set to the new start key at the end of this function if successful.
func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, cachedStartMinKey *uint64, endMinKey uint64) ([]uint64, error) {
startMinKey := *cachedStartMinKey
if startMinKey == 0 {
startIter := db.NewIterator(prefix, uint64ToKey(1))
if !startIter.Next() {
return nil, nil
}
startMinKey = binary.BigEndian.Uint64(bytes.TrimPrefix(startIter.Key(), prefix))
startIter.Release()
}
if endMinKey <= startMinKey {
*cachedStartMinKey = startMinKey
return nil, nil
}
startMinKey := binary.BigEndian.Uint64(bytes.TrimPrefix(startIter.Key(), prefix))
startIter.Release()
if endMinKey > startMinKey {
return deleteFromRange(ctx, db, prefix, startMinKey, endMinKey-1)
keys, err := deleteFromRange(ctx, db, prefix, startMinKey, endMinKey-1)
if err == nil {
*cachedStartMinKey = endMinKey - 1
}
return nil, nil
return keys, err
}
38 changes: 20 additions & 18 deletions arbnode/message_pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,30 @@ func TestMessagePrunerWithPruningEligibleMessagePresent(t *testing.T) {
defer cancel()

messagesCount := uint64(2 * 100 * 1024)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, 2*100*1024, 2*100*1024)
err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, 2*100*1024, 2*100*1024)
err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)

checkDbKeys(t, messagesCount, transactionStreamerDb, messagePrefix)
checkDbKeys(t, messagesCount, inboxTrackerDb, rlpDelayedMessagePrefix)

}

func TestMessagePrunerTraverseEachMessageOnlyOnce(t *testing.T) {
func TestMessagePrunerTwoHalves(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

messagesCount := uint64(10)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, messagesCount, messagesCount)
// In first iteration message till messagesCount are tried to be deleted.
err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
_, transactionStreamerDb, pruner := setupDatabase(t, messagesCount, messagesCount)
// In first iteration message till messagesCount/2 are tried to be deleted.
err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount/2), messagesCount/2)
Require(t, err)
// After first iteration messagesCount/2 is reinserted in inbox db
err = inboxTrackerDb.Put(dbKey(messagePrefix, messagesCount/2), []byte{})
// In first iteration all the message till messagesCount/2 are deleted.
checkDbKeys(t, messagesCount/2, transactionStreamerDb, messagePrefix)
// In second iteration message till messagesCount are tried to be deleted.
err = pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)
// In second iteration message till messagesCount are again tried to be deleted.
err = deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
Require(t, err)
// In second iteration all the message till messagesCount are deleted again.
// In second iteration all the message till messagesCount are deleted.
checkDbKeys(t, messagesCount, transactionStreamerDb, messagePrefix)
}

Expand All @@ -50,10 +49,10 @@ func TestMessagePrunerPruneTillLessThenEqualTo(t *testing.T) {
defer cancel()

messagesCount := uint64(10)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, 2*messagesCount, 20)
inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, 2*messagesCount, 20)
err := inboxTrackerDb.Delete(dbKey(messagePrefix, 9))
Require(t, err)
err = deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
err = pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)
hasKey, err := transactionStreamerDb.Has(dbKey(messagePrefix, messagesCount))
Require(t, err)
Expand All @@ -67,16 +66,16 @@ func TestMessagePrunerWithNoPruningEligibleMessagePresent(t *testing.T) {
defer cancel()

messagesCount := uint64(10)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, messagesCount, messagesCount)
err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, messagesCount, messagesCount)
err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)

checkDbKeys(t, uint64(messagesCount), transactionStreamerDb, messagePrefix)
checkDbKeys(t, messagesCount, inboxTrackerDb, rlpDelayedMessagePrefix)

}

func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethdb.Database, ethdb.Database) {
func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethdb.Database, ethdb.Database, *MessagePruner) {

transactionStreamerDb := rawdb.NewMemoryDatabase()
for i := uint64(0); i < uint64(messageCount); i++ {
Expand All @@ -90,7 +89,10 @@ func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethd
Require(t, err)
}

return inboxTrackerDb, transactionStreamerDb
return inboxTrackerDb, transactionStreamerDb, &MessagePruner{
transactionStreamer: &TransactionStreamer{db: transactionStreamerDb},
inboxTracker: &InboxTracker{db: inboxTrackerDb},
}
}

func checkDbKeys(t *testing.T, endCount uint64, db ethdb.Database, prefix []byte) {
Expand Down
6 changes: 3 additions & 3 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,13 +825,13 @@ func createNodeImpl(
}
}

notifiers := make([]staker.LatestStakedNotifier, 0)
var confirmedNotifiers []staker.LatestConfirmedNotifier
if config.MessagePruner.Enable && !config.Caching.Archive {
messagePruner = NewMessagePruner(txStreamer, inboxTracker, func() *MessagePrunerConfig { return &configFetcher.Get().MessagePruner })
notifiers = append(notifiers, messagePruner)
confirmedNotifiers = append(confirmedNotifiers, messagePruner)
}

stakerObj, err = staker.NewStaker(l1Reader, wallet, bind.CallOpts{}, config.Staker, blockValidator, statelessBlockValidator, notifiers, deployInfo.ValidatorUtils, fatalErrChan)
stakerObj, err = staker.NewStaker(l1Reader, wallet, bind.CallOpts{}, config.Staker, blockValidator, statelessBlockValidator, nil, confirmedNotifiers, deployInfo.ValidatorUtils, fatalErrChan)
if err != nil {
return nil, err
}
Expand Down
81 changes: 56 additions & 25 deletions staker/staker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rpc"
flag "github.com/spf13/pflag"

"github.com/offchainlabs/nitro/arbutil"
Expand All @@ -30,6 +31,7 @@ var (
stakerBalanceGauge = metrics.NewRegisteredGaugeFloat64("arb/staker/balance", nil)
stakerAmountStakedGauge = metrics.NewRegisteredGauge("arb/staker/amount_staked", nil)
stakerLatestStakedNodeGauge = metrics.NewRegisteredGauge("arb/staker/staked_node", nil)
stakerLatestConfirmedNodeGauge = metrics.NewRegisteredGauge("arb/staker/confirmed_node", nil)
stakerLastSuccessfulActionGauge = metrics.NewRegisteredGauge("arb/staker/action/last_success", nil)
stakerActionSuccessCounter = metrics.NewRegisteredCounter("arb/staker/action/success", nil)
stakerActionFailureCounter = metrics.NewRegisteredCounter("arb/staker/action/failure", nil)
Expand Down Expand Up @@ -195,11 +197,16 @@ type LatestStakedNotifier interface {
UpdateLatestStaked(count arbutil.MessageIndex, globalState validator.GoGlobalState)
}

type LatestConfirmedNotifier interface {
UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState)
}

type Staker struct {
*L1Validator
stopwaiter.StopWaiter
l1Reader L1ReaderInterface
notifiers []LatestStakedNotifier
stakedNotifiers []LatestStakedNotifier
confirmedNotifiers []LatestConfirmedNotifier
activeChallenge *ChallengeManager
baseCallOpts bind.CallOpts
config L1ValidatorConfig
Expand All @@ -219,7 +226,8 @@ func NewStaker(
config L1ValidatorConfig,
blockValidator *BlockValidator,
statelessBlockValidator *StatelessBlockValidator,
notifiers []LatestStakedNotifier,
stakedNotifiers []LatestStakedNotifier,
confirmedNotifiers []LatestConfirmedNotifier,
validatorUtilsAddress common.Address,
fatalErr chan<- error,
) (*Staker, error) {
Expand All @@ -234,13 +242,14 @@ func NewStaker(
return nil, err
}
stakerLastSuccessfulActionGauge.Update(time.Now().Unix())
if config.StartFromStaked {
notifiers = append(notifiers, blockValidator)
if config.StartFromStaked && blockValidator != nil {
stakedNotifiers = append(stakedNotifiers, blockValidator)
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
}
return &Staker{
L1Validator: val,
l1Reader: l1Reader,
notifiers: notifiers,
stakedNotifiers: stakedNotifiers,
confirmedNotifiers: confirmedNotifiers,
baseCallOpts: callOpts,
config: config,
highGasBlocksBuffer: big.NewInt(config.L1PostingStrategy.HighGasDelayBlocks),
Expand Down Expand Up @@ -280,50 +289,50 @@ func (s *Staker) Initialize(ctx context.Context) error {
return nil
}

func (s *Staker) checkLatestStaked(ctx context.Context) error {
latestStaked, _, err := s.validatorUtils.LatestStaked(&s.baseCallOpts, s.rollupAddress, s.wallet.AddressOrZero())
func (s *Staker) getLatestStakedState(ctx context.Context, staker common.Address) (uint64, arbutil.MessageIndex, *validator.GoGlobalState, error) {
callOpts := s.getCallOpts(ctx)
if s.l1Reader.UseFinalityData() {
callOpts.BlockNumber = big.NewInt(int64(rpc.FinalizedBlockNumber))
}
latestStaked, _, err := s.validatorUtils.LatestStaked(s.getCallOpts(ctx), s.rollupAddress, staker)
if err != nil {
return fmt.Errorf("couldn't get LatestStaked: %w", err)
return 0, 0, nil, fmt.Errorf("couldn't get LatestStaked(%v): %w", staker, err)
}
stakerLatestStakedNodeGauge.Update(int64(latestStaked))
if latestStaked == 0 {
return nil
return latestStaked, 0, nil, nil
}

stakedInfo, err := s.rollup.LookupNode(ctx, latestStaked)
if err != nil {
return fmt.Errorf("couldn't look up latest node: %w", err)
return 0, 0, nil, fmt.Errorf("couldn't look up latest assertion of %v (%v): %w", staker, latestStaked, err)
}

stakedGlobalState := stakedInfo.AfterState().GlobalState
caughtUp, count, err := GlobalStateToMsgCount(s.inboxTracker, s.txStreamer, stakedGlobalState)
globalState := stakedInfo.AfterState().GlobalState
caughtUp, count, err := GlobalStateToMsgCount(s.inboxTracker, s.txStreamer, globalState)
if err != nil {
if errors.Is(err, ErrGlobalStateNotInChain) && s.fatalErr != nil {
fatal := fmt.Errorf("latest staked not in chain: %w", err)
fatal := fmt.Errorf("latest assertion of %v (%v) not in chain: %w", staker, latestStaked, err)
s.fatalErr <- fatal
}
return fmt.Errorf("staker: latest staked %w", err)
return 0, 0, nil, fmt.Errorf("latest assertion of %v (%v): %w", staker, latestStaked, err)
}

if !caughtUp {
log.Info("latest valid not yet in our node", "staked", stakedGlobalState)
return nil
log.Info("latest assertion not yet in our node", "staker", staker, "assertion", latestStaked, "state", globalState)
return latestStaked, 0, nil, nil
}

processedCount, err := s.txStreamer.GetProcessedMessageCount()
if err != nil {
return err
return 0, 0, nil, err
}

if processedCount < count {
log.Info("execution catching up to last validated", "validatedCount", count, "processedCount", processedCount)
return nil
log.Info("execution catching up to rollup", "staker", staker, "rollupCount", count, "processedCount", processedCount)
return latestStaked, 0, nil, nil
}

for _, notifier := range s.notifiers {
notifier.UpdateLatestStaked(count, stakedGlobalState)
}
return nil
return latestStaked, count, &globalState, nil
}

func (s *Staker) Start(ctxIn context.Context) {
Expand Down Expand Up @@ -381,10 +390,32 @@ func (s *Staker) Start(ctxIn context.Context) {
return backoff
})
s.CallIteratively(func(ctx context.Context) time.Duration {
err := s.checkLatestStaked(ctx)
wallet := s.wallet.AddressOrZero()
staked, stakedMsgCount, stakedGlobalState, err := s.getLatestStakedState(ctx, wallet)
if err != nil && ctx.Err() == nil {
log.Error("staker: error checking latest staked", "err", err)
}
stakerLatestStakedNodeGauge.Update(int64(staked))
if stakedGlobalState != nil {
for _, notifier := range s.stakedNotifiers {
notifier.UpdateLatestStaked(stakedMsgCount, *stakedGlobalState)
}
}
confirmed := staked
confirmedMsgCount := stakedMsgCount
confirmedGlobalState := stakedGlobalState
if wallet != (common.Address{}) {
confirmed, confirmedMsgCount, confirmedGlobalState, err = s.getLatestStakedState(ctx, common.Address{})
if err != nil && ctx.Err() == nil {
log.Error("staker: error checking latest confirmed", "err", err)
}
}
stakerLatestConfirmedNodeGauge.Update(int64(confirmed))
if confirmedGlobalState != nil {
for _, notifier := range s.confirmedNotifiers {
notifier.UpdateLatestConfirmed(confirmedMsgCount, *confirmedGlobalState)
}
}
return s.config.StakerInterval
})
}
Expand Down
1 change: 1 addition & 0 deletions staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type L1ReaderInterface interface {
Client() arbutil.L1Interface
Subscribe(bool) (<-chan *types.Header, func())
WaitForTxApproval(ctx context.Context, tx *types.Transaction) (*types.Receipt, error)
UseFinalityData() bool
}

type GlobalStatePosition struct {
Expand Down
Loading
Loading