Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Wyatt Alt committed Aug 23, 2023
1 parent 5c4c7c4 commit 10c8992
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions go/mcap/indexed_message_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"github.com/pierrec/lz4/v4"
)

const (
chunkBufferGrowthMultiple = 1.2
)

// indexedMessageIterator is an iterator over an indexed mcap read seeker (as
// seeking is required). It makes reads in alternation from the index data
// section, the message index at the end of a chunk, and the chunk's contents.
Expand All @@ -35,7 +39,7 @@ type indexedMessageIterator struct {
lz4Reader *lz4.Reader
hasReadSummarySection bool

compressedChunk []byte
compressedChunkAndMessageIndex []byte
}

// parseIndexSection parses the index section of the file and populates the
Expand Down Expand Up @@ -151,14 +155,15 @@ func (it *indexedMessageIterator) loadChunk(chunkIndex *ChunkIndex) error {
}

compressedChunkLength := chunkIndex.ChunkLength + chunkIndex.MessageIndexLength
if len(it.compressedChunk) < int(compressedChunkLength) {
it.compressedChunk = make([]byte, int(float64(compressedChunkLength)*1.2))
if len(it.compressedChunkAndMessageIndex) < int(compressedChunkLength) {
newSize := int(float64(compressedChunkLength) * chunkBufferGrowthMultiple)
it.compressedChunkAndMessageIndex = make([]byte, newSize)
}
_, err = io.ReadFull(it.rs, it.compressedChunk[:compressedChunkLength])
_, err = io.ReadFull(it.rs, it.compressedChunkAndMessageIndex[:compressedChunkLength])
if err != nil {
return fmt.Errorf("failed to read chunk data: %w", err)
}
parsedChunk, err := ParseChunk(it.compressedChunk[9:chunkIndex.ChunkLength])
parsedChunk, err := ParseChunk(it.compressedChunkAndMessageIndex[9:chunkIndex.ChunkLength])
if err != nil {
return fmt.Errorf("failed to parse chunk: %w", err)
}
Expand Down Expand Up @@ -194,7 +199,7 @@ func (it *indexedMessageIterator) loadChunk(chunkIndex *ChunkIndex) error {
return fmt.Errorf("unsupported compression %s", parsedChunk.Compression)
}
// use the message index to find the messages we want from the chunk
messageIndexSection := it.compressedChunk[chunkIndex.ChunkLength:compressedChunkLength]
messageIndexSection := it.compressedChunkAndMessageIndex[chunkIndex.ChunkLength:compressedChunkLength]
var recordLen uint64
offset := 0
for offset < len(messageIndexSection) {
Expand Down

0 comments on commit 10c8992

Please sign in to comment.