diff --git a/go/cli/mcap/cmd/merge.go b/go/cli/mcap/cmd/merge.go index 81446c78b..62ec3faa3 100644 --- a/go/cli/mcap/cmd/merge.go +++ b/go/cli/mcap/cmd/merge.go @@ -46,9 +46,9 @@ type channelID struct { } type mcapMerger struct { - schemaIDs map[schemaID]uint16 - channelIDs map[channelID]uint16 - schemaIDByHash map[string]uint16 + schemaIDs map[schemaID]uint16 + channelIDs map[channelID]uint16 + schemaIDByHash map[string]uint16 channelIDByHash map[string]uint16 nextChannelID uint16 @@ -56,15 +56,21 @@ type mcapMerger struct { opts mergeOpts } +const ( + AutoCoalescing = "auto" + ForceCoalescing = "force" + NoCoalescing = "none" +) + func newMCAPMerger(opts mergeOpts) *mcapMerger { return &mcapMerger{ - schemaIDs: make(map[schemaID]uint16), - channelIDs: make(map[channelID]uint16), - schemaIDByHash: make(map[string]uint16), - channelIDByHash: make(map[string]uint16), - nextChannelID: 1, - nextSchemaID: 1, - opts: opts, + schemaIDs: make(map[schemaID]uint16), + channelIDs: make(map[channelID]uint16), + schemaIDByHash: make(map[string]uint16), + channelIDByHash: make(map[string]uint16), + nextChannelID: 1, + nextSchemaID: 1, + opts: opts, } } @@ -96,19 +102,19 @@ func getChannelHash(channel *mcap.Channel, coalesceChannels string) string { hasher.Write([]byte(channel.MessageEncoding)) switch coalesceChannels { - case "auto": // Include channel metadata in hash - for key, value := range channel.Metadata { - hasher.Write([]byte(key)) - hasher.Write([]byte(value)) - } - case "force": // Channel metadata is not included in hash - break; - default: - die("Invalid value for --coalesce-channels: %s\n", coalesceChannels) + case AutoCoalescing: // Include channel metadata in hash + for key, value := range channel.Metadata { + hasher.Write([]byte(key)) + hasher.Write([]byte(value)) + } + case ForceCoalescing: // Channel metadata is not included in hash + break + default: + die("Invalid value for --coalesce-channels: %s\n", coalesceChannels) } hash := hasher.Sum(nil) - return hex.EncodeToString(hash[:]) + return hex.EncodeToString(hash) } func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Channel) (uint16, error) { @@ -125,7 +131,7 @@ func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Chann Metadata: channel.Metadata, } - if m.opts.coalesceChannels != "none" { + if m.opts.coalesceChannels != NoCoalescing { channelHash := getChannelHash(newChannel, m.opts.coalesceChannels) channelID, channelKnown := m.channelIDByHash[channelHash] if channelKnown { diff --git a/go/cli/mcap/cmd/merge_test.go b/go/cli/mcap/cmd/merge_test.go index b80c236ea..7df1ec9ca 100644 --- a/go/cli/mcap/cmd/merge_test.go +++ b/go/cli/mcap/cmd/merge_test.go @@ -22,11 +22,11 @@ func prepInput(t *testing.T, w io.Writer, schema *mcap.Schema, channel *mcap.Cha assert.Nil(t, writer.WriteSchema(schema)) } assert.Nil(t, writer.WriteChannel(&mcap.Channel{ - ID: channel.ID, - SchemaID: schema.ID, - Topic: channel.Topic, + ID: channel.ID, + SchemaID: schema.ID, + Topic: channel.Topic, MessageEncoding: channel.MessageEncoding, - Metadata: channel.Metadata, + Metadata: channel.Metadata, })) for i := 0; i < 100; i++ { assert.Nil(t, writer.WriteMessage(&mcap.Message{ @@ -42,11 +42,11 @@ func TestMCAPMerging(t *testing.T) { buf1 := &bytes.Buffer{} buf2 := &bytes.Buffer{} buf3 := &bytes.Buffer{} - prepInput(t, buf1, &mcap.Schema{ID:1}, &mcap.Channel{ID:1, Topic: "/foo"}) - prepInput(t, buf2, &mcap.Schema{ID:1}, &mcap.Channel{ID:1, Topic: "/bar"}) - prepInput(t, buf3, &mcap.Schema{ID:1}, &mcap.Channel{ID:1, Topic: "/baz"}) + prepInput(t, buf1, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/foo"}) + prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/bar"}) + prepInput(t, buf3, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/baz"}) merger := newMCAPMerger(mergeOpts{ - chunked: chunked, + chunked: chunked, coalesceChannels: "none", }) output := &bytes.Buffer{} @@ -119,7 +119,7 @@ func TestChannelsWithSameSchema(t *testing.T) { })) assert.Nil(t, writer.Close()) merger := newMCAPMerger(mergeOpts{ - chunked: true, + chunked: true, coalesceChannels: "none", }) output := &bytes.Buffer{} @@ -138,8 +138,8 @@ func TestChannelsWithSameSchema(t *testing.T) { func TestMultiChannelInput(t *testing.T) { buf1 := &bytes.Buffer{} buf2 := &bytes.Buffer{} - prepInput(t, buf1, &mcap.Schema{ID:1}, &mcap.Channel{ID:1, Topic: "/foo"}) - prepInput(t, buf2, &mcap.Schema{ID:1}, &mcap.Channel{ID:1, Topic: "/bar"}) + prepInput(t, buf1, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/foo"}) + prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/bar"}) merger := newMCAPMerger(mergeOpts{coalesceChannels: "none"}) multiChannelInput := &bytes.Buffer{} inputs := []namedReader{ @@ -148,7 +148,7 @@ func TestMultiChannelInput(t *testing.T) { } assert.Nil(t, merger.mergeInputs(multiChannelInput, inputs)) buf3 := &bytes.Buffer{} - prepInput(t, buf3, &mcap.Schema{ID:2}, &mcap.Channel{ID:2, Topic: "/baz"}) + prepInput(t, buf3, &mcap.Schema{ID: 2}, &mcap.Channel{ID: 2, Topic: "/baz"}) output := &bytes.Buffer{} inputs2 := []namedReader{ {"multiChannelInput", multiChannelInput}, @@ -174,8 +174,8 @@ func TestMultiChannelInput(t *testing.T) { func TestSchemalessChannelInput(t *testing.T) { buf1 := &bytes.Buffer{} buf2 := &bytes.Buffer{} - prepInput(t, buf1, &mcap.Schema{ID:0}, &mcap.Channel{ID:1, Topic: "/foo"}) - prepInput(t, buf2, &mcap.Schema{ID:1}, &mcap.Channel{ID:1, Topic: "/bar"}) + prepInput(t, buf1, &mcap.Schema{ID: 0}, &mcap.Channel{ID: 1, Topic: "/foo"}) + prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/bar"}) merger := newMCAPMerger(mergeOpts{coalesceChannels: "none"}) output := &bytes.Buffer{} inputs := []namedReader{ @@ -266,7 +266,7 @@ func TestBadInputGivesNamedErrors(t *testing.T) { "bad magic", func() *bytes.Buffer { buf := &bytes.Buffer{} - prepInput(t, buf, &mcap.Schema{ID:0}, &mcap.Channel{ID:1, Topic: "/foo"}) + prepInput(t, buf, &mcap.Schema{ID: 0}, &mcap.Channel{ID: 1, Topic: "/foo"}) buf.Bytes()[0] = 0x00 return buf }, @@ -276,7 +276,7 @@ func TestBadInputGivesNamedErrors(t *testing.T) { "bad content", func() *bytes.Buffer { buf := &bytes.Buffer{} - prepInput(t, buf, &mcap.Schema{ID:0}, &mcap.Channel{ID:1, Topic: "/foo"}) + prepInput(t, buf, &mcap.Schema{ID: 0}, &mcap.Channel{ID: 1, Topic: "/foo"}) for i := 3000; i < 4000; i++ { buf.Bytes()[i] = 0x00 } @@ -290,7 +290,7 @@ func TestBadInputGivesNamedErrors(t *testing.T) { t.Run(fmt.Sprintf("%s chunked %v", c.assertion, chunked), func(t *testing.T) { buf := c.input() merger := newMCAPMerger(mergeOpts{ - chunked: chunked, + chunked: chunked, coalesceChannels: "none", }) inputs := []namedReader{ @@ -310,9 +310,9 @@ func TestSameSchemasNotDuplicated(t *testing.T) { buf1 := &bytes.Buffer{} buf2 := &bytes.Buffer{} buf3 := &bytes.Buffer{} - prepInput(t, buf1, &mcap.Schema{ID:1, Name: "SchemaA"}, &mcap.Channel{ID:1, Topic: "/foo"}) - prepInput(t, buf2, &mcap.Schema{ID:1, Name: "SchemaA"}, &mcap.Channel{ID:1, Topic: "/bar"}) - prepInput(t, buf3, &mcap.Schema{ID:1, Name: "SchemaB"}, &mcap.Channel{ID:1, Topic: "/baz"}) + prepInput(t, buf1, &mcap.Schema{ID: 1, Name: "SchemaA"}, &mcap.Channel{ID: 1, Topic: "/foo"}) + prepInput(t, buf2, &mcap.Schema{ID: 1, Name: "SchemaA"}, &mcap.Channel{ID: 1, Topic: "/bar"}) + prepInput(t, buf3, &mcap.Schema{ID: 1, Name: "SchemaB"}, &mcap.Channel{ID: 1, Topic: "/baz"}) merger := newMCAPMerger(mergeOpts{coalesceChannels: "none"}) output := &bytes.Buffer{} inputs := []namedReader{ @@ -346,8 +346,8 @@ func TestSameSchemasNotDuplicated(t *testing.T) { func TestChannelCoalesceBehavior(t *testing.T) { expectedMsgCountByChannel := map[string]map[uint16]int{ - "none": {1: 100, 2: 100, 3: 100, 4: 100}, - "auto": {1: 200, 2: 100, 3: 100}, + "none": {1: 100, 2: 100, 3: 100, 4: 100}, + "auto": {1: 200, 2: 100, 3: 100}, "force": {1: 300, 2: 100}, } @@ -356,10 +356,10 @@ func TestChannelCoalesceBehavior(t *testing.T) { buf2 := &bytes.Buffer{} buf3 := &bytes.Buffer{} buf4 := &bytes.Buffer{} - prepInput(t, buf1, &mcap.Schema{ID:1}, &mcap.Channel{ID:1, Topic: "/foo"}) - prepInput(t, buf2, &mcap.Schema{ID:1}, &mcap.Channel{ID:2, Topic: "/foo"}) - prepInput(t, buf3, &mcap.Schema{ID:1}, &mcap.Channel{ID:3, Topic: "/foo", Metadata: map[string]string{"k": "v"}}) - prepInput(t, buf4, &mcap.Schema{ID:1}, &mcap.Channel{ID:4, Topic: "/bar"}) + prepInput(t, buf1, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/foo"}) + prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 2, Topic: "/foo"}) + prepInput(t, buf3, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 3, Topic: "/foo", Metadata: map[string]string{"k": "v"}}) + prepInput(t, buf4, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 4, Topic: "/bar"}) output := &bytes.Buffer{} inputs := []namedReader{ {"buf1", buf1}, @@ -380,6 +380,7 @@ func TestChannelCoalesceBehavior(t *testing.T) { messages[channel.ID]++ return nil }) + assert.Nil(t, err) assert.Equal(t, messagesByChannel, messages) } }