From 15a09264afc7b29867e5832a747f745f89def0f5 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Tue, 20 Feb 2024 15:55:13 -0600 Subject: [PATCH 1/3] Unified interface for Data Availability providers --- arbnode/inbox_tracker.go | 7 +- arbstate/inbox.go | 151 +++++++++++++++++++++++--------- arbstate/inbox_fuzz_test.go | 2 +- cmd/replay/main.go | 6 +- system_tests/state_fuzz_test.go | 2 +- 5 files changed, 124 insertions(+), 44 deletions(-) diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index 763ddcc420..f87d75c5e7 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -606,8 +606,11 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L ctx: ctx, client: client, } - - multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.das, t.blobReader, arbstate.KeysetValidate) + daProviders := []arbstate.DataAvailabilityProvider{ + arbstate.DAProviderWrapperDAS(t.das), + arbstate.DAProviderWrapperBlobReader(t.blobReader), + } + multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, daProviders, arbstate.KeysetValidate) batchMessageCounts := make(map[uint64]arbutil.MessageIndex) currentpos := prevbatchmeta.MessageCount + 1 for { diff --git a/arbstate/inbox.go b/arbstate/inbox.go index b65c9360c1..95566ba3ce 100644 --- a/arbstate/inbox.go +++ b/arbstate/inbox.go @@ -63,7 +63,7 @@ const maxZeroheavyDecompressedLen = 101*MaxDecompressedLen/100 + 64 const MaxSegmentsPerSequencerMessage = 100 * 1024 const MinLifetimeSecondsForDataAvailabilityCert = 7 * 24 * 60 * 60 // one week -func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash common.Hash, data []byte, dasReader DataAvailabilityReader, blobReader BlobReader, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) { +func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash common.Hash, data []byte, daProviders []DataAvailabilityProvider, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) { if len(data) < 40 { return nil, errors.New("sequencer message missing L1 header") } @@ -88,41 +88,30 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash // Stage 1: Extract the payload from any data availability header. // It's important that multiple DAS strategies can't both be invoked in the same batch, // as these headers are validated by the sequencer inbox and not other DASs. - if len(payload) > 0 && IsDASMessageHeaderByte(payload[0]) { - if dasReader == nil { - log.Error("No DAS Reader configured, but sequencer message found with DAS header") - } else { - var err error - payload, err = RecoverPayloadFromDasBatch(ctx, batchNum, data, dasReader, nil, keysetValidationMode) - if err != nil { - return nil, err - } - if payload == nil { - return parsedMsg, nil + // We try to extract payload from the first occuring valid DA provider in the daProviders list + if len(payload) > 0 { + foundDA := false + var err error + for _, provider := range daProviders { + if provider != nil && provider.IsValidHeaderByte(payload[0]) { + payload, err = provider.RecoverPayloadFromBatch(ctx, batchNum, batchBlockHash, data, nil, keysetValidationMode) + if err != nil { + return nil, err + } + if payload == nil { + return parsedMsg, nil + } + foundDA = true + break } } - } else if len(payload) > 0 && IsBlobHashesHeaderByte(payload[0]) { - blobHashes := payload[1:] - if len(blobHashes)%len(common.Hash{}) != 0 { - return nil, fmt.Errorf("blob batch data is not a list of hashes as expected") - } - versionedHashes := make([]common.Hash, len(blobHashes)/len(common.Hash{})) - for i := 0; i*32 < len(blobHashes); i += 1 { - copy(versionedHashes[i][:], blobHashes[i*32:(i+1)*32]) - } - - if blobReader == nil { - return nil, errors.New("blob batch payload was encountered but no BlobReader was configured") - } - kzgBlobs, err := blobReader.GetBlobs(ctx, batchBlockHash, versionedHashes) - if err != nil { - return nil, fmt.Errorf("failed to get blobs: %w", err) - } - payload, err = blobs.DecodeBlobs(kzgBlobs) - if err != nil { - log.Warn("Failed to decode blobs", "batchBlockHash", batchBlockHash, "versionedHashes", versionedHashes, "err", err) - return parsedMsg, nil + if !foundDA { + if IsDASMessageHeaderByte(payload[0]) { + log.Error("No DAS Reader configured, but sequencer message found with DAS header") + } else if IsBlobHashesHeaderByte(payload[0]) { + return nil, errors.New("blob batch payload was encountered but no BlobReader was configured") + } } } @@ -284,6 +273,92 @@ func RecoverPayloadFromDasBatch( return payload, nil } +type DataAvailabilityProvider interface { + // IsValidHeaderByte returns true if the given headerByte has bits corresponding to the DA provider + IsValidHeaderByte(headerByte byte) bool + + // RecoverPayloadFromBatch fetches the underlying payload from the DA provider given the batch header information + RecoverPayloadFromBatch( + ctx context.Context, + batchNum uint64, + batchBlockHash common.Hash, + sequencerMsg []byte, + preimages map[arbutil.PreimageType]map[common.Hash][]byte, + keysetValidationMode KeysetValidationMode, + ) ([]byte, error) +} + +// DAProviderWrapperDAS is generally meant to be only used by nitro. +// DA Providers should implement methods in the DataAvailabilityProvider interface independently +func DAProviderWrapperDAS(das DataAvailabilityReader) *dAProviderForDAS { + return &dAProviderForDAS{ + das: das, + } +} + +type dAProviderForDAS struct { + das DataAvailabilityReader +} + +func (d *dAProviderForDAS) IsValidHeaderByte(headerByte byte) bool { + return IsDASMessageHeaderByte(headerByte) +} + +func (d *dAProviderForDAS) RecoverPayloadFromBatch( + ctx context.Context, + batchNum uint64, + batchBlockHash common.Hash, + sequencerMsg []byte, + preimages map[arbutil.PreimageType]map[common.Hash][]byte, + keysetValidationMode KeysetValidationMode, +) ([]byte, error) { + return RecoverPayloadFromDasBatch(ctx, batchNum, sequencerMsg, d.das, preimages, keysetValidationMode) +} + +// DAProviderWrapperBlobReader is generally meant to be only used by nitro. +// DA Providers should implement methods in the DataAvailabilityProvider interface independently +func DAProviderWrapperBlobReader(blobReader BlobReader) *dAProviderForBlobReader { + return &dAProviderForBlobReader{ + blobReader: blobReader, + } +} + +type dAProviderForBlobReader struct { + blobReader BlobReader +} + +func (b *dAProviderForBlobReader) IsValidHeaderByte(headerByte byte) bool { + return IsBlobHashesHeaderByte(headerByte) +} + +func (b *dAProviderForBlobReader) RecoverPayloadFromBatch( + ctx context.Context, + batchNum uint64, + batchBlockHash common.Hash, + sequencerMsg []byte, + preimages map[arbutil.PreimageType]map[common.Hash][]byte, + keysetValidationMode KeysetValidationMode, +) ([]byte, error) { + blobHashes := sequencerMsg[41:] + if len(blobHashes)%len(common.Hash{}) != 0 { + return nil, fmt.Errorf("blob batch data is not a list of hashes as expected") + } + versionedHashes := make([]common.Hash, len(blobHashes)/len(common.Hash{})) + for i := 0; i*32 < len(blobHashes); i += 1 { + copy(versionedHashes[i][:], blobHashes[i*32:(i+1)*32]) + } + kzgBlobs, err := b.blobReader.GetBlobs(ctx, batchBlockHash, versionedHashes) + if err != nil { + return nil, fmt.Errorf("failed to get blobs: %w", err) + } + payload, err := blobs.DecodeBlobs(kzgBlobs) + if err != nil { + log.Warn("Failed to decode blobs", "batchBlockHash", batchBlockHash, "versionedHashes", versionedHashes, "err", err) + return nil, nil + } + return payload, nil +} + type KeysetValidationMode uint8 const KeysetValidate KeysetValidationMode = 0 @@ -293,8 +368,7 @@ const KeysetDontValidate KeysetValidationMode = 2 type inboxMultiplexer struct { backend InboxBackend delayedMessagesRead uint64 - dasReader DataAvailabilityReader - blobReader BlobReader + daProviders []DataAvailabilityProvider cachedSequencerMessage *sequencerMessage cachedSequencerMessageNum uint64 cachedSegmentNum uint64 @@ -304,12 +378,11 @@ type inboxMultiplexer struct { keysetValidationMode KeysetValidationMode } -func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, dasReader DataAvailabilityReader, blobReader BlobReader, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer { +func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, daProviders []DataAvailabilityProvider, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer { return &inboxMultiplexer{ backend: backend, delayedMessagesRead: delayedMessagesRead, - dasReader: dasReader, - blobReader: blobReader, + daProviders: daProviders, keysetValidationMode: keysetValidationMode, } } @@ -331,7 +404,7 @@ func (r *inboxMultiplexer) Pop(ctx context.Context) (*arbostypes.MessageWithMeta } r.cachedSequencerMessageNum = r.backend.GetSequencerInboxPosition() var err error - r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, batchBlockHash, bytes, r.dasReader, r.blobReader, r.keysetValidationMode) + r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, batchBlockHash, bytes, r.daProviders, r.keysetValidationMode) if err != nil { return nil, err } diff --git a/arbstate/inbox_fuzz_test.go b/arbstate/inbox_fuzz_test.go index dcf43fd0da..b34c02534b 100644 --- a/arbstate/inbox_fuzz_test.go +++ b/arbstate/inbox_fuzz_test.go @@ -67,7 +67,7 @@ func FuzzInboxMultiplexer(f *testing.F) { delayedMessage: delayedMsg, positionWithinMessage: 0, } - multiplexer := NewInboxMultiplexer(backend, 0, nil, nil, KeysetValidate) + multiplexer := NewInboxMultiplexer(backend, 0, nil, KeysetValidate) _, err := multiplexer.Pop(context.TODO()) if err != nil { panic(err) diff --git a/cmd/replay/main.go b/cmd/replay/main.go index afbd308e91..0fae45a81e 100644 --- a/cmd/replay/main.go +++ b/cmd/replay/main.go @@ -210,7 +210,11 @@ func main() { if backend.GetPositionWithinMessage() > 0 { keysetValidationMode = arbstate.KeysetDontValidate } - inboxMultiplexer := arbstate.NewInboxMultiplexer(backend, delayedMessagesRead, dasReader, &BlobPreimageReader{}, keysetValidationMode) + daProviders := []arbstate.DataAvailabilityProvider{ + arbstate.DAProviderWrapperDAS(dasReader), + arbstate.DAProviderWrapperBlobReader(&BlobPreimageReader{}), + } + inboxMultiplexer := arbstate.NewInboxMultiplexer(backend, delayedMessagesRead, daProviders, keysetValidationMode) ctx := context.Background() message, err := inboxMultiplexer.Pop(ctx) if err != nil { diff --git a/system_tests/state_fuzz_test.go b/system_tests/state_fuzz_test.go index 28bcbec9b4..1b29dca4b9 100644 --- a/system_tests/state_fuzz_test.go +++ b/system_tests/state_fuzz_test.go @@ -41,7 +41,7 @@ func BuildBlock( if lastBlockHeader != nil { delayedMessagesRead = lastBlockHeader.Nonce.Uint64() } - inboxMultiplexer := arbstate.NewInboxMultiplexer(inbox, delayedMessagesRead, nil, nil, arbstate.KeysetValidate) + inboxMultiplexer := arbstate.NewInboxMultiplexer(inbox, delayedMessagesRead, nil, arbstate.KeysetValidate) ctx := context.Background() message, err := inboxMultiplexer.Pop(ctx) From cb1e663557746811c579ccb0b5783352a4fa15e0 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Wed, 21 Feb 2024 21:00:45 -0600 Subject: [PATCH 2/3] address PR comments --- arbnode/inbox_tracker.go | 4 ++-- arbstate/inbox.go | 14 ++++++++++---- cmd/replay/main.go | 4 ++-- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index f87d75c5e7..410965a5f9 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -607,8 +607,8 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L client: client, } daProviders := []arbstate.DataAvailabilityProvider{ - arbstate.DAProviderWrapperDAS(t.das), - arbstate.DAProviderWrapperBlobReader(t.blobReader), + arbstate.NewDAProviderDAS(t.das), + arbstate.NewDAProviderBlobReader(t.blobReader), } multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, daProviders, arbstate.KeysetValidate) batchMessageCounts := make(map[uint64]arbutil.MessageIndex) diff --git a/arbstate/inbox.go b/arbstate/inbox.go index 95566ba3ce..8dbfef2ed6 100644 --- a/arbstate/inbox.go +++ b/arbstate/inbox.go @@ -288,9 +288,12 @@ type DataAvailabilityProvider interface { ) ([]byte, error) } -// DAProviderWrapperDAS is generally meant to be only used by nitro. +// NewDAProviderDAS is generally meant to be only used by nitro. // DA Providers should implement methods in the DataAvailabilityProvider interface independently -func DAProviderWrapperDAS(das DataAvailabilityReader) *dAProviderForDAS { +func NewDAProviderDAS(das DataAvailabilityReader) *dAProviderForDAS { + if das == nil { + return nil + } return &dAProviderForDAS{ das: das, } @@ -315,9 +318,12 @@ func (d *dAProviderForDAS) RecoverPayloadFromBatch( return RecoverPayloadFromDasBatch(ctx, batchNum, sequencerMsg, d.das, preimages, keysetValidationMode) } -// DAProviderWrapperBlobReader is generally meant to be only used by nitro. +// NewDAProviderBlobReader is generally meant to be only used by nitro. // DA Providers should implement methods in the DataAvailabilityProvider interface independently -func DAProviderWrapperBlobReader(blobReader BlobReader) *dAProviderForBlobReader { +func NewDAProviderBlobReader(blobReader BlobReader) *dAProviderForBlobReader { + if blobReader == nil { + return nil + } return &dAProviderForBlobReader{ blobReader: blobReader, } diff --git a/cmd/replay/main.go b/cmd/replay/main.go index 0fae45a81e..11a2526a61 100644 --- a/cmd/replay/main.go +++ b/cmd/replay/main.go @@ -211,8 +211,8 @@ func main() { keysetValidationMode = arbstate.KeysetDontValidate } daProviders := []arbstate.DataAvailabilityProvider{ - arbstate.DAProviderWrapperDAS(dasReader), - arbstate.DAProviderWrapperBlobReader(&BlobPreimageReader{}), + arbstate.NewDAProviderDAS(dasReader), + arbstate.NewDAProviderBlobReader(&BlobPreimageReader{}), } inboxMultiplexer := arbstate.NewInboxMultiplexer(backend, delayedMessagesRead, daProviders, keysetValidationMode) ctx := context.Background() From 514edfeb92e7b9683558f90933c539e799ffc5c4 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli Date: Thu, 22 Feb 2024 15:35:22 -0600 Subject: [PATCH 3/3] bug fix --- arbnode/inbox_tracker.go | 9 ++++++--- arbstate/inbox.go | 6 ------ cmd/replay/main.go | 7 ++++--- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index 410965a5f9..f98f93a3eb 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -606,9 +606,12 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L ctx: ctx, client: client, } - daProviders := []arbstate.DataAvailabilityProvider{ - arbstate.NewDAProviderDAS(t.das), - arbstate.NewDAProviderBlobReader(t.blobReader), + var daProviders []arbstate.DataAvailabilityProvider + if t.das != nil { + daProviders = append(daProviders, arbstate.NewDAProviderDAS(t.das)) + } + if t.blobReader != nil { + daProviders = append(daProviders, arbstate.NewDAProviderBlobReader(t.blobReader)) } multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, daProviders, arbstate.KeysetValidate) batchMessageCounts := make(map[uint64]arbutil.MessageIndex) diff --git a/arbstate/inbox.go b/arbstate/inbox.go index 8dbfef2ed6..49192f9d37 100644 --- a/arbstate/inbox.go +++ b/arbstate/inbox.go @@ -291,9 +291,6 @@ type DataAvailabilityProvider interface { // NewDAProviderDAS is generally meant to be only used by nitro. // DA Providers should implement methods in the DataAvailabilityProvider interface independently func NewDAProviderDAS(das DataAvailabilityReader) *dAProviderForDAS { - if das == nil { - return nil - } return &dAProviderForDAS{ das: das, } @@ -321,9 +318,6 @@ func (d *dAProviderForDAS) RecoverPayloadFromBatch( // NewDAProviderBlobReader is generally meant to be only used by nitro. // DA Providers should implement methods in the DataAvailabilityProvider interface independently func NewDAProviderBlobReader(blobReader BlobReader) *dAProviderForBlobReader { - if blobReader == nil { - return nil - } return &dAProviderForBlobReader{ blobReader: blobReader, } diff --git a/cmd/replay/main.go b/cmd/replay/main.go index 11a2526a61..7ab59fc513 100644 --- a/cmd/replay/main.go +++ b/cmd/replay/main.go @@ -210,10 +210,11 @@ func main() { if backend.GetPositionWithinMessage() > 0 { keysetValidationMode = arbstate.KeysetDontValidate } - daProviders := []arbstate.DataAvailabilityProvider{ - arbstate.NewDAProviderDAS(dasReader), - arbstate.NewDAProviderBlobReader(&BlobPreimageReader{}), + var daProviders []arbstate.DataAvailabilityProvider + if dasReader != nil { + daProviders = append(daProviders, arbstate.NewDAProviderDAS(dasReader)) } + daProviders = append(daProviders, arbstate.NewDAProviderBlobReader(&BlobPreimageReader{})) inboxMultiplexer := arbstate.NewInboxMultiplexer(backend, delayedMessagesRead, daProviders, keysetValidationMode) ctx := context.Background() message, err := inboxMultiplexer.Pop(ctx)