Skip to content

Commit

Permalink
Remove panicif, tidy up compressor setup
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Oct 24, 2024
1 parent be01a59 commit d196769
Showing 1 changed file with 41 additions and 33 deletions.
74 changes: 41 additions & 33 deletions go/mcap/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math"
"sort"

"github.com/anacrolix/missinggo/v2/panicif"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"
)
Expand Down Expand Up @@ -414,7 +413,6 @@ func (w *Writer) WriteDataEnd(e *DataEnd) error {
}

func (w *Writer) uncompressedSize() int {
//panicif.NotEq(w.compressedWriter.Size(), int64(w.uncompressed.Len()))
return w.uncompressed.Len()
}

Expand All @@ -423,17 +421,21 @@ func (w *Writer) flushActiveChunk() error {
return nil
}

var c1 bytes.Buffer
cw, err := w.newCompressedWriter(&c1)
panicif.Err(err)
n, err := cw.Write(w.uncompressed.Bytes())
panicif.Err(err)
panicif.NotEq(n, w.uncompressed.Len())
crc := cw.CRC()
var buf bytes.Buffer
cw, err := w.newCompressedWriter(&buf)
if err != nil {
return fmt.Errorf("creating compressed writer: %w", err)
}
_, err = cw.Write(w.uncompressed.Bytes())
if err != nil {
return fmt.Errorf("writing uncompressed data to compressor: %w", err)
}
err = cw.Close()
panicif.Err(err)
compressedLen := c1.Len()
if err != nil {
return fmt.Errorf("finalizing compressor: %w", err)
}

compressedLen := buf.Len()
uncompressedLen := w.uncompressedSize()
// the "top fields" are all fields of the chunk record except for the compressed records.
topFieldsLen := 8 + 8 + 8 + 4 + 4 + len(w.opts.Compression) + 8
Expand All @@ -459,14 +461,14 @@ func (w *Writer) flushActiveChunk() error {
offset += putUint64(w.msg[offset:], start)
offset += putUint64(w.msg[offset:], end)
offset += putUint64(w.msg[offset:], uint64(uncompressedLen))
offset += putUint32(w.msg[offset:], crc)
offset += putUint32(w.msg[offset:], cw.CRC())
offset += putPrefixedString(w.msg[offset:], string(w.opts.Compression))
offset += putUint64(w.msg[offset:], uint64(c1.Len()))
offset += putUint64(w.msg[offset:], uint64(buf.Len()))
_, err = w.w.Write(w.msg[:offset])
if err != nil {
return err
}
_, err = w.w.Write(c1.Bytes())
_, err = w.w.Write(buf.Bytes())
if err != nil {
return err
}
Expand Down Expand Up @@ -750,7 +752,7 @@ const (
)

type CustomCompressor interface {
NewCompressor() ResettableWriteCloser
NewCompressor() (ResettableWriteCloser, error)
Compression() CompressionFormat
}

Expand All @@ -771,8 +773,8 @@ type customCompressor struct {
compressor func() (ResettableWriteCloser, error)
}

func (c *customCompressor) Compressor() ResettableWriteCloser {
return c.compressor
func (c *customCompressor) NewCompressor() (ResettableWriteCloser, error) {
return c.compressor()
}

func (c *customCompressor) Compression() CompressionFormat {
Expand Down Expand Up @@ -865,11 +867,7 @@ func encoderLevelFromZstd(level CompressionLevel) zstd.EncoderLevel {
}
}

func (w *Writer) newCompressedWriter(compressed *bytes.Buffer) (compressedWriter *countingCRCWriter, err error) {
opts := w.opts
if !opts.Chunked {
panic("this should never get called when chunking isn't enabled")
}
func newCompressor(compressed *bytes.Buffer, opts *WriterOptions) (ResettableWriteCloser, error) {
switch {
case opts.Compressor != nil: // must be top
// override the compression option. We can't check for a mismatch here
Expand All @@ -878,27 +876,37 @@ func (w *Writer) newCompressedWriter(compressed *bytes.Buffer) (compressedWriter
if opts.Compressor.Compression() == "" {
return nil, fmt.Errorf("custom compressor requires compression format")
}
c := opts.Compressor.NewCompressor()
c.Reset(compressed)
compressedWriter = newCountingCRCWriter(c, opts.IncludeCRC)
case opts.Compression == CompressionZSTD:
level := encoderLevelFromZstd(opts.CompressionLevel)
zw, err := zstd.NewWriter(compressed, zstd.WithEncoderLevel(level))
c, err := opts.Compressor.NewCompressor()
if err != nil {
return nil, err
}
compressedWriter = newCountingCRCWriter(zw, opts.IncludeCRC)
c.Reset(compressed)
return c, nil
case opts.Compression == CompressionZSTD:
level := encoderLevelFromZstd(opts.CompressionLevel)
return zstd.NewWriter(compressed, zstd.WithEncoderLevel(level))
case opts.Compression == CompressionLZ4:
level := encoderLevelFromLZ4(opts.CompressionLevel)
lzw := lz4.NewWriter(compressed)
_ = lzw.Apply(lz4.CompressionLevelOption(level))
compressedWriter = newCountingCRCWriter(lzw, opts.IncludeCRC)
return lzw, nil
case opts.Compression == CompressionNone:
compressedWriter = newCountingCRCWriter(bufCloser{compressed}, opts.IncludeCRC)
return newCountingCRCWriter(bufCloser{compressed}, opts.IncludeCRC), nil
default:
return nil, fmt.Errorf("unsupported compression")
return nil, fmt.Errorf("unsupported compression: %v", opts.Compression)
}
}

func (w *Writer) newCompressedWriter(compressed *bytes.Buffer) (*countingCRCWriter, error) {
opts := w.opts
if !opts.Chunked {
panic("this should never get called when chunking isn't enabled")
}
compressor, err := newCompressor(compressed, opts)
if err != nil {
return nil, err
}
return
return newCountingCRCWriter(compressor, opts.IncludeCRC), nil
}

// NewWriter returns a new MCAP writer.
Expand Down

0 comments on commit d196769

Please sign in to comment.