Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go: add support for custom compression #968

Merged
merged 5 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions go/mcap/resettable_write_closer.go → go/mcap/buf_closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ import (
"io"
)

// resettableWriteCloser is a WriteCloser that supports a Reset method.
type resettableWriteCloser interface {
io.WriteCloser
Reset(io.Writer)
}

type bufCloser struct {
b *bytes.Buffer
}
Expand All @@ -23,6 +17,6 @@ func (b bufCloser) Write(p []byte) (int, error) {
return b.b.Write(p)
}

func (b bufCloser) Reset(w io.Writer) {
func (b bufCloser) Reset(_ io.Writer) {
b.b.Reset()
}
4 changes: 2 additions & 2 deletions go/mcap/counting_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type countingCRCWriter struct {
w resettableWriteCloser
w ResettableWriteCloser
size int64
crc hash.Hash32
computeCRC bool
Expand Down Expand Up @@ -45,7 +45,7 @@ func (c *countingCRCWriter) Write(p []byte) (int, error) {
return c.w.Write(p)
}

func newCountingCRCWriter(w resettableWriteCloser, computeCRC bool) *countingCRCWriter {
func newCountingCRCWriter(w ResettableWriteCloser, computeCRC bool) *countingCRCWriter {
return &countingCRCWriter{
w: w,
crc: crc32.NewIEEE(),
Expand Down
2 changes: 1 addition & 1 deletion go/mcap/indexed_message_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (it *indexedMessageIterator) loadChunk(chunkIndex *ChunkIndex) error {
return nil
}

func (it *indexedMessageIterator) Next(p []byte) (*Schema, *Channel, *Message, error) {
func (it *indexedMessageIterator) Next(_ []byte) (*Schema, *Channel, *Message, error) {
if !it.hasReadSummarySection {
err := it.parseSummarySection()
if err != nil {
Expand Down
31 changes: 27 additions & 4 deletions go/mcap/lexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ type Lexer struct {
maxRecordSize int
maxDecompressedChunkSize int
attachmentCallback func(*AttachmentReader) error
decompressors map[CompressionFormat]ResettableReader
}

// Next returns the next token from the lexer as a byte array. The result will
Expand Down Expand Up @@ -302,6 +303,11 @@ func (l *Lexer) Close() {
if l.decoders.zstd != nil {
l.decoders.zstd.Close()
}
for _, decompressor := range l.decompressors {
if closer, ok := decompressor.(io.Closer); ok {
closer.Close()
}
}
}

type decoders struct {
Expand Down Expand Up @@ -414,15 +420,22 @@ func loadChunk(l *Lexer, recordLen uint64) error {

// remaining bytes in the record are the chunk data
lr := io.LimitReader(l.reader, int64(recordsLength))
switch compression {
case CompressionNone:
switch {
case l.decompressors[compression] != nil: // must be top
decoder := l.decompressors[compression]
err = decoder.Reset(lr)
if err != nil {
return fmt.Errorf("failed to reset custom decompressor: %w", err)
}
l.reader = decoder
case compression == CompressionNone:
l.reader = lr
case CompressionZSTD:
case compression == CompressionZSTD:
err = l.setZSTDDecoder(lr)
if err != nil {
return err
}
case CompressionLZ4:
case compression == CompressionLZ4:
l.setLZ4Decoder(lr)
default:
return fmt.Errorf("unsupported compression: %s", string(compression))
Expand Down Expand Up @@ -498,13 +511,20 @@ type LexerOptions struct {
MaxRecordSize int
// AttachmentCallback is a function to execute on attachments encountered in the file.
AttachmentCallback func(*AttachmentReader) error
// Decompressors are custom decompressors. Chunks matching the supplied
// compression format will be decompressed with the provided
// ResettableReader instead of the default implementation. If the
// ResettableReader also implements io.Closer, Close will be called on close
// of the reader.
Decompressors map[CompressionFormat]ResettableReader
}

// NewLexer returns a new lexer for the given reader.
func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) {
var maxRecordSize, maxDecompressedChunkSize int
var computeAttachmentCRCs, validateChunkCRCs, emitChunks, emitInvalidChunks, skipMagic bool
var attachmentCallback func(*AttachmentReader) error
var decompressors map[CompressionFormat]ResettableReader
if len(opts) > 0 {
validateChunkCRCs = opts[0].ValidateChunkCRCs
computeAttachmentCRCs = opts[0].ComputeAttachmentCRCs
Expand All @@ -514,13 +534,15 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) {
maxRecordSize = opts[0].MaxRecordSize
maxDecompressedChunkSize = opts[0].MaxDecompressedChunkSize
attachmentCallback = opts[0].AttachmentCallback
decompressors = opts[0].Decompressors
}
if !skipMagic {
err := validateMagic(r)
if err != nil {
return nil, err
}
}

return &Lexer{
basereader: r,
reader: r,
Expand All @@ -532,5 +554,6 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) {
maxRecordSize: maxRecordSize,
maxDecompressedChunkSize: maxDecompressedChunkSize,
attachmentCallback: attachmentCallback,
decompressors: decompressors,
}, nil
}
47 changes: 47 additions & 0 deletions go/mcap/lexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/pierrec/lz4/v4"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -134,6 +135,52 @@ func TestBadMagic(t *testing.T) {
}
}

type lzreader struct {
*lz4.Reader
}

func (l lzreader) Reset(r io.Reader) error {
l.Reader.Reset(r)
return nil
}

func TestCustomDecompressor(t *testing.T) {
buf := file(
header(),
chunk(t, CompressionLZ4, true, channelInfo(), message(), message()),
chunk(t, CompressionLZ4, true, channelInfo(), message(), message()),
attachment(), attachment(),
footer(),
)
lzr := lz4.NewReader(nil)
blockCount := 0
assert.Nil(t, lzr.Apply(lz4.OnBlockDoneOption(func(size int) {
blockCount++
})))
lexer, err := NewLexer(bytes.NewReader(buf), &LexerOptions{
Decompressors: map[CompressionFormat]ResettableReader{
CompressionLZ4: lzreader{lzr},
},
})
assert.Nil(t, err)
expected := []TokenType{
TokenHeader,
TokenChannel,
TokenMessage,
TokenMessage,
TokenChannel,
TokenMessage,
TokenMessage,
TokenFooter,
}
for i, expectedTokenType := range expected {
tokenType, _, err := lexer.Next(nil)
assert.Nil(t, err)
assert.Equal(t, expectedTokenType, tokenType, fmt.Sprintf("mismatch element %d", i))
}
assert.Positive(t, blockCount)
}

func TestReturnsEOFOnSuccessiveCalls(t *testing.T) {
lexer, err := NewLexer(bytes.NewReader(file()))
assert.Nil(t, err)
Expand Down
15 changes: 15 additions & 0 deletions go/mcap/resettable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package mcap

import "io"

// ResettableWriteCloser implements io.WriteCloser and adds a Reset method.
type ResettableWriteCloser interface {
io.WriteCloser
Reset(io.Writer)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps Reset should return error? lz4's does not, but zstd's does.

}

// ResettableReadCloser implements io.ReadCloser and adds a Reset method.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io.Reader

type ResettableReader interface {
io.Reader
Reset(io.Reader) error
}
2 changes: 1 addition & 1 deletion go/mcap/version.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package mcap

// Version of the MCAP library.
var Version = "v1.0.3"
var Version = "v1.0.4"
49 changes: 45 additions & 4 deletions go/mcap/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ func (w *Writer) flushActiveChunk() error {
if err != nil {
return err
}

offset += putUint64(w.chunk[offset:], uint64(msglen))
offset += putUint64(w.chunk[offset:], start)
offset += putUint64(w.chunk[offset:], end)
Expand Down Expand Up @@ -740,6 +741,33 @@ const (
CompressionLevelBest
)

type CustomCompressor interface {
Compressor() ResettableWriteCloser
Compression() CompressionFormat
}

// NewCustomCompressor returns a structure that may be supplied to writer
// options as a custom chunk compressor.
func NewCustomCompressor(compression CompressionFormat, compressor ResettableWriteCloser) CustomCompressor {
return &customCompressor{
compression: compression,
compressor: compressor,
}
}

type customCompressor struct {
compression CompressionFormat
compressor ResettableWriteCloser
}

func (c *customCompressor) Compressor() ResettableWriteCloser {
return c.compressor
}

func (c *customCompressor) Compression() CompressionFormat {
return c.compression
}

// WriterOptions are options for the MCAP Writer.
type WriterOptions struct {
// IncludeCRC specifies whether to compute CRC checksums in the output.
Expand Down Expand Up @@ -788,6 +816,10 @@ type WriterOptions struct {
// SkipMagic causes the writer to skip writing magic bytes at the start of
// the file. This may be useful for writing a partial section of records.
SkipMagic bool

// Compressor is a custom compressor. If supplied it will take precedence
// over the built-in ones.
Compressor CustomCompressor
}

// Convert an MCAP compression level to the corresponding lz4.CompressionLevel.
Expand Down Expand Up @@ -833,20 +865,29 @@ func NewWriter(w io.Writer, opts *WriterOptions) (*Writer, error) {
compressed := bytes.Buffer{}
var compressedWriter *countingCRCWriter
if opts.Chunked {
switch opts.Compression {
case CompressionZSTD:
switch {
case opts.Compressor != nil: // must be top
// override the compression option. We can't check for a mismatch here
// because "none compression" is an empty string.
opts.Compression = opts.Compressor.Compression()
if opts.Compressor.Compression() == "" {
return nil, fmt.Errorf("custom compressor requires compression format")
}
opts.Compressor.Compressor().Reset(&compressed)
compressedWriter = newCountingCRCWriter(opts.Compressor.Compressor(), opts.IncludeCRC)
case opts.Compression == CompressionZSTD:
level := encoderLevelFromZstd(opts.CompressionLevel)
zw, err := zstd.NewWriter(&compressed, zstd.WithEncoderLevel(level))
if err != nil {
return nil, err
}
compressedWriter = newCountingCRCWriter(zw, opts.IncludeCRC)
case CompressionLZ4:
case opts.Compression == CompressionLZ4:
level := encoderLevelFromLZ4(opts.CompressionLevel)
lzw := lz4.NewWriter(&compressed)
_ = lzw.Apply(lz4.CompressionLevelOption(level))
compressedWriter = newCountingCRCWriter(lzw, opts.IncludeCRC)
case CompressionNone:
case opts.Compression == CompressionNone:
compressedWriter = newCountingCRCWriter(bufCloser{&compressed}, opts.IncludeCRC)
default:
return nil, fmt.Errorf("unsupported compression")
Expand Down
62 changes: 62 additions & 0 deletions go/mcap/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"bytes"
"crypto/md5"
"fmt"
"io"
"testing"
"time"

"github.com/pierrec/lz4/v4"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -678,3 +680,63 @@ func TestWriteAttachment(t *testing.T) {
})
}
}

func assertReadable(t *testing.T, rs io.ReadSeeker) {
reader, err := NewReader(rs)
assert.Nil(t, err)

_, err = reader.Info()
assert.Nil(t, err)

it, err := reader.Messages()
assert.Nil(t, err)
for {
_, _, _, err := it.Next(nil)
if err != nil {
assert.ErrorIs(t, err, io.EOF)
break
}
}
}

func TestBYOCompressor(t *testing.T) {
buf := &bytes.Buffer{}
// example - custom lz4 settings
lzw := lz4.NewWriter(nil)
blockCount := 0
assert.Nil(t, lzw.Apply(lz4.OnBlockDoneOption(func(size int) {
blockCount++
})))

writer, err := NewWriter(buf, &WriterOptions{
Chunked: true,
ChunkSize: 1024,
Compressor: NewCustomCompressor("lz4", lzw),
})
assert.Nil(t, err)

assert.Nil(t, writer.WriteHeader(&Header{}))
assert.Nil(t, writer.WriteSchema(&Schema{
ID: 1,
Name: "schema",
Encoding: "ros1msg",
Data: []byte{},
}))
assert.Nil(t, writer.WriteChannel(&Channel{
ID: 0,
SchemaID: 1,
Topic: "/foo",
MessageEncoding: "ros1msg",
}))

for i := 0; i < 100; i++ {
assert.Nil(t, writer.WriteMessage(&Message{
ChannelID: 0,
Sequence: 0,
LogTime: uint64(i),
}))
}
assert.Nil(t, writer.Close())
assertReadable(t, bytes.NewReader(buf.Bytes()))
assert.Positive(t, blockCount)
}
Loading