From af98e5ad97723cb8bb6db4888652d423c20a8df8 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Thu, 7 Sep 2023 19:18:38 -0700 Subject: [PATCH] Go: add support for custom compression Adds support for bringing custom compressors and decompressors to the go lexer and writer. For the writer, a ResettableWriteCloser is accepted. For the reader, a map of compression format to ResettableReader is accepted. If the reader implements io.Closer we'll call that on file close. --- go/mcap/Makefile | 2 +- go/mcap/lexer.go | 28 ++++++++++++++++--- go/mcap/lexer_test.go | 38 +++++++++++++++++++++++++ go/mcap/writer.go | 25 ++++++++++++----- go/mcap/writer_test.go | 63 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 144 insertions(+), 12 deletions(-) diff --git a/go/mcap/Makefile b/go/mcap/Makefile index 5e1a0e5fff..fb9cbeedde 100644 --- a/go/mcap/Makefile +++ b/go/mcap/Makefile @@ -1,5 +1,5 @@ test: - go test ./... + go test -cover ./... bench: go test -benchmem -run=^$$ -count 5 -bench ^BenchmarkLexer/demo -memprofile mem.out -cpuprofile cpu.out diff --git a/go/mcap/lexer.go b/go/mcap/lexer.go index 131ba842a7..6ea49e2c18 100644 --- a/go/mcap/lexer.go +++ b/go/mcap/lexer.go @@ -158,6 +158,7 @@ type Lexer struct { maxRecordSize int maxDecompressedChunkSize int attachmentCallback func(*AttachmentReader) error + decompressors map[CompressionFormat]ResettableReader } // Next returns the next token from the lexer as a byte array. The result will @@ -302,6 +303,11 @@ func (l *Lexer) Close() { if l.decoders.zstd != nil { l.decoders.zstd.Close() } + for _, decompressor := range l.decompressors { + if closer, ok := decompressor.(io.Closer); ok { + closer.Close() + } + } } type decoders struct { @@ -414,15 +420,19 @@ func loadChunk(l *Lexer, recordLen uint64) error { // remaining bytes in the record are the chunk data lr := io.LimitReader(l.reader, int64(recordsLength)) - switch compression { - case CompressionNone: + switch { + case l.decompressors[compression] != nil: // must be top + decoder := l.decompressors[compression] + decoder.Reset(lr) + l.reader = decoder + case compression == CompressionNone: l.reader = lr - case CompressionZSTD: + case compression == CompressionZSTD: err = l.setZSTDDecoder(lr) if err != nil { return err } - case CompressionLZ4: + case compression == CompressionLZ4: l.setLZ4Decoder(lr) default: return fmt.Errorf("unsupported compression: %s", string(compression)) @@ -498,6 +508,12 @@ type LexerOptions struct { MaxRecordSize int // AttachmentCallback is a function to execute on attachments encountered in the file. AttachmentCallback func(*AttachmentReader) error + // Decompressors are custom decompressors. Chunks matching the supplied + // compression format will be decompressed with the provided + // ResettableReader instead of the default implementation. If the + // ResettableReader also implements io.Closer, Close will be called on close + // of the reader. + Decompressors map[CompressionFormat]ResettableReader } // NewLexer returns a new lexer for the given reader. @@ -505,6 +521,7 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) { var maxRecordSize, maxDecompressedChunkSize int var computeAttachmentCRCs, validateChunkCRCs, emitChunks, emitInvalidChunks, skipMagic bool var attachmentCallback func(*AttachmentReader) error + var decompressors map[CompressionFormat]ResettableReader if len(opts) > 0 { validateChunkCRCs = opts[0].ValidateChunkCRCs computeAttachmentCRCs = opts[0].ComputeAttachmentCRCs @@ -514,6 +531,7 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) { maxRecordSize = opts[0].MaxRecordSize maxDecompressedChunkSize = opts[0].MaxDecompressedChunkSize attachmentCallback = opts[0].AttachmentCallback + decompressors = opts[0].Decompressors } if !skipMagic { err := validateMagic(r) @@ -521,6 +539,7 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) { return nil, err } } + return &Lexer{ basereader: r, reader: r, @@ -532,5 +551,6 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) { maxRecordSize: maxRecordSize, maxDecompressedChunkSize: maxDecompressedChunkSize, attachmentCallback: attachmentCallback, + decompressors: decompressors, }, nil } diff --git a/go/mcap/lexer_test.go b/go/mcap/lexer_test.go index cc0c7c9824..179fdb31a7 100644 --- a/go/mcap/lexer_test.go +++ b/go/mcap/lexer_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/pierrec/lz4/v4" "github.com/stretchr/testify/assert" ) @@ -134,6 +135,43 @@ func TestBadMagic(t *testing.T) { } } +func TestCustomDecompressor(t *testing.T) { + buf := file( + header(), + chunk(t, CompressionLZ4, true, channelInfo(), message(), message()), + chunk(t, CompressionLZ4, true, channelInfo(), message(), message()), + attachment(), attachment(), + footer(), + ) + lzr := lz4.NewReader(nil) + blockCount := 0 + lzr.Apply(lz4.OnBlockDoneOption(func(size int) { + blockCount++ + })) + lexer, err := NewLexer(bytes.NewReader(buf), &LexerOptions{ + Decompressors: map[CompressionFormat]ResettableReader{ + CompressionLZ4: lzr, + }, + }) + assert.Nil(t, err) + expected := []TokenType{ + TokenHeader, + TokenChannel, + TokenMessage, + TokenMessage, + TokenChannel, + TokenMessage, + TokenMessage, + TokenFooter, + } + for i, expectedTokenType := range expected { + tokenType, _, err := lexer.Next(nil) + assert.Nil(t, err) + assert.Equal(t, expectedTokenType, tokenType, fmt.Sprintf("mismatch element %d", i)) + } + assert.Positive(t, blockCount) +} + func TestReturnsEOFOnSuccessiveCalls(t *testing.T) { lexer, err := NewLexer(bytes.NewReader(file())) assert.Nil(t, err) diff --git a/go/mcap/writer.go b/go/mcap/writer.go index ce8119c6a4..b758aabad8 100644 --- a/go/mcap/writer.go +++ b/go/mcap/writer.go @@ -788,6 +788,10 @@ type WriterOptions struct { // SkipMagic causes the writer to skip writing magic bytes at the start of // the file. This may be useful for writing a partial section of records. SkipMagic bool + + // Compressor is a custom compressor. If supplied it will take precedence + // over the built-in ones. + Compressor ResettableWriteCloser } // Convert an MCAP compression level to the corresponding lz4.CompressionLevel. @@ -833,27 +837,34 @@ func NewWriter(w io.Writer, opts *WriterOptions) (*Writer, error) { compressed := bytes.Buffer{} var compressedWriter *countingCRCWriter if opts.Chunked { - switch opts.Compression { - case CompressionZSTD: + switch { + // custom compressor takes precedence. + case opts.Compressor != nil: + if opts.Compression == "" { + return nil, fmt.Errorf("custom compressor requires compression format") + } + opts.Compressor.Reset(&compressed) + compressedWriter = newCountingCRCWriter(opts.Compressor, opts.IncludeCRC) + case opts.Compression == CompressionZSTD: level := encoderLevelFromZstd(opts.CompressionLevel) zw, err := zstd.NewWriter(&compressed, zstd.WithEncoderLevel(level)) if err != nil { return nil, err } compressedWriter = newCountingCRCWriter(zw, opts.IncludeCRC) - case CompressionLZ4: + case opts.Compression == CompressionLZ4: level := encoderLevelFromLZ4(opts.CompressionLevel) lzw := lz4.NewWriter(&compressed) _ = lzw.Apply(lz4.CompressionLevelOption(level)) compressedWriter = newCountingCRCWriter(lzw, opts.IncludeCRC) - case CompressionNone: + case opts.Compression == CompressionNone: compressedWriter = newCountingCRCWriter(bufCloser{&compressed}, opts.IncludeCRC) default: return nil, fmt.Errorf("unsupported compression") } - if opts.ChunkSize == 0 { - opts.ChunkSize = 1024 * 1024 - } + } + if opts.ChunkSize == 0 { + opts.ChunkSize = 1024 * 1024 } return &Writer{ w: writer, diff --git a/go/mcap/writer_test.go b/go/mcap/writer_test.go index e105b8ae46..c7a7e1be29 100644 --- a/go/mcap/writer_test.go +++ b/go/mcap/writer_test.go @@ -4,9 +4,11 @@ import ( "bytes" "crypto/md5" "fmt" + "io" "testing" "time" + "github.com/pierrec/lz4/v4" "github.com/stretchr/testify/assert" ) @@ -678,3 +680,64 @@ func TestWriteAttachment(t *testing.T) { }) } } + +func assertReadable(t *testing.T, rs io.ReadSeeker) { + reader, err := NewReader(rs) + assert.Nil(t, err) + + _, err = reader.Info() + assert.Nil(t, err) + + it, err := reader.Messages() + assert.Nil(t, err) + for { + _, _, _, err := it.Next(nil) + if err != nil { + assert.ErrorIs(t, err, io.EOF) + break + } + } +} + +func TestBYOCompressor(t *testing.T) { + buf := &bytes.Buffer{} + // example - custom lz4 settings + lzw := lz4.NewWriter(nil) + blockCount := 0 + lzw.Apply(lz4.OnBlockDoneOption(func(size int) { + blockCount++ + })) + + writer, err := NewWriter(buf, &WriterOptions{ + Chunked: true, + ChunkSize: 1024, + Compressor: lzw, + Compression: "lz4", + }) + assert.Nil(t, err) + + assert.Nil(t, writer.WriteHeader(&Header{})) + assert.Nil(t, writer.WriteSchema(&Schema{ + ID: 1, + Name: "schema", + Encoding: "ros1msg", + Data: []byte{}, + })) + assert.Nil(t, writer.WriteChannel(&Channel{ + ID: 0, + SchemaID: 1, + Topic: "/foo", + MessageEncoding: "ros1msg", + })) + + for i := 0; i < 100; i++ { + assert.Nil(t, writer.WriteMessage(&Message{ + ChannelID: 0, + Sequence: 0, + LogTime: uint64(i), + })) + } + assert.Nil(t, writer.Close()) + assertReadable(t, bytes.NewReader(buf.Bytes())) + assert.Positive(t, blockCount) +}