Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go: add support for custom compression #968

Merged
merged 5 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/mcap/indexed_message_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (it *indexedMessageIterator) loadChunk(chunkIndex *ChunkIndex) error {
return nil
}

func (it *indexedMessageIterator) Next(p []byte) (*Schema, *Channel, *Message, error) {
func (it *indexedMessageIterator) Next(_ []byte) (*Schema, *Channel, *Message, error) {
if !it.hasReadSummarySection {
err := it.parseSummarySection()
if err != nil {
Expand Down
28 changes: 24 additions & 4 deletions go/mcap/lexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ type Lexer struct {
maxRecordSize int
maxDecompressedChunkSize int
attachmentCallback func(*AttachmentReader) error
decompressors map[CompressionFormat]ResettableReader
}

// Next returns the next token from the lexer as a byte array. The result will
Expand Down Expand Up @@ -302,6 +303,11 @@ func (l *Lexer) Close() {
if l.decoders.zstd != nil {
l.decoders.zstd.Close()
}
for _, decompressor := range l.decompressors {
if closer, ok := decompressor.(io.Closer); ok {
closer.Close()
}
}
}

type decoders struct {
Expand Down Expand Up @@ -414,15 +420,19 @@ func loadChunk(l *Lexer, recordLen uint64) error {

// remaining bytes in the record are the chunk data
lr := io.LimitReader(l.reader, int64(recordsLength))
switch compression {
case CompressionNone:
switch {
case l.decompressors[compression] != nil: // must be top
decoder := l.decompressors[compression]
decoder.Reset(lr)
l.reader = decoder
case compression == CompressionNone:
l.reader = lr
case CompressionZSTD:
case compression == CompressionZSTD:
err = l.setZSTDDecoder(lr)
if err != nil {
return err
}
case CompressionLZ4:
case compression == CompressionLZ4:
l.setLZ4Decoder(lr)
default:
return fmt.Errorf("unsupported compression: %s", string(compression))
Expand Down Expand Up @@ -498,13 +508,20 @@ type LexerOptions struct {
MaxRecordSize int
// AttachmentCallback is a function to execute on attachments encountered in the file.
AttachmentCallback func(*AttachmentReader) error
// Decompressors are custom decompressors. Chunks matching the supplied
// compression format will be decompressed with the provided
// ResettableReader instead of the default implementation. If the
// ResettableReader also implements io.Closer, Close will be called on close
// of the reader.
Decompressors map[CompressionFormat]ResettableReader
}

// NewLexer returns a new lexer for the given reader.
func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) {
var maxRecordSize, maxDecompressedChunkSize int
var computeAttachmentCRCs, validateChunkCRCs, emitChunks, emitInvalidChunks, skipMagic bool
var attachmentCallback func(*AttachmentReader) error
var decompressors map[CompressionFormat]ResettableReader
if len(opts) > 0 {
validateChunkCRCs = opts[0].ValidateChunkCRCs
computeAttachmentCRCs = opts[0].ComputeAttachmentCRCs
Expand All @@ -514,13 +531,15 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) {
maxRecordSize = opts[0].MaxRecordSize
maxDecompressedChunkSize = opts[0].MaxDecompressedChunkSize
attachmentCallback = opts[0].AttachmentCallback
decompressors = opts[0].Decompressors
}
if !skipMagic {
err := validateMagic(r)
if err != nil {
return nil, err
}
}

return &Lexer{
basereader: r,
reader: r,
Expand All @@ -532,5 +551,6 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) {
maxRecordSize: maxRecordSize,
maxDecompressedChunkSize: maxDecompressedChunkSize,
attachmentCallback: attachmentCallback,
decompressors: decompressors,
}, nil
}
38 changes: 38 additions & 0 deletions go/mcap/lexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/pierrec/lz4/v4"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -134,6 +135,43 @@ func TestBadMagic(t *testing.T) {
}
}

func TestCustomDecompressor(t *testing.T) {
buf := file(
header(),
chunk(t, CompressionLZ4, true, channelInfo(), message(), message()),
chunk(t, CompressionLZ4, true, channelInfo(), message(), message()),
attachment(), attachment(),
footer(),
)
lzr := lz4.NewReader(nil)
blockCount := 0
assert.Nil(t, lzr.Apply(lz4.OnBlockDoneOption(func(size int) {
blockCount++
})))
lexer, err := NewLexer(bytes.NewReader(buf), &LexerOptions{
Decompressors: map[CompressionFormat]ResettableReader{
CompressionLZ4: lzr,
},
})
assert.Nil(t, err)
expected := []TokenType{
TokenHeader,
TokenChannel,
TokenMessage,
TokenMessage,
TokenChannel,
TokenMessage,
TokenMessage,
TokenFooter,
}
for i, expectedTokenType := range expected {
tokenType, _, err := lexer.Next(nil)
assert.Nil(t, err)
assert.Equal(t, expectedTokenType, tokenType, fmt.Sprintf("mismatch element %d", i))
}
assert.Positive(t, blockCount)
}

func TestReturnsEOFOnSuccessiveCalls(t *testing.T) {
lexer, err := NewLexer(bytes.NewReader(file()))
assert.Nil(t, err)
Expand Down
15 changes: 15 additions & 0 deletions go/mcap/resettable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package mcap

import "io"

// ResettableWriteCloser implements io.WriteCloser and adds a Reset method.
type ResettableWriteCloser interface {
io.WriteCloser
Reset(io.Writer)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps Reset should return error? lz4's does not, but zstd's does.

}

// ResettableReadCloser implements io.ReadCloser and adds a Reset method.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io.Reader

type ResettableReader interface {
io.Reader
Reset(io.Reader)
}
2 changes: 1 addition & 1 deletion go/mcap/resettable_write_closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ func (b bufCloser) Write(p []byte) (int, error) {
return b.b.Write(p)
}

func (b bufCloser) Reset(w io.Writer) {
func (b bufCloser) Reset(_ io.Writer) {
b.b.Reset()
}
18 changes: 14 additions & 4 deletions go/mcap/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,10 @@ type WriterOptions struct {
// SkipMagic causes the writer to skip writing magic bytes at the start of
// the file. This may be useful for writing a partial section of records.
SkipMagic bool

// Compressor is a custom compressor. If supplied it will take precedence
// over the built-in ones.
Compressor ResettableWriteCloser
}

// Convert an MCAP compression level to the corresponding lz4.CompressionLevel.
Expand Down Expand Up @@ -833,20 +837,26 @@ func NewWriter(w io.Writer, opts *WriterOptions) (*Writer, error) {
compressed := bytes.Buffer{}
var compressedWriter *countingCRCWriter
if opts.Chunked {
switch opts.Compression {
case CompressionZSTD:
switch {
case opts.Compressor != nil: // must be top
if opts.Compression == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first reaction is that the interface feels a little strange with 'compressor' and 'compression' decoupled. I don't see where Compression is used in the case of a custom compressor, but presumably an lz4 writer should always be accompanied by type LZ4. Is Compression required if a compressor is passed? If so, would it make sense to have Compressor contain the compression, and embed the ResettableWriteCloser type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's an option, but what I don't really like about it is it forces all consumers of this to implement their own Compressor type (to expose a Compression() string method), whereas the current interface has a chance of matching the compressors out of the box - although I think this will only be true of one of lz4 and zstd, depending on whether or not Reset() is chosen to send an error out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we used a functional options patter for this it would be better - we could tell the user to pass WithCompressor(compression string, compressor ...).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we can't make that change here unfortunately

return nil, fmt.Errorf("custom compressor requires compression format")
}
opts.Compressor.Reset(&compressed)
compressedWriter = newCountingCRCWriter(opts.Compressor, opts.IncludeCRC)
case opts.Compression == CompressionZSTD:
level := encoderLevelFromZstd(opts.CompressionLevel)
zw, err := zstd.NewWriter(&compressed, zstd.WithEncoderLevel(level))
if err != nil {
return nil, err
}
compressedWriter = newCountingCRCWriter(zw, opts.IncludeCRC)
case CompressionLZ4:
case opts.Compression == CompressionLZ4:
level := encoderLevelFromLZ4(opts.CompressionLevel)
lzw := lz4.NewWriter(&compressed)
_ = lzw.Apply(lz4.CompressionLevelOption(level))
compressedWriter = newCountingCRCWriter(lzw, opts.IncludeCRC)
case CompressionNone:
case opts.Compression == CompressionNone:
compressedWriter = newCountingCRCWriter(bufCloser{&compressed}, opts.IncludeCRC)
default:
return nil, fmt.Errorf("unsupported compression")
Expand Down
63 changes: 63 additions & 0 deletions go/mcap/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"bytes"
"crypto/md5"
"fmt"
"io"
"testing"
"time"

"github.com/pierrec/lz4/v4"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -678,3 +680,64 @@ func TestWriteAttachment(t *testing.T) {
})
}
}

func assertReadable(t *testing.T, rs io.ReadSeeker) {
reader, err := NewReader(rs)
assert.Nil(t, err)

_, err = reader.Info()
assert.Nil(t, err)

it, err := reader.Messages()
assert.Nil(t, err)
for {
_, _, _, err := it.Next(nil)
if err != nil {
assert.ErrorIs(t, err, io.EOF)
break
}
}
}

func TestBYOCompressor(t *testing.T) {
buf := &bytes.Buffer{}
// example - custom lz4 settings
lzw := lz4.NewWriter(nil)
blockCount := 0
assert.Nil(t, lzw.Apply(lz4.OnBlockDoneOption(func(size int) {
blockCount++
})))

writer, err := NewWriter(buf, &WriterOptions{
Chunked: true,
ChunkSize: 1024,
Compressor: lzw,
Compression: "lz4",
})
assert.Nil(t, err)

assert.Nil(t, writer.WriteHeader(&Header{}))
assert.Nil(t, writer.WriteSchema(&Schema{
ID: 1,
Name: "schema",
Encoding: "ros1msg",
Data: []byte{},
}))
assert.Nil(t, writer.WriteChannel(&Channel{
ID: 0,
SchemaID: 1,
Topic: "/foo",
MessageEncoding: "ros1msg",
}))

for i := 0; i < 100; i++ {
assert.Nil(t, writer.WriteMessage(&Message{
ChannelID: 0,
Sequence: 0,
LogTime: uint64(i),
}))
}
assert.Nil(t, writer.Close())
assertReadable(t, bytes.NewReader(buf.Bytes()))
assert.Positive(t, blockCount)
}
Loading