Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement leveldb storage for dataposter #1736

Merged
merged 29 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
82705c1
draft
anodar Jul 5, 2023
345528a
implement leveldb storage
anodar Jul 6, 2023
8563d05
Use ArbDB table instead of creating new levelDB
anodar Jul 6, 2023
1f90062
Rename "leveldb-enable" to "enable-leveldb"
anodar Jul 6, 2023
23ec417
cleanup: drop unnecessary emtpy line, remove unused DataPosterPrefix …
anodar Jul 6, 2023
dec0776
factor out redis and slice storage into their own packages
anodar Jul 7, 2023
c0a8f52
refactor redis and slice storage (drop redis/slice prefixes as they a…
anodar Jul 7, 2023
2e6972d
Drop pflag to flag alias as it is unnecessary
anodar Jul 7, 2023
926b43c
Add all storages to tests (redis, slice, leveldb)
anodar Jul 7, 2023
b85ea16
Delete leveldb_test.go since it's covered in tests for all storages
anodar Jul 7, 2023
268f7c3
draft
anodar Jul 5, 2023
21f726c
implement leveldb storage
anodar Jul 6, 2023
7c56e8e
Use ArbDB table instead of creating new levelDB
anodar Jul 6, 2023
a29c0a5
Rename "leveldb-enable" to "enable-leveldb"
anodar Jul 6, 2023
7fded9d
cleanup: drop unnecessary emtpy line, remove unused DataPosterPrefix …
anodar Jul 6, 2023
3d0d06d
factor out redis and slice storage into their own packages
anodar Jul 7, 2023
daf123a
refactor redis and slice storage (drop redis/slice prefixes as they a…
anodar Jul 7, 2023
b7005dc
Drop pflag to flag alias as it is unnecessary
anodar Jul 7, 2023
e38ffb0
Add all storages to tests (redis, slice, leveldb)
anodar Jul 7, 2023
b0ca3d3
Delete leveldb_test.go since it's covered in tests for all storages
anodar Jul 7, 2023
6236027
Move BlockValidatorPrefix back to schema.go
anodar Jul 11, 2023
9c80723
Rebase with master
anodar Jul 11, 2023
8fe085f
Fix bug in slice storage that compares pointers instead of values
anodar Jul 11, 2023
fb0b7c7
Add comment to cmp.Diff
anodar Jul 11, 2023
cf122ea
Fix lint error in test
anodar Jul 11, 2023
61cf4a5
Use reflect.DeepEqual instead of cmp.Diff
anodar Jul 11, 2023
aec5d79
Handle empty storage correctly
anodar Jul 11, 2023
35097b1
Merge branch 'master' into leveldb-based-dataposter
anodar Jul 11, 2023
f13c129
gofmt batch_poster
anodar Jul 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,20 @@ import (
"time"

"github.com/andybalholm/brotli"
flag "github.com/spf13/pflag"
"github.com/spf13/pflag"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/arbnode/dataposter"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbutil"
Expand Down Expand Up @@ -105,7 +106,7 @@ func (c *BatchPosterConfig) Validate() error {

type BatchPosterConfigFetcher func() *BatchPosterConfig

func BatchPosterConfigAddOptions(prefix string, f *flag.FlagSet) {
func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBatchPosterConfig.Enable, "enable posting batches to l1")
f.Bool(prefix+".disable-das-fallback-store-data-on-chain", DefaultBatchPosterConfig.DisableDasFallbackStoreDataOnChain, "If unable to batch to DAS, disable fallback storing data on chain")
f.Int(prefix+".max-size", DefaultBatchPosterConfig.MaxBatchSize, "maximum batch size")
Expand Down Expand Up @@ -162,7 +163,7 @@ var TestBatchPosterConfig = BatchPosterConfig{
L1Wallet: DefaultBatchPosterL1WalletConfig,
}

func NewBatchPoster(l1Reader *headerreader.HeaderReader, inbox *InboxTracker, streamer *TransactionStreamer, syncMonitor *SyncMonitor, config BatchPosterConfigFetcher, deployInfo *chaininfo.RollupAddresses, transactOpts *bind.TransactOpts, daWriter das.DataAvailabilityServiceWriter) (*BatchPoster, error) {
func NewBatchPoster(dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderReader, inbox *InboxTracker, streamer *TransactionStreamer, syncMonitor *SyncMonitor, config BatchPosterConfigFetcher, deployInfo *chaininfo.RollupAddresses, transactOpts *bind.TransactOpts, daWriter das.DataAvailabilityServiceWriter) (*BatchPoster, error) {
seqInbox, err := bridgegen.NewSequencerInbox(deployInfo.SequencerInbox, l1Reader.Client())
if err != nil {
return nil, err
Expand Down Expand Up @@ -205,7 +206,7 @@ func NewBatchPoster(l1Reader *headerreader.HeaderReader, inbox *InboxTracker, st
dataPosterConfigFetcher := func() *dataposter.DataPosterConfig {
return &config().DataPoster
}
b.dataPoster, err = dataposter.NewDataPoster(l1Reader, transactOpts, redisClient, redisLock, dataPosterConfigFetcher, b.getBatchPosterPosition)
b.dataPoster, err = dataposter.NewDataPoster(dataPosterDB, l1Reader, transactOpts, redisClient, redisLock, dataPosterConfigFetcher, b.getBatchPosterPosition)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -809,7 +810,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
if err != nil {
b.building = nil
logLevel := log.Error
if errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, dataposter.ErrStorageRace) {
if errors.Is(err, AccumulatorNotFoundErr) || errors.Is(err, storage.ErrStorageRace) {
// Likely the inbox tracker just isn't caught up.
// Let's see if this error disappears naturally.
if b.firstAccErr == (time.Time{}) {
Expand Down
29 changes: 21 additions & 8 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/go-redis/redis/v8"
"github.com/offchainlabs/nitro/arbnode/dataposter/leveldb"
"github.com/offchainlabs/nitro/arbnode/dataposter/slice"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/signature"
"github.com/offchainlabs/nitro/util/stopwaiter"
flag "github.com/spf13/pflag"
"github.com/spf13/pflag"

redisstorage "github.com/offchainlabs/nitro/arbnode/dataposter/redis"
)

type queuedTransaction[Meta any] struct {
Expand Down Expand Up @@ -55,11 +61,12 @@ type DataPosterConfig struct {
UrgencyGwei float64 `koanf:"urgency-gwei" reload:"hot"`
MinFeeCapGwei float64 `koanf:"min-fee-cap-gwei" reload:"hot"`
MinTipCapGwei float64 `koanf:"min-tip-cap-gwei" reload:"hot"`
EnableLevelDB bool `koanf:"enable-leveldb" reload:"hot"`
}

type DataPosterConfigFetcher func() *DataPosterConfig

func DataPosterConfigAddOptions(prefix string, f *flag.FlagSet) {
func DataPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".replacement-times", DefaultDataPosterConfig.ReplacementTimes, "comma-separated list of durations since first posting to attempt a replace-by-fee")
f.Bool(prefix+".wait-for-l1-finality", DefaultDataPosterConfig.WaitForL1Finality, "only treat a transaction as confirmed after L1 finality has been achieved (recommended)")
f.Uint64(prefix+".max-mempool-transactions", DefaultDataPosterConfig.MaxMempoolTransactions, "the maximum number of transactions to have queued in the mempool at once (0 = unlimited)")
Expand All @@ -68,6 +75,7 @@ func DataPosterConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Float64(prefix+".urgency-gwei", DefaultDataPosterConfig.UrgencyGwei, "the urgency to use for maximum fee cap calculation")
f.Float64(prefix+".min-fee-cap-gwei", DefaultDataPosterConfig.MinFeeCapGwei, "the minimum fee cap to post transactions at")
f.Float64(prefix+".min-tip-cap-gwei", DefaultDataPosterConfig.MinTipCapGwei, "the minimum tip cap to post transactions at")
f.Bool(prefix+".enable-leveldb", DefaultDataPosterConfig.EnableLevelDB, "uses leveldb when enabled")
signature.SimpleHmacConfigAddOptions(prefix+".redis-signer", f)
}

Expand All @@ -78,6 +86,7 @@ var DefaultDataPosterConfig = DataPosterConfig{
UrgencyGwei: 2.,
MaxMempoolTransactions: 64,
MinTipCapGwei: 0.05,
EnableLevelDB: false,
}

var TestDataPosterConfig = DataPosterConfig{
Expand All @@ -88,6 +97,7 @@ var TestDataPosterConfig = DataPosterConfig{
UrgencyGwei: 2.,
MaxMempoolTransactions: 64,
MinTipCapGwei: 0.05,
EnableLevelDB: false,
}

// DataPoster must be RLP serializable and deserializable
Expand All @@ -114,7 +124,7 @@ type AttemptLocker interface {
AttemptLock(context.Context) bool
}

func NewDataPoster[Meta any](headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config DataPosterConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error)) (*DataPoster[Meta], error) {
func NewDataPoster[Meta any](db ethdb.Database, headerReader *headerreader.HeaderReader, auth *bind.TransactOpts, redisClient redis.UniversalClient, redisLock AttemptLocker, config DataPosterConfigFetcher, metadataRetriever func(ctx context.Context, blockNum *big.Int) (Meta, error)) (*DataPoster[Meta], error) {
var replacementTimes []time.Duration
var lastReplacementTime time.Duration
for _, s := range strings.Split(config().ReplacementTimes, ",") {
Expand All @@ -134,11 +144,14 @@ func NewDataPoster[Meta any](headerReader *headerreader.HeaderReader, auth *bind
// To avoid special casing "don't replace again", replace in 10 years
replacementTimes = append(replacementTimes, time.Hour*24*365*10)
var queue QueueStorage[queuedTransaction[Meta]]
if redisClient == nil {
queue = NewSliceStorage[queuedTransaction[Meta]]()
} else {
switch {
case config().EnableLevelDB:
queue = leveldb.New[queuedTransaction[Meta]](db)
case redisClient == nil:
queue = slice.NewStorage[queuedTransaction[Meta]]()
default:
var err error
queue, err = NewRedisStorage[queuedTransaction[Meta]](redisClient, "data-poster.queue", &config().RedisSigner)
queue, err = redisstorage.NewStorage[queuedTransaction[Meta]](redisClient, "data-poster.queue", &config().RedisSigner)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -460,7 +473,7 @@ func (p *DataPoster[Meta]) maybeLogError(err error, tx *queuedTransaction[Meta],
return
}
logLevel := log.Error
if errors.Is(err, ErrStorageRace) {
if errors.Is(err, storage.ErrStorageRace) {
p.errorCount[nonce]++
if p.errorCount[nonce] <= maxConsecutiveIntermittentErrors {
logLevel = log.Debug
Expand Down
179 changes: 179 additions & 0 deletions arbnode/dataposter/leveldb/leveldb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2021-2023, Offchain Labs, Inc.
// For license information, see https://github.com/nitro/blob/master/LICENSE

package leveldb

import (
"bytes"
"context"
"errors"
"fmt"
"strconv"

"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/syndtr/goleveldb/leveldb"
)

// Storage implements leveldb based storage for batch poster.
type Storage[Item any] struct {
db ethdb.Database
}

var (
// Value at this index holds the *index* of last item.
// Keys that we never want to be accidentally deleted by "Prune()" should be
// lexicographically less than minimum index (that is "0"), hence the prefix
// ".".
lastItemIdxKey = []byte(".last_item_idx_key")
countKey = []byte(".count_key")
)

func New[Item any](db ethdb.Database) *Storage[Item] {
return &Storage[Item]{db: db}
}

func (s *Storage[Item]) decodeItem(data []byte) (*Item, error) {
var item Item
if err := rlp.DecodeBytes(data, &item); err != nil {
return nil, fmt.Errorf("decoding item: %w", err)
}
return &item, nil
}

func idxToKey(idx uint64) []byte {
return []byte(fmt.Sprintf("%020d", idx))
}

func (s *Storage[Item]) GetContents(_ context.Context, startingIndex uint64, maxResults uint64) ([]*Item, error) {
PlasmaPower marked this conversation as resolved.
Show resolved Hide resolved
var res []*Item
it := s.db.NewIterator([]byte(""), idxToKey(startingIndex))
for i := 0; i < int(maxResults); i++ {
if !it.Next() {
break
}
item, err := s.decodeItem(it.Value())
if err != nil {
return nil, err
}
res = append(res, item)
}
return res, it.Error()
}

func (s *Storage[Item]) lastItemIdx(context.Context) ([]byte, error) {
return s.db.Get(lastItemIdxKey)
}

func (s *Storage[Item]) GetLast(ctx context.Context) (*Item, error) {
size, err := s.Length(ctx)
if err != nil {
return nil, err
}
if size == 0 {
return nil, nil
}
lastItemIdx, err := s.lastItemIdx(ctx)
if err != nil {
return nil, fmt.Errorf("getting last item index: %w", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should return nil, nil if the last item index is missing (i.e. there are no items). We should also think about what happens when all items are pruned. In that case, we probably want to have pruning remove the last item index from the db.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Checking length, if it's 0 then returning nil, nil. So no need to remove last item idx from db when pruning (for all public methods behavior will be exactly the same).

}
val, err := s.db.Get(lastItemIdx)
if err != nil {
return nil, err
}
return s.decodeItem(val)
}

func (s *Storage[Item]) Prune(ctx context.Context, keepStartingAt uint64) error {
cnt, err := s.Length(ctx)
if err != nil {
return err
}
end := idxToKey(keepStartingAt)
it := s.db.NewIterator([]byte{}, idxToKey(0))
b := s.db.NewBatch()
for it.Next() {
if bytes.Compare(it.Key(), end) >= 0 {
break
}
if err := b.Delete(it.Key()); err != nil {
return fmt.Errorf("deleting key: %w", err)
}
cnt--
}
if err := b.Put(countKey, []byte(strconv.Itoa(cnt))); err != nil {
return fmt.Errorf("updating length counter: %w", err)
}
return b.Write()
}

// valueAt gets returns the value at key. If it doesn't exist then it returns
// encoded bytes of nil.
func (s *Storage[Item]) valueAt(_ context.Context, key []byte) ([]byte, error) {
val, err := s.db.Get(key)
if err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
return rlp.EncodeToBytes((*Item)(nil))
}
return nil, err
}
return val, nil
}

func (s *Storage[Item]) Put(ctx context.Context, index uint64, prev *Item, new *Item) error {
key := idxToKey(index)
stored, err := s.valueAt(ctx, key)
if err != nil {
return err
}
prevEnc, err := rlp.EncodeToBytes(prev)
if err != nil {
return fmt.Errorf("encoding previous item: %w", err)
}
if !bytes.Equal(stored, prevEnc) {
return fmt.Errorf("replacing different item than expected at index %v %v %v", index, stored, prevEnc)
}
newEnc, err := rlp.EncodeToBytes(new)
if err != nil {
return fmt.Errorf("encoding new item: %w", err)
}
b := s.db.NewBatch()
cnt, err := s.Length(ctx)
if err != nil {
return err
}
if err := b.Put(key, newEnc); err != nil {
return fmt.Errorf("updating value at: %v: %w", key, err)
}
lastItemIdx, err := s.lastItemIdx(ctx)
if err != nil && !errors.Is(err, leveldb.ErrNotFound) {
return err
}
if errors.Is(err, leveldb.ErrNotFound) {
lastItemIdx = []byte{}
}
if cnt == 0 || bytes.Compare(key, lastItemIdx) > 0 {
if err := b.Put(lastItemIdxKey, key); err != nil {
return fmt.Errorf("updating last item: %w", err)
}
if err := b.Put(countKey, []byte(strconv.Itoa(cnt+1))); err != nil {
return fmt.Errorf("updating length counter: %w", err)
}
}
return b.Write()
}

func (s *Storage[Item]) Length(context.Context) (int, error) {
val, err := s.db.Get(countKey)
if err != nil {
if errors.Is(err, leveldb.ErrNotFound) {
return 0, nil
}
return 0, err
}
return strconv.Atoi(string(val))
}

func (s *Storage[Item]) IsPersistent() bool {
return true
}
Loading
Loading