Skip to content

Commit

Permalink
Merge pull request #2660 from OffchainLabs/validator_fetch_batch_fix
Browse files Browse the repository at this point in the history
Validator recording fix
  • Loading branch information
tsahee authored Sep 13, 2024
2 parents 0903019 + b5fa17d commit 4483e77
Show file tree
Hide file tree
Showing 10 changed files with 254 additions and 113 deletions.
11 changes: 11 additions & 0 deletions arbos/arbostypes/incomingmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,17 @@ func (msg *L1IncomingMessage) FillInBatchGasCost(batchFetcher FallibleBatchFetch
return nil
}

func (msg *L1IncomingMessage) PastBatchesRequired() ([]uint64, error) {
if msg.Header.Kind != L1MessageType_BatchPostingReport {
return nil, nil
}
_, _, _, batchNum, _, _, err := ParseBatchPostingReportMessageFields(bytes.NewReader(msg.L2msg))
if err != nil {
return nil, fmt.Errorf("failed to parse batch posting report: %w", err)
}
return []uint64{batchNum}, nil
}

func ParseIncomingL1Message(rd io.Reader, batchFetcher FallibleBatchFetcher) (*L1IncomingMessage, error) {
var kindBuf [1]byte
_, err := rd.Read(kindBuf[:])
Expand Down
32 changes: 29 additions & 3 deletions execution/gethexec/block_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/execution"
flag "github.com/spf13/pflag"
)

// BlockRecorder uses a separate statedatabase from the blockchain.
Expand All @@ -25,6 +26,8 @@ import (
// Most recent/advanced header we ever computed (lastHdr)
// Hopefully - some recent valid block. For that we always keep one candidate block until it becomes validated.
type BlockRecorder struct {
config *BlockRecorderConfig

recordingDatabase *arbitrum.RecordingDatabase
execEngine *ExecutionEngine

Expand All @@ -39,10 +42,33 @@ type BlockRecorder struct {
preparedLock sync.Mutex
}

func NewBlockRecorder(config *arbitrum.RecordingDatabaseConfig, execEngine *ExecutionEngine, ethDb ethdb.Database) *BlockRecorder {
type BlockRecorderConfig struct {
TrieDirtyCache int `koanf:"trie-dirty-cache"`
TrieCleanCache int `koanf:"trie-clean-cache"`
MaxPrepared int `koanf:"max-prepared"`
}

var DefaultBlockRecorderConfig = BlockRecorderConfig{
TrieDirtyCache: 1024,
TrieCleanCache: 16,
MaxPrepared: 1000,
}

func BlockRecorderConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".trie-dirty-cache", DefaultBlockRecorderConfig.TrieDirtyCache, "like trie-dirty-cache for the separate, recording database (used for validation)")
f.Int(prefix+".trie-clean-cache", DefaultBlockRecorderConfig.TrieCleanCache, "like trie-clean-cache for the separate, recording database (used for validation)")
f.Int(prefix+".max-prepared", DefaultBlockRecorderConfig.MaxPrepared, "max references to store in the recording database")
}

func NewBlockRecorder(config *BlockRecorderConfig, execEngine *ExecutionEngine, ethDb ethdb.Database) *BlockRecorder {
dbConfig := arbitrum.RecordingDatabaseConfig{
TrieDirtyCache: config.TrieDirtyCache,
TrieCleanCache: config.TrieCleanCache,
}
recorder := &BlockRecorder{
config: config,
execEngine: execEngine,
recordingDatabase: arbitrum.NewRecordingDatabase(config, ethDb, execEngine.bc),
recordingDatabase: arbitrum.NewRecordingDatabase(&dbConfig, ethDb, execEngine.bc),
}
execEngine.SetRecorder(recorder)
return recorder
Expand Down Expand Up @@ -303,7 +329,7 @@ func (r *BlockRecorder) PrepareForRecord(ctx context.Context, start, end arbutil
r.updateLastHdr(header)
hdrNum++
}
r.preparedAddTrim(references, 1000)
r.preparedAddTrim(references, r.config.MaxPrepared)
return nil
}

Expand Down
30 changes: 15 additions & 15 deletions execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,19 @@ func StylusTargetConfigAddOptions(prefix string, f *flag.FlagSet) {
}

type Config struct {
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"`
RecordingDatabase arbitrum.RecordingDatabaseConfig `koanf:"recording-database"`
TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"`
Forwarder ForwarderConfig `koanf:"forwarder"`
ForwardingTarget string `koanf:"forwarding-target"`
SecondaryForwardingTarget []string `koanf:"secondary-forwarding-target"`
Caching CachingConfig `koanf:"caching"`
RPC arbitrum.Config `koanf:"rpc"`
TxLookupLimit uint64 `koanf:"tx-lookup-limit"`
EnablePrefetchBlock bool `koanf:"enable-prefetch-block"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
StylusTarget StylusTargetConfig `koanf:"stylus-target"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"`
RecordingDatabase BlockRecorderConfig `koanf:"recording-database"`
TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"`
Forwarder ForwarderConfig `koanf:"forwarder"`
ForwardingTarget string `koanf:"forwarding-target"`
SecondaryForwardingTarget []string `koanf:"secondary-forwarding-target"`
Caching CachingConfig `koanf:"caching"`
RPC arbitrum.Config `koanf:"rpc"`
TxLookupLimit uint64 `koanf:"tx-lookup-limit"`
EnablePrefetchBlock bool `koanf:"enable-prefetch-block"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
StylusTarget StylusTargetConfig `koanf:"stylus-target"`

forwardingTarget string
}
Expand Down Expand Up @@ -129,7 +129,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
arbitrum.ConfigAddOptions(prefix+".rpc", f)
SequencerConfigAddOptions(prefix+".sequencer", f)
headerreader.AddOptions(prefix+".parent-chain-reader", f)
arbitrum.RecordingDatabaseConfigAddOptions(prefix+".recording-database", f)
BlockRecorderConfigAddOptions(prefix+".recording-database", f)
f.String(prefix+".forwarding-target", ConfigDefault.ForwardingTarget, "transaction forwarding target URL, or \"null\" to disable forwarding (iff not sequencer)")
f.StringSlice(prefix+".secondary-forwarding-target", ConfigDefault.SecondaryForwardingTarget, "secondary transaction forwarding target URL")
AddOptionsForNodeForwarderConfig(prefix+".forwarder", f)
Expand All @@ -145,7 +145,7 @@ var ConfigDefault = Config{
RPC: arbitrum.DefaultConfig,
Sequencer: DefaultSequencerConfig,
ParentChainReader: headerreader.DefaultConfig,
RecordingDatabase: arbitrum.DefaultRecordingDatabaseConfig,
RecordingDatabase: DefaultBlockRecorderConfig,
ForwardingTarget: "",
SecondaryForwardingTarget: []string{},
TxPreChecker: DefaultTxPreCheckerConfig,
Expand Down
2 changes: 1 addition & 1 deletion go-ethereum
6 changes: 6 additions & 0 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (c *Consumer[Request, Response]) Start(ctx context.Context) {
)
}

func (c *Consumer[Request, Response]) Id() string {
return c.id
}

func (c *Consumer[Request, Response]) StopAndWait() {
c.StopWaiter.StopAndWait()
c.deleteHeartBeat(c.GetParentContext())
Expand Down Expand Up @@ -164,10 +168,12 @@ func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID s
if err != nil {
return fmt.Errorf("marshaling result: %w", err)
}
log.Debug("consumer: setting result", "cid", c.id, "messageId", messageID)
acquired, err := c.client.SetNX(ctx, messageID, resp, c.cfg.ResponseEntryTimeout).Result()
if err != nil || !acquired {
return fmt.Errorf("setting result for message: %v, error: %w", messageID, err)
}
log.Debug("consumer: xack", "cid", c.id, "messageId", messageID)
if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil {
return fmt.Errorf("acking message: %v, error: %w", messageID, err)
}
Expand Down
12 changes: 8 additions & 4 deletions pubsub/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,35 +205,39 @@ func setMinIdInt(min *[2]uint64, id string) error {
// checkResponses checks iteratively whether response for the promise is ready.
func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.Duration {
minIdInt := [2]uint64{math.MaxUint64, math.MaxUint64}
log.Debug("redis producer: check responses starting")
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
responded := 0
errored := 0
checked := 0
for id, promise := range p.promises {
if ctx.Err() != nil {
return 0
}
checked++
res, err := p.client.Get(ctx, id).Result()
if err != nil {
errSetId := setMinIdInt(&minIdInt, id)
if errSetId != nil {
log.Error("error setting minId", "err", err)
log.Error("redis producer: error setting minId", "err", err)
return p.cfg.CheckResultInterval
}
if !errors.Is(err, redis.Nil) {
log.Error("Error reading value in redis", "key", id, "error", err)
log.Error("redis producer: Error reading value in redis", "key", id, "error", err)
}
continue
}
var resp Response
if err := json.Unmarshal([]byte(res), &resp); err != nil {
promise.ProduceError(fmt.Errorf("error unmarshalling: %w", err))
log.Error("Error unmarshaling", "value", res, "error", err)
log.Error("redis producer: Error unmarshaling", "value", res, "error", err)
errored++
} else {
promise.Produce(resp)
responded++
}
p.client.Del(ctx, id)
delete(p.promises, id)
}
var trimmed int64
Expand All @@ -245,7 +249,7 @@ func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.D
} else {
trimmed, trimErr = p.client.XTrimMaxLen(ctx, p.redisStream, 0).Result()
}
log.Trace("trimming", "id", minId, "trimmed", trimmed, "responded", responded, "errored", errored, "trim-err", trimErr)
log.Debug("trimming", "id", minId, "trimmed", trimmed, "responded", responded, "errored", errored, "trim-err", trimErr, "checked", checked)
return p.cfg.CheckResultInterval
}

Expand Down
74 changes: 57 additions & 17 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ type BlockValidator struct {
chainCaughtUp bool

// can only be accessed from creation thread or if holding reorg-write
nextCreateBatch []byte
nextCreateBatchBlockHash common.Hash
nextCreateBatchMsgCount arbutil.MessageIndex
nextCreateBatchReread bool
nextCreateStartGS validator.GoGlobalState
nextCreatePrevDelayed uint64
nextCreateBatch *FullBatchInfo
nextCreateBatchReread bool
prevBatchCache map[uint64][]byte

nextCreateStartGS validator.GoGlobalState
nextCreatePrevDelayed uint64

// can only be accessed from from validation thread or if holding reorg-write
lastValidGS validator.GoGlobalState
Expand Down Expand Up @@ -108,6 +108,7 @@ type BlockValidatorConfig struct {
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
RecordingIterLimit uint64 `koanf:"recording-iter-limit"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
BatchCacheLimit uint32 `koanf:"batch-cache-limit"`
CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload
PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Expand Down Expand Up @@ -172,8 +173,9 @@ func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) {
redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f)
f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of execution rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds")
f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations")
f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (small footprint)")
f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (stores batch-copy per block)")
f.Uint64(prefix+".prerecorded-blocks", DefaultBlockValidatorConfig.PrerecordedBlocks, "record that many blocks ahead of validation (larger footprint)")
f.Uint32(prefix+".batch-cache-limit", DefaultBlockValidatorConfig.BatchCacheLimit, "limit number of old batches to keep in block-validator")
f.String(prefix+".current-module-root", DefaultBlockValidatorConfig.CurrentModuleRoot, "current wasm module root ('current' read from chain, 'latest' from machines/latest dir, or provide hash)")
f.Uint64(prefix+".recording-iter-limit", DefaultBlockValidatorConfig.RecordingIterLimit, "limit on block recordings sent per iteration")
f.String(prefix+".pending-upgrade-module-root", DefaultBlockValidatorConfig.PendingUpgradeModuleRoot, "pending upgrade wasm module root to additionally validate (hash, 'latest' or empty)")
Expand All @@ -192,8 +194,9 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{
ValidationServer: rpcclient.DefaultClientConfig,
RedisValidationClientConfig: redis.DefaultValidationClientConfig,
ValidationPoll: time.Second,
ForwardBlocks: 1024,
ForwardBlocks: 128,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
BatchCacheLimit: 20,
CurrentModuleRoot: "current",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Expand All @@ -209,6 +212,7 @@ var TestBlockValidatorConfig = BlockValidatorConfig{
RedisValidationClientConfig: redis.TestValidationClientConfig,
ValidationPoll: 100 * time.Millisecond,
ForwardBlocks: 128,
BatchCacheLimit: 20,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
RecordingIterLimit: 20,
CurrentModuleRoot: "latest",
Expand Down Expand Up @@ -271,6 +275,7 @@ func NewBlockValidator(
progressValidationsChan: make(chan struct{}, 1),
config: config,
fatalErr: fatalErr,
prevBatchCache: make(map[uint64][]byte),
}
if !config().Dangerous.ResetBlockValidation {
validated, err := ret.ReadLastValidatedInfo()
Expand Down Expand Up @@ -571,33 +576,63 @@ func (v *BlockValidator) createNextValidationEntry(ctx context.Context) (bool, e
}
if v.nextCreateStartGS.PosInBatch == 0 || v.nextCreateBatchReread {
// new batch
found, batch, batchBlockHash, count, err := v.readBatch(ctx, v.nextCreateStartGS.Batch)
found, fullBatchInfo, err := v.readFullBatch(ctx, v.nextCreateStartGS.Batch)
if !found {
return false, err
}
v.nextCreateBatch = batch
v.nextCreateBatchBlockHash = batchBlockHash
v.nextCreateBatchMsgCount = count
if v.nextCreateBatch != nil {
v.prevBatchCache[v.nextCreateBatch.Number] = v.nextCreateBatch.PostedData
}
v.nextCreateBatch = fullBatchInfo
// #nosec G115
validatorMsgCountCurrentBatch.Update(int64(count))
validatorMsgCountCurrentBatch.Update(int64(fullBatchInfo.MsgCount))
batchCacheLimit := v.config().BatchCacheLimit
if len(v.prevBatchCache) > int(batchCacheLimit) {
for num := range v.prevBatchCache {
if num+uint64(batchCacheLimit) < v.nextCreateStartGS.Batch {
delete(v.prevBatchCache, num)
}
}
}
v.nextCreateBatchReread = false
}
endGS := validator.GoGlobalState{
BlockHash: endRes.BlockHash,
SendRoot: endRes.SendRoot,
}
if pos+1 < v.nextCreateBatchMsgCount {
if pos+1 < v.nextCreateBatch.MsgCount {
endGS.Batch = v.nextCreateStartGS.Batch
endGS.PosInBatch = v.nextCreateStartGS.PosInBatch + 1
} else if pos+1 == v.nextCreateBatchMsgCount {
} else if pos+1 == v.nextCreateBatch.MsgCount {
endGS.Batch = v.nextCreateStartGS.Batch + 1
endGS.PosInBatch = 0
} else {
return false, fmt.Errorf("illegal batch msg count %d pos %d batch %d", v.nextCreateBatchMsgCount, pos, endGS.Batch)
return false, fmt.Errorf("illegal batch msg count %d pos %d batch %d", v.nextCreateBatch.MsgCount, pos, endGS.Batch)
}
chainConfig := v.streamer.ChainConfig()
prevBatchNums, err := msg.Message.PastBatchesRequired()
if err != nil {
return false, err
}
prevBatches := make([]validator.BatchInfo, 0, len(prevBatchNums))
// prevBatchNums are only used for batch reports, each is only used once
for _, batchNum := range prevBatchNums {
data, found := v.prevBatchCache[batchNum]
if found {
delete(v.prevBatchCache, batchNum)
} else {
data, err = v.readPostedBatch(ctx, batchNum)
if err != nil {
return false, err
}
}
prevBatches = append(prevBatches, validator.BatchInfo{
Number: batchNum,
Data: data,
})
}
entry, err := newValidationEntry(
pos, v.nextCreateStartGS, endGS, msg, v.nextCreateBatch, v.nextCreateBatchBlockHash, v.nextCreatePrevDelayed, chainConfig,
pos, v.nextCreateStartGS, endGS, msg, v.nextCreateBatch, prevBatches, v.nextCreatePrevDelayed, chainConfig,
)
if err != nil {
return false, err
Expand Down Expand Up @@ -997,6 +1032,9 @@ func (v *BlockValidator) UpdateLatestStaked(count arbutil.MessageIndex, globalSt
v.nextCreateStartGS = globalState
v.nextCreatePrevDelayed = msg.DelayedMessagesRead
v.nextCreateBatchReread = true
if v.nextCreateBatch != nil {
v.prevBatchCache[v.nextCreateBatch.Number] = v.nextCreateBatch.PostedData
}
v.createdA.Store(countUint64)
}
// under the reorg mutex we don't need atomic access
Expand All @@ -1023,6 +1061,7 @@ func (v *BlockValidator) ReorgToBatchCount(count uint64) {
defer v.reorgMutex.Unlock()
if v.nextCreateStartGS.Batch >= count {
v.nextCreateBatchReread = true
v.prevBatchCache = make(map[uint64][]byte)
}
}

Expand Down Expand Up @@ -1063,6 +1102,7 @@ func (v *BlockValidator) Reorg(ctx context.Context, count arbutil.MessageIndex)
v.nextCreateStartGS = buildGlobalState(*res, endPosition)
v.nextCreatePrevDelayed = msg.DelayedMessagesRead
v.nextCreateBatchReread = true
v.prevBatchCache = make(map[uint64][]byte)
countUint64 := uint64(count)
v.createdA.Store(countUint64)
// under the reorg mutex we don't need atomic access
Expand Down
Loading

0 comments on commit 4483e77

Please sign in to comment.