Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache the start key for message pruning and use latest confirmed #1757

Merged
merged 5 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions arbnode/message_pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (

type MessagePruner struct {
stopwaiter.StopWaiter
transactionStreamer *TransactionStreamer
inboxTracker *InboxTracker
config MessagePrunerConfigFetcher
pruningLock sync.Mutex
lastPruneDone time.Time
transactionStreamer *TransactionStreamer
inboxTracker *InboxTracker
config MessagePrunerConfigFetcher
pruningLock sync.Mutex
lastPruneDone time.Time
cachedPrunedMessages uint64
cachedPrunedDelayedMessages uint64
}

type MessagePrunerConfig struct {
Expand Down Expand Up @@ -62,7 +64,7 @@ func (m *MessagePruner) Start(ctxIn context.Context) {
m.StopWaiter.Start(ctxIn, m)
}

func (m *MessagePruner) UpdateLatestStaked(count arbutil.MessageIndex, globalState validator.GoGlobalState) {
func (m *MessagePruner) UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState) {
locked := m.pruningLock.TryLock()
if !locked {
return
Expand Down Expand Up @@ -108,19 +110,19 @@ func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, g
msgCount := endBatchMetadata.MessageCount
delayedCount := endBatchMetadata.DelayedMessageCount

return deleteOldMessageFromDB(ctx, msgCount, delayedCount, m.inboxTracker.db, m.transactionStreamer.db)
return m.deleteOldMessagesFromDB(ctx, msgCount, delayedCount)
}

func deleteOldMessageFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64, inboxTrackerDb ethdb.Database, transactionStreamerDb ethdb.Database) error {
prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, transactionStreamerDb, messagePrefix, uint64(messageCount))
func (m *MessagePruner) deleteOldMessagesFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64) error {
prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, &m.cachedPrunedMessages, uint64(messageCount))
if err != nil {
return fmt.Errorf("error deleting last batch messages: %w", err)
}
if len(prunedKeysRange) > 0 {
log.Info("Pruned last batch messages:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1])
}

prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, inboxTrackerDb, rlpDelayedMessagePrefix, delayedMessageCount)
prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.inboxTracker.db, rlpDelayedMessagePrefix, &m.cachedPrunedDelayedMessages, delayedMessageCount)
if err != nil {
return fmt.Errorf("error deleting last batch delayed messages: %w", err)
}
Expand All @@ -130,15 +132,25 @@ func deleteOldMessageFromDB(ctx context.Context, messageCount arbutil.MessageInd
return nil
}

func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, endMinKey uint64) ([]uint64, error) {
startIter := db.NewIterator(prefix, uint64ToKey(1))
if !startIter.Next() {
// deleteFromLastPrunedUptoEndKey is similar to deleteFromRange but automatically populates the start key
// cachedStartMinKey must not be nil. It's set to the new start key at the end of this function if successful.
func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, cachedStartMinKey *uint64, endMinKey uint64) ([]uint64, error) {
startMinKey := *cachedStartMinKey
if startMinKey == 0 {
startIter := db.NewIterator(prefix, uint64ToKey(1))
if !startIter.Next() {
return nil, nil
}
startMinKey = binary.BigEndian.Uint64(bytes.TrimPrefix(startIter.Key(), prefix))
startIter.Release()
}
if endMinKey <= startMinKey {
*cachedStartMinKey = startMinKey
return nil, nil
}
startMinKey := binary.BigEndian.Uint64(bytes.TrimPrefix(startIter.Key(), prefix))
startIter.Release()
if endMinKey > startMinKey {
return deleteFromRange(ctx, db, prefix, startMinKey, endMinKey-1)
keys, err := deleteFromRange(ctx, db, prefix, startMinKey, endMinKey-1)
if err == nil {
*cachedStartMinKey = endMinKey - 1
}
return nil, nil
return keys, err
}
38 changes: 20 additions & 18 deletions arbnode/message_pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,30 @@ func TestMessagePrunerWithPruningEligibleMessagePresent(t *testing.T) {
defer cancel()

messagesCount := uint64(2 * 100 * 1024)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, 2*100*1024, 2*100*1024)
err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, 2*100*1024, 2*100*1024)
err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)

checkDbKeys(t, messagesCount, transactionStreamerDb, messagePrefix)
checkDbKeys(t, messagesCount, inboxTrackerDb, rlpDelayedMessagePrefix)

}

func TestMessagePrunerTraverseEachMessageOnlyOnce(t *testing.T) {
func TestMessagePrunerTwoHalves(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

messagesCount := uint64(10)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, messagesCount, messagesCount)
// In first iteration message till messagesCount are tried to be deleted.
err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
_, transactionStreamerDb, pruner := setupDatabase(t, messagesCount, messagesCount)
// In first iteration message till messagesCount/2 are tried to be deleted.
err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount/2), messagesCount/2)
Require(t, err)
// After first iteration messagesCount/2 is reinserted in inbox db
err = inboxTrackerDb.Put(dbKey(messagePrefix, messagesCount/2), []byte{})
// In first iteration all the message till messagesCount/2 are deleted.
checkDbKeys(t, messagesCount/2, transactionStreamerDb, messagePrefix)
// In second iteration message till messagesCount are tried to be deleted.
err = pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)
// In second iteration message till messagesCount are again tried to be deleted.
err = deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
Require(t, err)
// In second iteration all the message till messagesCount are deleted again.
// In second iteration all the message till messagesCount are deleted.
checkDbKeys(t, messagesCount, transactionStreamerDb, messagePrefix)
}

Expand All @@ -50,10 +49,10 @@ func TestMessagePrunerPruneTillLessThenEqualTo(t *testing.T) {
defer cancel()

messagesCount := uint64(10)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, 2*messagesCount, 20)
inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, 2*messagesCount, 20)
err := inboxTrackerDb.Delete(dbKey(messagePrefix, 9))
Require(t, err)
err = deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
err = pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)
hasKey, err := transactionStreamerDb.Has(dbKey(messagePrefix, messagesCount))
Require(t, err)
Expand All @@ -67,16 +66,16 @@ func TestMessagePrunerWithNoPruningEligibleMessagePresent(t *testing.T) {
defer cancel()

messagesCount := uint64(10)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, messagesCount, messagesCount)
err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, messagesCount, messagesCount)
err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)

checkDbKeys(t, uint64(messagesCount), transactionStreamerDb, messagePrefix)
checkDbKeys(t, messagesCount, inboxTrackerDb, rlpDelayedMessagePrefix)

}

func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethdb.Database, ethdb.Database) {
func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethdb.Database, ethdb.Database, *MessagePruner) {

transactionStreamerDb := rawdb.NewMemoryDatabase()
for i := uint64(0); i < uint64(messageCount); i++ {
Expand All @@ -90,7 +89,10 @@ func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethd
Require(t, err)
}

return inboxTrackerDb, transactionStreamerDb
return inboxTrackerDb, transactionStreamerDb, &MessagePruner{
transactionStreamer: &TransactionStreamer{db: transactionStreamerDb},
inboxTracker: &InboxTracker{db: inboxTrackerDb},
}
}

func checkDbKeys(t *testing.T, endCount uint64, db ethdb.Database, prefix []byte) {
Expand Down
6 changes: 3 additions & 3 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,13 +825,13 @@ func createNodeImpl(
}
}

notifiers := make([]staker.LatestStakedNotifier, 0)
var confirmedNotifiers []staker.LatestConfirmedNotifier
if config.MessagePruner.Enable && !config.Caching.Archive {
messagePruner = NewMessagePruner(txStreamer, inboxTracker, func() *MessagePrunerConfig { return &configFetcher.Get().MessagePruner })
notifiers = append(notifiers, messagePruner)
confirmedNotifiers = append(confirmedNotifiers, messagePruner)
}

stakerObj, err = staker.NewStaker(l1Reader, wallet, bind.CallOpts{}, config.Staker, blockValidator, statelessBlockValidator, notifiers, deployInfo.ValidatorUtils, fatalErrChan)
stakerObj, err = staker.NewStaker(l1Reader, wallet, bind.CallOpts{}, config.Staker, blockValidator, statelessBlockValidator, nil, confirmedNotifiers, deployInfo.ValidatorUtils, fatalErrChan)
if err != nil {
return nil, err
}
Expand Down
102 changes: 79 additions & 23 deletions staker/staker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rpc"
flag "github.com/spf13/pflag"

"github.com/offchainlabs/nitro/arbutil"
Expand All @@ -30,6 +31,7 @@ var (
stakerBalanceGauge = metrics.NewRegisteredGaugeFloat64("arb/staker/balance", nil)
stakerAmountStakedGauge = metrics.NewRegisteredGauge("arb/staker/amount_staked", nil)
stakerLatestStakedNodeGauge = metrics.NewRegisteredGauge("arb/staker/staked_node", nil)
stakerLatestConfirmedNodeGauge = metrics.NewRegisteredGauge("arb/staker/confirmed_node", nil)
stakerLastSuccessfulActionGauge = metrics.NewRegisteredGauge("arb/staker/action/last_success", nil)
stakerActionSuccessCounter = metrics.NewRegisteredCounter("arb/staker/action/success", nil)
stakerActionFailureCounter = metrics.NewRegisteredCounter("arb/staker/action/failure", nil)
Expand Down Expand Up @@ -195,11 +197,16 @@ type LatestStakedNotifier interface {
UpdateLatestStaked(count arbutil.MessageIndex, globalState validator.GoGlobalState)
}

type LatestConfirmedNotifier interface {
UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState)
}

type Staker struct {
*L1Validator
stopwaiter.StopWaiter
l1Reader L1ReaderInterface
notifiers []LatestStakedNotifier
stakedNotifiers []LatestStakedNotifier
confirmedNotifiers []LatestConfirmedNotifier
activeChallenge *ChallengeManager
baseCallOpts bind.CallOpts
config L1ValidatorConfig
Expand All @@ -219,7 +226,8 @@ func NewStaker(
config L1ValidatorConfig,
blockValidator *BlockValidator,
statelessBlockValidator *StatelessBlockValidator,
notifiers []LatestStakedNotifier,
stakedNotifiers []LatestStakedNotifier,
confirmedNotifiers []LatestConfirmedNotifier,
validatorUtilsAddress common.Address,
fatalErr chan<- error,
) (*Staker, error) {
Expand All @@ -235,12 +243,13 @@ func NewStaker(
}
stakerLastSuccessfulActionGauge.Update(time.Now().Unix())
if config.StartFromStaked {
notifiers = append(notifiers, blockValidator)
stakedNotifiers = append(stakedNotifiers, blockValidator)
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
}
return &Staker{
L1Validator: val,
l1Reader: l1Reader,
notifiers: notifiers,
stakedNotifiers: stakedNotifiers,
confirmedNotifiers: confirmedNotifiers,
baseCallOpts: callOpts,
config: config,
highGasBlocksBuffer: big.NewInt(config.L1PostingStrategy.HighGasDelayBlocks),
Expand Down Expand Up @@ -280,8 +289,42 @@ func (s *Staker) Initialize(ctx context.Context) error {
return nil
}

func (s *Staker) latestNodeDetailsForUpdate(ctx context.Context, description string, node uint64) (arbutil.MessageIndex, *validator.GoGlobalState, error) {
stakedInfo, err := s.rollup.LookupNode(ctx, node)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could merge more code by using the fact that LatestConfirmed is identical to LatestStaked when getting zeros for address.
Could even test if walletOrZero is zeroes and in that case skip checking latestStaked (instead - use what you have for latestConfirmed)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've unified the functions. Let me know what you think. I spent a while thinking about how to do this and my current approach is the best I came up with, but there might be a slightly cleaner way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I didn't want to do is have LatestStakedNotifier and LatestConfirmedNotifier be the same interface, because I could imagine some components wanting both, and I don't want them to be confusable (looking at a component it should be clear whether it gets the latest staked or confirmed info).

if err != nil {
return 0, nil, fmt.Errorf("couldn't look up latest %v assertion %v: %w", description, node, err)
}

globalState := stakedInfo.AfterState().GlobalState
caughtUp, count, err := GlobalStateToMsgCount(s.inboxTracker, s.txStreamer, globalState)
if err != nil {
if errors.Is(err, ErrGlobalStateNotInChain) && s.fatalErr != nil {
fatal := fmt.Errorf("latest %v assertion %v not in chain: %w", description, node, err)
s.fatalErr <- fatal
}
return 0, nil, fmt.Errorf("latest %v assertion %v: %w", description, node, err)
}

if !caughtUp {
log.Info(fmt.Sprintf("latest %v assertion not yet in our node", description), "assertion", node, "state", globalState)
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
return 0, nil, nil
}

processedCount, err := s.txStreamer.GetProcessedMessageCount()
if err != nil {
return 0, nil, err
}

if processedCount < count {
log.Info("execution catching up to rollup", "lookingFor", description, "rollupCount", count, "processedCount", processedCount)
return 0, nil, nil
}

return count, &globalState, nil
}

func (s *Staker) checkLatestStaked(ctx context.Context) error {
latestStaked, _, err := s.validatorUtils.LatestStaked(&s.baseCallOpts, s.rollupAddress, s.wallet.AddressOrZero())
latestStaked, _, err := s.validatorUtils.LatestStaked(s.getCallOpts(ctx), s.rollupAddress, s.wallet.AddressOrZero())
if err != nil {
return fmt.Errorf("couldn't get LatestStaked: %w", err)
}
Expand All @@ -290,38 +333,44 @@ func (s *Staker) checkLatestStaked(ctx context.Context) error {
return nil
}

stakedInfo, err := s.rollup.LookupNode(ctx, latestStaked)
count, globalState, err := s.latestNodeDetailsForUpdate(ctx, "staked", latestStaked)
if err != nil {
return fmt.Errorf("couldn't look up latest node: %w", err)
return err
}
if globalState == nil {
return nil
}

stakedGlobalState := stakedInfo.AfterState().GlobalState
caughtUp, count, err := GlobalStateToMsgCount(s.inboxTracker, s.txStreamer, stakedGlobalState)
if err != nil {
if errors.Is(err, ErrGlobalStateNotInChain) && s.fatalErr != nil {
fatal := fmt.Errorf("latest staked not in chain: %w", err)
s.fatalErr <- fatal
}
return fmt.Errorf("staker: latest staked %w", err)
for _, notifier := range s.stakedNotifiers {
notifier.UpdateLatestStaked(count, *globalState)
}
return nil
}

if !caughtUp {
log.Info("latest valid not yet in our node", "staked", stakedGlobalState)
func (s *Staker) checkLatestConfirmed(ctx context.Context) error {
callOpts := s.getCallOpts(ctx)
if s.l1Reader.UseFinalityData() {
callOpts.BlockNumber = big.NewInt(int64(rpc.FinalizedBlockNumber))
}
latestConfirmed, err := s.rollup.LatestConfirmed(callOpts)
if err != nil {
return fmt.Errorf("couldn't get LatestConfirmed: %w", err)
}
stakerLatestConfirmedNodeGauge.Update(int64(latestConfirmed))
if latestConfirmed == 0 {
return nil
}

processedCount, err := s.txStreamer.GetProcessedMessageCount()
count, globalState, err := s.latestNodeDetailsForUpdate(ctx, "confirmed", latestConfirmed)
if err != nil {
return err
}

if processedCount < count {
log.Info("execution catching up to last validated", "validatedCount", count, "processedCount", processedCount)
if globalState == nil {
return nil
}

for _, notifier := range s.notifiers {
notifier.UpdateLatestStaked(count, stakedGlobalState)
for _, notifier := range s.confirmedNotifiers {
notifier.UpdateLatestConfirmed(count, *globalState)
}
return nil
}
Expand Down Expand Up @@ -387,6 +436,13 @@ func (s *Staker) Start(ctxIn context.Context) {
}
return s.config.StakerInterval
})
s.CallIteratively(func(ctx context.Context) time.Duration {
err := s.checkLatestConfirmed(ctx)
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
if err != nil && ctx.Err() == nil {
log.Error("staker: error checking latest confirmed", "err", err)
}
return s.config.StakerInterval
})
}

func (s *Staker) IsWhitelisted(ctx context.Context) (bool, error) {
Expand Down
1 change: 1 addition & 0 deletions staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type L1ReaderInterface interface {
Client() arbutil.L1Interface
Subscribe(bool) (<-chan *types.Header, func())
WaitForTxApproval(ctx context.Context, tx *types.Transaction) (*types.Receipt, error)
UseFinalityData() bool
}

type GlobalStatePosition struct {
Expand Down
Loading
Loading