From d9cabbecd4aad79b38c7a71ba212a1cef4415d53 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 12 Sep 2023 09:36:46 -0700 Subject: [PATCH] Go: add support for custom compression (#968) Adds support for bringing custom compressors and decompressors to the go lexer and writer. For the writer, a ResettableWriteCloser is accepted. For the reader, a map of compression format to ResettableReader is accepted. If the reader implements io.Closer we'll call that on file close. --- ...settable_write_closer.go => buf_closer.go} | 8 +-- go/mcap/counting_writer.go | 4 +- go/mcap/indexed_message_iterator.go | 2 +- go/mcap/lexer.go | 31 ++++++++-- go/mcap/lexer_test.go | 47 ++++++++++++++ go/mcap/resettable.go | 15 +++++ go/mcap/version.go | 2 +- go/mcap/writer.go | 49 +++++++++++++-- go/mcap/writer_test.go | 62 +++++++++++++++++++ 9 files changed, 201 insertions(+), 19 deletions(-) rename go/mcap/{resettable_write_closer.go => buf_closer.go} (54%) create mode 100644 go/mcap/resettable.go diff --git a/go/mcap/resettable_write_closer.go b/go/mcap/buf_closer.go similarity index 54% rename from go/mcap/resettable_write_closer.go rename to go/mcap/buf_closer.go index 268856511e..efb833cfbc 100644 --- a/go/mcap/resettable_write_closer.go +++ b/go/mcap/buf_closer.go @@ -5,12 +5,6 @@ import ( "io" ) -// resettableWriteCloser is a WriteCloser that supports a Reset method. -type resettableWriteCloser interface { - io.WriteCloser - Reset(io.Writer) -} - type bufCloser struct { b *bytes.Buffer } @@ -23,6 +17,6 @@ func (b bufCloser) Write(p []byte) (int, error) { return b.b.Write(p) } -func (b bufCloser) Reset(w io.Writer) { +func (b bufCloser) Reset(_ io.Writer) { b.b.Reset() } diff --git a/go/mcap/counting_writer.go b/go/mcap/counting_writer.go index cf4ef130d7..82a26af3d7 100644 --- a/go/mcap/counting_writer.go +++ b/go/mcap/counting_writer.go @@ -7,7 +7,7 @@ import ( ) type countingCRCWriter struct { - w resettableWriteCloser + w ResettableWriteCloser size int64 crc hash.Hash32 computeCRC bool @@ -45,7 +45,7 @@ func (c *countingCRCWriter) Write(p []byte) (int, error) { return c.w.Write(p) } -func newCountingCRCWriter(w resettableWriteCloser, computeCRC bool) *countingCRCWriter { +func newCountingCRCWriter(w ResettableWriteCloser, computeCRC bool) *countingCRCWriter { return &countingCRCWriter{ w: w, crc: crc32.NewIEEE(), diff --git a/go/mcap/indexed_message_iterator.go b/go/mcap/indexed_message_iterator.go index ef7ba4b6fa..1c75b9e56e 100644 --- a/go/mcap/indexed_message_iterator.go +++ b/go/mcap/indexed_message_iterator.go @@ -237,7 +237,7 @@ func (it *indexedMessageIterator) loadChunk(chunkIndex *ChunkIndex) error { return nil } -func (it *indexedMessageIterator) Next(p []byte) (*Schema, *Channel, *Message, error) { +func (it *indexedMessageIterator) Next(_ []byte) (*Schema, *Channel, *Message, error) { if !it.hasReadSummarySection { err := it.parseSummarySection() if err != nil { diff --git a/go/mcap/lexer.go b/go/mcap/lexer.go index 131ba842a7..188974d8a1 100644 --- a/go/mcap/lexer.go +++ b/go/mcap/lexer.go @@ -158,6 +158,7 @@ type Lexer struct { maxRecordSize int maxDecompressedChunkSize int attachmentCallback func(*AttachmentReader) error + decompressors map[CompressionFormat]ResettableReader } // Next returns the next token from the lexer as a byte array. The result will @@ -302,6 +303,11 @@ func (l *Lexer) Close() { if l.decoders.zstd != nil { l.decoders.zstd.Close() } + for _, decompressor := range l.decompressors { + if closer, ok := decompressor.(io.Closer); ok { + closer.Close() + } + } } type decoders struct { @@ -414,15 +420,22 @@ func loadChunk(l *Lexer, recordLen uint64) error { // remaining bytes in the record are the chunk data lr := io.LimitReader(l.reader, int64(recordsLength)) - switch compression { - case CompressionNone: + switch { + case l.decompressors[compression] != nil: // must be top + decoder := l.decompressors[compression] + err = decoder.Reset(lr) + if err != nil { + return fmt.Errorf("failed to reset custom decompressor: %w", err) + } + l.reader = decoder + case compression == CompressionNone: l.reader = lr - case CompressionZSTD: + case compression == CompressionZSTD: err = l.setZSTDDecoder(lr) if err != nil { return err } - case CompressionLZ4: + case compression == CompressionLZ4: l.setLZ4Decoder(lr) default: return fmt.Errorf("unsupported compression: %s", string(compression)) @@ -498,6 +511,12 @@ type LexerOptions struct { MaxRecordSize int // AttachmentCallback is a function to execute on attachments encountered in the file. AttachmentCallback func(*AttachmentReader) error + // Decompressors are custom decompressors. Chunks matching the supplied + // compression format will be decompressed with the provided + // ResettableReader instead of the default implementation. If the + // ResettableReader also implements io.Closer, Close will be called on close + // of the reader. + Decompressors map[CompressionFormat]ResettableReader } // NewLexer returns a new lexer for the given reader. @@ -505,6 +524,7 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) { var maxRecordSize, maxDecompressedChunkSize int var computeAttachmentCRCs, validateChunkCRCs, emitChunks, emitInvalidChunks, skipMagic bool var attachmentCallback func(*AttachmentReader) error + var decompressors map[CompressionFormat]ResettableReader if len(opts) > 0 { validateChunkCRCs = opts[0].ValidateChunkCRCs computeAttachmentCRCs = opts[0].ComputeAttachmentCRCs @@ -514,6 +534,7 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) { maxRecordSize = opts[0].MaxRecordSize maxDecompressedChunkSize = opts[0].MaxDecompressedChunkSize attachmentCallback = opts[0].AttachmentCallback + decompressors = opts[0].Decompressors } if !skipMagic { err := validateMagic(r) @@ -521,6 +542,7 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) { return nil, err } } + return &Lexer{ basereader: r, reader: r, @@ -532,5 +554,6 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) { maxRecordSize: maxRecordSize, maxDecompressedChunkSize: maxDecompressedChunkSize, attachmentCallback: attachmentCallback, + decompressors: decompressors, }, nil } diff --git a/go/mcap/lexer_test.go b/go/mcap/lexer_test.go index cc0c7c9824..6d959217ea 100644 --- a/go/mcap/lexer_test.go +++ b/go/mcap/lexer_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/pierrec/lz4/v4" "github.com/stretchr/testify/assert" ) @@ -134,6 +135,52 @@ func TestBadMagic(t *testing.T) { } } +type lzreader struct { + *lz4.Reader +} + +func (l lzreader) Reset(r io.Reader) error { + l.Reader.Reset(r) + return nil +} + +func TestCustomDecompressor(t *testing.T) { + buf := file( + header(), + chunk(t, CompressionLZ4, true, channelInfo(), message(), message()), + chunk(t, CompressionLZ4, true, channelInfo(), message(), message()), + attachment(), attachment(), + footer(), + ) + lzr := lz4.NewReader(nil) + blockCount := 0 + assert.Nil(t, lzr.Apply(lz4.OnBlockDoneOption(func(size int) { + blockCount++ + }))) + lexer, err := NewLexer(bytes.NewReader(buf), &LexerOptions{ + Decompressors: map[CompressionFormat]ResettableReader{ + CompressionLZ4: lzreader{lzr}, + }, + }) + assert.Nil(t, err) + expected := []TokenType{ + TokenHeader, + TokenChannel, + TokenMessage, + TokenMessage, + TokenChannel, + TokenMessage, + TokenMessage, + TokenFooter, + } + for i, expectedTokenType := range expected { + tokenType, _, err := lexer.Next(nil) + assert.Nil(t, err) + assert.Equal(t, expectedTokenType, tokenType, fmt.Sprintf("mismatch element %d", i)) + } + assert.Positive(t, blockCount) +} + func TestReturnsEOFOnSuccessiveCalls(t *testing.T) { lexer, err := NewLexer(bytes.NewReader(file())) assert.Nil(t, err) diff --git a/go/mcap/resettable.go b/go/mcap/resettable.go new file mode 100644 index 0000000000..60d2cdeed3 --- /dev/null +++ b/go/mcap/resettable.go @@ -0,0 +1,15 @@ +package mcap + +import "io" + +// ResettableWriteCloser implements io.WriteCloser and adds a Reset method. +type ResettableWriteCloser interface { + io.WriteCloser + Reset(io.Writer) +} + +// ResettableReadCloser implements io.ReadCloser and adds a Reset method. +type ResettableReader interface { + io.Reader + Reset(io.Reader) error +} diff --git a/go/mcap/version.go b/go/mcap/version.go index cb24bfb1b5..5fe6bb6d87 100644 --- a/go/mcap/version.go +++ b/go/mcap/version.go @@ -1,4 +1,4 @@ package mcap // Version of the MCAP library. -var Version = "v1.0.3" +var Version = "v1.0.4" diff --git a/go/mcap/writer.go b/go/mcap/writer.go index ce8119c6a4..6d0a74c2f7 100644 --- a/go/mcap/writer.go +++ b/go/mcap/writer.go @@ -446,6 +446,7 @@ func (w *Writer) flushActiveChunk() error { if err != nil { return err } + offset += putUint64(w.chunk[offset:], uint64(msglen)) offset += putUint64(w.chunk[offset:], start) offset += putUint64(w.chunk[offset:], end) @@ -740,6 +741,33 @@ const ( CompressionLevelBest ) +type CustomCompressor interface { + Compressor() ResettableWriteCloser + Compression() CompressionFormat +} + +// NewCustomCompressor returns a structure that may be supplied to writer +// options as a custom chunk compressor. +func NewCustomCompressor(compression CompressionFormat, compressor ResettableWriteCloser) CustomCompressor { + return &customCompressor{ + compression: compression, + compressor: compressor, + } +} + +type customCompressor struct { + compression CompressionFormat + compressor ResettableWriteCloser +} + +func (c *customCompressor) Compressor() ResettableWriteCloser { + return c.compressor +} + +func (c *customCompressor) Compression() CompressionFormat { + return c.compression +} + // WriterOptions are options for the MCAP Writer. type WriterOptions struct { // IncludeCRC specifies whether to compute CRC checksums in the output. @@ -788,6 +816,10 @@ type WriterOptions struct { // SkipMagic causes the writer to skip writing magic bytes at the start of // the file. This may be useful for writing a partial section of records. SkipMagic bool + + // Compressor is a custom compressor. If supplied it will take precedence + // over the built-in ones. + Compressor CustomCompressor } // Convert an MCAP compression level to the corresponding lz4.CompressionLevel. @@ -833,20 +865,29 @@ func NewWriter(w io.Writer, opts *WriterOptions) (*Writer, error) { compressed := bytes.Buffer{} var compressedWriter *countingCRCWriter if opts.Chunked { - switch opts.Compression { - case CompressionZSTD: + switch { + case opts.Compressor != nil: // must be top + // override the compression option. We can't check for a mismatch here + // because "none compression" is an empty string. + opts.Compression = opts.Compressor.Compression() + if opts.Compressor.Compression() == "" { + return nil, fmt.Errorf("custom compressor requires compression format") + } + opts.Compressor.Compressor().Reset(&compressed) + compressedWriter = newCountingCRCWriter(opts.Compressor.Compressor(), opts.IncludeCRC) + case opts.Compression == CompressionZSTD: level := encoderLevelFromZstd(opts.CompressionLevel) zw, err := zstd.NewWriter(&compressed, zstd.WithEncoderLevel(level)) if err != nil { return nil, err } compressedWriter = newCountingCRCWriter(zw, opts.IncludeCRC) - case CompressionLZ4: + case opts.Compression == CompressionLZ4: level := encoderLevelFromLZ4(opts.CompressionLevel) lzw := lz4.NewWriter(&compressed) _ = lzw.Apply(lz4.CompressionLevelOption(level)) compressedWriter = newCountingCRCWriter(lzw, opts.IncludeCRC) - case CompressionNone: + case opts.Compression == CompressionNone: compressedWriter = newCountingCRCWriter(bufCloser{&compressed}, opts.IncludeCRC) default: return nil, fmt.Errorf("unsupported compression") diff --git a/go/mcap/writer_test.go b/go/mcap/writer_test.go index e105b8ae46..854f452e5f 100644 --- a/go/mcap/writer_test.go +++ b/go/mcap/writer_test.go @@ -4,9 +4,11 @@ import ( "bytes" "crypto/md5" "fmt" + "io" "testing" "time" + "github.com/pierrec/lz4/v4" "github.com/stretchr/testify/assert" ) @@ -678,3 +680,63 @@ func TestWriteAttachment(t *testing.T) { }) } } + +func assertReadable(t *testing.T, rs io.ReadSeeker) { + reader, err := NewReader(rs) + assert.Nil(t, err) + + _, err = reader.Info() + assert.Nil(t, err) + + it, err := reader.Messages() + assert.Nil(t, err) + for { + _, _, _, err := it.Next(nil) + if err != nil { + assert.ErrorIs(t, err, io.EOF) + break + } + } +} + +func TestBYOCompressor(t *testing.T) { + buf := &bytes.Buffer{} + // example - custom lz4 settings + lzw := lz4.NewWriter(nil) + blockCount := 0 + assert.Nil(t, lzw.Apply(lz4.OnBlockDoneOption(func(size int) { + blockCount++ + }))) + + writer, err := NewWriter(buf, &WriterOptions{ + Chunked: true, + ChunkSize: 1024, + Compressor: NewCustomCompressor("lz4", lzw), + }) + assert.Nil(t, err) + + assert.Nil(t, writer.WriteHeader(&Header{})) + assert.Nil(t, writer.WriteSchema(&Schema{ + ID: 1, + Name: "schema", + Encoding: "ros1msg", + Data: []byte{}, + })) + assert.Nil(t, writer.WriteChannel(&Channel{ + ID: 0, + SchemaID: 1, + Topic: "/foo", + MessageEncoding: "ros1msg", + })) + + for i := 0; i < 100; i++ { + assert.Nil(t, writer.WriteMessage(&Message{ + ChannelID: 0, + Sequence: 0, + LogTime: uint64(i), + })) + } + assert.Nil(t, writer.Close()) + assertReadable(t, bytes.NewReader(buf.Bytes())) + assert.Positive(t, blockCount) +}