Skip to content

Commit

Permalink
Merge branch 'master' into cgroupsv2-resourcemanager
Browse files Browse the repository at this point in the history
  • Loading branch information
Tristan-Wilson authored Jul 17, 2023
2 parents 232ded2 + 676bef3 commit 42a4693
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 75 deletions.
48 changes: 30 additions & 18 deletions arbnode/message_pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (

type MessagePruner struct {
stopwaiter.StopWaiter
transactionStreamer *TransactionStreamer
inboxTracker *InboxTracker
config MessagePrunerConfigFetcher
pruningLock sync.Mutex
lastPruneDone time.Time
transactionStreamer *TransactionStreamer
inboxTracker *InboxTracker
config MessagePrunerConfigFetcher
pruningLock sync.Mutex
lastPruneDone time.Time
cachedPrunedMessages uint64
cachedPrunedDelayedMessages uint64
}

type MessagePrunerConfig struct {
Expand Down Expand Up @@ -62,7 +64,7 @@ func (m *MessagePruner) Start(ctxIn context.Context) {
m.StopWaiter.Start(ctxIn, m)
}

func (m *MessagePruner) UpdateLatestStaked(count arbutil.MessageIndex, globalState validator.GoGlobalState) {
func (m *MessagePruner) UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState) {
locked := m.pruningLock.TryLock()
if !locked {
return
Expand Down Expand Up @@ -108,19 +110,19 @@ func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, g
msgCount := endBatchMetadata.MessageCount
delayedCount := endBatchMetadata.DelayedMessageCount

return deleteOldMessageFromDB(ctx, msgCount, delayedCount, m.inboxTracker.db, m.transactionStreamer.db)
return m.deleteOldMessagesFromDB(ctx, msgCount, delayedCount)
}

func deleteOldMessageFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64, inboxTrackerDb ethdb.Database, transactionStreamerDb ethdb.Database) error {
prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, transactionStreamerDb, messagePrefix, uint64(messageCount))
func (m *MessagePruner) deleteOldMessagesFromDB(ctx context.Context, messageCount arbutil.MessageIndex, delayedMessageCount uint64) error {
prunedKeysRange, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, messagePrefix, &m.cachedPrunedMessages, uint64(messageCount))
if err != nil {
return fmt.Errorf("error deleting last batch messages: %w", err)
}
if len(prunedKeysRange) > 0 {
log.Info("Pruned last batch messages:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1])
}

prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, inboxTrackerDb, rlpDelayedMessagePrefix, delayedMessageCount)
prunedKeysRange, err = deleteFromLastPrunedUptoEndKey(ctx, m.inboxTracker.db, rlpDelayedMessagePrefix, &m.cachedPrunedDelayedMessages, delayedMessageCount)
if err != nil {
return fmt.Errorf("error deleting last batch delayed messages: %w", err)
}
Expand All @@ -130,15 +132,25 @@ func deleteOldMessageFromDB(ctx context.Context, messageCount arbutil.MessageInd
return nil
}

func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, endMinKey uint64) ([]uint64, error) {
startIter := db.NewIterator(prefix, uint64ToKey(1))
if !startIter.Next() {
// deleteFromLastPrunedUptoEndKey is similar to deleteFromRange but automatically populates the start key
// cachedStartMinKey must not be nil. It's set to the new start key at the end of this function if successful.
func deleteFromLastPrunedUptoEndKey(ctx context.Context, db ethdb.Database, prefix []byte, cachedStartMinKey *uint64, endMinKey uint64) ([]uint64, error) {
startMinKey := *cachedStartMinKey
if startMinKey == 0 {
startIter := db.NewIterator(prefix, uint64ToKey(1))
if !startIter.Next() {
return nil, nil
}
startMinKey = binary.BigEndian.Uint64(bytes.TrimPrefix(startIter.Key(), prefix))
startIter.Release()
}
if endMinKey <= startMinKey {
*cachedStartMinKey = startMinKey
return nil, nil
}
startMinKey := binary.BigEndian.Uint64(bytes.TrimPrefix(startIter.Key(), prefix))
startIter.Release()
if endMinKey > startMinKey {
return deleteFromRange(ctx, db, prefix, startMinKey, endMinKey-1)
keys, err := deleteFromRange(ctx, db, prefix, startMinKey, endMinKey-1)
if err == nil {
*cachedStartMinKey = endMinKey - 1
}
return nil, nil
return keys, err
}
38 changes: 20 additions & 18 deletions arbnode/message_pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,30 @@ func TestMessagePrunerWithPruningEligibleMessagePresent(t *testing.T) {
defer cancel()

messagesCount := uint64(2 * 100 * 1024)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, 2*100*1024, 2*100*1024)
err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, 2*100*1024, 2*100*1024)
err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)

checkDbKeys(t, messagesCount, transactionStreamerDb, messagePrefix)
checkDbKeys(t, messagesCount, inboxTrackerDb, rlpDelayedMessagePrefix)

}

func TestMessagePrunerTraverseEachMessageOnlyOnce(t *testing.T) {
func TestMessagePrunerTwoHalves(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

messagesCount := uint64(10)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, messagesCount, messagesCount)
// In first iteration message till messagesCount are tried to be deleted.
err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
_, transactionStreamerDb, pruner := setupDatabase(t, messagesCount, messagesCount)
// In first iteration message till messagesCount/2 are tried to be deleted.
err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount/2), messagesCount/2)
Require(t, err)
// After first iteration messagesCount/2 is reinserted in inbox db
err = inboxTrackerDb.Put(dbKey(messagePrefix, messagesCount/2), []byte{})
// In first iteration all the message till messagesCount/2 are deleted.
checkDbKeys(t, messagesCount/2, transactionStreamerDb, messagePrefix)
// In second iteration message till messagesCount are tried to be deleted.
err = pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)
// In second iteration message till messagesCount are again tried to be deleted.
err = deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
Require(t, err)
// In second iteration all the message till messagesCount are deleted again.
// In second iteration all the message till messagesCount are deleted.
checkDbKeys(t, messagesCount, transactionStreamerDb, messagePrefix)
}

Expand All @@ -50,10 +49,10 @@ func TestMessagePrunerPruneTillLessThenEqualTo(t *testing.T) {
defer cancel()

messagesCount := uint64(10)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, 2*messagesCount, 20)
inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, 2*messagesCount, 20)
err := inboxTrackerDb.Delete(dbKey(messagePrefix, 9))
Require(t, err)
err = deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
err = pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)
hasKey, err := transactionStreamerDb.Has(dbKey(messagePrefix, messagesCount))
Require(t, err)
Expand All @@ -67,16 +66,16 @@ func TestMessagePrunerWithNoPruningEligibleMessagePresent(t *testing.T) {
defer cancel()

messagesCount := uint64(10)
inboxTrackerDb, transactionStreamerDb := setupDatabase(t, messagesCount, messagesCount)
err := deleteOldMessageFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount, inboxTrackerDb, transactionStreamerDb)
inboxTrackerDb, transactionStreamerDb, pruner := setupDatabase(t, messagesCount, messagesCount)
err := pruner.deleteOldMessagesFromDB(ctx, arbutil.MessageIndex(messagesCount), messagesCount)
Require(t, err)

checkDbKeys(t, uint64(messagesCount), transactionStreamerDb, messagePrefix)
checkDbKeys(t, messagesCount, inboxTrackerDb, rlpDelayedMessagePrefix)

}

func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethdb.Database, ethdb.Database) {
func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethdb.Database, ethdb.Database, *MessagePruner) {

transactionStreamerDb := rawdb.NewMemoryDatabase()
for i := uint64(0); i < uint64(messageCount); i++ {
Expand All @@ -90,7 +89,10 @@ func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethd
Require(t, err)
}

return inboxTrackerDb, transactionStreamerDb
return inboxTrackerDb, transactionStreamerDb, &MessagePruner{
transactionStreamer: &TransactionStreamer{db: transactionStreamerDb},
inboxTracker: &InboxTracker{db: inboxTrackerDb},
}
}

func checkDbKeys(t *testing.T, endCount uint64, db ethdb.Database, prefix []byte) {
Expand Down
6 changes: 3 additions & 3 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,13 +825,13 @@ func createNodeImpl(
}
}

notifiers := make([]staker.LatestStakedNotifier, 0)
var confirmedNotifiers []staker.LatestConfirmedNotifier
if config.MessagePruner.Enable && !config.Caching.Archive {
messagePruner = NewMessagePruner(txStreamer, inboxTracker, func() *MessagePrunerConfig { return &configFetcher.Get().MessagePruner })
notifiers = append(notifiers, messagePruner)
confirmedNotifiers = append(confirmedNotifiers, messagePruner)
}

stakerObj, err = staker.NewStaker(l1Reader, wallet, bind.CallOpts{}, config.Staker, blockValidator, statelessBlockValidator, notifiers, deployInfo.ValidatorUtils, fatalErrChan)
stakerObj, err = staker.NewStaker(l1Reader, wallet, bind.CallOpts{}, config.Staker, blockValidator, statelessBlockValidator, nil, confirmedNotifiers, deployInfo.ValidatorUtils, fatalErrChan)
if err != nil {
return nil, err
}
Expand Down
81 changes: 56 additions & 25 deletions staker/staker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rpc"
flag "github.com/spf13/pflag"

"github.com/offchainlabs/nitro/arbutil"
Expand All @@ -30,6 +31,7 @@ var (
stakerBalanceGauge = metrics.NewRegisteredGaugeFloat64("arb/staker/balance", nil)
stakerAmountStakedGauge = metrics.NewRegisteredGauge("arb/staker/amount_staked", nil)
stakerLatestStakedNodeGauge = metrics.NewRegisteredGauge("arb/staker/staked_node", nil)
stakerLatestConfirmedNodeGauge = metrics.NewRegisteredGauge("arb/staker/confirmed_node", nil)
stakerLastSuccessfulActionGauge = metrics.NewRegisteredGauge("arb/staker/action/last_success", nil)
stakerActionSuccessCounter = metrics.NewRegisteredCounter("arb/staker/action/success", nil)
stakerActionFailureCounter = metrics.NewRegisteredCounter("arb/staker/action/failure", nil)
Expand Down Expand Up @@ -195,11 +197,16 @@ type LatestStakedNotifier interface {
UpdateLatestStaked(count arbutil.MessageIndex, globalState validator.GoGlobalState)
}

type LatestConfirmedNotifier interface {
UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState)
}

type Staker struct {
*L1Validator
stopwaiter.StopWaiter
l1Reader L1ReaderInterface
notifiers []LatestStakedNotifier
stakedNotifiers []LatestStakedNotifier
confirmedNotifiers []LatestConfirmedNotifier
activeChallenge *ChallengeManager
baseCallOpts bind.CallOpts
config L1ValidatorConfig
Expand All @@ -219,7 +226,8 @@ func NewStaker(
config L1ValidatorConfig,
blockValidator *BlockValidator,
statelessBlockValidator *StatelessBlockValidator,
notifiers []LatestStakedNotifier,
stakedNotifiers []LatestStakedNotifier,
confirmedNotifiers []LatestConfirmedNotifier,
validatorUtilsAddress common.Address,
fatalErr chan<- error,
) (*Staker, error) {
Expand All @@ -234,13 +242,14 @@ func NewStaker(
return nil, err
}
stakerLastSuccessfulActionGauge.Update(time.Now().Unix())
if config.StartFromStaked {
notifiers = append(notifiers, blockValidator)
if config.StartFromStaked && blockValidator != nil {
stakedNotifiers = append(stakedNotifiers, blockValidator)
}
return &Staker{
L1Validator: val,
l1Reader: l1Reader,
notifiers: notifiers,
stakedNotifiers: stakedNotifiers,
confirmedNotifiers: confirmedNotifiers,
baseCallOpts: callOpts,
config: config,
highGasBlocksBuffer: big.NewInt(config.L1PostingStrategy.HighGasDelayBlocks),
Expand Down Expand Up @@ -280,50 +289,50 @@ func (s *Staker) Initialize(ctx context.Context) error {
return nil
}

func (s *Staker) checkLatestStaked(ctx context.Context) error {
latestStaked, _, err := s.validatorUtils.LatestStaked(&s.baseCallOpts, s.rollupAddress, s.wallet.AddressOrZero())
func (s *Staker) getLatestStakedState(ctx context.Context, staker common.Address) (uint64, arbutil.MessageIndex, *validator.GoGlobalState, error) {
callOpts := s.getCallOpts(ctx)
if s.l1Reader.UseFinalityData() {
callOpts.BlockNumber = big.NewInt(int64(rpc.FinalizedBlockNumber))
}
latestStaked, _, err := s.validatorUtils.LatestStaked(s.getCallOpts(ctx), s.rollupAddress, staker)
if err != nil {
return fmt.Errorf("couldn't get LatestStaked: %w", err)
return 0, 0, nil, fmt.Errorf("couldn't get LatestStaked(%v): %w", staker, err)
}
stakerLatestStakedNodeGauge.Update(int64(latestStaked))
if latestStaked == 0 {
return nil
return latestStaked, 0, nil, nil
}

stakedInfo, err := s.rollup.LookupNode(ctx, latestStaked)
if err != nil {
return fmt.Errorf("couldn't look up latest node: %w", err)
return 0, 0, nil, fmt.Errorf("couldn't look up latest assertion of %v (%v): %w", staker, latestStaked, err)
}

stakedGlobalState := stakedInfo.AfterState().GlobalState
caughtUp, count, err := GlobalStateToMsgCount(s.inboxTracker, s.txStreamer, stakedGlobalState)
globalState := stakedInfo.AfterState().GlobalState
caughtUp, count, err := GlobalStateToMsgCount(s.inboxTracker, s.txStreamer, globalState)
if err != nil {
if errors.Is(err, ErrGlobalStateNotInChain) && s.fatalErr != nil {
fatal := fmt.Errorf("latest staked not in chain: %w", err)
fatal := fmt.Errorf("latest assertion of %v (%v) not in chain: %w", staker, latestStaked, err)
s.fatalErr <- fatal
}
return fmt.Errorf("staker: latest staked %w", err)
return 0, 0, nil, fmt.Errorf("latest assertion of %v (%v): %w", staker, latestStaked, err)
}

if !caughtUp {
log.Info("latest valid not yet in our node", "staked", stakedGlobalState)
return nil
log.Info("latest assertion not yet in our node", "staker", staker, "assertion", latestStaked, "state", globalState)
return latestStaked, 0, nil, nil
}

processedCount, err := s.txStreamer.GetProcessedMessageCount()
if err != nil {
return err
return 0, 0, nil, err
}

if processedCount < count {
log.Info("execution catching up to last validated", "validatedCount", count, "processedCount", processedCount)
return nil
log.Info("execution catching up to rollup", "staker", staker, "rollupCount", count, "processedCount", processedCount)
return latestStaked, 0, nil, nil
}

for _, notifier := range s.notifiers {
notifier.UpdateLatestStaked(count, stakedGlobalState)
}
return nil
return latestStaked, count, &globalState, nil
}

func (s *Staker) Start(ctxIn context.Context) {
Expand Down Expand Up @@ -381,10 +390,32 @@ func (s *Staker) Start(ctxIn context.Context) {
return backoff
})
s.CallIteratively(func(ctx context.Context) time.Duration {
err := s.checkLatestStaked(ctx)
wallet := s.wallet.AddressOrZero()
staked, stakedMsgCount, stakedGlobalState, err := s.getLatestStakedState(ctx, wallet)
if err != nil && ctx.Err() == nil {
log.Error("staker: error checking latest staked", "err", err)
}
stakerLatestStakedNodeGauge.Update(int64(staked))
if stakedGlobalState != nil {
for _, notifier := range s.stakedNotifiers {
notifier.UpdateLatestStaked(stakedMsgCount, *stakedGlobalState)
}
}
confirmed := staked
confirmedMsgCount := stakedMsgCount
confirmedGlobalState := stakedGlobalState
if wallet != (common.Address{}) {
confirmed, confirmedMsgCount, confirmedGlobalState, err = s.getLatestStakedState(ctx, common.Address{})
if err != nil && ctx.Err() == nil {
log.Error("staker: error checking latest confirmed", "err", err)
}
}
stakerLatestConfirmedNodeGauge.Update(int64(confirmed))
if confirmedGlobalState != nil {
for _, notifier := range s.confirmedNotifiers {
notifier.UpdateLatestConfirmed(confirmedMsgCount, *confirmedGlobalState)
}
}
return s.config.StakerInterval
})
}
Expand Down
1 change: 1 addition & 0 deletions staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type L1ReaderInterface interface {
Client() arbutil.L1Interface
Subscribe(bool) (<-chan *types.Header, func())
WaitForTxApproval(ctx context.Context, tx *types.Transaction) (*types.Receipt, error)
UseFinalityData() bool
}

type GlobalStatePosition struct {
Expand Down
Loading

0 comments on commit 42a4693

Please sign in to comment.