Skip to content

Commit

Permalink
cli/merge: Avoid duplication of identical schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
achim-k committed Sep 28, 2023
1 parent 3a14afb commit e422d7f
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 0 deletions.
28 changes: 28 additions & 0 deletions go/cli/mcap/cmd/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cmd

import (
"container/heap"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -43,6 +45,7 @@ type channelID struct {
type mcapMerger struct {
schemaIDs map[schemaID]uint16
channelIDs map[channelID]uint16
schemaIdByHash map[string]uint16

nextChannelID uint16
nextSchemaID uint16
Expand All @@ -53,6 +56,7 @@ func newMCAPMerger(opts mergeOpts) *mcapMerger {
return &mcapMerger{
schemaIDs: make(map[schemaID]uint16),
channelIDs: make(map[channelID]uint16),
schemaIdByHash: make(map[string]uint16),
nextChannelID: 1,
nextSchemaID: 1,
opts: opts,
Expand Down Expand Up @@ -100,15 +104,32 @@ func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Chann
return newChannel.ID, nil
}

func getSchemaHash(schema *mcap.Schema) string {
hasher := md5.New()
hasher.Write([]byte(schema.Name))
hasher.Write([]byte(schema.Encoding))
hasher.Write(schema.Data)
hash := hasher.Sum(nil)
return hex.EncodeToString(hash[:])
}

func (m *mcapMerger) addSchema(w *mcap.Writer, inputID int, schema *mcap.Schema) (uint16, error) {
key := schemaID{inputID, schema.ID}
schemaHash := getSchemaHash(schema)
schemaId, schemaKnown := m.schemaIdByHash[schemaHash]
if schemaKnown {
m.schemaIDs[key] = schemaId
return schemaId, nil
}

newSchema := &mcap.Schema{
ID: m.nextSchemaID, // substitute the next output schema ID
Name: schema.Name,
Encoding: schema.Encoding,
Data: schema.Data,
}
m.schemaIDs[key] = m.nextSchemaID
m.schemaIdByHash[schemaHash] = m.nextSchemaID
err := w.WriteSchema(newSchema)
if err != nil {
return 0, fmt.Errorf("failed to write schema: %w", err)
Expand Down Expand Up @@ -148,6 +169,13 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []namedReader) error {
profiles := make([]string, len(inputs))
pq := utils.NewPriorityQueue(nil)

// Reset struct members
m.schemaIdByHash = make(map[string]uint16)
m.schemaIDs = make(map[schemaID]uint16)
m.channelIDs = make(map[channelID]uint16)
m.nextChannelID = 1
m.nextSchemaID = 1

// for each input reader, initialize an mcap reader and read the first
// message off. Insert the schema and channel into the output with
// renumbered IDs, and load the message (with renumbered IDs) into the
Expand Down
35 changes: 35 additions & 0 deletions go/cli/mcap/cmd/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,38 @@ func TestBadInputGivesNamedErrors(t *testing.T) {
}
}
}

func TestSameSchemasNotDuplicated(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
buf3 := &bytes.Buffer{}
prepInput(t, buf1, &mcap.Schema{ID:1, Name: "SchemaA"}, 1, "/foo")
prepInput(t, buf2, &mcap.Schema{ID:1, Name: "SchemaA"}, 1, "/bar")
prepInput(t, buf3, &mcap.Schema{ID:1, Name: "SchemaB"}, 1, "/baz")
merger := newMCAPMerger(mergeOpts{})
output := &bytes.Buffer{}
inputs := []namedReader{
{"buf1", buf1},
{"buf2", buf2},
{"buf3", buf3},
}
assert.Nil(t, merger.mergeInputs(output, inputs))
// output should now be a well-formed mcap
reader, err := mcap.NewReader(output)
assert.Nil(t, err)
assert.Equal(t, reader.Header().Profile, "testprofile")
it, err := reader.Messages(readopts.UsingIndex(false))
assert.Nil(t, err)
schemas := make(map[uint16]bool)
var schemaNames []string
err = mcap.Range(it, func(schema *mcap.Schema, channel *mcap.Channel, message *mcap.Message) error {
_, ok := schemas[schema.ID];
if !ok {
schemas[schema.ID] = true
schemaNames = append(schemaNames, schema.Name)
}
return nil
})
assert.Equal(t, 2, len(schemas))
assert.Equal(t, schemaNames, []string{"SchemaA", "SchemaB"})
}

0 comments on commit e422d7f

Please sign in to comment.