diff --git a/go/mcap/writer.go b/go/mcap/writer.go index 3eb4396b5..06cee1a25 100644 --- a/go/mcap/writer.go +++ b/go/mcap/writer.go @@ -8,7 +8,6 @@ import ( "math" "sort" - "github.com/anacrolix/missinggo/v2/panicif" "github.com/klauspost/compress/zstd" "github.com/pierrec/lz4/v4" ) @@ -414,7 +413,6 @@ func (w *Writer) WriteDataEnd(e *DataEnd) error { } func (w *Writer) uncompressedSize() int { - //panicif.NotEq(w.compressedWriter.Size(), int64(w.uncompressed.Len())) return w.uncompressed.Len() } @@ -423,17 +421,21 @@ func (w *Writer) flushActiveChunk() error { return nil } - var c1 bytes.Buffer - cw, err := w.newCompressedWriter(&c1) - panicif.Err(err) - n, err := cw.Write(w.uncompressed.Bytes()) - panicif.Err(err) - panicif.NotEq(n, w.uncompressed.Len()) - crc := cw.CRC() + var buf bytes.Buffer + cw, err := w.newCompressedWriter(&buf) + if err != nil { + return fmt.Errorf("creating compressed writer: %w", err) + } + _, err = cw.Write(w.uncompressed.Bytes()) + if err != nil { + return fmt.Errorf("writing uncompressed data to compressor: %w", err) + } err = cw.Close() - panicif.Err(err) - compressedLen := c1.Len() + if err != nil { + return fmt.Errorf("finalizing compressor: %w", err) + } + compressedLen := buf.Len() uncompressedLen := w.uncompressedSize() // the "top fields" are all fields of the chunk record except for the compressed records. topFieldsLen := 8 + 8 + 8 + 4 + 4 + len(w.opts.Compression) + 8 @@ -459,14 +461,14 @@ func (w *Writer) flushActiveChunk() error { offset += putUint64(w.msg[offset:], start) offset += putUint64(w.msg[offset:], end) offset += putUint64(w.msg[offset:], uint64(uncompressedLen)) - offset += putUint32(w.msg[offset:], crc) + offset += putUint32(w.msg[offset:], cw.CRC()) offset += putPrefixedString(w.msg[offset:], string(w.opts.Compression)) - offset += putUint64(w.msg[offset:], uint64(c1.Len())) + offset += putUint64(w.msg[offset:], uint64(buf.Len())) _, err = w.w.Write(w.msg[:offset]) if err != nil { return err } - _, err = w.w.Write(c1.Bytes()) + _, err = w.w.Write(buf.Bytes()) if err != nil { return err } @@ -750,7 +752,7 @@ const ( ) type CustomCompressor interface { - NewCompressor() ResettableWriteCloser + NewCompressor() (ResettableWriteCloser, error) Compression() CompressionFormat } @@ -771,8 +773,8 @@ type customCompressor struct { compressor func() (ResettableWriteCloser, error) } -func (c *customCompressor) Compressor() ResettableWriteCloser { - return c.compressor +func (c *customCompressor) NewCompressor() (ResettableWriteCloser, error) { + return c.compressor() } func (c *customCompressor) Compression() CompressionFormat { @@ -865,11 +867,7 @@ func encoderLevelFromZstd(level CompressionLevel) zstd.EncoderLevel { } } -func (w *Writer) newCompressedWriter(compressed *bytes.Buffer) (compressedWriter *countingCRCWriter, err error) { - opts := w.opts - if !opts.Chunked { - panic("this should never get called when chunking isn't enabled") - } +func newCompressor(compressed *bytes.Buffer, opts *WriterOptions) (ResettableWriteCloser, error) { switch { case opts.Compressor != nil: // must be top // override the compression option. We can't check for a mismatch here @@ -878,27 +876,37 @@ func (w *Writer) newCompressedWriter(compressed *bytes.Buffer) (compressedWriter if opts.Compressor.Compression() == "" { return nil, fmt.Errorf("custom compressor requires compression format") } - c := opts.Compressor.NewCompressor() - c.Reset(compressed) - compressedWriter = newCountingCRCWriter(c, opts.IncludeCRC) - case opts.Compression == CompressionZSTD: - level := encoderLevelFromZstd(opts.CompressionLevel) - zw, err := zstd.NewWriter(compressed, zstd.WithEncoderLevel(level)) + c, err := opts.Compressor.NewCompressor() if err != nil { return nil, err } - compressedWriter = newCountingCRCWriter(zw, opts.IncludeCRC) + c.Reset(compressed) + return c, nil + case opts.Compression == CompressionZSTD: + level := encoderLevelFromZstd(opts.CompressionLevel) + return zstd.NewWriter(compressed, zstd.WithEncoderLevel(level)) case opts.Compression == CompressionLZ4: level := encoderLevelFromLZ4(opts.CompressionLevel) lzw := lz4.NewWriter(compressed) _ = lzw.Apply(lz4.CompressionLevelOption(level)) - compressedWriter = newCountingCRCWriter(lzw, opts.IncludeCRC) + return lzw, nil case opts.Compression == CompressionNone: - compressedWriter = newCountingCRCWriter(bufCloser{compressed}, opts.IncludeCRC) + return newCountingCRCWriter(bufCloser{compressed}, opts.IncludeCRC), nil default: - return nil, fmt.Errorf("unsupported compression") + return nil, fmt.Errorf("unsupported compression: %v", opts.Compression) + } +} + +func (w *Writer) newCompressedWriter(compressed *bytes.Buffer) (*countingCRCWriter, error) { + opts := w.opts + if !opts.Chunked { + panic("this should never get called when chunking isn't enabled") + } + compressor, err := newCompressor(compressed, opts) + if err != nil { + return nil, err } - return + return newCountingCRCWriter(compressor, opts.IncludeCRC), nil } // NewWriter returns a new MCAP writer.