Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Wyatt Alt committed Sep 12, 2023
1 parent 9d68a25 commit 0b75f23
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 19 deletions.
6 changes: 0 additions & 6 deletions go/mcap/resettable_write_closer.go → go/mcap/buf_closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ import (
"io"
)

// resettableWriteCloser is a WriteCloser that supports a Reset method.
type resettableWriteCloser interface {
io.WriteCloser
Reset(io.Writer)
}

type bufCloser struct {
b *bytes.Buffer
}
Expand Down
4 changes: 2 additions & 2 deletions go/mcap/counting_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type countingCRCWriter struct {
w resettableWriteCloser
w ResettableWriteCloser
size int64
crc hash.Hash32
computeCRC bool
Expand Down Expand Up @@ -45,7 +45,7 @@ func (c *countingCRCWriter) Write(p []byte) (int, error) {
return c.w.Write(p)
}

func newCountingCRCWriter(w resettableWriteCloser, computeCRC bool) *countingCRCWriter {
func newCountingCRCWriter(w ResettableWriteCloser, computeCRC bool) *countingCRCWriter {
return &countingCRCWriter{
w: w,
crc: crc32.NewIEEE(),
Expand Down
5 changes: 4 additions & 1 deletion go/mcap/lexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,10 @@ func loadChunk(l *Lexer, recordLen uint64) error {
switch {
case l.decompressors[compression] != nil: // must be top
decoder := l.decompressors[compression]
decoder.Reset(lr)
err = decoder.Reset(lr)
if err != nil {
return fmt.Errorf("failed to reset custom decompressor: %w", err)
}
l.reader = decoder
case compression == CompressionNone:
l.reader = lr
Expand Down
11 changes: 10 additions & 1 deletion go/mcap/lexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ func TestBadMagic(t *testing.T) {
}
}

type lzreader struct {
*lz4.Reader
}

func (l lzreader) Reset(r io.Reader) error {
l.Reader.Reset(r)
return nil
}

func TestCustomDecompressor(t *testing.T) {
buf := file(
header(),
Expand All @@ -150,7 +159,7 @@ func TestCustomDecompressor(t *testing.T) {
})))
lexer, err := NewLexer(bytes.NewReader(buf), &LexerOptions{
Decompressors: map[CompressionFormat]ResettableReader{
CompressionLZ4: lzr,
CompressionLZ4: lzreader{lzr},
},
})
assert.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion go/mcap/resettable.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ type ResettableWriteCloser interface {
// ResettableReadCloser implements io.ReadCloser and adds a Reset method.
type ResettableReader interface {
io.Reader
Reset(io.Reader)
Reset(io.Reader) error
}
39 changes: 35 additions & 4 deletions go/mcap/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ func (w *Writer) flushActiveChunk() error {
if err != nil {
return err
}

offset += putUint64(w.chunk[offset:], uint64(msglen))
offset += putUint64(w.chunk[offset:], start)
offset += putUint64(w.chunk[offset:], end)
Expand Down Expand Up @@ -740,6 +741,33 @@ const (
CompressionLevelBest
)

type CustomCompressor interface {
Compressor() ResettableWriteCloser
Compression() CompressionFormat
}

// NewCustomCompressor returns a structure that may be supplied to writer
// options as a custom chunk compressor.
func NewCustomCompressor(compression CompressionFormat, compressor ResettableWriteCloser) CustomCompressor {
return &customCompressor{
compression: compression,
compressor: compressor,
}
}

type customCompressor struct {
compression CompressionFormat
compressor ResettableWriteCloser
}

func (c *customCompressor) Compressor() ResettableWriteCloser {
return c.compressor
}

func (c *customCompressor) Compression() CompressionFormat {
return c.compression
}

// WriterOptions are options for the MCAP Writer.
type WriterOptions struct {
// IncludeCRC specifies whether to compute CRC checksums in the output.
Expand Down Expand Up @@ -791,7 +819,7 @@ type WriterOptions struct {

// Compressor is a custom compressor. If supplied it will take precedence
// over the built-in ones.
Compressor ResettableWriteCloser
Compressor CustomCompressor
}

// Convert an MCAP compression level to the corresponding lz4.CompressionLevel.
Expand Down Expand Up @@ -839,11 +867,14 @@ func NewWriter(w io.Writer, opts *WriterOptions) (*Writer, error) {
if opts.Chunked {
switch {
case opts.Compressor != nil: // must be top
if opts.Compression == "" {
// override the compression option. We can't check for a mismatch here
// because "none compression" is an empty string.
opts.Compression = opts.Compressor.Compression()
if opts.Compressor.Compression() == "" {
return nil, fmt.Errorf("custom compressor requires compression format")
}
opts.Compressor.Reset(&compressed)
compressedWriter = newCountingCRCWriter(opts.Compressor, opts.IncludeCRC)
opts.Compressor.Compressor().Reset(&compressed)
compressedWriter = newCountingCRCWriter(opts.Compressor.Compressor(), opts.IncludeCRC)
case opts.Compression == CompressionZSTD:
level := encoderLevelFromZstd(opts.CompressionLevel)
zw, err := zstd.NewWriter(&compressed, zstd.WithEncoderLevel(level))
Expand Down
7 changes: 3 additions & 4 deletions go/mcap/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,9 @@ func TestBYOCompressor(t *testing.T) {
})))

writer, err := NewWriter(buf, &WriterOptions{
Chunked: true,
ChunkSize: 1024,
Compressor: lzw,
Compression: "lz4",
Chunked: true,
ChunkSize: 1024,
Compressor: NewCustomCompressor("lz4", lzw),
})
assert.Nil(t, err)

Expand Down

0 comments on commit 0b75f23

Please sign in to comment.