Skip to content

Commit

Permalink
Merge pull request #2155 from OffchainLabs/unified-da-provider-interface
Browse files Browse the repository at this point in the history
Unified reader interface for Data Availability providers
  • Loading branch information
ganeshvanahalli committed Feb 23, 2024
2 parents 3e14543 + 514edfe commit 58e4b50
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 44 deletions.
10 changes: 8 additions & 2 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,14 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
ctx: ctx,
client: client,
}

multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.das, t.blobReader, arbstate.KeysetValidate)
var daProviders []arbstate.DataAvailabilityProvider
if t.das != nil {
daProviders = append(daProviders, arbstate.NewDAProviderDAS(t.das))
}
if t.blobReader != nil {
daProviders = append(daProviders, arbstate.NewDAProviderBlobReader(t.blobReader))
}
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, daProviders, arbstate.KeysetValidate)
batchMessageCounts := make(map[uint64]arbutil.MessageIndex)
currentpos := prevbatchmeta.MessageCount + 1
for {
Expand Down
151 changes: 112 additions & 39 deletions arbstate/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ const maxZeroheavyDecompressedLen = 101*MaxDecompressedLen/100 + 64
const MaxSegmentsPerSequencerMessage = 100 * 1024
const MinLifetimeSecondsForDataAvailabilityCert = 7 * 24 * 60 * 60 // one week

func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash common.Hash, data []byte, dasReader DataAvailabilityReader, blobReader BlobReader, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) {
func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash common.Hash, data []byte, daProviders []DataAvailabilityProvider, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) {
if len(data) < 40 {
return nil, errors.New("sequencer message missing L1 header")
}
Expand All @@ -88,41 +88,30 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash
// Stage 1: Extract the payload from any data availability header.
// It's important that multiple DAS strategies can't both be invoked in the same batch,
// as these headers are validated by the sequencer inbox and not other DASs.
if len(payload) > 0 && IsDASMessageHeaderByte(payload[0]) {
if dasReader == nil {
log.Error("No DAS Reader configured, but sequencer message found with DAS header")
} else {
var err error
payload, err = RecoverPayloadFromDasBatch(ctx, batchNum, data, dasReader, nil, keysetValidationMode)
if err != nil {
return nil, err
}
if payload == nil {
return parsedMsg, nil
// We try to extract payload from the first occuring valid DA provider in the daProviders list
if len(payload) > 0 {
foundDA := false
var err error
for _, provider := range daProviders {
if provider != nil && provider.IsValidHeaderByte(payload[0]) {
payload, err = provider.RecoverPayloadFromBatch(ctx, batchNum, batchBlockHash, data, nil, keysetValidationMode)
if err != nil {
return nil, err
}
if payload == nil {
return parsedMsg, nil
}
foundDA = true
break
}
}
} else if len(payload) > 0 && IsBlobHashesHeaderByte(payload[0]) {
blobHashes := payload[1:]
if len(blobHashes)%len(common.Hash{}) != 0 {
return nil, fmt.Errorf("blob batch data is not a list of hashes as expected")
}
versionedHashes := make([]common.Hash, len(blobHashes)/len(common.Hash{}))
for i := 0; i*32 < len(blobHashes); i += 1 {
copy(versionedHashes[i][:], blobHashes[i*32:(i+1)*32])
}

if blobReader == nil {
return nil, errors.New("blob batch payload was encountered but no BlobReader was configured")
}

kzgBlobs, err := blobReader.GetBlobs(ctx, batchBlockHash, versionedHashes)
if err != nil {
return nil, fmt.Errorf("failed to get blobs: %w", err)
}
payload, err = blobs.DecodeBlobs(kzgBlobs)
if err != nil {
log.Warn("Failed to decode blobs", "batchBlockHash", batchBlockHash, "versionedHashes", versionedHashes, "err", err)
return parsedMsg, nil
if !foundDA {
if IsDASMessageHeaderByte(payload[0]) {
log.Error("No DAS Reader configured, but sequencer message found with DAS header")
} else if IsBlobHashesHeaderByte(payload[0]) {
return nil, errors.New("blob batch payload was encountered but no BlobReader was configured")
}
}
}

Expand Down Expand Up @@ -284,6 +273,92 @@ func RecoverPayloadFromDasBatch(
return payload, nil
}

type DataAvailabilityProvider interface {
// IsValidHeaderByte returns true if the given headerByte has bits corresponding to the DA provider
IsValidHeaderByte(headerByte byte) bool

// RecoverPayloadFromBatch fetches the underlying payload from the DA provider given the batch header information
RecoverPayloadFromBatch(
ctx context.Context,
batchNum uint64,
batchBlockHash common.Hash,
sequencerMsg []byte,
preimages map[arbutil.PreimageType]map[common.Hash][]byte,
keysetValidationMode KeysetValidationMode,
) ([]byte, error)
}

// NewDAProviderDAS is generally meant to be only used by nitro.
// DA Providers should implement methods in the DataAvailabilityProvider interface independently
func NewDAProviderDAS(das DataAvailabilityReader) *dAProviderForDAS {
return &dAProviderForDAS{
das: das,
}
}

type dAProviderForDAS struct {
das DataAvailabilityReader
}

func (d *dAProviderForDAS) IsValidHeaderByte(headerByte byte) bool {
return IsDASMessageHeaderByte(headerByte)
}

func (d *dAProviderForDAS) RecoverPayloadFromBatch(
ctx context.Context,
batchNum uint64,
batchBlockHash common.Hash,
sequencerMsg []byte,
preimages map[arbutil.PreimageType]map[common.Hash][]byte,
keysetValidationMode KeysetValidationMode,
) ([]byte, error) {
return RecoverPayloadFromDasBatch(ctx, batchNum, sequencerMsg, d.das, preimages, keysetValidationMode)
}

// NewDAProviderBlobReader is generally meant to be only used by nitro.
// DA Providers should implement methods in the DataAvailabilityProvider interface independently
func NewDAProviderBlobReader(blobReader BlobReader) *dAProviderForBlobReader {
return &dAProviderForBlobReader{
blobReader: blobReader,
}
}

type dAProviderForBlobReader struct {
blobReader BlobReader
}

func (b *dAProviderForBlobReader) IsValidHeaderByte(headerByte byte) bool {
return IsBlobHashesHeaderByte(headerByte)
}

func (b *dAProviderForBlobReader) RecoverPayloadFromBatch(
ctx context.Context,
batchNum uint64,
batchBlockHash common.Hash,
sequencerMsg []byte,
preimages map[arbutil.PreimageType]map[common.Hash][]byte,
keysetValidationMode KeysetValidationMode,
) ([]byte, error) {
blobHashes := sequencerMsg[41:]
if len(blobHashes)%len(common.Hash{}) != 0 {
return nil, fmt.Errorf("blob batch data is not a list of hashes as expected")
}
versionedHashes := make([]common.Hash, len(blobHashes)/len(common.Hash{}))
for i := 0; i*32 < len(blobHashes); i += 1 {
copy(versionedHashes[i][:], blobHashes[i*32:(i+1)*32])
}
kzgBlobs, err := b.blobReader.GetBlobs(ctx, batchBlockHash, versionedHashes)
if err != nil {
return nil, fmt.Errorf("failed to get blobs: %w", err)
}
payload, err := blobs.DecodeBlobs(kzgBlobs)
if err != nil {
log.Warn("Failed to decode blobs", "batchBlockHash", batchBlockHash, "versionedHashes", versionedHashes, "err", err)
return nil, nil
}
return payload, nil
}

type KeysetValidationMode uint8

const KeysetValidate KeysetValidationMode = 0
Expand All @@ -293,8 +368,7 @@ const KeysetDontValidate KeysetValidationMode = 2
type inboxMultiplexer struct {
backend InboxBackend
delayedMessagesRead uint64
dasReader DataAvailabilityReader
blobReader BlobReader
daProviders []DataAvailabilityProvider
cachedSequencerMessage *sequencerMessage
cachedSequencerMessageNum uint64
cachedSegmentNum uint64
Expand All @@ -304,12 +378,11 @@ type inboxMultiplexer struct {
keysetValidationMode KeysetValidationMode
}

func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, dasReader DataAvailabilityReader, blobReader BlobReader, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer {
func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, daProviders []DataAvailabilityProvider, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer {
return &inboxMultiplexer{
backend: backend,
delayedMessagesRead: delayedMessagesRead,
dasReader: dasReader,
blobReader: blobReader,
daProviders: daProviders,
keysetValidationMode: keysetValidationMode,
}
}
Expand All @@ -331,7 +404,7 @@ func (r *inboxMultiplexer) Pop(ctx context.Context) (*arbostypes.MessageWithMeta
}
r.cachedSequencerMessageNum = r.backend.GetSequencerInboxPosition()
var err error
r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, batchBlockHash, bytes, r.dasReader, r.blobReader, r.keysetValidationMode)
r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, batchBlockHash, bytes, r.daProviders, r.keysetValidationMode)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion arbstate/inbox_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func FuzzInboxMultiplexer(f *testing.F) {
delayedMessage: delayedMsg,
positionWithinMessage: 0,
}
multiplexer := NewInboxMultiplexer(backend, 0, nil, nil, KeysetValidate)
multiplexer := NewInboxMultiplexer(backend, 0, nil, KeysetValidate)
_, err := multiplexer.Pop(context.TODO())
if err != nil {
panic(err)
Expand Down
7 changes: 6 additions & 1 deletion cmd/replay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,12 @@ func main() {
if backend.GetPositionWithinMessage() > 0 {
keysetValidationMode = arbstate.KeysetDontValidate
}
inboxMultiplexer := arbstate.NewInboxMultiplexer(backend, delayedMessagesRead, dasReader, &BlobPreimageReader{}, keysetValidationMode)
var daProviders []arbstate.DataAvailabilityProvider
if dasReader != nil {
daProviders = append(daProviders, arbstate.NewDAProviderDAS(dasReader))
}
daProviders = append(daProviders, arbstate.NewDAProviderBlobReader(&BlobPreimageReader{}))
inboxMultiplexer := arbstate.NewInboxMultiplexer(backend, delayedMessagesRead, daProviders, keysetValidationMode)
ctx := context.Background()
message, err := inboxMultiplexer.Pop(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion system_tests/state_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func BuildBlock(
if lastBlockHeader != nil {
delayedMessagesRead = lastBlockHeader.Nonce.Uint64()
}
inboxMultiplexer := arbstate.NewInboxMultiplexer(inbox, delayedMessagesRead, nil, nil, arbstate.KeysetValidate)
inboxMultiplexer := arbstate.NewInboxMultiplexer(inbox, delayedMessagesRead, nil, arbstate.KeysetValidate)

ctx := context.Background()
message, err := inboxMultiplexer.Pop(ctx)
Expand Down

0 comments on commit 58e4b50

Please sign in to comment.