From 80b5b855e289942a5caf7572a72bb54ba80273ca Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 29 Aug 2023 07:54:56 -0700 Subject: [PATCH] Include metadata in mcap merge Includes metadata records from input files in mcap merge via a new read option. This required a breaking change to read options to avoid a dependency cycle: since I need to supply a callback option to apply to metadata records, the readopts package required awareness of "mcap" while "mcap" required awareness of readopts for configuration. To address this I have moved readopts.go under the mcap package. Users who upgrade the library will need to swap out the package name if they are using any options. --- go/cli/mcap/cmd/cat.go | 9 +- go/cli/mcap/cmd/merge.go | 13 ++- go/cli/mcap/cmd/merge_test.go | 29 ++++- go/conformance/test-read-conformance/main.go | 3 +- go/mcap/indexed_message_iterator.go | 42 +++++++ go/mcap/range_index_heap.go | 12 +- go/mcap/range_index_heap_test.go | 9 +- go/mcap/reader.go | 110 +++++++++++++----- .../readopts.go => reader_options.go} | 20 ++-- go/mcap/reader_test.go | 62 +++++++--- go/mcap/unindexed_message_iterator.go | 15 +++ go/ros/ros2db3_to_mcap_test.go | 3 +- 12 files changed, 239 insertions(+), 88 deletions(-) rename go/mcap/{readopts/readopts.go => reader_options.go} (86%) diff --git a/go/cli/mcap/cmd/cat.go b/go/cli/mcap/cmd/cat.go index ae344a31b9..5788e04650 100644 --- a/go/cli/mcap/cmd/cat.go +++ b/go/cli/mcap/cmd/cat.go @@ -16,7 +16,6 @@ import ( "github.com/foxglove/mcap/go/cli/mcap/utils" "github.com/foxglove/mcap/go/cli/mcap/utils/ros" "github.com/foxglove/mcap/go/mcap" - "github.com/foxglove/mcap/go/mcap/readopts" "github.com/spf13/cobra" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" @@ -166,14 +165,14 @@ func (w *jsonOutputWriter) writeMessage( return nil } -func getReadOpts(useIndex bool) []readopts.ReadOpt { +func getReadOpts(useIndex bool) []mcap.ReadOpt { topics := strings.FieldsFunc(catTopics, func(c rune) bool { return c == ',' }) - opts := []readopts.ReadOpt{readopts.UsingIndex(useIndex), readopts.WithTopics(topics)} + opts := []mcap.ReadOpt{mcap.UsingIndex(useIndex), mcap.WithTopics(topics)} if catStart != 0 { - opts = append(opts, readopts.After(catStart*1e9)) + opts = append(opts, mcap.After(catStart*1e9)) } if catEnd != math.MaxInt64 { - opts = append(opts, readopts.Before(catEnd*1e9)) + opts = append(opts, mcap.Before(catEnd*1e9)) } return opts } diff --git a/go/cli/mcap/cmd/merge.go b/go/cli/mcap/cmd/merge.go index c4e9acdf90..9c4eba3a7b 100644 --- a/go/cli/mcap/cmd/merge.go +++ b/go/cli/mcap/cmd/merge.go @@ -9,7 +9,6 @@ import ( "github.com/foxglove/mcap/go/cli/mcap/utils" "github.com/foxglove/mcap/go/mcap" - "github.com/foxglove/mcap/go/mcap/readopts" "github.com/spf13/cobra" ) @@ -85,6 +84,14 @@ func (m *mcapMerger) outputSchemaID(inputID int, inputSchemaID uint16) (uint16, return v, ok } +func (m *mcapMerger) addMetadata(w *mcap.Writer, metadata *mcap.Metadata) error { + err := w.WriteMetadata(metadata) + if err != nil { + return fmt.Errorf("failed to write metadata: %w", err) + } + return nil +} + func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Channel) (uint16, error) { outputSchemaID, ok := m.outputSchemaID(inputID, channel.SchemaID) if !ok { @@ -168,7 +175,9 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []namedReader) error { } defer reader.Close() profiles[inputID] = reader.Header().Profile - iterator, err := reader.Messages(readopts.UsingIndex(false)) + iterator, err := reader.Messages(mcap.UsingIndex(false), mcap.WithMetadataCallback(func(metadata *mcap.Metadata) error { + return m.addMetadata(writer, metadata) + })) if err != nil { return fmt.Errorf("failed to read messages on %s: %w", input.name, err) } diff --git a/go/cli/mcap/cmd/merge_test.go b/go/cli/mcap/cmd/merge_test.go index f644dbfffb..b6e4fcbdc5 100644 --- a/go/cli/mcap/cmd/merge_test.go +++ b/go/cli/mcap/cmd/merge_test.go @@ -7,7 +7,6 @@ import ( "testing" "github.com/foxglove/mcap/go/mcap" - "github.com/foxglove/mcap/go/mcap/readopts" "github.com/stretchr/testify/assert" ) @@ -34,6 +33,14 @@ func prepInput(t *testing.T, w io.Writer, schemaID uint16, channelID uint16, top LogTime: uint64(i), })) } + + assert.Nil(t, writer.WriteMetadata(&mcap.Metadata{ + Name: "a", + Metadata: map[string]string{ + "b": "c", + }, + })) + assert.Nil(t, writer.Close()) } @@ -57,10 +64,10 @@ func TestMCAPMerging(t *testing.T) { assert.Nil(t, merger.mergeInputs(output, inputs)) // output should now be a well-formed mcap - reader, err := mcap.NewReader(output) + reader, err := mcap.NewReader(bytes.NewReader(output.Bytes())) assert.Nil(t, err) assert.Equal(t, reader.Header().Profile, "testprofile") - it, err := reader.Messages(readopts.UsingIndex(false)) + it, err := reader.Messages(mcap.UsingIndex(false)) assert.Nil(t, err) messages := make(map[string]int) @@ -72,6 +79,16 @@ func TestMCAPMerging(t *testing.T) { assert.Equal(t, 100, messages["/foo"]) assert.Equal(t, 100, messages["/bar"]) assert.Equal(t, 100, messages["/baz"]) + + // check we got 3 parsable metadatas + info, err := reader.Info() + assert.Nil(t, err) + assert.Equal(t, 3, len(info.MetadataIndexes)) + for _, idx := range info.MetadataIndexes { + _, err := reader.GetMetadata(idx.Offset) + assert.Nil(t, err) + } + reader.Close() } } @@ -157,7 +174,7 @@ func TestMultiChannelInput(t *testing.T) { assert.Nil(t, err) defer reader.Close() assert.Equal(t, reader.Header().Profile, "testprofile") - it, err := reader.Messages(readopts.UsingIndex(false)) + it, err := reader.Messages(mcap.UsingIndex(false)) assert.Nil(t, err) messages := make(map[string]int) err = mcap.Range(it, func(schema *mcap.Schema, channel *mcap.Channel, message *mcap.Message) error { @@ -186,7 +203,7 @@ func TestSchemalessChannelInput(t *testing.T) { reader, err := mcap.NewReader(output) assert.Nil(t, err) assert.Equal(t, reader.Header().Profile, "testprofile") - it, err := reader.Messages(readopts.UsingIndex(false)) + it, err := reader.Messages(mcap.UsingIndex(false)) assert.Nil(t, err) messages := make(map[string]int) schemaIDs := make(map[uint16]int) @@ -239,7 +256,7 @@ func TestMultipleSchemalessChannelSingleInput(t *testing.T) { reader, err := mcap.NewReader(output) assert.Nil(t, err) assert.Equal(t, reader.Header().Profile, "testprofile") - it, err := reader.Messages(readopts.UsingIndex(false)) + it, err := reader.Messages(mcap.UsingIndex(false)) assert.Nil(t, err) messages := make(map[string]int) schemaIDs := make(map[uint16]int) diff --git a/go/conformance/test-read-conformance/main.go b/go/conformance/test-read-conformance/main.go index 5925df39b5..9cdae68711 100644 --- a/go/conformance/test-read-conformance/main.go +++ b/go/conformance/test-read-conformance/main.go @@ -13,7 +13,6 @@ import ( "strings" "github.com/foxglove/mcap/go/mcap" - "github.com/foxglove/mcap/go/mcap/readopts" ) var ( @@ -339,7 +338,7 @@ func readIndexed(w io.Writer, filepath string) error { if err != nil { return err } - it, err := reader.Messages(readopts.InOrder(readopts.LogTimeOrder)) + it, err := reader.Messages(mcap.InOrder(mcap.LogTimeOrder)) if err != nil { return err } diff --git a/go/mcap/indexed_message_iterator.go b/go/mcap/indexed_message_iterator.go index ef7ba4b6fa..40869351c5 100644 --- a/go/mcap/indexed_message_iterator.go +++ b/go/mcap/indexed_message_iterator.go @@ -40,6 +40,7 @@ type indexedMessageIterator struct { hasReadSummarySection bool compressedChunkAndMessageIndex []byte + metadataCallback func(*Metadata) error } // parseIndexSection parses the index section of the file and populates the @@ -237,13 +238,54 @@ func (it *indexedMessageIterator) loadChunk(chunkIndex *ChunkIndex) error { return nil } +func readRecord(r io.Reader) (TokenType, []byte, error) { + buf := make([]byte, 9) + _, err := io.ReadFull(r, buf) + if err != nil { + return 0, nil, fmt.Errorf("failed to read record header: %w", err) + } + tokenType := TokenType(buf[0]) + recordLen := binary.LittleEndian.Uint64(buf[1:]) + record := make([]byte, recordLen) + _, err = io.ReadFull(r, record) + if err != nil { + return 0, nil, fmt.Errorf("failed to read record: %w", err) + } + return tokenType, record, nil +} + func (it *indexedMessageIterator) Next(p []byte) (*Schema, *Channel, *Message, error) { if !it.hasReadSummarySection { err := it.parseSummarySection() if err != nil { return nil, nil, nil, err } + // take care of the metadata here + if it.metadataCallback != nil { + for _, idx := range it.metadataIndexes { + _, err = it.rs.Seek(int64(idx.Offset), io.SeekStart) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to seek to metadata: %w", err) + } + tokenType, data, err := readRecord(it.rs) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to read metadata record: %w", err) + } + if tokenType != TokenMetadata { + return nil, nil, nil, fmt.Errorf("expected metadata record, found %v", data) + } + metadata, err := ParseMetadata(data) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to parse metadata record: %w", err) + } + err = it.metadataCallback(metadata) + if err != nil { + return nil, nil, nil, fmt.Errorf("metadata callback failed: %w", err) + } + } + } } + for it.indexHeap.Len() > 0 { ri, err := it.indexHeap.HeapPop() if err != nil { diff --git a/go/mcap/range_index_heap.go b/go/mcap/range_index_heap.go index ca8401a667..284ecaf084 100644 --- a/go/mcap/range_index_heap.go +++ b/go/mcap/range_index_heap.go @@ -3,8 +3,6 @@ package mcap import ( "container/heap" "fmt" - - "github.com/foxglove/mcap/go/mcap/readopts" ) // rangeIndex refers to either a chunk (via the ChunkIndex, with other fields nil) @@ -18,7 +16,7 @@ type rangeIndex struct { // heap of rangeIndex entries, where the entries are sorted by their log time. type rangeIndexHeap struct { indices []rangeIndex - order readopts.ReadOrder + order ReadOrder lastErr error } @@ -26,7 +24,7 @@ type rangeIndexHeap struct { func (h rangeIndexHeap) timestamp(i int) uint64 { ri := h.indices[i] if ri.messageIndexEntry == nil { - if h.order == readopts.ReverseLogTimeOrder { + if h.order == ReverseLogTimeOrder { return ri.chunkIndex.MessageEndTime } return ri.chunkIndex.MessageStartTime @@ -78,14 +76,14 @@ func (h *rangeIndexHeap) Pop() interface{} { // Less is required by `heap.Interface`. func (h *rangeIndexHeap) Less(i, j int) bool { switch h.order { - case readopts.FileOrder: + case FileOrder: return h.filePositionLess(i, j) - case readopts.LogTimeOrder: + case LogTimeOrder: if h.timestamp(i) == h.timestamp(j) { return h.filePositionLess(i, j) } return h.timestamp(i) < h.timestamp(j) - case readopts.ReverseLogTimeOrder: + case ReverseLogTimeOrder: if h.timestamp(i) == h.timestamp(j) { return h.filePositionLess(j, i) } diff --git a/go/mcap/range_index_heap_test.go b/go/mcap/range_index_heap_test.go index e842a7cbb2..67ff2f96bb 100644 --- a/go/mcap/range_index_heap_test.go +++ b/go/mcap/range_index_heap_test.go @@ -4,7 +4,6 @@ import ( "reflect" "testing" - "github.com/foxglove/mcap/go/mcap/readopts" "github.com/stretchr/testify/assert" ) @@ -44,22 +43,22 @@ var rangeIndexHeapTestItems = []rangeIndex{ func TestMessageOrdering(t *testing.T) { cases := []struct { assertion string - order readopts.ReadOrder + order ReadOrder expectedIndexOrder []int }{ { assertion: "read time order forwards", - order: readopts.LogTimeOrder, + order: LogTimeOrder, expectedIndexOrder: []int{0, 1, 2, 3}, }, { assertion: "read time order backwards", - order: readopts.ReverseLogTimeOrder, + order: ReverseLogTimeOrder, expectedIndexOrder: []int{3, 0, 2, 1}, }, { assertion: "read file order", - order: readopts.FileOrder, + order: FileOrder, expectedIndexOrder: []int{0, 2, 1, 3}, }, } diff --git a/go/mcap/reader.go b/go/mcap/reader.go index 5561565d45..34ceb864f2 100644 --- a/go/mcap/reader.go +++ b/go/mcap/reader.go @@ -6,10 +6,10 @@ import ( "fmt" "io" "math" - - "github.com/foxglove/mcap/go/mcap/readopts" ) +var ErrMetadataNotFound = errors.New("metadata not found") + func getPrefixedString(data []byte, offset int) (s string, newoffset int, err error) { if len(data[offset:]) < 4 { return "", 0, io.ErrShortBuffer @@ -60,6 +60,8 @@ type Reader struct { rs io.ReadSeeker header *Header channels map[uint16]*Channel + + info *Info } type MessageIterator interface { @@ -82,64 +84,69 @@ func Range(it MessageIterator, f func(*Schema, *Channel, *Message) error) error } } -func (r *Reader) unindexedIterator(topics []string, start uint64, end uint64) *unindexedMessageIterator { +func (r *Reader) unindexedIterator(opts ReadOptions) *unindexedMessageIterator { topicMap := make(map[string]bool) - for _, topic := range topics { + for _, topic := range opts.Topics { topicMap[topic] = true } r.l.emitChunks = false return &unindexedMessageIterator{ - lexer: r.l, - channels: make(map[uint16]*Channel), - schemas: make(map[uint16]*Schema), - topics: topicMap, - start: start, - end: end, + lexer: r.l, + channels: make(map[uint16]*Channel), + schemas: make(map[uint16]*Schema), + topics: topicMap, + start: uint64(opts.Start), + end: uint64(opts.End), + metadataCallback: opts.MetadataCallback, } } func (r *Reader) indexedMessageIterator( - topics []string, - start uint64, - end uint64, - order readopts.ReadOrder, + opts ReadOptions, ) *indexedMessageIterator { topicMap := make(map[string]bool) - for _, topic := range topics { + for _, topic := range opts.Topics { topicMap[topic] = true } r.l.emitChunks = true return &indexedMessageIterator{ - lexer: r.l, - rs: r.rs, - channels: make(map[uint16]*Channel), - schemas: make(map[uint16]*Schema), - topics: topicMap, - start: start, - end: end, - indexHeap: rangeIndexHeap{order: order}, + lexer: r.l, + rs: r.rs, + channels: make(map[uint16]*Channel), + schemas: make(map[uint16]*Schema), + topics: topicMap, + start: uint64(opts.Start), + end: uint64(opts.End), + indexHeap: rangeIndexHeap{order: opts.Order}, + metadataCallback: opts.MetadataCallback, } } func (r *Reader) Messages( - opts ...readopts.ReadOpt, + opts ...ReadOpt, ) (MessageIterator, error) { - ro := readopts.Default() + options := ReadOptions{ + Start: 0, + End: math.MaxInt64, + Topics: nil, + UseIndex: true, + Order: FileOrder, + } for _, opt := range opts { - err := opt(&ro) + err := opt(&options) if err != nil { return nil, err } } - if ro.UseIndex { + if options.UseIndex { if rs, ok := r.r.(io.ReadSeeker); ok { r.rs = rs } else { return nil, fmt.Errorf("indexed reader requires a seekable reader") } - return r.indexedMessageIterator(ro.Topics, uint64(ro.Start), uint64(ro.End), ro.Order), nil + return r.indexedMessageIterator(options), nil } - return r.unindexedIterator(ro.Topics, uint64(ro.Start), uint64(ro.End)), nil + return r.unindexedIterator(options), nil } // Get the Header record from this MCAP. @@ -150,12 +157,20 @@ func (r *Reader) Header() *Header { // Info scans the summary section to form a structure describing characteristics // of the underlying mcap file. func (r *Reader) Info() (*Info, error) { - it := r.indexedMessageIterator(nil, 0, math.MaxUint64, readopts.FileOrder) + if r.info != nil { + return r.info, nil + } + if r.rs == nil { + return nil, fmt.Errorf("cannot get info from non-seekable reader") + } + it := r.indexedMessageIterator(ReadOptions{ + UseIndex: true, + }) err := it.parseSummarySection() if err != nil { return nil, err } - return &Info{ + info := &Info{ Statistics: it.statistics, Channels: it.channels, ChunkIndexes: it.chunkIndexes, @@ -164,7 +179,38 @@ func (r *Reader) Info() (*Info, error) { Schemas: it.schemas, Footer: it.footer, Header: r.header, - }, nil + } + r.info = info + return info, nil +} + +func (r *Reader) GetMetadata(offset uint64) (*Metadata, error) { + info, err := r.Info() + if err != nil { + return nil, err + } + for _, idx := range info.MetadataIndexes { + if idx.Offset != offset { + continue + } + _, err := r.rs.Seek(int64(idx.Offset), io.SeekStart) + if err != nil { + return nil, err + } + token, data, err := r.l.Next(nil) + if err != nil { + return nil, err + } + if token != TokenMetadata { + return nil, fmt.Errorf("expected metadata record, found %v", data) + } + metadata, err := ParseMetadata(data) + if err != nil { + return nil, fmt.Errorf("failed to parse metadata record: %w", err) + } + return metadata, nil + } + return nil, ErrMetadataNotFound } // Close the reader. diff --git a/go/mcap/readopts/readopts.go b/go/mcap/reader_options.go similarity index 86% rename from go/mcap/readopts/readopts.go rename to go/mcap/reader_options.go index 5de9aa78eb..b0f1c837ae 100644 --- a/go/mcap/readopts/readopts.go +++ b/go/mcap/reader_options.go @@ -1,8 +1,7 @@ -package readopts +package mcap import ( "fmt" - "math" ) type ReadOrder int @@ -19,16 +18,8 @@ type ReadOptions struct { Topics []string UseIndex bool Order ReadOrder -} -func Default() ReadOptions { - return ReadOptions{ - Start: 0, - End: math.MaxInt64, - Topics: nil, - UseIndex: true, - Order: FileOrder, - } + MetadataCallback func(*Metadata) error } type ReadOpt func(*ReadOptions) error @@ -79,3 +70,10 @@ func UsingIndex(useIndex bool) ReadOpt { return nil } } + +func WithMetadataCallback(callback func(*Metadata) error) ReadOpt { + return func(ro *ReadOptions) error { + ro.MetadataCallback = callback + return nil + } +} diff --git a/go/mcap/reader_test.go b/go/mcap/reader_test.go index c6e0b8190c..35a062ee3a 100644 --- a/go/mcap/reader_test.go +++ b/go/mcap/reader_test.go @@ -8,7 +8,6 @@ import ( "os" "testing" - "github.com/foxglove/mcap/go/mcap/readopts" "github.com/stretchr/testify/assert" ) @@ -63,7 +62,7 @@ func TestIndexedReaderBreaksTiesOnChunkOffset(t *testing.T) { reader, err := NewReader(bytes.NewReader(buf.Bytes())) assert.Nil(t, err) - it, err := reader.Messages(readopts.UsingIndex(true)) + it, err := reader.Messages(UsingIndex(true)) assert.Nil(t, err) expectedTopics := []string{"/foo", "/bar"} for i := 0; i < 2; i++ { @@ -273,7 +272,7 @@ func TestMessageReading(t *testing.T) { reader := bytes.NewReader(buf.Bytes()) r, err := NewReader(reader) assert.Nil(t, err) - it, err := r.Messages(readopts.UsingIndex(useIndex)) + it, err := r.Messages(UsingIndex(useIndex)) assert.Nil(t, err) c := 0 for { @@ -296,8 +295,8 @@ func TestMessageReading(t *testing.T) { r, err := NewReader(reader) assert.Nil(t, err) it, err := r.Messages( - readopts.WithTopics([]string{"/test1"}), - readopts.UsingIndex(useIndex), + WithTopics([]string{"/test1"}), + UsingIndex(useIndex), ) assert.Nil(t, err) c := 0 @@ -321,8 +320,8 @@ func TestMessageReading(t *testing.T) { r, err := NewReader(reader) assert.Nil(t, err) it, err := r.Messages( - readopts.WithTopics([]string{"/test1", "/test2"}), - readopts.UsingIndex(useIndex), + WithTopics([]string{"/test1", "/test2"}), + UsingIndex(useIndex), ) assert.Nil(t, err) c := 0 @@ -346,9 +345,9 @@ func TestMessageReading(t *testing.T) { r, err := NewReader(reader) assert.Nil(t, err) it, err := r.Messages( - readopts.After(100), - readopts.Before(200), - readopts.UsingIndex(useIndex), + After(100), + Before(200), + UsingIndex(useIndex), ) assert.Nil(t, err) c := 0 @@ -379,7 +378,7 @@ func TestReaderCounting(t *testing.T) { defer f.Close() r, err := NewReader(f) assert.Nil(t, err) - it, err := r.Messages(readopts.UsingIndex(indexed)) + it, err := r.Messages(UsingIndex(indexed)) assert.Nil(t, err) c := 0 for { @@ -560,7 +559,7 @@ func TestReadingDiagnostics(t *testing.T) { assert.Nil(t, err) r, err := NewReader(f) assert.Nil(t, err) - it, err := r.Messages(readopts.WithTopics([]string{"/diagnostics"})) + it, err := r.Messages(WithTopics([]string{"/diagnostics"})) assert.Nil(t, err) c := 0 for { @@ -574,6 +573,37 @@ func TestReadingDiagnostics(t *testing.T) { assert.Equal(t, 52, c) } +func TestReadingMetadata(t *testing.T) { + buf := &bytes.Buffer{} + writer, err := NewWriter(buf, &WriterOptions{ + Chunked: true, + ChunkSize: 1024, + Compression: "", + }) + assert.Nil(t, err) + assert.Nil(t, writer.WriteHeader(&Header{})) + + expectedMetadata := &Metadata{ + Name: "foo", + Metadata: map[string]string{ + "foo": "bar", + }, + } + assert.Nil(t, writer.WriteMetadata(expectedMetadata)) + assert.Nil(t, writer.Close()) + + reader, err := NewReader(bytes.NewReader(buf.Bytes())) + assert.Nil(t, err) + + info, err := reader.Info() + assert.Nil(t, err) + assert.Equal(t, 1, len(info.MetadataIndexes)) + idx := info.MetadataIndexes[0] + metadata, err := reader.GetMetadata(idx.Offset) + assert.Nil(t, err) + assert.Equal(t, expectedMetadata, metadata) +} + func TestReadingMessageOrderWithOverlappingChunks(t *testing.T) { buf := &bytes.Buffer{} // write an MCAP with two chunks, where in each chunk all messages have ascending timestamps, @@ -634,8 +664,8 @@ func TestReadingMessageOrderWithOverlappingChunks(t *testing.T) { assert.Nil(t, err) it, err := reader.Messages( - readopts.UsingIndex(true), - readopts.InOrder(readopts.LogTimeOrder), + UsingIndex(true), + InOrder(LogTimeOrder), ) assert.Nil(t, err) @@ -655,8 +685,8 @@ func TestReadingMessageOrderWithOverlappingChunks(t *testing.T) { // now try iterating in reverse reverseIt, err := reader.Messages( - readopts.UsingIndex(true), - readopts.InOrder(readopts.ReverseLogTimeOrder), + UsingIndex(true), + InOrder(ReverseLogTimeOrder), ) assert.Nil(t, err) diff --git a/go/mcap/unindexed_message_iterator.go b/go/mcap/unindexed_message_iterator.go index 364ef4e934..839af16a4a 100644 --- a/go/mcap/unindexed_message_iterator.go +++ b/go/mcap/unindexed_message_iterator.go @@ -11,6 +11,8 @@ type unindexedMessageIterator struct { topics map[string]bool start uint64 end uint64 + + metadataCallback func(*Metadata) error } func (it *unindexedMessageIterator) Next(p []byte) (*Schema, *Channel, *Message, error) { @@ -55,6 +57,19 @@ func (it *unindexedMessageIterator) Next(p []byte) (*Schema, *Channel, *Message, schema := it.schemas[channel.SchemaID] return schema, channel, message, nil } + case TokenMetadata: + if it.metadataCallback != nil { + metadata, err := ParseMetadata(record) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to parse metadata: %w", err) + } + err = it.metadataCallback(metadata) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to process metadata: %w", err) + } + } + // we don't emit metadata from the reader, so continue onward + continue default: // skip all other tokens } diff --git a/go/ros/ros2db3_to_mcap_test.go b/go/ros/ros2db3_to_mcap_test.go index 873a91e7eb..3779bfee45 100644 --- a/go/ros/ros2db3_to_mcap_test.go +++ b/go/ros/ros2db3_to_mcap_test.go @@ -9,7 +9,6 @@ import ( "testing" "github.com/foxglove/mcap/go/mcap" - "github.com/foxglove/mcap/go/mcap/readopts" _ "github.com/mattn/go-sqlite3" "github.com/stretchr/testify/assert" ) @@ -64,7 +63,7 @@ func TestDB3MCAPConversion(t *testing.T) { assert.Equal(t, 1, len(info.Channels)) assert.Equal(t, c.expectedTopic, info.Channels[1].Topic) messageCount := 0 - it, err := reader.Messages(readopts.WithTopics([]string{c.expectedTopic})) + it, err := reader.Messages(mcap.WithTopics([]string{c.expectedTopic})) assert.Nil(t, err) for { schema, channel, message, err := it.Next(nil)