From 79d90689a1216b83d1d6b3ac25a197ca28a695cb Mon Sep 17 00:00:00 2001 From: Hans-Joachim Krauch Date: Mon, 2 Oct 2023 13:16:15 +0200 Subject: [PATCH] go/cli: Coalesce identical channels by default, allow to opt out Changes the default behavior of `mcap merge` to automatically coalesce identical channels. Channels are considered identical when their topic, schema, message encoding and metadata equals. One can force coalescing of channels with different metadata with `--coalesce-channels force`. Channel coalescing can also be completely disabled with `--coalesce-channels none`. --- go/cli/mcap/cmd/merge.go | 91 ++++++++++++++++++++++++++++------ go/cli/mcap/cmd/merge_test.go | 93 ++++++++++++++++++++++++++--------- 2 files changed, 146 insertions(+), 38 deletions(-) diff --git a/go/cli/mcap/cmd/merge.go b/go/cli/mcap/cmd/merge.go index 85d74207af..61106af374 100644 --- a/go/cli/mcap/cmd/merge.go +++ b/go/cli/mcap/cmd/merge.go @@ -3,6 +3,7 @@ package cmd import ( "container/heap" "crypto/md5" + "encoding/binary" "encoding/hex" "encoding/json" "errors" @@ -36,6 +37,7 @@ var ( mergeChunked bool mergeOutputFile string mergeAllowDuplicateMetadata bool + coalesceChannels string ) type mergeOpts struct { @@ -44,6 +46,7 @@ type mergeOpts struct { includeCRC bool chunked bool allowDuplicateMetadata bool + coalesceChannels string } // schemaID uniquely identifies a schema across the inputs. @@ -59,26 +62,34 @@ type channelID struct { } type mcapMerger struct { - schemaIDs map[schemaID]uint16 - channelIDs map[channelID]uint16 - schemaIDByHash map[string]uint16 - metadataHashes map[string]bool - metadataNames map[string]bool - nextChannelID uint16 - nextSchemaID uint16 - opts mergeOpts + schemaIDs map[schemaID]uint16 + channelIDs map[channelID]uint16 + schemaIDByHash map[string]uint16 + channelIDByHash map[string]uint16 + metadataHashes map[string]bool + metadataNames map[string]bool + nextChannelID uint16 + nextSchemaID uint16 + opts mergeOpts } +const ( + AutoCoalescing = "auto" + ForceCoalescing = "force" + NoCoalescing = "none" +) + func newMCAPMerger(opts mergeOpts) *mcapMerger { return &mcapMerger{ - schemaIDs: make(map[schemaID]uint16), - channelIDs: make(map[channelID]uint16), - schemaIDByHash: make(map[string]uint16), - metadataHashes: make(map[string]bool), - metadataNames: make(map[string]bool), - nextChannelID: 1, - nextSchemaID: 1, - opts: opts, + schemaIDs: make(map[schemaID]uint16), + channelIDs: make(map[channelID]uint16), + schemaIDByHash: make(map[string]uint16), + channelIDByHash: make(map[string]uint16), + metadataHashes: make(map[string]bool), + metadataNames: make(map[string]bool), + nextChannelID: 1, + nextSchemaID: 1, + opts: opts, } } @@ -132,6 +143,30 @@ func (m *mcapMerger) addMetadata(w *mcap.Writer, metadata *mcap.Metadata) error return nil } +func getChannelHash(channel *mcap.Channel, coalesceChannels string) string { + hasher := md5.New() + schemaIDBytes := make([]byte, 2) + binary.LittleEndian.PutUint16(schemaIDBytes, channel.SchemaID) + hasher.Write(schemaIDBytes) + hasher.Write([]byte(channel.Topic)) + hasher.Write([]byte(channel.MessageEncoding)) + + switch coalesceChannels { + case AutoCoalescing: // Include channel metadata in hash + for key, value := range channel.Metadata { + hasher.Write([]byte(key)) + hasher.Write([]byte(value)) + } + case ForceCoalescing: // Channel metadata is not included in hash + break + default: + die("Invalid value for --coalesce-channels: %s\n", coalesceChannels) + } + + hash := hasher.Sum(nil) + return hex.EncodeToString(hash) +} + func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Channel) (uint16, error) { outputSchemaID, ok := m.outputSchemaID(inputID, channel.SchemaID) if !ok { @@ -145,6 +180,17 @@ func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Chann MessageEncoding: channel.MessageEncoding, Metadata: channel.Metadata, } + + if m.opts.coalesceChannels != NoCoalescing { + channelHash := getChannelHash(newChannel, m.opts.coalesceChannels) + channelID, channelKnown := m.channelIDByHash[channelHash] + if channelKnown { + m.channelIDs[key] = channelID + return channelID, nil + } + m.channelIDByHash[channelHash] = m.nextChannelID + } + m.channelIDs[key] = m.nextChannelID err := w.WriteChannel(newChannel) if err != nil { @@ -218,6 +264,7 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []namedReader) error { // Reset struct members m.schemaIDByHash = make(map[string]uint16) + m.channelIDByHash = make(map[string]uint16) m.schemaIDs = make(map[schemaID]uint16) m.channelIDs = make(map[channelID]uint16) m.nextChannelID = 1 @@ -351,6 +398,7 @@ var mergeCmd = &cobra.Command{ includeCRC: mergeIncludeCRC, chunked: mergeChunked, allowDuplicateMetadata: mergeAllowDuplicateMetadata, + coalesceChannels: coalesceChannels, } merger := newMCAPMerger(opts) var writer io.Writer @@ -415,4 +463,15 @@ func init() { false, "Allow duplicate-named metadata records to be merged in the output", ) + mergeCmd.PersistentFlags().StringVarP( + &coalesceChannels, + "coalesce-channels", + "", + "auto", + `channel coalescing behavior (supported: auto, force, none). + - auto: Coalesce channels with matching topic, schema and metadata + - force: Same as auto but ignores metadata + - none: Do not coalesce channels +`, + ) } diff --git a/go/cli/mcap/cmd/merge_test.go b/go/cli/mcap/cmd/merge_test.go index fc4e43ed53..3e3e2fedd0 100644 --- a/go/cli/mcap/cmd/merge_test.go +++ b/go/cli/mcap/cmd/merge_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/assert" ) -func prepInput(t *testing.T, w io.Writer, schema *mcap.Schema, channelID uint16, topic string) { +func prepInput(t *testing.T, w io.Writer, schema *mcap.Schema, channel *mcap.Channel) { writer, err := mcap.NewWriter(w, &mcap.WriterOptions{ Chunked: true, }) @@ -21,13 +21,15 @@ func prepInput(t *testing.T, w io.Writer, schema *mcap.Schema, channelID uint16, assert.Nil(t, writer.WriteSchema(schema)) } assert.Nil(t, writer.WriteChannel(&mcap.Channel{ - ID: channelID, - SchemaID: schema.ID, - Topic: topic, + ID: channel.ID, + SchemaID: schema.ID, + Topic: channel.Topic, + MessageEncoding: channel.MessageEncoding, + Metadata: channel.Metadata, })) for i := 0; i < 100; i++ { assert.Nil(t, writer.WriteMessage(&mcap.Message{ - ChannelID: channelID, + ChannelID: channel.ID, LogTime: uint64(i), })) } @@ -36,7 +38,7 @@ func prepInput(t *testing.T, w io.Writer, schema *mcap.Schema, channelID uint16, Name: "a", Metadata: map[string]string{ "b": "c", - "topic": topic, + "topic": channel.Topic, }, })) @@ -74,11 +76,12 @@ func TestMCAPMerging(t *testing.T) { buf1 := &bytes.Buffer{} buf2 := &bytes.Buffer{} buf3 := &bytes.Buffer{} - prepInput(t, buf1, &mcap.Schema{ID: 1}, 1, "/foo") - prepInput(t, buf2, &mcap.Schema{ID: 1}, 1, "/bar") - prepInput(t, buf3, &mcap.Schema{ID: 1}, 1, "/baz") + prepInput(t, buf1, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/foo"}) + prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/bar"}) + prepInput(t, buf3, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/baz"}) c.opts.chunked = chunked + c.opts.coalesceChannels = "none" merger := newMCAPMerger(c.opts) output := &bytes.Buffer{} inputs := []namedReader{ @@ -163,7 +166,8 @@ func TestChannelsWithSameSchema(t *testing.T) { })) assert.Nil(t, writer.Close()) merger := newMCAPMerger(mergeOpts{ - chunked: true, + chunked: true, + coalesceChannels: "none", }) output := &bytes.Buffer{} assert.Nil(t, merger.mergeInputs(output, []namedReader{{"buf", buf}})) @@ -181,10 +185,11 @@ func TestChannelsWithSameSchema(t *testing.T) { func TestMultiChannelInput(t *testing.T) { buf1 := &bytes.Buffer{} buf2 := &bytes.Buffer{} - prepInput(t, buf1, &mcap.Schema{ID: 1}, 1, "/foo") - prepInput(t, buf2, &mcap.Schema{ID: 1}, 1, "/bar") + prepInput(t, buf1, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/foo"}) + prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/bar"}) merger := newMCAPMerger(mergeOpts{ allowDuplicateMetadata: true, + coalesceChannels: "none", }) multiChannelInput := &bytes.Buffer{} inputs := []namedReader{ @@ -193,7 +198,7 @@ func TestMultiChannelInput(t *testing.T) { } assert.Nil(t, merger.mergeInputs(multiChannelInput, inputs)) buf3 := &bytes.Buffer{} - prepInput(t, buf3, &mcap.Schema{ID: 2}, 2, "/baz") + prepInput(t, buf3, &mcap.Schema{ID: 2}, &mcap.Channel{ID: 2, Topic: "/baz"}) output := &bytes.Buffer{} inputs2 := []namedReader{ {"multiChannelInput", multiChannelInput}, @@ -219,10 +224,11 @@ func TestMultiChannelInput(t *testing.T) { func TestSchemalessChannelInput(t *testing.T) { buf1 := &bytes.Buffer{} buf2 := &bytes.Buffer{} - prepInput(t, buf1, &mcap.Schema{ID: 0}, 1, "/foo") - prepInput(t, buf2, &mcap.Schema{ID: 1}, 1, "/bar") + prepInput(t, buf1, &mcap.Schema{ID: 0}, &mcap.Channel{ID: 1, Topic: "/foo"}) + prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/bar"}) merger := newMCAPMerger(mergeOpts{ allowDuplicateMetadata: true, + coalesceChannels: "none", }) output := &bytes.Buffer{} inputs := []namedReader{ @@ -277,7 +283,7 @@ func TestMultipleSchemalessChannelSingleInput(t *testing.T) { })) assert.Nil(t, writer.Close()) - merger := newMCAPMerger(mergeOpts{}) + merger := newMCAPMerger(mergeOpts{coalesceChannels: "none"}) output := &bytes.Buffer{} inputs := []namedReader{ {"buf", buf}, @@ -313,7 +319,7 @@ func TestBadInputGivesNamedErrors(t *testing.T) { "bad magic", func() *bytes.Buffer { buf := &bytes.Buffer{} - prepInput(t, buf, &mcap.Schema{ID: 0}, 1, "/foo") + prepInput(t, buf, &mcap.Schema{ID: 0}, &mcap.Channel{ID: 1, Topic: "/foo"}) buf.Bytes()[0] = 0x00 return buf }, @@ -323,7 +329,7 @@ func TestBadInputGivesNamedErrors(t *testing.T) { "bad content", func() *bytes.Buffer { buf := &bytes.Buffer{} - prepInput(t, buf, &mcap.Schema{ID: 0}, 1, "/foo") + prepInput(t, buf, &mcap.Schema{ID: 0}, &mcap.Channel{ID: 1, Topic: "/foo"}) for i := 3000; i < 4000; i++ { buf.Bytes()[i] = 0x00 } @@ -337,7 +343,8 @@ func TestBadInputGivesNamedErrors(t *testing.T) { t.Run(fmt.Sprintf("%s chunked %v", c.assertion, chunked), func(t *testing.T) { buf := c.input() merger := newMCAPMerger(mergeOpts{ - chunked: chunked, + chunked: chunked, + coalesceChannels: "none", }) inputs := []namedReader{ {"filename", buf}, @@ -356,11 +363,12 @@ func TestSameSchemasNotDuplicated(t *testing.T) { buf1 := &bytes.Buffer{} buf2 := &bytes.Buffer{} buf3 := &bytes.Buffer{} - prepInput(t, buf1, &mcap.Schema{ID: 1, Name: "SchemaA"}, 1, "/foo") - prepInput(t, buf2, &mcap.Schema{ID: 1, Name: "SchemaA"}, 1, "/bar") - prepInput(t, buf3, &mcap.Schema{ID: 1, Name: "SchemaB"}, 1, "/baz") + prepInput(t, buf1, &mcap.Schema{ID: 1, Name: "SchemaA"}, &mcap.Channel{ID: 1, Topic: "/foo"}) + prepInput(t, buf2, &mcap.Schema{ID: 1, Name: "SchemaA"}, &mcap.Channel{ID: 1, Topic: "/bar"}) + prepInput(t, buf3, &mcap.Schema{ID: 1, Name: "SchemaB"}, &mcap.Channel{ID: 1, Topic: "/baz"}) merger := newMCAPMerger(mergeOpts{ allowDuplicateMetadata: true, + coalesceChannels: "none", }) output := &bytes.Buffer{} inputs := []namedReader{ @@ -391,3 +399,44 @@ func TestSameSchemasNotDuplicated(t *testing.T) { assert.Equal(t, 2, len(schemas)) assert.Equal(t, schemaNames, []string{"SchemaA", "SchemaB"}) } + +func TestChannelCoalesceBehavior(t *testing.T) { + expectedMsgCountByChannel := map[string]map[uint16]int{ + "none": {1: 100, 2: 100, 3: 100, 4: 100}, + "auto": {1: 200, 2: 100, 3: 100}, + "force": {1: 300, 2: 100}, + } + + for coalesceChannels, messagesByChannel := range expectedMsgCountByChannel { + buf1 := &bytes.Buffer{} + buf2 := &bytes.Buffer{} + buf3 := &bytes.Buffer{} + buf4 := &bytes.Buffer{} + prepInput(t, buf1, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/foo"}) + prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 2, Topic: "/foo"}) + prepInput(t, buf3, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 3, Topic: "/foo", Metadata: map[string]string{"k": "v"}}) + prepInput(t, buf4, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 4, Topic: "/bar"}) + output := &bytes.Buffer{} + inputs := []namedReader{ + {"buf1", buf1}, + {"buf2", buf2}, + {"buf3", buf3}, + {"buf4", buf4}, + } + merger := newMCAPMerger(mergeOpts{coalesceChannels: coalesceChannels, allowDuplicateMetadata: true}) + assert.Nil(t, merger.mergeInputs(output, inputs)) + // output should now be a well-formed mcap + reader, err := mcap.NewReader(output) + assert.Nil(t, err) + assert.Equal(t, reader.Header().Profile, "testprofile") + it, err := reader.Messages(mcap.UsingIndex(false)) + assert.Nil(t, err) + messages := make(map[uint16]int) + err = mcap.Range(it, func(schema *mcap.Schema, channel *mcap.Channel, message *mcap.Message) error { + messages[channel.ID]++ + return nil + }) + assert.Nil(t, err) + assert.Equal(t, messagesByChannel, messages) + } +}