Skip to content

Commit

Permalink
Merge pull request #1736 from OffchainLabs/leveldb-based-dataposter
Browse files Browse the repository at this point in the history
Implement leveldb storage for dataposter
  • Loading branch information
PlasmaPower committed Jul 12, 2023
2 parents 274c78a + f13c129 commit f8a9421
Show file tree
Hide file tree
Showing 13 changed files with 544 additions and 53 deletions.
13 changes: 7 additions & 6 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,20 @@ import (
"time"

"github.com/andybalholm/brotli"
flag "github.com/spf13/pflag"
"github.com/spf13/pflag"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/arbnode/dataposter"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbutil"
Expand Down Expand Up @@ -105,7 +106,7 @@ func (c *BatchPosterConfig) Validate() error {

type BatchPosterConfigFetcher func() *BatchPosterConfig

func BatchPosterConfigAddOptions(prefix string, f *flag.FlagSet) {
func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBatchPosterConfig.Enable, "enable posting batches to l1")
f.Bool(prefix+".disable-das-fallback-store-data-on-chain", DefaultBatchPosterConfig.DisableDasFallbackStoreDataOnChain, "If unable to batch to DAS, disable fallback storing data on chain")
f.Int(prefix+".max-size", DefaultBatchPosterConfig.MaxBatchSize, "maximum batch size")
Expand Down Expand Up @@ -162,7 +163,7 @@ var TestBatchPosterConfig = BatchPosterConfig{
L1Wallet: DefaultBatchPosterL1WalletConfig,
}

func NewBatchPoster(l1Reader *headerreader.HeaderReader, inbox *InboxTracker, streamer *TransactionStreamer, syncMonitor *SyncMonitor, config BatchPosterConfigFetcher, deployInfo *chaininfo.RollupAddresses, transactOpts *bind.TransactOpts, daWriter das.DataAvailabilityServiceWriter) (*BatchPoster, error) {
func NewBatchPoster(dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderReader, inbox *InboxTracker, streamer *TransactionStreamer, syncMonitor *SyncMonitor, config BatchPosterConfigFetcher, deployInfo *chaininfo.RollupAddresses, transactOpts *bind.TransactOpts, daWriter das.DataAvailabilityServiceWriter) (*BatchPoster, error) {
seqInbox, err := bridgegen.NewSequencerInbox(deployInfo.SequencerInbox, l1Reader.Client())
if err != nil {
return nil, err
Expand Down Expand Up @@ -205,7 +206,7 @@ func NewBatchPoster(l1Reader *headerreader.HeaderReader, inbox *InboxTracker, st
dataPosterConfigFetcher := func() *dataposter.DataPosterConfig {
return &config().DataPoster
}
b.dataPoster, err = dataposter.NewDataPoster(l1Reader, transactOpts, redisClient, redisLock, dataPosterConfigFetcher, b.getBatchPosterPosition)
b.dataPoster, err = dataposter.NewDataPoster(dataPosterDB, l1Reader, transactOpts, redisClient, redisLock, dataPosterConfigFetcher, b.getBatchPosterPosition)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -809,7 +810,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
if err != nil {
b.building = nil
logLevel := log.Error
if errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, dataposter.ErrStorageRace) {
if errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, storage.ErrStorageRace) {
// Likely the inbox tracker just isn't caught up.
// Let's see if this error disappears naturally.
if b.firstAccErr == (time.Time{}) {
Expand Down
29 changes: 21 additions & 8 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/go-redis/redis/v8"
"github.com/offchainlabs/nitro/arbnode/dataposter/leveldb"
"github.com/offchainlabs/nitro/arbnode/dataposter/slice"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/signature"
"github.com/offchainlabs/nitro/util/stopwaiter"
flag "github.com/spf13/pflag"
"github.com/spf13/pflag"

redisstorage "github.com/offchainlabs/nitro/arbnode/dataposter/redis"
)

type queuedTransaction[Meta any] struct {
Expand Down Expand Up @@ -55,11 +61,12 @@ type DataPosterConfig struct {
UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"`
MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"`
MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"`
EnableLevelDB bool `koanf:"enable-leveldb" reload:"hot"`
}

type DataPosterConfigFetcher func() *DataPosterConfig

func DataPosterConfigAddOptions(prefix string, f *flag.FlagSet) {
func DataPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".replacement-times", DefaultDataPosterConfig.ReplacementTimes, "comma-separated list of durations since first posting to attempt a replace-by-fee")
f.Bool(prefix+".wait-for-l1-finality", DefaultDataPosterConfig.WaitForL1Finality, "only treat a transaction as confirmed after L1 finality has been achieved (recommended)")
f.Uint64(prefix+".max-mempool-transactions", DefaultDataPosterConfig.MaxMempoolTransactions, "the maximum number of transactions to have queued in the mempool at once (0 = unlimited)")
Expand All @@ -68,6 +75,7 @@ func DataPosterConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Float64(prefix+".urgency-gwei", DefaultDataPosterConfig.UrgencyGwei, "the urgency to use for maximum fee cap calculation")
f.Float64(prefix+".min-fee-cap-gwei", DefaultDataPosterConfig.MinFeeCapGwei, "the minimum fee cap to post transactions at")
f.Float64(prefix+".min-tip-cap-gwei", DefaultDataPosterConfig.MinTipCapGwei, "the minimum tip cap to post transactions at")
f.Bool(prefix+".enable-leveldb", DefaultDataPosterConfig.EnableLevelDB, "uses leveldb when enabled")
signature.SimpleHmacConfigAddOptions(prefix+".redis-signer", f)
}

Expand All @@ -78,6 +86,7 @@ var DefaultDataPosterConfig = DataPosterConfig{
UrgencyGwei: 2.,
MaxMempoolTransactions: 64,
MinTipCapGwei: 0.05,
EnableLevelDB: false,
}

var TestDataPosterConfig = DataPosterConfig{
Expand All @@ -88,6 +97,7 @@ var TestDataPosterConfig = DataPosterConfig{
UrgencyGwei: 2.,
MaxMempoolTransactions: 64,
MinTipCapGwei: 0.05,
EnableLevelDB: false,
}

// DataPoster must be RLP serializable and deserializable
Expand All @@ -114,7 +124,7 @@ type AttemptLocker interface {
AttemptLock(context.Context) bool
}

func NewDataPoster[Meta any](headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config DataPosterConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error)) (*DataPoster[Meta], error) {
func NewDataPoster[Meta any](db ethdb.Database, headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config DataPosterConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error)) (*DataPoster[Meta], error) {
var replacementTimes []time.Duration
var lastReplacementTime time.Duration
for _, s := range strings.Split(config().ReplacementTimes, ",") {
Expand All @@ -134,11 +144,14 @@ func NewDataPoster[Meta any](headerReader *headerreader.HeaderReader, auth *bind
// To avoid special casing "don't replace again", replace in 10 years
replacementTimes = append(replacementTimes, time.Hour*24*365*10)
var queue QueueStorage[queuedTransaction[Meta]]
if redisClient == nil {
queue = NewSliceStorage[queuedTransaction[Meta]]()
} else {
switch {
case config().EnableLevelDB:
queue = leveldb.New[queuedTransaction[Meta]](db)
case redisClient == nil:
queue = slice.NewStorage[queuedTransaction[Meta]]()
default:
var err error
queue, err = NewRedisStorage[queuedTransaction[Meta]](redisClient, "data-poster.queue", &config().RedisSigner)
queue, err = redisstorage.NewStorage[queuedTransaction[Meta]](redisClient, "data-poster.queue", &config().RedisSigner)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -460,7 +473,7 @@ func (p *DataPoster[Meta]) maybeLogError(err error, tx *queuedTransaction[Meta],
return
}
logLevel := log.Error
if errors.Is(err, ErrStorageRace) {
if errors.Is(err, storage.ErrStorageRace) {
p.errorCount[nonce]++
if p.errorCount[nonce] <= maxConsecutiveIntermittentErrors {
logLevel = log.Debug
Expand Down
179 changes: 179 additions & 0 deletions arbnode/dataposter/leveldb/leveldb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2021-2023, Offchain Labs, Inc.
// For license information, see https://github.com/nitro/blob/master/LICENSE

package leveldb

import (
"bytes"
"context"
"errors"
"fmt"
"strconv"

"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/syndtr/goleveldb/leveldb"
)

// Storage implements leveldb based storage for batch poster.
type Storage[Item any] struct {
db ethdb.Database
}

var (
// Value at this index holds the *index* of last item.
// Keys that we never want to be accidentally deleted by "Prune()" should be
// lexicographically less than minimum index (that is "0"), hence the prefix
// ".".
lastItemIdxKey = []byte(".last_item_idx_key")
countKey = []byte(".count_key")
)

func New[Item any](db ethdb.Database) *Storage[Item] {
return &Storage[Item]{db: db}
}

func (s *Storage[Item]) decodeItem(data []byte) (*Item, error) {
var item Item
if err := rlp.DecodeBytes(data, &item); err != nil {
return nil, fmt.Errorf("decoding item: %w", err)
}
return &item, nil
}

func idxToKey(idx uint64) []byte {
return []byte(fmt.Sprintf("%020d", idx))
}

func (s *Storage[Item]) GetContents(_ context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) {
var res []*Item
it := s.db.NewIterator([]byte(""), idxToKey(startingIndex))
for i := 0; i < int(maxResults); i++ {
if !it.Next() {
break
}
item, err := s.decodeItem(it.Value())
if err != nil {
return nil, err
}
res = append(res, item)
}
return res, it.Error()
}

func (s *Storage[Item]) lastItemIdx(context.Context) ([]byte, error) {
return s.db.Get(lastItemIdxKey)
}

func (s *Storage[Item]) GetLast(ctx context.Context) (*Item, error) {
size, err := s.Length(ctx)
if err != nil {
return nil, err
}
if size == 0 {
return nil, nil
}
lastItemIdx, err := s.lastItemIdx(ctx)
if err != nil {
return nil, fmt.Errorf("getting last item index: %w", err)
}
val, err := s.db.Get(lastItemIdx)
if err != nil {
return nil, err
}
return s.decodeItem(val)
}

func (s *Storage[Item]) Prune(ctx context.Context, keepStartingAt uint64) error {
cnt, err := s.Length(ctx)
if err != nil {
return err
}
end := idxToKey(keepStartingAt)
it := s.db.NewIterator([]byte{}, idxToKey(0))
b := s.db.NewBatch()
for it.Next() {
if bytes.Compare(it.Key(), end) >= 0 {
break
}
if err := b.Delete(it.Key()); err != nil {
return fmt.Errorf("deleting key: %w", err)
}
cnt--
}
if err := b.Put(countKey, []byte(strconv.Itoa(cnt))); err != nil {
return fmt.Errorf("updating length counter: %w", err)
}
return b.Write()
}

// valueAt gets returns the value at key. If it doesn't exist then it returns
// encoded bytes of nil.
func (s *Storage[Item]) valueAt(_ context.Context, key []byte) ([]byte, error) {
val, err := s.db.Get(key)
if err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
return rlp.EncodeToBytes((*Item)(nil))
}
return nil, err
}
return val, nil
}

func (s *Storage[Item]) Put(ctx context.Context, index uint64, prev *Item, new *Item) error {
key := idxToKey(index)
stored, err := s.valueAt(ctx, key)
if err != nil {
return err
}
prevEnc, err := rlp.EncodeToBytes(prev)
if err != nil {
return fmt.Errorf("encoding previous item: %w", err)
}
if !bytes.Equal(stored, prevEnc) {
return fmt.Errorf("replacing different item than expected at index %v %v %v", index, stored, prevEnc)
}
newEnc, err := rlp.EncodeToBytes(new)
if err != nil {
return fmt.Errorf("encoding new item: %w", err)
}
b := s.db.NewBatch()
cnt, err := s.Length(ctx)
if err != nil {
return err
}
if err := b.Put(key, newEnc); err != nil {
return fmt.Errorf("updating value at: %v: %w", key, err)
}
lastItemIdx, err := s.lastItemIdx(ctx)
if err != nil && !errors.Is(err, leveldb.ErrNotFound) {
return err
}
if errors.Is(err, leveldb.ErrNotFound) {
lastItemIdx = []byte{}
}
if cnt == 0 || bytes.Compare(key, lastItemIdx) > 0 {
if err := b.Put(lastItemIdxKey, key); err != nil {
return fmt.Errorf("updating last item: %w", err)
}
if err := b.Put(countKey, []byte(strconv.Itoa(cnt+1))); err != nil {
return fmt.Errorf("updating length counter: %w", err)
}
}
return b.Write()
}

func (s *Storage[Item]) Length(context.Context) (int, error) {
val, err := s.db.Get(countKey)
if err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
return 0, nil
}
return 0, err
}
return strconv.Atoi(string(val))
}

func (s *Storage[Item]) IsPersistent() bool {
return true
}
Loading

0 comments on commit f8a9421

Please sign in to comment.