From 0b75f2353d263516cf8b09b09b9dafb1b7de3d0a Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 12 Sep 2023 08:36:38 -0700 Subject: [PATCH] Address feedback --- ...settable_write_closer.go => buf_closer.go} | 6 --- go/mcap/counting_writer.go | 4 +- go/mcap/lexer.go | 5 ++- go/mcap/lexer_test.go | 11 +++++- go/mcap/resettable.go | 2 +- go/mcap/writer.go | 39 +++++++++++++++++-- go/mcap/writer_test.go | 7 ++-- 7 files changed, 55 insertions(+), 19 deletions(-) rename go/mcap/{resettable_write_closer.go => buf_closer.go} (64%) diff --git a/go/mcap/resettable_write_closer.go b/go/mcap/buf_closer.go similarity index 64% rename from go/mcap/resettable_write_closer.go rename to go/mcap/buf_closer.go index 03d36b2f65..efb833cfbc 100644 --- a/go/mcap/resettable_write_closer.go +++ b/go/mcap/buf_closer.go @@ -5,12 +5,6 @@ import ( "io" ) -// resettableWriteCloser is a WriteCloser that supports a Reset method. -type resettableWriteCloser interface { - io.WriteCloser - Reset(io.Writer) -} - type bufCloser struct { b *bytes.Buffer } diff --git a/go/mcap/counting_writer.go b/go/mcap/counting_writer.go index cf4ef130d7..82a26af3d7 100644 --- a/go/mcap/counting_writer.go +++ b/go/mcap/counting_writer.go @@ -7,7 +7,7 @@ import ( ) type countingCRCWriter struct { - w resettableWriteCloser + w ResettableWriteCloser size int64 crc hash.Hash32 computeCRC bool @@ -45,7 +45,7 @@ func (c *countingCRCWriter) Write(p []byte) (int, error) { return c.w.Write(p) } -func newCountingCRCWriter(w resettableWriteCloser, computeCRC bool) *countingCRCWriter { +func newCountingCRCWriter(w ResettableWriteCloser, computeCRC bool) *countingCRCWriter { return &countingCRCWriter{ w: w, crc: crc32.NewIEEE(), diff --git a/go/mcap/lexer.go b/go/mcap/lexer.go index 6ea49e2c18..188974d8a1 100644 --- a/go/mcap/lexer.go +++ b/go/mcap/lexer.go @@ -423,7 +423,10 @@ func loadChunk(l *Lexer, recordLen uint64) error { switch { case l.decompressors[compression] != nil: // must be top decoder := l.decompressors[compression] - decoder.Reset(lr) + err = decoder.Reset(lr) + if err != nil { + return fmt.Errorf("failed to reset custom decompressor: %w", err) + } l.reader = decoder case compression == CompressionNone: l.reader = lr diff --git a/go/mcap/lexer_test.go b/go/mcap/lexer_test.go index 76ed5dcf73..6d959217ea 100644 --- a/go/mcap/lexer_test.go +++ b/go/mcap/lexer_test.go @@ -135,6 +135,15 @@ func TestBadMagic(t *testing.T) { } } +type lzreader struct { + *lz4.Reader +} + +func (l lzreader) Reset(r io.Reader) error { + l.Reader.Reset(r) + return nil +} + func TestCustomDecompressor(t *testing.T) { buf := file( header(), @@ -150,7 +159,7 @@ func TestCustomDecompressor(t *testing.T) { }))) lexer, err := NewLexer(bytes.NewReader(buf), &LexerOptions{ Decompressors: map[CompressionFormat]ResettableReader{ - CompressionLZ4: lzr, + CompressionLZ4: lzreader{lzr}, }, }) assert.Nil(t, err) diff --git a/go/mcap/resettable.go b/go/mcap/resettable.go index 29a24d6f98..60d2cdeed3 100644 --- a/go/mcap/resettable.go +++ b/go/mcap/resettable.go @@ -11,5 +11,5 @@ type ResettableWriteCloser interface { // ResettableReadCloser implements io.ReadCloser and adds a Reset method. type ResettableReader interface { io.Reader - Reset(io.Reader) + Reset(io.Reader) error } diff --git a/go/mcap/writer.go b/go/mcap/writer.go index 56a21888eb..6d0a74c2f7 100644 --- a/go/mcap/writer.go +++ b/go/mcap/writer.go @@ -446,6 +446,7 @@ func (w *Writer) flushActiveChunk() error { if err != nil { return err } + offset += putUint64(w.chunk[offset:], uint64(msglen)) offset += putUint64(w.chunk[offset:], start) offset += putUint64(w.chunk[offset:], end) @@ -740,6 +741,33 @@ const ( CompressionLevelBest ) +type CustomCompressor interface { + Compressor() ResettableWriteCloser + Compression() CompressionFormat +} + +// NewCustomCompressor returns a structure that may be supplied to writer +// options as a custom chunk compressor. +func NewCustomCompressor(compression CompressionFormat, compressor ResettableWriteCloser) CustomCompressor { + return &customCompressor{ + compression: compression, + compressor: compressor, + } +} + +type customCompressor struct { + compression CompressionFormat + compressor ResettableWriteCloser +} + +func (c *customCompressor) Compressor() ResettableWriteCloser { + return c.compressor +} + +func (c *customCompressor) Compression() CompressionFormat { + return c.compression +} + // WriterOptions are options for the MCAP Writer. type WriterOptions struct { // IncludeCRC specifies whether to compute CRC checksums in the output. @@ -791,7 +819,7 @@ type WriterOptions struct { // Compressor is a custom compressor. If supplied it will take precedence // over the built-in ones. - Compressor ResettableWriteCloser + Compressor CustomCompressor } // Convert an MCAP compression level to the corresponding lz4.CompressionLevel. @@ -839,11 +867,14 @@ func NewWriter(w io.Writer, opts *WriterOptions) (*Writer, error) { if opts.Chunked { switch { case opts.Compressor != nil: // must be top - if opts.Compression == "" { + // override the compression option. We can't check for a mismatch here + // because "none compression" is an empty string. + opts.Compression = opts.Compressor.Compression() + if opts.Compressor.Compression() == "" { return nil, fmt.Errorf("custom compressor requires compression format") } - opts.Compressor.Reset(&compressed) - compressedWriter = newCountingCRCWriter(opts.Compressor, opts.IncludeCRC) + opts.Compressor.Compressor().Reset(&compressed) + compressedWriter = newCountingCRCWriter(opts.Compressor.Compressor(), opts.IncludeCRC) case opts.Compression == CompressionZSTD: level := encoderLevelFromZstd(opts.CompressionLevel) zw, err := zstd.NewWriter(&compressed, zstd.WithEncoderLevel(level)) diff --git a/go/mcap/writer_test.go b/go/mcap/writer_test.go index 8ecd16ff5a..854f452e5f 100644 --- a/go/mcap/writer_test.go +++ b/go/mcap/writer_test.go @@ -709,10 +709,9 @@ func TestBYOCompressor(t *testing.T) { }))) writer, err := NewWriter(buf, &WriterOptions{ - Chunked: true, - ChunkSize: 1024, - Compressor: lzw, - Compression: "lz4", + Chunked: true, + ChunkSize: 1024, + Compressor: NewCustomCompressor("lz4", lzw), }) assert.Nil(t, err)