Skip to content

Commit

Permalink
Add flag for merging metadata
Browse files Browse the repository at this point in the history
Adds a --merge-metadata flag that will cause metadata records to get
merged by the merge command. When supplied,
* metadata records from the inputs that hash identically in
  name+metadata will be deduplicated.
* metadata records will otherwise be merged into the output in
  deterministic, but not necessarily principled order. This can cause
  issues if users are using metadata records to effect "overwrites" or
  "edits".

The behavior defaults to false/off.
  • Loading branch information
Wyatt Alt committed Nov 2, 2023
1 parent ed69bb9 commit 9f71938
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 41 deletions.
96 changes: 59 additions & 37 deletions go/cli/mcap/cmd/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"container/heap"
"crypto/md5"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -20,13 +21,15 @@ var (
mergeIncludeCRC bool
mergeChunked bool
mergeOutputFile string
mergeMetadata bool
)

type mergeOpts struct {
compression string
chunkSize int64
includeCRC bool
chunked bool
compression string
chunkSize int64
includeCRC bool
chunked bool
mergeMetadata bool
}

// schemaID uniquely identifies a schema across the inputs.
Expand All @@ -45,17 +48,18 @@ type mcapMerger struct {
schemaIDs map[schemaID]uint16
channelIDs map[channelID]uint16
schemaIDByHash map[string]uint16

nextChannelID uint16
nextSchemaID uint16
opts mergeOpts
metadataHashes map[string]bool
nextChannelID uint16
nextSchemaID uint16
opts mergeOpts
}

func newMCAPMerger(opts mergeOpts) *mcapMerger {
return &mcapMerger{
schemaIDs: make(map[schemaID]uint16),
channelIDs: make(map[channelID]uint16),
schemaIDByHash: make(map[string]uint16),
metadataHashes: make(map[string]bool),
nextChannelID: 1,
nextSchemaID: 1,
opts: opts,
Expand All @@ -81,10 +85,29 @@ func (m *mcapMerger) outputSchemaID(inputID int, inputSchemaID uint16) (uint16,
return v, ok
}

func hashMetadata(metadata *mcap.Metadata) (string, error) {
hasher := md5.New()
hasher.Write([]byte(metadata.Name))
bytes, err := json.Marshal(metadata.Metadata)
if err != nil {
return "", err
}
hasher.Write(bytes)
hash := hasher.Sum(nil)
return hex.EncodeToString(hash), nil
}

func (m *mcapMerger) addMetadata(w *mcap.Writer, metadata *mcap.Metadata) error {
err := w.WriteMetadata(metadata)
hash, err := hashMetadata(metadata)
if err != nil {
return fmt.Errorf("failed to write metadata: %w", err)
return fmt.Errorf("failed to compute metadata hash: %w", err)
}
if !m.metadataHashes[hash] {
err := w.WriteMetadata(metadata)
if err != nil {
return fmt.Errorf("failed to write metadata: %w", err)
}
m.metadataHashes[hash] = true
}
return nil
}
Expand Down Expand Up @@ -168,9 +191,6 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []namedReader) error {
if err != nil {
return fmt.Errorf("failed to create writer: %w", err)
}
if err != nil {
return fmt.Errorf("failed to write header: %w", err)
}

iterators := make([]mcap.MessageIterator, len(inputs))
profiles := make([]string, len(inputs))
Expand All @@ -188,29 +208,23 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []namedReader) error {
// renumbered IDs, and load the message (with renumbered IDs) into the
// priority queue.
for inputID, input := range inputs {
err := func() error {
reader, err := mcap.NewReader(input.reader)
if err != nil {
return fmt.Errorf("failed to open reader on %s: %w", input.name, err)
}
defer reader.Close()
profiles[inputID] = reader.Header().Profile
opts := []mcap.ReadOpt{readopts.UsingIndex(false)}
if m.opts.mergeMetadata {
opts = append(opts, mcap.WithMetadataCallback(func(metadata *mcap.Metadata) error {
return m.addMetadata(writer, metadata)
})
}
iterator, err := reader.Messages(opts...)
if err != nil {
return fmt.Errorf("failed to read messages on %s: %w", input.name, err)
}
iterators[inputID] = iterator
return nil
}()
reader, err := mcap.NewReader(input.reader)
if err != nil {
return fmt.Errorf("failed to open reader on %s: %w", input.name, err)
}
defer reader.Close() //nolint:gocritic // we actually want these defered in the loop.
profiles[inputID] = reader.Header().Profile
opts := []mcap.ReadOpt{mcap.UsingIndex(false)}
if m.opts.mergeMetadata {
opts = append(opts, mcap.WithMetadataCallback(func(metadata *mcap.Metadata) error {
return m.addMetadata(writer, metadata)
}))
}
iterator, err := reader.Messages(opts...)
if err != nil {
return err
}
iterators[inputID] = iterator
}
if err := writer.WriteHeader(&mcap.Header{Profile: outputProfile(profiles)}); err != nil {
return err
Expand Down Expand Up @@ -313,10 +327,11 @@ var mergeCmd = &cobra.Command{
readers = append(readers, namedReader{name: arg, reader: f})
}
opts := mergeOpts{
compression: mergeCompression,
chunkSize: mergeChunkSize,
includeCRC: mergeIncludeCRC,
chunked: mergeChunked,
compression: mergeCompression,
chunkSize: mergeChunkSize,
includeCRC: mergeIncludeCRC,
chunked: mergeChunked,
mergeMetadata: mergeMetadata,
}
merger := newMCAPMerger(opts)
var writer io.Writer
Expand Down Expand Up @@ -374,4 +389,11 @@ func init() {
true,
"chunk the output file",
)
mergeCmd.PersistentFlags().BoolVarP(
&mergeMetadata,
"merge-metadata",
"",
false,
"include metadata records in merged output",
)
}
9 changes: 5 additions & 4 deletions go/cli/mcap/cmd/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func TestMCAPMerging(t *testing.T) {
prepInput(t, buf2, &mcap.Schema{ID: 1}, 1, "/bar")
prepInput(t, buf3, &mcap.Schema{ID: 1}, 1, "/baz")
merger := newMCAPMerger(mergeOpts{
chunked: chunked,
chunked: chunked,
mergeMetadata: true,
})
output := &bytes.Buffer{}
inputs := []namedReader{
Expand All @@ -78,10 +79,10 @@ func TestMCAPMerging(t *testing.T) {
assert.Equal(t, 100, messages["/bar"])
assert.Equal(t, 100, messages["/baz"])

// check we got 3 parsable metadatas
// check we got one parsable metadatas
info, err := reader.Info()
assert.Nil(t, err)
assert.Equal(t, 3, len(info.MetadataIndexes))
assert.Equal(t, 1, len(info.MetadataIndexes))
for _, idx := range info.MetadataIndexes {
_, err := reader.GetMetadata(idx.Offset)
assert.Nil(t, err)
Expand Down Expand Up @@ -337,7 +338,7 @@ func TestSameSchemasNotDuplicated(t *testing.T) {
reader, err := mcap.NewReader(output)
assert.Nil(t, err)
assert.Equal(t, reader.Header().Profile, "testprofile")
it, err := reader.Messages(readopts.UsingIndex(false))
it, err := reader.Messages(mcap.UsingIndex(false))
assert.Nil(t, err)
schemas := make(map[uint16]bool)
var schemaNames []string
Expand Down

0 comments on commit 9f71938

Please sign in to comment.