Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLI/merge: Avoid duplication of identical schemas #982

Merged
merged 3 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions go/cli/mcap/cmd/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cmd

import (
"container/heap"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -41,12 +43,9 @@ type channelID struct {
}

type mcapMerger struct {
schemas map[schemaID]*mcap.Schema
channels map[channelID]*mcap.Channel
schemaIDs map[schemaID]uint16
channelIDs map[channelID]uint16

outputChannelSchemas map[uint16]uint16
schemaIdByHash map[string]uint16

nextChannelID uint16
nextSchemaID uint16
Expand All @@ -55,11 +54,9 @@ type mcapMerger struct {

func newMCAPMerger(opts mergeOpts) *mcapMerger {
return &mcapMerger{
schemas: make(map[schemaID]*mcap.Schema),
channels: make(map[channelID]*mcap.Channel),
schemaIDs: make(map[schemaID]uint16),
channelIDs: make(map[channelID]uint16),
outputChannelSchemas: make(map[uint16]uint16),
schemaIdByHash: make(map[string]uint16),
nextChannelID: 1,
nextSchemaID: 1,
opts: opts,
Expand Down Expand Up @@ -98,7 +95,6 @@ func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Chann
MessageEncoding: channel.MessageEncoding,
Metadata: channel.Metadata,
}
m.channels[key] = channel
m.channelIDs[key] = m.nextChannelID
err := w.WriteChannel(newChannel)
if err != nil {
Expand All @@ -108,16 +104,32 @@ func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Chann
return newChannel.ID, nil
}

func getSchemaHash(schema *mcap.Schema) string {
hasher := md5.New()
hasher.Write([]byte(schema.Name))
hasher.Write([]byte(schema.Encoding))
hasher.Write(schema.Data)
hash := hasher.Sum(nil)
return hex.EncodeToString(hash[:])
}

func (m *mcapMerger) addSchema(w *mcap.Writer, inputID int, schema *mcap.Schema) (uint16, error) {
key := schemaID{inputID, schema.ID}
schemaHash := getSchemaHash(schema)
schemaId, schemaKnown := m.schemaIdByHash[schemaHash]
if schemaKnown {
m.schemaIDs[key] = schemaId
return schemaId, nil
}

newSchema := &mcap.Schema{
ID: m.nextSchemaID, // substitute the next output schema ID
Name: schema.Name,
Encoding: schema.Encoding,
Data: schema.Data,
}
m.schemas[key] = newSchema
m.schemaIDs[key] = m.nextSchemaID
m.schemaIdByHash[schemaHash] = m.nextSchemaID
err := w.WriteSchema(newSchema)
if err != nil {
return 0, fmt.Errorf("failed to write schema: %w", err)
Expand Down Expand Up @@ -157,6 +169,13 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []namedReader) error {
profiles := make([]string, len(inputs))
pq := utils.NewPriorityQueue(nil)

// Reset struct members
m.schemaIdByHash = make(map[string]uint16)
m.schemaIDs = make(map[schemaID]uint16)
m.channelIDs = make(map[channelID]uint16)
m.nextChannelID = 1
m.nextSchemaID = 1
Comment on lines +172 to +177
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I have to reset the schemaIdByHash as we might have a new writer where the schemas haven't been written to yet.
In general, I don't fully understand the purpose of the mcapMerger struct. Wouldn't a single function be sufficient? I don't think users are supposed to call mcapMerger.addSchema themselves? But maybe I don't fully understand Go's concept of private members yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could definitely be written as a single function. There are no users of mcapMerger since it's private. I don't remember the details about mcapMerger but I'm pretty sure it only exists to assist with whatever remapping of channel/schema IDs are required, and you could write that either way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you doing this resetting because you anticipate mergeInputs will be called multiple times?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^ this seems reasonable to me (though I don't think it actually will be called that way), as would writing a single function -- maybe that's why you were asking.

Patch looks good to me generally, thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you doing this resetting because you anticipate mergeInputs will be called multiple times?

It's called multiple times in the tests. The resetting was required to make it succeed.


// for each input reader, initialize an mcap reader and read the first
// message off. Insert the schema and channel into the output with
// renumbered IDs, and load the message (with renumbered IDs) into the
Expand Down
65 changes: 49 additions & 16 deletions go/cli/mcap/cmd/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,19 @@ import (
"github.com/stretchr/testify/assert"
)

func prepInput(t *testing.T, w io.Writer, schemaID uint16, channelID uint16, topic string) {
func prepInput(t *testing.T, w io.Writer, schema *mcap.Schema, channelID uint16, topic string) {
writer, err := mcap.NewWriter(w, &mcap.WriterOptions{
Chunked: true,
})
assert.Nil(t, err)

assert.Nil(t, writer.WriteHeader(&mcap.Header{Profile: "testprofile"}))
if schemaID != 0 {
assert.Nil(t, writer.WriteSchema(&mcap.Schema{
ID: schemaID,
}))
if schema.ID != 0 {
assert.Nil(t, writer.WriteSchema(schema))
}
assert.Nil(t, writer.WriteChannel(&mcap.Channel{
ID: channelID,
SchemaID: schemaID,
SchemaID: schema.ID,
Topic: topic,
}))
for i := 0; i < 100; i++ {
Expand All @@ -42,9 +40,9 @@ func TestMCAPMerging(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
buf3 := &bytes.Buffer{}
prepInput(t, buf1, 1, 1, "/foo")
prepInput(t, buf2, 1, 1, "/bar")
prepInput(t, buf3, 1, 1, "/baz")
prepInput(t, buf1, &mcap.Schema{ID:1}, 1, "/foo")
prepInput(t, buf2, &mcap.Schema{ID:1}, 1, "/bar")
prepInput(t, buf3, &mcap.Schema{ID:1}, 1, "/baz")
merger := newMCAPMerger(mergeOpts{
chunked: chunked,
})
Expand Down Expand Up @@ -136,8 +134,8 @@ func TestChannelsWithSameSchema(t *testing.T) {
func TestMultiChannelInput(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
prepInput(t, buf1, 1, 1, "/foo")
prepInput(t, buf2, 1, 1, "/bar")
prepInput(t, buf1, &mcap.Schema{ID:1}, 1, "/foo")
prepInput(t, buf2, &mcap.Schema{ID:1}, 1, "/bar")
merger := newMCAPMerger(mergeOpts{})
multiChannelInput := &bytes.Buffer{}
inputs := []namedReader{
Expand All @@ -146,7 +144,7 @@ func TestMultiChannelInput(t *testing.T) {
}
assert.Nil(t, merger.mergeInputs(multiChannelInput, inputs))
buf3 := &bytes.Buffer{}
prepInput(t, buf3, 2, 2, "/baz")
prepInput(t, buf3, &mcap.Schema{ID:2}, 2, "/baz")
output := &bytes.Buffer{}
inputs2 := []namedReader{
{"multiChannelInput", multiChannelInput},
Expand All @@ -172,8 +170,8 @@ func TestMultiChannelInput(t *testing.T) {
func TestSchemalessChannelInput(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
prepInput(t, buf1, 0, 1, "/foo")
prepInput(t, buf2, 1, 1, "/bar")
prepInput(t, buf1, &mcap.Schema{ID:0}, 1, "/foo")
prepInput(t, buf2, &mcap.Schema{ID:1}, 1, "/bar")
merger := newMCAPMerger(mergeOpts{})
output := &bytes.Buffer{}
inputs := []namedReader{
Expand Down Expand Up @@ -264,7 +262,7 @@ func TestBadInputGivesNamedErrors(t *testing.T) {
"bad magic",
func() *bytes.Buffer {
buf := &bytes.Buffer{}
prepInput(t, buf, 0, 1, "/foo")
prepInput(t, buf, &mcap.Schema{ID:0}, 1, "/foo")
buf.Bytes()[0] = 0x00
return buf
},
Expand All @@ -274,7 +272,7 @@ func TestBadInputGivesNamedErrors(t *testing.T) {
"bad content",
func() *bytes.Buffer {
buf := &bytes.Buffer{}
prepInput(t, buf, 0, 1, "/foo")
prepInput(t, buf, &mcap.Schema{ID:0}, 1, "/foo")
for i := 3000; i < 4000; i++ {
buf.Bytes()[i] = 0x00
}
Expand Down Expand Up @@ -302,3 +300,38 @@ func TestBadInputGivesNamedErrors(t *testing.T) {
}
}
}

func TestSameSchemasNotDuplicated(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
buf3 := &bytes.Buffer{}
prepInput(t, buf1, &mcap.Schema{ID:1, Name: "SchemaA"}, 1, "/foo")
prepInput(t, buf2, &mcap.Schema{ID:1, Name: "SchemaA"}, 1, "/bar")
prepInput(t, buf3, &mcap.Schema{ID:1, Name: "SchemaB"}, 1, "/baz")
merger := newMCAPMerger(mergeOpts{})
output := &bytes.Buffer{}
inputs := []namedReader{
{"buf1", buf1},
{"buf2", buf2},
{"buf3", buf3},
}
assert.Nil(t, merger.mergeInputs(output, inputs))
// output should now be a well-formed mcap
reader, err := mcap.NewReader(output)
assert.Nil(t, err)
assert.Equal(t, reader.Header().Profile, "testprofile")
it, err := reader.Messages(readopts.UsingIndex(false))
assert.Nil(t, err)
schemas := make(map[uint16]bool)
var schemaNames []string
err = mcap.Range(it, func(schema *mcap.Schema, channel *mcap.Channel, message *mcap.Message) error {
_, ok := schemas[schema.ID];
if !ok {
schemas[schema.ID] = true
schemaNames = append(schemaNames, schema.Name)
}
return nil
})
assert.Equal(t, 2, len(schemas))
assert.Equal(t, schemaNames, []string{"SchemaA", "SchemaB"})
}
Loading