Skip to content

Commit

Permalink
Finally figured it out
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Oct 23, 2024
1 parent 5fc835a commit da15e57
Showing 1 changed file with 49 additions and 48 deletions.
97 changes: 49 additions & 48 deletions go/mcap/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ type Writer struct {
// MetadataIndexes created over the course of the recording.
MetadataIndexes []*MetadataIndex

channelIDs []uint16
schemaIDs []uint16
channels map[uint16]*Channel
schemas map[uint16]*Schema
messageIndexes map[uint16]*MessageIndex
w *writeSizer
buf []byte
msg []byte
uncompressed bytes.Buffer
compressedWriter *countingCRCWriter
channelIDs []uint16
schemaIDs []uint16
channels map[uint16]*Channel
schemas map[uint16]*Schema
messageIndexes map[uint16]*MessageIndex
w *writeSizer
buf []byte
msg []byte
uncompressed bytes.Buffer

currentChunkStartTime uint64
currentChunkEndTime uint64
Expand Down Expand Up @@ -420,16 +419,21 @@ func (w *Writer) uncompressedSize() int {
}

func (w *Writer) flushActiveChunk() error {
opts := w.opts
fmt.Printf("chunked: %v, compressor: %v, compression: %v\n", opts.Chunked, opts.Compressor != nil, opts.Compression)
if w.uncompressedSize() == 0 {
return nil
}

var c1 bytes.Buffer
cw, err := w.newCompressedWriter(&c1)
panicif.Err(err)
cw.Write(w.uncompressed.Bytes())
n, err := cw.Write(w.uncompressed.Bytes())
panicif.Err(err)
panicif.NotEq(n, w.uncompressed.Len())
crc := cw.CRC()
cw.Close()
err = cw.Close()
panicif.Err(err)
compressedLen := c1.Len()

uncompressedLen := w.uncompressedSize()
Expand Down Expand Up @@ -468,7 +472,6 @@ func (w *Writer) flushActiveChunk() error {
if err != nil {
return err
}
cw.Reset(io.Discard)
w.uncompressed.Reset()
chunkEndOffset := w.w.Size()

Expand Down Expand Up @@ -863,37 +866,36 @@ func encoderLevelFromZstd(level CompressionLevel) zstd.EncoderLevel {

func (w *Writer) newCompressedWriter(compressed *bytes.Buffer) (compressedWriter *countingCRCWriter, err error) {
opts := w.opts
if opts.Chunked {
switch {
case opts.Compressor != nil: // must be top
// override the compression option. We can't check for a mismatch here
// because "none compression" is an empty string.
opts.Compression = opts.Compressor.Compression()
if opts.Compressor.Compression() == "" {
return nil, fmt.Errorf("custom compressor requires compression format")
}
opts.Compressor.Compressor().Reset(compressed)
compressedWriter = newCountingCRCWriter(opts.Compressor.Compressor(), opts.IncludeCRC)
case opts.Compression == CompressionZSTD:
level := encoderLevelFromZstd(opts.CompressionLevel)
zw, err := zstd.NewWriter(compressed, zstd.WithEncoderLevel(level))
if err != nil {
return nil, err
}
compressedWriter = newCountingCRCWriter(zw, opts.IncludeCRC)
case opts.Compression == CompressionLZ4:
level := encoderLevelFromLZ4(opts.CompressionLevel)
lzw := lz4.NewWriter(compressed)
_ = lzw.Apply(lz4.CompressionLevelOption(level))
compressedWriter = newCountingCRCWriter(lzw, opts.IncludeCRC)
case opts.Compression == CompressionNone:
compressedWriter = newCountingCRCWriter(bufCloser{compressed}, opts.IncludeCRC)
default:
return nil, fmt.Errorf("unsupported compression")
if !opts.Chunked {
panic("this should never get called when chunking isn't enabled")
}
switch {
case opts.Compressor != nil: // must be top
// override the compression option. We can't check for a mismatch here
// because "none compression" is an empty string.
opts.Compression = opts.Compressor.Compression()
if opts.Compressor.Compression() == "" {
return nil, fmt.Errorf("custom compressor requires compression format")
}
if opts.ChunkSize == 0 {
opts.ChunkSize = 1024 * 1024
c := opts.Compressor.NewCompressor()
c.Reset(compressed)
compressedWriter = newCountingCRCWriter(c, opts.IncludeCRC)
case opts.Compression == CompressionZSTD:
level := encoderLevelFromZstd(opts.CompressionLevel)
zw, err := zstd.NewWriter(compressed, zstd.WithEncoderLevel(level))
if err != nil {
return nil, err
}
compressedWriter = newCountingCRCWriter(zw, opts.IncludeCRC)
case opts.Compression == CompressionLZ4:
level := encoderLevelFromLZ4(opts.CompressionLevel)
lzw := lz4.NewWriter(compressed)
_ = lzw.Apply(lz4.CompressionLevelOption(level))
compressedWriter = newCountingCRCWriter(lzw, opts.IncludeCRC)
case opts.Compression == CompressionNone:
compressedWriter = newCountingCRCWriter(bufCloser{compressed}, opts.IncludeCRC)
default:
return nil, fmt.Errorf("unsupported compression")
}
return
}
Expand All @@ -906,7 +908,11 @@ func NewWriter(w io.Writer, opts *WriterOptions) (ret *Writer, err error) {
return nil, err
}
}
// TODO: Check here that compression options are valid?
if opts.ChunkSize == 0 {
opts.ChunkSize = 1024 * 1024
}
// Should we check here that compression options are valid? For now I haven't added it as it's
// going to come up pretty quickly while writing anyway, and introduces some extra complexity.
ret = &Writer{
w: writer,
buf: make([]byte, 32),
Expand All @@ -923,14 +929,9 @@ func NewWriter(w io.Writer, opts *WriterOptions) (ret *Writer, err error) {
},
opts: opts,
}
ret.compressedWriter, err = ret.newCompressedWriter(nil)
panicif.Err(err)
return ret, nil
}

func (w *Writer) uncompressedWriter() io.Writer {
return &w.uncompressed
//return io.MultiWriter(&w.uncompressed, w.compressedWriter)
}

const checkWriter = false

0 comments on commit da15e57

Please sign in to comment.