diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 3e5e6a738f..5aa07f5157 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -10,21 +10,24 @@ import ( "errors" "fmt" "math/big" + "sync/atomic" "time" "github.com/andybalholm/brotli" - flag "github.com/spf13/pflag" + "github.com/spf13/pflag" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" - "github.com/offchainlabs/nitro/arbnode/dataposter" + "github.com/offchainlabs/nitro/arbnode/dataposter/storage" "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbstate" "github.com/offchainlabs/nitro/arbutil" @@ -66,6 +69,8 @@ type BatchPoster struct { redisLock *SimpleRedisLock firstAccErr time.Time // first time a continuous missing accumulator occurred backlog uint64 // An estimate of the number of unposted batches + + batchReverted atomic.Bool // indicates whether data poster batch was reverted } type BatchPosterConfig struct { @@ -101,7 +106,7 @@ func (c *BatchPosterConfig) Validate() error { type BatchPosterConfigFetcher func() *BatchPosterConfig -func BatchPosterConfigAddOptions(prefix string, f *flag.FlagSet) { +func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Bool(prefix+".enable", DefaultBatchPosterConfig.Enable, "enable posting batches to l1") f.Bool(prefix+".disable-das-fallback-store-data-on-chain", DefaultBatchPosterConfig.DisableDasFallbackStoreDataOnChain, "If unable to batch to DAS, disable fallback storing data on chain") f.Int(prefix+".max-size", DefaultBatchPosterConfig.MaxBatchSize, "maximum batch size") @@ -158,7 +163,7 @@ var TestBatchPosterConfig = BatchPosterConfig{ L1Wallet: DefaultBatchPosterL1WalletConfig, } -func NewBatchPoster(l1Reader *headerreader.HeaderReader, inbox *InboxTracker, streamer *TransactionStreamer, syncMonitor *SyncMonitor, config BatchPosterConfigFetcher, deployInfo *chaininfo.RollupAddresses, transactOpts *bind.TransactOpts, daWriter das.DataAvailabilityServiceWriter) (*BatchPoster, error) { +func NewBatchPoster(dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderReader, inbox *InboxTracker, streamer *TransactionStreamer, syncMonitor *SyncMonitor, config BatchPosterConfigFetcher, deployInfo *chaininfo.RollupAddresses, transactOpts *bind.TransactOpts, daWriter das.DataAvailabilityServiceWriter) (*BatchPoster, error) { seqInbox, err := bridgegen.NewSequencerInbox(deployInfo.SequencerInbox, l1Reader.Client()) if err != nil { return nil, err @@ -201,13 +206,91 @@ func NewBatchPoster(l1Reader *headerreader.HeaderReader, inbox *InboxTracker, st dataPosterConfigFetcher := func() *dataposter.DataPosterConfig { return &config().DataPoster } - b.dataPoster, err = dataposter.NewDataPoster(l1Reader, transactOpts, redisClient, redisLock, dataPosterConfigFetcher, b.getBatchPosterPosition) + b.dataPoster, err = dataposter.NewDataPoster(dataPosterDB, l1Reader, transactOpts, redisClient, redisLock, dataPosterConfigFetcher, b.getBatchPosterPosition) if err != nil { return nil, err } return b, nil } +// checkRevert checks blocks with number in range [from, to] whether they +// contain reverted batch_poster transaction. +func (b *BatchPoster) checkReverts(ctx context.Context, from, to int64) (bool, error) { + if from > to { + return false, fmt.Errorf("wrong range, from: %d is more to: %d", from, to) + } + for idx := from; idx <= to; idx++ { + number := big.NewInt(idx) + block, err := b.l1Reader.Client().BlockByNumber(ctx, number) + if err != nil { + return false, fmt.Errorf("getting block: %v by number: %w", number, err) + } + for idx, tx := range block.Transactions() { + from, err := b.l1Reader.Client().TransactionSender(ctx, tx, block.Hash(), uint(idx)) + if err != nil { + return false, fmt.Errorf("getting sender of transaction tx: %v, %w", tx.Hash(), err) + } + if bytes.Equal(from.Bytes(), b.dataPoster.From().Bytes()) { + r, err := b.l1Reader.Client().TransactionReceipt(ctx, tx.Hash()) + if err != nil { + return false, fmt.Errorf("getting a receipt for transaction: %v, %w", tx.Hash(), err) + } + if r.Status == types.ReceiptStatusFailed { + log.Error("Transaction from batch poster reverted", "nonce", tx.Nonce(), "txHash", tx.Hash(), "blockNumber", r.BlockNumber, "blockHash", r.BlockHash) + return true, nil + } + } + } + } + return false, nil +} + +// pollForReverts runs a gouroutine that listens to l1 block headers, checks +// if any transaction made by batch poster was reverted. +func (b *BatchPoster) pollForReverts(ctx context.Context) { + headerCh, unsubscribe := b.l1Reader.Subscribe(false) + defer unsubscribe() + + last := int64(0) // number of last seen block + for { + // Poll until: + // - L1 headers reader channel is closed, or + // - polling is through context, or + // - we see a transaction in the block from dataposter that was reverted. + select { + case h, closed := <-headerCh: + if closed { + log.Info("L1 headers channel has been closed") + return + } + // If this is the first block header, set last seen as number-1. + // We may see same block number again if there is L1 reorg, in that + // case we check the block again. + if last == 0 || last == h.Number.Int64() { + last = h.Number.Int64() - 1 + } + if h.Number.Int64()-last > 100 { + log.Warn("Large gap between last seen and current block number, skipping check for reverts", "last", last, "current", h.Number) + last = h.Number.Int64() + continue + } + + reverted, err := b.checkReverts(ctx, last+1, h.Number.Int64()) + if err != nil { + log.Error("Checking batch reverts", "error", err) + continue + } + if reverted { + b.batchReverted.Store(true) + return + } + last = h.Number.Int64() + case <-ctx.Done(): + return + } + } +} + func (b *BatchPoster) getBatchPosterPosition(ctx context.Context, blockNum *big.Int) (batchPosterPosition, error) { bigInboxBatchCount, err := b.seqInbox.BatchCount(&bind.CallOpts{Context: ctx, BlockNumber: blockNum}) if err != nil { @@ -554,6 +637,9 @@ func (b *BatchPoster) estimateGas(ctx context.Context, sequencerMessage []byte, } func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) { + if b.batchReverted.Load() { + return false, fmt.Errorf("batch was reverted, not posting any more batches") + } nonce, batchPosition, err := b.dataPoster.GetNextNonceAndMeta(ctx) if err != nil { return false, err @@ -636,7 +722,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled if errors.Is(err, das.BatchToDasFailed) { if config.DisableDasFallbackStoreDataOnChain { - return false, errors.New("Unable to batch to DAS and fallback storing data on chain is disabled") + return false, errors.New("unable to batch to DAS and fallback storing data on chain is disabled") } log.Warn("Falling back to storing data on chain", "err", err) } else if err != nil { @@ -697,6 +783,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) { b.dataPoster.Start(ctxIn) b.redisLock.Start(ctxIn) b.StopWaiter.Start(ctxIn, b) + b.LaunchThread(b.pollForReverts) b.CallIteratively(func(ctx context.Context) time.Duration { var err error if common.HexToAddress(b.config().GasRefunderAddress) != (common.Address{}) { @@ -723,7 +810,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) { if err != nil { b.building = nil logLevel := log.Error - if errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, dataposter.ErrStorageRace) { + if errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, storage.ErrStorageRace) { // Likely the inbox tracker just isn't caught up. // Let's see if this error disappears naturally. if b.firstAccErr == (time.Time{}) { diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index ff0dcfebcf..e6668c8b39 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -15,16 +15,22 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" "github.com/go-redis/redis/v8" + "github.com/offchainlabs/nitro/arbnode/dataposter/leveldb" + "github.com/offchainlabs/nitro/arbnode/dataposter/slice" + "github.com/offchainlabs/nitro/arbnode/dataposter/storage" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/util/headerreader" "github.com/offchainlabs/nitro/util/signature" "github.com/offchainlabs/nitro/util/stopwaiter" - flag "github.com/spf13/pflag" + "github.com/spf13/pflag" + + redisstorage "github.com/offchainlabs/nitro/arbnode/dataposter/redis" ) type queuedTransaction[Meta any] struct { @@ -55,11 +61,12 @@ type DataPosterConfig struct { UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"` MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"` MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"` + EnableLevelDB bool `koanf:"enable-leveldb" reload:"hot"` } type DataPosterConfigFetcher func() *DataPosterConfig -func DataPosterConfigAddOptions(prefix string, f *flag.FlagSet) { +func DataPosterConfigAddOptions(prefix string, f *pflag.FlagSet) { f.String(prefix+".replacement-times", DefaultDataPosterConfig.ReplacementTimes, "comma-separated list of durations since first posting to attempt a replace-by-fee") f.Bool(prefix+".wait-for-l1-finality", DefaultDataPosterConfig.WaitForL1Finality, "only treat a transaction as confirmed after L1 finality has been achieved (recommended)") f.Uint64(prefix+".max-mempool-transactions", DefaultDataPosterConfig.MaxMempoolTransactions, "the maximum number of transactions to have queued in the mempool at once (0 = unlimited)") @@ -68,6 +75,7 @@ func DataPosterConfigAddOptions(prefix string, f *flag.FlagSet) { f.Float64(prefix+".urgency-gwei", DefaultDataPosterConfig.UrgencyGwei, "the urgency to use for maximum fee cap calculation") f.Float64(prefix+".min-fee-cap-gwei", DefaultDataPosterConfig.MinFeeCapGwei, "the minimum fee cap to post transactions at") f.Float64(prefix+".min-tip-cap-gwei", DefaultDataPosterConfig.MinTipCapGwei, "the minimum tip cap to post transactions at") + f.Bool(prefix+".enable-leveldb", DefaultDataPosterConfig.EnableLevelDB, "uses leveldb when enabled") signature.SimpleHmacConfigAddOptions(prefix+".redis-signer", f) } @@ -78,6 +86,7 @@ var DefaultDataPosterConfig = DataPosterConfig{ UrgencyGwei: 2., MaxMempoolTransactions: 64, MinTipCapGwei: 0.05, + EnableLevelDB: false, } var TestDataPosterConfig = DataPosterConfig{ @@ -88,6 +97,7 @@ var TestDataPosterConfig = DataPosterConfig{ UrgencyGwei: 2., MaxMempoolTransactions: 64, MinTipCapGwei: 0.05, + EnableLevelDB: false, } // DataPoster must be RLP serializable and deserializable @@ -114,7 +124,7 @@ type AttemptLocker interface { AttemptLock(context.Context) bool } -func NewDataPoster[Meta any](headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config DataPosterConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error)) (*DataPoster[Meta], error) { +func NewDataPoster[Meta any](db ethdb.Database, headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config DataPosterConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error)) (*DataPoster[Meta], error) { var replacementTimes []time.Duration var lastReplacementTime time.Duration for _, s := range strings.Split(config().ReplacementTimes, ",") { @@ -134,11 +144,14 @@ func NewDataPoster[Meta any](headerReader *headerreader.HeaderReader, auth *bind // To avoid special casing "don't replace again", replace in 10 years replacementTimes = append(replacementTimes, time.Hour*24*365*10) var queue QueueStorage[queuedTransaction[Meta]] - if redisClient == nil { - queue = NewSliceStorage[queuedTransaction[Meta]]() - } else { + switch { + case config().EnableLevelDB: + queue = leveldb.New[queuedTransaction[Meta]](db) + case redisClient == nil: + queue = slice.NewStorage[queuedTransaction[Meta]]() + default: var err error - queue, err = NewRedisStorage[queuedTransaction[Meta]](redisClient, "data-poster.queue", &config().RedisSigner) + queue, err = redisstorage.NewStorage[queuedTransaction[Meta]](redisClient, "data-poster.queue", &config().RedisSigner) if err != nil { return nil, err } @@ -460,7 +473,7 @@ func (p *DataPoster[Meta]) maybeLogError(err error, tx *queuedTransaction[Meta], return } logLevel := log.Error - if errors.Is(err, ErrStorageRace) { + if errors.Is(err, storage.ErrStorageRace) { p.errorCount[nonce]++ if p.errorCount[nonce] <= maxConsecutiveIntermittentErrors { logLevel = log.Debug diff --git a/arbnode/dataposter/leveldb/leveldb.go b/arbnode/dataposter/leveldb/leveldb.go new file mode 100644 index 0000000000..c25cba6601 --- /dev/null +++ b/arbnode/dataposter/leveldb/leveldb.go @@ -0,0 +1,179 @@ +// Copyright 2021-2023, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package leveldb + +import ( + "bytes" + "context" + "errors" + "fmt" + "strconv" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/rlp" + "github.com/syndtr/goleveldb/leveldb" +) + +// Storage implements leveldb based storage for batch poster. +type Storage[Item any] struct { + db ethdb.Database +} + +var ( + // Value at this index holds the *index* of last item. + // Keys that we never want to be accidentally deleted by "Prune()" should be + // lexicographically less than minimum index (that is "0"), hence the prefix + // ".". + lastItemIdxKey = []byte(".last_item_idx_key") + countKey = []byte(".count_key") +) + +func New[Item any](db ethdb.Database) *Storage[Item] { + return &Storage[Item]{db: db} +} + +func (s *Storage[Item]) decodeItem(data []byte) (*Item, error) { + var item Item + if err := rlp.DecodeBytes(data, &item); err != nil { + return nil, fmt.Errorf("decoding item: %w", err) + } + return &item, nil +} + +func idxToKey(idx uint64) []byte { + return []byte(fmt.Sprintf("%020d", idx)) +} + +func (s *Storage[Item]) GetContents(_ context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) { + var res []*Item + it := s.db.NewIterator([]byte(""), idxToKey(startingIndex)) + for i := 0; i < int(maxResults); i++ { + if !it.Next() { + break + } + item, err := s.decodeItem(it.Value()) + if err != nil { + return nil, err + } + res = append(res, item) + } + return res, it.Error() +} + +func (s *Storage[Item]) lastItemIdx(context.Context) ([]byte, error) { + return s.db.Get(lastItemIdxKey) +} + +func (s *Storage[Item]) GetLast(ctx context.Context) (*Item, error) { + size, err := s.Length(ctx) + if err != nil { + return nil, err + } + if size == 0 { + return nil, nil + } + lastItemIdx, err := s.lastItemIdx(ctx) + if err != nil { + return nil, fmt.Errorf("getting last item index: %w", err) + } + val, err := s.db.Get(lastItemIdx) + if err != nil { + return nil, err + } + return s.decodeItem(val) +} + +func (s *Storage[Item]) Prune(ctx context.Context, keepStartingAt uint64) error { + cnt, err := s.Length(ctx) + if err != nil { + return err + } + end := idxToKey(keepStartingAt) + it := s.db.NewIterator([]byte{}, idxToKey(0)) + b := s.db.NewBatch() + for it.Next() { + if bytes.Compare(it.Key(), end) >= 0 { + break + } + if err := b.Delete(it.Key()); err != nil { + return fmt.Errorf("deleting key: %w", err) + } + cnt-- + } + if err := b.Put(countKey, []byte(strconv.Itoa(cnt))); err != nil { + return fmt.Errorf("updating length counter: %w", err) + } + return b.Write() +} + +// valueAt gets returns the value at key. If it doesn't exist then it returns +// encoded bytes of nil. +func (s *Storage[Item]) valueAt(_ context.Context, key []byte) ([]byte, error) { + val, err := s.db.Get(key) + if err != nil { + if errors.Is(err, leveldb.ErrNotFound) { + return rlp.EncodeToBytes((*Item)(nil)) + } + return nil, err + } + return val, nil +} + +func (s *Storage[Item]) Put(ctx context.Context, index uint64, prev *Item, new *Item) error { + key := idxToKey(index) + stored, err := s.valueAt(ctx, key) + if err != nil { + return err + } + prevEnc, err := rlp.EncodeToBytes(prev) + if err != nil { + return fmt.Errorf("encoding previous item: %w", err) + } + if !bytes.Equal(stored, prevEnc) { + return fmt.Errorf("replacing different item than expected at index %v %v %v", index, stored, prevEnc) + } + newEnc, err := rlp.EncodeToBytes(new) + if err != nil { + return fmt.Errorf("encoding new item: %w", err) + } + b := s.db.NewBatch() + cnt, err := s.Length(ctx) + if err != nil { + return err + } + if err := b.Put(key, newEnc); err != nil { + return fmt.Errorf("updating value at: %v: %w", key, err) + } + lastItemIdx, err := s.lastItemIdx(ctx) + if err != nil && !errors.Is(err, leveldb.ErrNotFound) { + return err + } + if errors.Is(err, leveldb.ErrNotFound) { + lastItemIdx = []byte{} + } + if cnt == 0 || bytes.Compare(key, lastItemIdx) > 0 { + if err := b.Put(lastItemIdxKey, key); err != nil { + return fmt.Errorf("updating last item: %w", err) + } + if err := b.Put(countKey, []byte(strconv.Itoa(cnt+1))); err != nil { + return fmt.Errorf("updating length counter: %w", err) + } + } + return b.Write() +} + +func (s *Storage[Item]) Length(context.Context) (int, error) { + val, err := s.db.Get(countKey) + if err != nil { + if errors.Is(err, leveldb.ErrNotFound) { + return 0, nil + } + return 0, err + } + return strconv.Atoi(string(val)) +} + +func (s *Storage[Item]) IsPersistent() bool { + return true +} diff --git a/arbnode/dataposter/redis_storage.go b/arbnode/dataposter/redis/redisstorage.go similarity index 78% rename from arbnode/dataposter/redis_storage.go rename to arbnode/dataposter/redis/redisstorage.go index df3e894539..30370a0673 100644 --- a/arbnode/dataposter/redis_storage.go +++ b/arbnode/dataposter/redis/redisstorage.go @@ -1,7 +1,7 @@ // Copyright 2021-2022, Offchain Labs, Inc. // For license information, see https://github.com/nitro/blob/master/LICENSE -package dataposter +package redis import ( "bytes" @@ -11,22 +11,23 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/go-redis/redis/v8" + "github.com/offchainlabs/nitro/arbnode/dataposter/storage" "github.com/offchainlabs/nitro/util/signature" ) -// RedisStorage requires that Item is RLP encodable/decodable -type RedisStorage[Item any] struct { +// Storage requires that Item is RLP encodable/decodable +type Storage[Item any] struct { client redis.UniversalClient signer *signature.SimpleHmac key string } -func NewRedisStorage[Item any](client redis.UniversalClient, key string, signerConf *signature.SimpleHmacConfig) (*RedisStorage[Item], error) { +func NewStorage[Item any](client redis.UniversalClient, key string, signerConf *signature.SimpleHmacConfig) (*Storage[Item], error) { signer, err := signature.NewSimpleHmac(signerConf) if err != nil { return nil, err } - return &RedisStorage[Item]{client, signer, key}, nil + return &Storage[Item]{client, signer, key}, nil } func joinHmacMsg(msg []byte, sig []byte) ([]byte, error) { @@ -36,7 +37,7 @@ func joinHmacMsg(msg []byte, sig []byte) ([]byte, error) { return append(sig, msg...), nil } -func (s *RedisStorage[Item]) peelVerifySignature(data []byte) ([]byte, error) { +func (s *Storage[Item]) peelVerifySignature(data []byte) ([]byte, error) { if len(data) < 32 { return nil, errors.New("data is too short to contain message signature") } @@ -48,7 +49,7 @@ func (s *RedisStorage[Item]) peelVerifySignature(data []byte) ([]byte, error) { return data[32:], nil } -func (s *RedisStorage[Item]) GetContents(ctx context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) { +func (s *Storage[Item]) GetContents(ctx context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) { query := redis.ZRangeArgs{ Key: s.key, ByScore: true, @@ -75,7 +76,7 @@ func (s *RedisStorage[Item]) GetContents(ctx context.Context, startingIndex uint return items, nil } -func (s *RedisStorage[Item]) GetLast(ctx context.Context) (*Item, error) { +func (s *Storage[Item]) GetLast(ctx context.Context) (*Item, error) { query := redis.ZRangeArgs{ Key: s.key, Start: 0, @@ -105,16 +106,14 @@ func (s *RedisStorage[Item]) GetLast(ctx context.Context) (*Item, error) { return ret, nil } -func (s *RedisStorage[Item]) Prune(ctx context.Context, keepStartingAt uint64) error { +func (s *Storage[Item]) Prune(ctx context.Context, keepStartingAt uint64) error { if keepStartingAt > 0 { return s.client.ZRemRangeByScore(ctx, s.key, "-inf", fmt.Sprintf("%v", keepStartingAt-1)).Err() } return nil } -var ErrStorageRace = errors.New("storage race error") - -func (s *RedisStorage[Item]) Put(ctx context.Context, index uint64, prevItem *Item, newItem *Item) error { +func (s *Storage[Item]) Put(ctx context.Context, index uint64, prevItem *Item, newItem *Item) error { if newItem == nil { return fmt.Errorf("tried to insert nil item at index %v", index) } @@ -132,11 +131,11 @@ func (s *RedisStorage[Item]) Put(ctx context.Context, index uint64, prevItem *It pipe := tx.TxPipeline() if len(haveItems) == 0 { if prevItem != nil { - return fmt.Errorf("%w: tried to replace item at index %v but no item exists there", ErrStorageRace, index) + return fmt.Errorf("%w: tried to replace item at index %v but no item exists there", storage.ErrStorageRace, index) } } else if len(haveItems) == 1 { if prevItem == nil { - return fmt.Errorf("%w: tried to insert new item at index %v but an item exists there", ErrStorageRace, index) + return fmt.Errorf("%w: tried to insert new item at index %v but an item exists there", storage.ErrStorageRace, index) } verifiedItem, err := s.peelVerifySignature([]byte(haveItems[0])) if err != nil { @@ -147,7 +146,7 @@ func (s *RedisStorage[Item]) Put(ctx context.Context, index uint64, prevItem *It return err } if !bytes.Equal(verifiedItem, prevItemEncoded) { - return fmt.Errorf("%w: replacing different item than expected at index %v", ErrStorageRace, index) + return fmt.Errorf("%w: replacing different item than expected at index %v", storage.ErrStorageRace, index) } err = pipe.ZRem(ctx, s.key, haveItems[0]).Err() if err != nil { @@ -179,7 +178,7 @@ func (s *RedisStorage[Item]) Put(ctx context.Context, index uint64, prevItem *It if errors.Is(err, redis.TxFailedErr) { // Unfortunately, we can't wrap two errors. //nolint:errorlint - err = fmt.Errorf("%w: %v", ErrStorageRace, err.Error()) + err = fmt.Errorf("%w: %v", storage.ErrStorageRace, err.Error()) } return err } @@ -187,7 +186,7 @@ func (s *RedisStorage[Item]) Put(ctx context.Context, index uint64, prevItem *It return s.client.Watch(ctx, action, s.key) } -func (s *RedisStorage[Item]) Length(ctx context.Context) (int, error) { +func (s *Storage[Item]) Length(ctx context.Context) (int, error) { count, err := s.client.ZCount(ctx, s.key, "-inf", "+inf").Result() if err != nil { return 0, err @@ -195,6 +194,6 @@ func (s *RedisStorage[Item]) Length(ctx context.Context) (int, error) { return int(count), nil } -func (s *RedisStorage[Item]) IsPersistent() bool { +func (s *Storage[Item]) IsPersistent() bool { return true } diff --git a/arbnode/dataposter/slice_storage.go b/arbnode/dataposter/slice/slicestorage.go similarity index 63% rename from arbnode/dataposter/slice_storage.go rename to arbnode/dataposter/slice/slicestorage.go index 4364523d99..b0a253086f 100644 --- a/arbnode/dataposter/slice_storage.go +++ b/arbnode/dataposter/slice/slicestorage.go @@ -1,28 +1,30 @@ // Copyright 2021-2022, Offchain Labs, Inc. // For license information, see https://github.com/nitro/blob/master/LICENSE -package dataposter +package slice import ( "context" "errors" "fmt" + "reflect" ) -type SliceStorage[Item any] struct { +type Storage[Item any] struct { firstNonce uint64 queue []*Item } -func NewSliceStorage[Item any]() *SliceStorage[Item] { - return &SliceStorage[Item]{} +func NewStorage[Item any]() *Storage[Item] { + return &Storage[Item]{} } -func (s *SliceStorage[Item]) GetContents(ctx context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) { +func (s *Storage[Item]) GetContents(_ context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) { ret := s.queue - if startingIndex >= s.firstNonce+uint64(len(s.queue)) { - ret = nil - } else if startingIndex > s.firstNonce { + if startingIndex >= s.firstNonce+uint64(len(s.queue)) || maxResults == 0 { + return nil, nil + } + if startingIndex > s.firstNonce { ret = ret[startingIndex-s.firstNonce:] } if uint64(len(ret)) > maxResults { @@ -31,14 +33,14 @@ func (s *SliceStorage[Item]) GetContents(ctx context.Context, startingIndex uint return ret, nil } -func (s *SliceStorage[Item]) GetLast(ctx context.Context) (*Item, error) { +func (s *Storage[Item]) GetLast(context.Context) (*Item, error) { if len(s.queue) == 0 { return nil, nil } return s.queue[len(s.queue)-1], nil } -func (s *SliceStorage[Item]) Prune(ctx context.Context, keepStartingAt uint64) error { +func (s *Storage[Item]) Prune(_ context.Context, keepStartingAt uint64) error { if keepStartingAt >= s.firstNonce+uint64(len(s.queue)) { s.queue = nil } else if keepStartingAt >= s.firstNonce { @@ -48,7 +50,7 @@ func (s *SliceStorage[Item]) Prune(ctx context.Context, keepStartingAt uint64) e return nil } -func (s *SliceStorage[Item]) Put(ctx context.Context, index uint64, prevItem *Item, newItem *Item) error { +func (s *Storage[Item]) Put(_ context.Context, index uint64, prevItem *Item, newItem *Item) error { if newItem == nil { return fmt.Errorf("tried to insert nil item at index %v", index) } @@ -68,8 +70,8 @@ func (s *SliceStorage[Item]) Put(ctx context.Context, index uint64, prevItem *It if queueIdx > len(s.queue) { return fmt.Errorf("attempted to set out-of-bounds index %v in queue starting at %v of length %v", index, s.firstNonce, len(s.queue)) } - if prevItem != s.queue[queueIdx] { - return errors.New("prevItem isn't nil but item is just after end of queue") + if !reflect.DeepEqual(prevItem, s.queue[queueIdx]) { + return fmt.Errorf("replacing different item than expected at index: %v: %v %v", index, prevItem, s.queue[queueIdx]) } s.queue[queueIdx] = newItem } else { @@ -78,10 +80,10 @@ func (s *SliceStorage[Item]) Put(ctx context.Context, index uint64, prevItem *It return nil } -func (s *SliceStorage[Item]) Length(ctx context.Context) (int, error) { +func (s *Storage[Item]) Length(context.Context) (int, error) { return len(s.queue), nil } -func (s *SliceStorage[Item]) IsPersistent() bool { +func (s *Storage[Item]) IsPersistent() bool { return false } diff --git a/arbnode/dataposter/storage/storage.go b/arbnode/dataposter/storage/storage.go new file mode 100644 index 0000000000..555f7e1e5d --- /dev/null +++ b/arbnode/dataposter/storage/storage.go @@ -0,0 +1,13 @@ +package storage + +import ( + "errors" +) + +var ( + ErrStorageRace = errors.New("storage race error") + + DataPosterPrefix string = "d" // the prefix for all data poster keys + // TODO(anodar): move everything else from schema.go file to here once + // execution split is complete. +) diff --git a/arbnode/dataposter/storage_test.go b/arbnode/dataposter/storage_test.go new file mode 100644 index 0000000000..0ef83ed5ba --- /dev/null +++ b/arbnode/dataposter/storage_test.go @@ -0,0 +1,283 @@ +package dataposter + +import ( + "context" + "path" + "strconv" + "testing" + + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/google/go-cmp/cmp" + "github.com/offchainlabs/nitro/arbnode/dataposter/leveldb" + "github.com/offchainlabs/nitro/arbnode/dataposter/redis" + "github.com/offchainlabs/nitro/arbnode/dataposter/slice" + "github.com/offchainlabs/nitro/util/arbmath" + "github.com/offchainlabs/nitro/util/redisutil" + "github.com/offchainlabs/nitro/util/signature" +) + +func newLevelDBStorage[Item any](t *testing.T) *leveldb.Storage[Item] { + t.Helper() + db, err := rawdb.NewLevelDBDatabase(path.Join(t.TempDir(), "level.db"), 0, 0, "default", false) + if err != nil { + t.Fatalf("NewLevelDBDatabase() unexpected error: %v", err) + } + return leveldb.New[Item](db) +} + +func newSliceStorage[Item any]() *slice.Storage[Item] { + return slice.NewStorage[Item]() +} + +func newRedisStorage[Item any](ctx context.Context, t *testing.T) *redis.Storage[Item] { + t.Helper() + redisUrl := redisutil.CreateTestRedis(ctx, t) + client, err := redisutil.RedisClientFromURL(redisUrl) + if err != nil { + t.Fatalf("RedisClientFromURL(%q) unexpected error: %v", redisUrl, err) + } + s, err := redis.NewStorage[Item](client, "", &signature.TestSimpleHmacConfig) + if err != nil { + t.Fatalf("redis.NewStorage() unexpected error: %v", err) + } + return s +} + +// Initializes the QueueStorage. Returns the same object (for convenience). +func initStorage(ctx context.Context, t *testing.T, s QueueStorage[string]) QueueStorage[string] { + t.Helper() + for i := 0; i < 20; i++ { + val := strconv.Itoa(i) + if err := s.Put(ctx, uint64(i), nil, &val); err != nil { + t.Fatalf("Error putting a key/value: %v", err) + } + } + return s +} + +// Returns a map of all empty storages. +func storages(t *testing.T) map[string]QueueStorage[string] { + t.Helper() + return map[string]QueueStorage[string]{ + "levelDB": newLevelDBStorage[string](t), + "slice": newSliceStorage[string](), + "redis": newRedisStorage[string](context.Background(), t), + } +} + +// Returns a map of all initialized storages. +func initStorages(ctx context.Context, t *testing.T) map[string]QueueStorage[string] { + t.Helper() + m := map[string]QueueStorage[string]{} + for k, v := range storages(t) { + m[k] = initStorage(ctx, t, v) + } + return m +} + +func strPtrs(values []string) []*string { + var res []*string + for _, val := range values { + v := val + res = append(res, &v) + } + return res +} + +func TestGetContents(t *testing.T) { + ctx := context.Background() + for name, s := range initStorages(ctx, t) { + for _, tc := range []struct { + desc string + startIdx uint64 + maxResults uint64 + want []*string + }{ + { + desc: "sequence with single digits", + startIdx: 5, + maxResults: 3, + want: strPtrs([]string{"5", "6", "7"}), + }, + { + desc: "corner case of single element", + startIdx: 0, + maxResults: 1, + want: strPtrs([]string{"0"}), + }, + { + desc: "no elements", + startIdx: 3, + maxResults: 0, + want: strPtrs([]string{}), + }, + { + // Making sure it's correctly ordered lexicographically. + desc: "sequence with variable number of digits", + startIdx: 9, + maxResults: 3, + want: strPtrs([]string{"9", "10", "11"}), + }, + { + desc: "max results goes over the last element", + startIdx: 13, + maxResults: 10, + want: strPtrs([]string{"13", "14", "15", "16", "17", "18", "19"}), + }, + } { + t.Run(name+"_"+tc.desc, func(t *testing.T) { + values, err := s.GetContents(ctx, tc.startIdx, tc.maxResults) + if err != nil { + t.Fatalf("GetContents(%d, %d) unexpected error: %v", tc.startIdx, tc.maxResults, err) + } + if diff := cmp.Diff(tc.want, values); diff != "" { + t.Errorf("GetContext(%d, %d) unexpected diff:\n%s", tc.startIdx, tc.maxResults, diff) + } + }) + } + } +} + +func TestGetLast(t *testing.T) { + cnt := 100 + for name, s := range storages(t) { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + for i := 0; i < cnt; i++ { + val := strconv.Itoa(i) + if err := s.Put(ctx, uint64(i), nil, &val); err != nil { + t.Fatalf("Error putting a key/value: %v", err) + } + got, err := s.GetLast(ctx) + if err != nil { + t.Fatalf("Error getting a last element: %v", err) + } + if *got != val { + t.Errorf("GetLast() = %q want %q", *got, val) + } + + } + }) + last := strconv.Itoa(cnt - 1) + t.Run(name+"_update_entries", func(t *testing.T) { + ctx := context.Background() + for i := 0; i < cnt-1; i++ { + prev := strconv.Itoa(i) + newVal := strconv.Itoa(cnt + i) + if err := s.Put(ctx, uint64(i), &prev, &newVal); err != nil { + t.Fatalf("Error putting a key/value: %v, prev: %v, new: %v", err, prev, newVal) + } + got, err := s.GetLast(ctx) + if err != nil { + t.Fatalf("Error getting a last element: %v", err) + } + if *got != last { + t.Errorf("GetLast() = %q want %q", *got, last) + } + gotCnt, err := s.Length(ctx) + if err != nil { + t.Fatalf("Length() unexpected error: %v", err) + } + if gotCnt != cnt { + t.Errorf("Length() = %d want %d", gotCnt, cnt) + } + } + }) + } +} + +func TestPrune(t *testing.T) { + ctx := context.Background() + for _, tc := range []struct { + desc string + pruneFrom uint64 + want []*string + }{ + { + desc: "prune all elements", + pruneFrom: 20, + }, + { + desc: "prune all but one", + pruneFrom: 19, + want: strPtrs([]string{"19"}), + }, + { + desc: "pruning first element", + pruneFrom: 1, + want: strPtrs([]string{"1", "2", "3", "4", "5", "6", "7", + "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19"}), + }, + { + desc: "pruning first 11 elements", + pruneFrom: 11, + want: strPtrs([]string{"11", "12", "13", "14", "15", "16", "17", "18", "19"}), + }, + { + desc: "pruning from higher than biggest index", + pruneFrom: 30, + want: strPtrs([]string{}), + }, + } { + // Storages must be re-initialized in each test-case. + for name, s := range initStorages(ctx, t) { + t.Run(name+"_"+tc.desc, func(t *testing.T) { + if err := s.Prune(ctx, tc.pruneFrom); err != nil { + t.Fatalf("Prune(%d) unexpected error: %v", tc.pruneFrom, err) + } + got, err := s.GetContents(ctx, 0, 20) + if err != nil { + t.Fatalf("GetContents() unexpected error: %v", err) + } + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("Prune(%d) unexpected diff:\n%s", tc.pruneFrom, diff) + } + }) + } + } +} + +func TestLength(t *testing.T) { + ctx := context.Background() + for _, tc := range []struct { + desc string + pruneFrom uint64 + }{ + { + desc: "not prune any elements", + }, + { + desc: "prune all but one", + pruneFrom: 19, + }, + { + desc: "pruning first element", + pruneFrom: 1, + }, + { + desc: "pruning first 11 elements", + pruneFrom: 11, + }, + { + desc: "pruning from higher than biggest index", + pruneFrom: 30, + }, + } { + // Storages must be re-initialized in each test-case. + for name, s := range initStorages(ctx, t) { + t.Run(name+"_"+tc.desc, func(t *testing.T) { + if err := s.Prune(ctx, tc.pruneFrom); err != nil { + t.Fatalf("Prune(%d) unexpected error: %v", tc.pruneFrom, err) + } + got, err := s.Length(ctx) + if err != nil { + t.Fatalf("Length() unexpected error: %v", err) + } + if want := arbmath.MaxInt(0, 20-int(tc.pruneFrom)); got != want { + t.Errorf("Length() = %d want %d", got, want) + } + }) + } + + } +} diff --git a/arbnode/node.go b/arbnode/node.go index 9178baf65d..8a4f38f28c 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -26,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" - "github.com/offchainlabs/nitro/arbnode/execution" "github.com/offchainlabs/nitro/arbnode/resourcemanager" "github.com/offchainlabs/nitro/arbutil" @@ -859,7 +858,7 @@ func createNodeImpl( if txOptsBatchPoster == nil { return nil, errors.New("batchposter, but no TxOpts") } - batchPoster, err = NewBatchPoster(l1Reader, inboxTracker, txStreamer, syncMonitor, func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster }, deployInfo, txOptsBatchPoster, daWriter) + batchPoster, err = NewBatchPoster(rawdb.NewTable(arbDb, BlockValidatorPrefix), l1Reader, inboxTracker, txStreamer, syncMonitor, func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster }, deployInfo, txOptsBatchPoster, daWriter) if err != nil { return nil, err } diff --git a/cmd/nitro/init.go b/cmd/nitro/init.go index fcc27287d0..399555098c 100644 --- a/cmd/nitro/init.go +++ b/cmd/nitro/init.go @@ -41,7 +41,7 @@ import ( "github.com/offchainlabs/nitro/cmd/ipfshelper" "github.com/offchainlabs/nitro/staker" "github.com/offchainlabs/nitro/statetransfer" - flag "github.com/spf13/pflag" + "github.com/spf13/pflag" ) type InitConfig struct { @@ -77,7 +77,7 @@ var InitConfigDefault = InitConfig{ ResetToMsg: -1, } -func InitConfigAddOptions(prefix string, f *flag.FlagSet) { +func InitConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Bool(prefix+".force", InitConfigDefault.Force, "if true: in case database exists init code will be reexecuted and genesis block compared to database") f.String(prefix+".url", InitConfigDefault.Url, "url to download initializtion data - will poll if download fails") f.String(prefix+".download-path", InitConfigDefault.DownloadPath, "path to save temp downloaded file") diff --git a/go-ethereum b/go-ethereum index fcda31cae2..3725b60e04 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit fcda31cae2d5a29699c5c5e58038a72ce0d196ac +Subproject commit 3725b60e0494df145672ab67dd3ec18a85a2b5d1 diff --git a/go.mod b/go.mod index fc52f1f763..37ab04ff30 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/codeclysm/extract/v3 v3.0.2 github.com/dgraph-io/badger/v3 v3.2103.2 github.com/ethereum/go-ethereum v1.10.26 + github.com/google/go-cmp v0.5.9 github.com/hashicorp/golang-lru/v2 v2.0.1 github.com/ipfs/go-cid v0.3.2 github.com/ipfs/go-libipfs v0.6.2 diff --git a/go.sum b/go.sum index f351bb9545..ca552ef60a 100644 --- a/go.sum +++ b/go.sum @@ -493,6 +493,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= diff --git a/system_tests/batch_poster_test.go b/system_tests/batch_poster_test.go index 8c656cb2d3..6f6c041c41 100644 --- a/system_tests/batch_poster_test.go +++ b/system_tests/batch_poster_test.go @@ -82,7 +82,7 @@ func testBatchPosterParallel(t *testing.T, useRedis bool) { for i := 0; i < parallelBatchPosters; i++ { // Make a copy of the batch poster config so NewBatchPoster calling Validate() on it doesn't race batchPosterConfig := conf.BatchPoster - batchPoster, err := arbnode.NewBatchPoster(nodeA.L1Reader, nodeA.InboxTracker, nodeA.TxStreamer, nodeA.SyncMonitor, func() *arbnode.BatchPosterConfig { return &batchPosterConfig }, nodeA.DeployInfo, &seqTxOpts, nil) + batchPoster, err := arbnode.NewBatchPoster(nil, nodeA.L1Reader, nodeA.InboxTracker, nodeA.TxStreamer, nodeA.SyncMonitor, func() *arbnode.BatchPosterConfig { return &batchPosterConfig }, nodeA.DeployInfo, &seqTxOpts, nil) Require(t, err) batchPoster.Start(ctx) defer batchPoster.StopAndWait()