Skip to content

Commit

Permalink
Include metadata in mcap merge
Browse files Browse the repository at this point in the history
Includes metadata records from input files in mcap merge via a new read
option. This required a breaking change to read options to avoid a
dependency cycle: since I need to supply a callback option to apply to
metadata records, the readopts package required awareness of "mcap"
while "mcap" required awareness of readopts for configuration.

To address this I have moved readopts.go under the mcap package. Users
who upgrade the library will need to swap out the package name if they
are using any options.
  • Loading branch information
Wyatt Alt committed Aug 29, 2023
1 parent c0a4720 commit 80b5b85
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 88 deletions.
9 changes: 4 additions & 5 deletions go/cli/mcap/cmd/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/foxglove/mcap/go/cli/mcap/utils"
"github.com/foxglove/mcap/go/cli/mcap/utils/ros"
"github.com/foxglove/mcap/go/mcap"
"github.com/foxglove/mcap/go/mcap/readopts"
"github.com/spf13/cobra"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -166,14 +165,14 @@ func (w *jsonOutputWriter) writeMessage(
return nil
}

func getReadOpts(useIndex bool) []readopts.ReadOpt {
func getReadOpts(useIndex bool) []mcap.ReadOpt {
topics := strings.FieldsFunc(catTopics, func(c rune) bool { return c == ',' })
opts := []readopts.ReadOpt{readopts.UsingIndex(useIndex), readopts.WithTopics(topics)}
opts := []mcap.ReadOpt{mcap.UsingIndex(useIndex), mcap.WithTopics(topics)}
if catStart != 0 {
opts = append(opts, readopts.After(catStart*1e9))
opts = append(opts, mcap.After(catStart*1e9))
}
if catEnd != math.MaxInt64 {
opts = append(opts, readopts.Before(catEnd*1e9))
opts = append(opts, mcap.Before(catEnd*1e9))
}
return opts
}
Expand Down
13 changes: 11 additions & 2 deletions go/cli/mcap/cmd/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/foxglove/mcap/go/cli/mcap/utils"
"github.com/foxglove/mcap/go/mcap"
"github.com/foxglove/mcap/go/mcap/readopts"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -85,6 +84,14 @@ func (m *mcapMerger) outputSchemaID(inputID int, inputSchemaID uint16) (uint16,
return v, ok
}

func (m *mcapMerger) addMetadata(w *mcap.Writer, metadata *mcap.Metadata) error {
err := w.WriteMetadata(metadata)
if err != nil {
return fmt.Errorf("failed to write metadata: %w", err)
}
return nil
}

func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Channel) (uint16, error) {
outputSchemaID, ok := m.outputSchemaID(inputID, channel.SchemaID)
if !ok {
Expand Down Expand Up @@ -168,7 +175,9 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []namedReader) error {
}
defer reader.Close()
profiles[inputID] = reader.Header().Profile
iterator, err := reader.Messages(readopts.UsingIndex(false))
iterator, err := reader.Messages(mcap.UsingIndex(false), mcap.WithMetadataCallback(func(metadata *mcap.Metadata) error {
return m.addMetadata(writer, metadata)
}))
if err != nil {
return fmt.Errorf("failed to read messages on %s: %w", input.name, err)
}
Expand Down
29 changes: 23 additions & 6 deletions go/cli/mcap/cmd/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"

"github.com/foxglove/mcap/go/mcap"
"github.com/foxglove/mcap/go/mcap/readopts"
"github.com/stretchr/testify/assert"
)

Expand All @@ -34,6 +33,14 @@ func prepInput(t *testing.T, w io.Writer, schemaID uint16, channelID uint16, top
LogTime: uint64(i),
}))
}

assert.Nil(t, writer.WriteMetadata(&mcap.Metadata{
Name: "a",
Metadata: map[string]string{
"b": "c",
},
}))

assert.Nil(t, writer.Close())
}

Expand All @@ -57,10 +64,10 @@ func TestMCAPMerging(t *testing.T) {
assert.Nil(t, merger.mergeInputs(output, inputs))

// output should now be a well-formed mcap
reader, err := mcap.NewReader(output)
reader, err := mcap.NewReader(bytes.NewReader(output.Bytes()))
assert.Nil(t, err)
assert.Equal(t, reader.Header().Profile, "testprofile")
it, err := reader.Messages(readopts.UsingIndex(false))
it, err := reader.Messages(mcap.UsingIndex(false))
assert.Nil(t, err)

messages := make(map[string]int)
Expand All @@ -72,6 +79,16 @@ func TestMCAPMerging(t *testing.T) {
assert.Equal(t, 100, messages["/foo"])
assert.Equal(t, 100, messages["/bar"])
assert.Equal(t, 100, messages["/baz"])

// check we got 3 parsable metadatas
info, err := reader.Info()
assert.Nil(t, err)
assert.Equal(t, 3, len(info.MetadataIndexes))
for _, idx := range info.MetadataIndexes {
_, err := reader.GetMetadata(idx.Offset)
assert.Nil(t, err)
}

reader.Close()
}
}
Expand Down Expand Up @@ -157,7 +174,7 @@ func TestMultiChannelInput(t *testing.T) {
assert.Nil(t, err)
defer reader.Close()
assert.Equal(t, reader.Header().Profile, "testprofile")
it, err := reader.Messages(readopts.UsingIndex(false))
it, err := reader.Messages(mcap.UsingIndex(false))
assert.Nil(t, err)
messages := make(map[string]int)
err = mcap.Range(it, func(schema *mcap.Schema, channel *mcap.Channel, message *mcap.Message) error {
Expand Down Expand Up @@ -186,7 +203,7 @@ func TestSchemalessChannelInput(t *testing.T) {
reader, err := mcap.NewReader(output)
assert.Nil(t, err)
assert.Equal(t, reader.Header().Profile, "testprofile")
it, err := reader.Messages(readopts.UsingIndex(false))
it, err := reader.Messages(mcap.UsingIndex(false))
assert.Nil(t, err)
messages := make(map[string]int)
schemaIDs := make(map[uint16]int)
Expand Down Expand Up @@ -239,7 +256,7 @@ func TestMultipleSchemalessChannelSingleInput(t *testing.T) {
reader, err := mcap.NewReader(output)
assert.Nil(t, err)
assert.Equal(t, reader.Header().Profile, "testprofile")
it, err := reader.Messages(readopts.UsingIndex(false))
it, err := reader.Messages(mcap.UsingIndex(false))
assert.Nil(t, err)
messages := make(map[string]int)
schemaIDs := make(map[uint16]int)
Expand Down
3 changes: 1 addition & 2 deletions go/conformance/test-read-conformance/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"strings"

"github.com/foxglove/mcap/go/mcap"
"github.com/foxglove/mcap/go/mcap/readopts"
)

var (
Expand Down Expand Up @@ -339,7 +338,7 @@ func readIndexed(w io.Writer, filepath string) error {
if err != nil {
return err
}
it, err := reader.Messages(readopts.InOrder(readopts.LogTimeOrder))
it, err := reader.Messages(mcap.InOrder(mcap.LogTimeOrder))
if err != nil {
return err
}
Expand Down
42 changes: 42 additions & 0 deletions go/mcap/indexed_message_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type indexedMessageIterator struct {
hasReadSummarySection bool

compressedChunkAndMessageIndex []byte
metadataCallback func(*Metadata) error
}

// parseIndexSection parses the index section of the file and populates the
Expand Down Expand Up @@ -237,13 +238,54 @@ func (it *indexedMessageIterator) loadChunk(chunkIndex *ChunkIndex) error {
return nil
}

func readRecord(r io.Reader) (TokenType, []byte, error) {
buf := make([]byte, 9)
_, err := io.ReadFull(r, buf)
if err != nil {
return 0, nil, fmt.Errorf("failed to read record header: %w", err)
}
tokenType := TokenType(buf[0])
recordLen := binary.LittleEndian.Uint64(buf[1:])
record := make([]byte, recordLen)
_, err = io.ReadFull(r, record)
if err != nil {
return 0, nil, fmt.Errorf("failed to read record: %w", err)
}
return tokenType, record, nil
}

func (it *indexedMessageIterator) Next(p []byte) (*Schema, *Channel, *Message, error) {
if !it.hasReadSummarySection {
err := it.parseSummarySection()
if err != nil {
return nil, nil, nil, err
}
// take care of the metadata here
if it.metadataCallback != nil {
for _, idx := range it.metadataIndexes {
_, err = it.rs.Seek(int64(idx.Offset), io.SeekStart)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to seek to metadata: %w", err)
}
tokenType, data, err := readRecord(it.rs)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to read metadata record: %w", err)
}
if tokenType != TokenMetadata {
return nil, nil, nil, fmt.Errorf("expected metadata record, found %v", data)
}
metadata, err := ParseMetadata(data)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to parse metadata record: %w", err)
}
err = it.metadataCallback(metadata)
if err != nil {
return nil, nil, nil, fmt.Errorf("metadata callback failed: %w", err)
}
}
}
}

for it.indexHeap.Len() > 0 {
ri, err := it.indexHeap.HeapPop()
if err != nil {
Expand Down
12 changes: 5 additions & 7 deletions go/mcap/range_index_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package mcap
import (
"container/heap"
"fmt"

"github.com/foxglove/mcap/go/mcap/readopts"
)

// rangeIndex refers to either a chunk (via the ChunkIndex, with other fields nil)
Expand All @@ -18,15 +16,15 @@ type rangeIndex struct {
// heap of rangeIndex entries, where the entries are sorted by their log time.
type rangeIndexHeap struct {
indices []rangeIndex
order readopts.ReadOrder
order ReadOrder
lastErr error
}

// key returns the comparison key used for elements in this heap.
func (h rangeIndexHeap) timestamp(i int) uint64 {
ri := h.indices[i]
if ri.messageIndexEntry == nil {
if h.order == readopts.ReverseLogTimeOrder {
if h.order == ReverseLogTimeOrder {
return ri.chunkIndex.MessageEndTime
}
return ri.chunkIndex.MessageStartTime
Expand Down Expand Up @@ -78,14 +76,14 @@ func (h *rangeIndexHeap) Pop() interface{} {
// Less is required by `heap.Interface`.
func (h *rangeIndexHeap) Less(i, j int) bool {
switch h.order {
case readopts.FileOrder:
case FileOrder:
return h.filePositionLess(i, j)
case readopts.LogTimeOrder:
case LogTimeOrder:
if h.timestamp(i) == h.timestamp(j) {
return h.filePositionLess(i, j)
}
return h.timestamp(i) < h.timestamp(j)
case readopts.ReverseLogTimeOrder:
case ReverseLogTimeOrder:
if h.timestamp(i) == h.timestamp(j) {
return h.filePositionLess(j, i)
}
Expand Down
9 changes: 4 additions & 5 deletions go/mcap/range_index_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"reflect"
"testing"

"github.com/foxglove/mcap/go/mcap/readopts"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -44,22 +43,22 @@ var rangeIndexHeapTestItems = []rangeIndex{
func TestMessageOrdering(t *testing.T) {
cases := []struct {
assertion string
order readopts.ReadOrder
order ReadOrder
expectedIndexOrder []int
}{
{
assertion: "read time order forwards",
order: readopts.LogTimeOrder,
order: LogTimeOrder,
expectedIndexOrder: []int{0, 1, 2, 3},
},
{
assertion: "read time order backwards",
order: readopts.ReverseLogTimeOrder,
order: ReverseLogTimeOrder,
expectedIndexOrder: []int{3, 0, 2, 1},
},
{
assertion: "read file order",
order: readopts.FileOrder,
order: FileOrder,
expectedIndexOrder: []int{0, 2, 1, 3},
},
}
Expand Down
Loading

0 comments on commit 80b5b85

Please sign in to comment.