Skip to content

Commit

Permalink
Go: add support for custom compression (#968)
Browse files Browse the repository at this point in the history
Adds support for bringing custom compressors and decompressors to the go
lexer and writer.

For the writer, a ResettableWriteCloser is accepted. For the reader, a
map of compression format to ResettableReader is accepted. If the reader
implements io.Closer we'll call that on file close.
  • Loading branch information
Wyatt Alt authored Sep 12, 2023
1 parent 1c191e0 commit d9cabbe
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 19 deletions.
8 changes: 1 addition & 7 deletions go/mcap/resettable_write_closer.go → go/mcap/buf_closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ import (
"io"
)

// resettableWriteCloser is a WriteCloser that supports a Reset method.
type resettableWriteCloser interface {
io.WriteCloser
Reset(io.Writer)
}

type bufCloser struct {
b *bytes.Buffer
}
Expand All @@ -23,6 +17,6 @@ func (b bufCloser) Write(p []byte) (int, error) {
return b.b.Write(p)
}

func (b bufCloser) Reset(w io.Writer) {
func (b bufCloser) Reset(_ io.Writer) {
b.b.Reset()
}
4 changes: 2 additions & 2 deletions go/mcap/counting_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type countingCRCWriter struct {
w resettableWriteCloser
w ResettableWriteCloser
size int64
crc hash.Hash32
computeCRC bool
Expand Down Expand Up @@ -45,7 +45,7 @@ func (c *countingCRCWriter) Write(p []byte) (int, error) {
return c.w.Write(p)
}

func newCountingCRCWriter(w resettableWriteCloser, computeCRC bool) *countingCRCWriter {
func newCountingCRCWriter(w ResettableWriteCloser, computeCRC bool) *countingCRCWriter {
return &countingCRCWriter{
w: w,
crc: crc32.NewIEEE(),
Expand Down
2 changes: 1 addition & 1 deletion go/mcap/indexed_message_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (it *indexedMessageIterator) loadChunk(chunkIndex *ChunkIndex) error {
return nil
}

func (it *indexedMessageIterator) Next(p []byte) (*Schema, *Channel, *Message, error) {
func (it *indexedMessageIterator) Next(_ []byte) (*Schema, *Channel, *Message, error) {
if !it.hasReadSummarySection {
err := it.parseSummarySection()
if err != nil {
Expand Down
31 changes: 27 additions & 4 deletions go/mcap/lexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ type Lexer struct {
maxRecordSize int
maxDecompressedChunkSize int
attachmentCallback func(*AttachmentReader) error
decompressors map[CompressionFormat]ResettableReader
}

// Next returns the next token from the lexer as a byte array. The result will
Expand Down Expand Up @@ -302,6 +303,11 @@ func (l *Lexer) Close() {
if l.decoders.zstd != nil {
l.decoders.zstd.Close()
}
for _, decompressor := range l.decompressors {
if closer, ok := decompressor.(io.Closer); ok {
closer.Close()
}
}
}

type decoders struct {
Expand Down Expand Up @@ -414,15 +420,22 @@ func loadChunk(l *Lexer, recordLen uint64) error {

// remaining bytes in the record are the chunk data
lr := io.LimitReader(l.reader, int64(recordsLength))
switch compression {
case CompressionNone:
switch {
case l.decompressors[compression] != nil: // must be top
decoder := l.decompressors[compression]
err = decoder.Reset(lr)
if err != nil {
return fmt.Errorf("failed to reset custom decompressor: %w", err)
}
l.reader = decoder
case compression == CompressionNone:
l.reader = lr
case CompressionZSTD:
case compression == CompressionZSTD:
err = l.setZSTDDecoder(lr)
if err != nil {
return err
}
case CompressionLZ4:
case compression == CompressionLZ4:
l.setLZ4Decoder(lr)
default:
return fmt.Errorf("unsupported compression: %s", string(compression))
Expand Down Expand Up @@ -498,13 +511,20 @@ type LexerOptions struct {
MaxRecordSize int
// AttachmentCallback is a function to execute on attachments encountered in the file.
AttachmentCallback func(*AttachmentReader) error
// Decompressors are custom decompressors. Chunks matching the supplied
// compression format will be decompressed with the provided
// ResettableReader instead of the default implementation. If the
// ResettableReader also implements io.Closer, Close will be called on close
// of the reader.
Decompressors map[CompressionFormat]ResettableReader
}

// NewLexer returns a new lexer for the given reader.
func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) {
var maxRecordSize, maxDecompressedChunkSize int
var computeAttachmentCRCs, validateChunkCRCs, emitChunks, emitInvalidChunks, skipMagic bool
var attachmentCallback func(*AttachmentReader) error
var decompressors map[CompressionFormat]ResettableReader
if len(opts) > 0 {
validateChunkCRCs = opts[0].ValidateChunkCRCs
computeAttachmentCRCs = opts[0].ComputeAttachmentCRCs
Expand All @@ -514,13 +534,15 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) {
maxRecordSize = opts[0].MaxRecordSize
maxDecompressedChunkSize = opts[0].MaxDecompressedChunkSize
attachmentCallback = opts[0].AttachmentCallback
decompressors = opts[0].Decompressors
}
if !skipMagic {
err := validateMagic(r)
if err != nil {
return nil, err
}
}

return &Lexer{
basereader: r,
reader: r,
Expand All @@ -532,5 +554,6 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) {
maxRecordSize: maxRecordSize,
maxDecompressedChunkSize: maxDecompressedChunkSize,
attachmentCallback: attachmentCallback,
decompressors: decompressors,
}, nil
}
47 changes: 47 additions & 0 deletions go/mcap/lexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/pierrec/lz4/v4"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -134,6 +135,52 @@ func TestBadMagic(t *testing.T) {
}
}

type lzreader struct {
*lz4.Reader
}

func (l lzreader) Reset(r io.Reader) error {
l.Reader.Reset(r)
return nil
}

func TestCustomDecompressor(t *testing.T) {
buf := file(
header(),
chunk(t, CompressionLZ4, true, channelInfo(), message(), message()),
chunk(t, CompressionLZ4, true, channelInfo(), message(), message()),
attachment(), attachment(),
footer(),
)
lzr := lz4.NewReader(nil)
blockCount := 0
assert.Nil(t, lzr.Apply(lz4.OnBlockDoneOption(func(size int) {
blockCount++
})))
lexer, err := NewLexer(bytes.NewReader(buf), &LexerOptions{
Decompressors: map[CompressionFormat]ResettableReader{
CompressionLZ4: lzreader{lzr},
},
})
assert.Nil(t, err)
expected := []TokenType{
TokenHeader,
TokenChannel,
TokenMessage,
TokenMessage,
TokenChannel,
TokenMessage,
TokenMessage,
TokenFooter,
}
for i, expectedTokenType := range expected {
tokenType, _, err := lexer.Next(nil)
assert.Nil(t, err)
assert.Equal(t, expectedTokenType, tokenType, fmt.Sprintf("mismatch element %d", i))
}
assert.Positive(t, blockCount)
}

func TestReturnsEOFOnSuccessiveCalls(t *testing.T) {
lexer, err := NewLexer(bytes.NewReader(file()))
assert.Nil(t, err)
Expand Down
15 changes: 15 additions & 0 deletions go/mcap/resettable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package mcap

import "io"

// ResettableWriteCloser implements io.WriteCloser and adds a Reset method.
type ResettableWriteCloser interface {
io.WriteCloser
Reset(io.Writer)
}

// ResettableReadCloser implements io.ReadCloser and adds a Reset method.
type ResettableReader interface {
io.Reader
Reset(io.Reader) error
}
2 changes: 1 addition & 1 deletion go/mcap/version.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package mcap

// Version of the MCAP library.
var Version = "v1.0.3"
var Version = "v1.0.4"
49 changes: 45 additions & 4 deletions go/mcap/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ func (w *Writer) flushActiveChunk() error {
if err != nil {
return err
}

offset += putUint64(w.chunk[offset:], uint64(msglen))
offset += putUint64(w.chunk[offset:], start)
offset += putUint64(w.chunk[offset:], end)
Expand Down Expand Up @@ -740,6 +741,33 @@ const (
CompressionLevelBest
)

type CustomCompressor interface {
Compressor() ResettableWriteCloser
Compression() CompressionFormat
}

// NewCustomCompressor returns a structure that may be supplied to writer
// options as a custom chunk compressor.
func NewCustomCompressor(compression CompressionFormat, compressor ResettableWriteCloser) CustomCompressor {
return &customCompressor{
compression: compression,
compressor: compressor,
}
}

type customCompressor struct {
compression CompressionFormat
compressor ResettableWriteCloser
}

func (c *customCompressor) Compressor() ResettableWriteCloser {
return c.compressor
}

func (c *customCompressor) Compression() CompressionFormat {
return c.compression
}

// WriterOptions are options for the MCAP Writer.
type WriterOptions struct {
// IncludeCRC specifies whether to compute CRC checksums in the output.
Expand Down Expand Up @@ -788,6 +816,10 @@ type WriterOptions struct {
// SkipMagic causes the writer to skip writing magic bytes at the start of
// the file. This may be useful for writing a partial section of records.
SkipMagic bool

// Compressor is a custom compressor. If supplied it will take precedence
// over the built-in ones.
Compressor CustomCompressor
}

// Convert an MCAP compression level to the corresponding lz4.CompressionLevel.
Expand Down Expand Up @@ -833,20 +865,29 @@ func NewWriter(w io.Writer, opts *WriterOptions) (*Writer, error) {
compressed := bytes.Buffer{}
var compressedWriter *countingCRCWriter
if opts.Chunked {
switch opts.Compression {
case CompressionZSTD:
switch {
case opts.Compressor != nil: // must be top
// override the compression option. We can't check for a mismatch here
// because "none compression" is an empty string.
opts.Compression = opts.Compressor.Compression()
if opts.Compressor.Compression() == "" {
return nil, fmt.Errorf("custom compressor requires compression format")
}
opts.Compressor.Compressor().Reset(&compressed)
compressedWriter = newCountingCRCWriter(opts.Compressor.Compressor(), opts.IncludeCRC)
case opts.Compression == CompressionZSTD:
level := encoderLevelFromZstd(opts.CompressionLevel)
zw, err := zstd.NewWriter(&compressed, zstd.WithEncoderLevel(level))
if err != nil {
return nil, err
}
compressedWriter = newCountingCRCWriter(zw, opts.IncludeCRC)
case CompressionLZ4:
case opts.Compression == CompressionLZ4:
level := encoderLevelFromLZ4(opts.CompressionLevel)
lzw := lz4.NewWriter(&compressed)
_ = lzw.Apply(lz4.CompressionLevelOption(level))
compressedWriter = newCountingCRCWriter(lzw, opts.IncludeCRC)
case CompressionNone:
case opts.Compression == CompressionNone:
compressedWriter = newCountingCRCWriter(bufCloser{&compressed}, opts.IncludeCRC)
default:
return nil, fmt.Errorf("unsupported compression")
Expand Down
62 changes: 62 additions & 0 deletions go/mcap/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"bytes"
"crypto/md5"
"fmt"
"io"
"testing"
"time"

"github.com/pierrec/lz4/v4"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -678,3 +680,63 @@ func TestWriteAttachment(t *testing.T) {
})
}
}

func assertReadable(t *testing.T, rs io.ReadSeeker) {
reader, err := NewReader(rs)
assert.Nil(t, err)

_, err = reader.Info()
assert.Nil(t, err)

it, err := reader.Messages()
assert.Nil(t, err)
for {
_, _, _, err := it.Next(nil)
if err != nil {
assert.ErrorIs(t, err, io.EOF)
break
}
}
}

func TestBYOCompressor(t *testing.T) {
buf := &bytes.Buffer{}
// example - custom lz4 settings
lzw := lz4.NewWriter(nil)
blockCount := 0
assert.Nil(t, lzw.Apply(lz4.OnBlockDoneOption(func(size int) {
blockCount++
})))

writer, err := NewWriter(buf, &WriterOptions{
Chunked: true,
ChunkSize: 1024,
Compressor: NewCustomCompressor("lz4", lzw),
})
assert.Nil(t, err)

assert.Nil(t, writer.WriteHeader(&Header{}))
assert.Nil(t, writer.WriteSchema(&Schema{
ID: 1,
Name: "schema",
Encoding: "ros1msg",
Data: []byte{},
}))
assert.Nil(t, writer.WriteChannel(&Channel{
ID: 0,
SchemaID: 1,
Topic: "/foo",
MessageEncoding: "ros1msg",
}))

for i := 0; i < 100; i++ {
assert.Nil(t, writer.WriteMessage(&Message{
ChannelID: 0,
Sequence: 0,
LogTime: uint64(i),
}))
}
assert.Nil(t, writer.Close())
assertReadable(t, bytes.NewReader(buf.Bytes()))
assert.Positive(t, blockCount)
}

0 comments on commit d9cabbe

Please sign in to comment.