Skip to content

Commit

Permalink
go/cli: Coalesce identical channels by default, allow to opt out
Browse files Browse the repository at this point in the history
Changes the default behavior of `mcap merge` to automatically coalesce
identical channels. Channels are considered identical when their topic,
schema, message encoding and metadata equals. One can force coalescing
of channels with different metadata with `--coalesce-channels force`.
Channel coalescing can also be completely disabled with
`--coalesce-channels none`.
  • Loading branch information
achim-k committed Nov 2, 2023
1 parent a164ec1 commit 79d9068
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 38 deletions.
91 changes: 75 additions & 16 deletions go/cli/mcap/cmd/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"container/heap"
"crypto/md5"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
Expand Down Expand Up @@ -36,6 +37,7 @@ var (
mergeChunked bool
mergeOutputFile string
mergeAllowDuplicateMetadata bool
coalesceChannels string
)

type mergeOpts struct {
Expand All @@ -44,6 +46,7 @@ type mergeOpts struct {
includeCRC bool
chunked bool
allowDuplicateMetadata bool
coalesceChannels string
}

// schemaID uniquely identifies a schema across the inputs.
Expand All @@ -59,26 +62,34 @@ type channelID struct {
}

type mcapMerger struct {
schemaIDs map[schemaID]uint16
channelIDs map[channelID]uint16
schemaIDByHash map[string]uint16
metadataHashes map[string]bool
metadataNames map[string]bool
nextChannelID uint16
nextSchemaID uint16
opts mergeOpts
schemaIDs map[schemaID]uint16
channelIDs map[channelID]uint16
schemaIDByHash map[string]uint16
channelIDByHash map[string]uint16
metadataHashes map[string]bool
metadataNames map[string]bool
nextChannelID uint16
nextSchemaID uint16
opts mergeOpts
}

const (
AutoCoalescing = "auto"
ForceCoalescing = "force"
NoCoalescing = "none"
)

func newMCAPMerger(opts mergeOpts) *mcapMerger {
return &mcapMerger{
schemaIDs: make(map[schemaID]uint16),
channelIDs: make(map[channelID]uint16),
schemaIDByHash: make(map[string]uint16),
metadataHashes: make(map[string]bool),
metadataNames: make(map[string]bool),
nextChannelID: 1,
nextSchemaID: 1,
opts: opts,
schemaIDs: make(map[schemaID]uint16),
channelIDs: make(map[channelID]uint16),
schemaIDByHash: make(map[string]uint16),
channelIDByHash: make(map[string]uint16),
metadataHashes: make(map[string]bool),
metadataNames: make(map[string]bool),
nextChannelID: 1,
nextSchemaID: 1,
opts: opts,
}
}

Expand Down Expand Up @@ -132,6 +143,30 @@ func (m *mcapMerger) addMetadata(w *mcap.Writer, metadata *mcap.Metadata) error
return nil
}

func getChannelHash(channel *mcap.Channel, coalesceChannels string) string {
hasher := md5.New()
schemaIDBytes := make([]byte, 2)
binary.LittleEndian.PutUint16(schemaIDBytes, channel.SchemaID)
hasher.Write(schemaIDBytes)
hasher.Write([]byte(channel.Topic))
hasher.Write([]byte(channel.MessageEncoding))

switch coalesceChannels {
case AutoCoalescing: // Include channel metadata in hash
for key, value := range channel.Metadata {
hasher.Write([]byte(key))
hasher.Write([]byte(value))
}
case ForceCoalescing: // Channel metadata is not included in hash
break
default:
die("Invalid value for --coalesce-channels: %s\n", coalesceChannels)
}

hash := hasher.Sum(nil)
return hex.EncodeToString(hash)
}

func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Channel) (uint16, error) {
outputSchemaID, ok := m.outputSchemaID(inputID, channel.SchemaID)
if !ok {
Expand All @@ -145,6 +180,17 @@ func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Chann
MessageEncoding: channel.MessageEncoding,
Metadata: channel.Metadata,
}

if m.opts.coalesceChannels != NoCoalescing {
channelHash := getChannelHash(newChannel, m.opts.coalesceChannels)
channelID, channelKnown := m.channelIDByHash[channelHash]
if channelKnown {
m.channelIDs[key] = channelID
return channelID, nil
}
m.channelIDByHash[channelHash] = m.nextChannelID
}

m.channelIDs[key] = m.nextChannelID
err := w.WriteChannel(newChannel)
if err != nil {
Expand Down Expand Up @@ -218,6 +264,7 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []namedReader) error {

// Reset struct members
m.schemaIDByHash = make(map[string]uint16)
m.channelIDByHash = make(map[string]uint16)
m.schemaIDs = make(map[schemaID]uint16)
m.channelIDs = make(map[channelID]uint16)
m.nextChannelID = 1
Expand Down Expand Up @@ -351,6 +398,7 @@ var mergeCmd = &cobra.Command{
includeCRC: mergeIncludeCRC,
chunked: mergeChunked,
allowDuplicateMetadata: mergeAllowDuplicateMetadata,
coalesceChannels: coalesceChannels,
}
merger := newMCAPMerger(opts)
var writer io.Writer
Expand Down Expand Up @@ -415,4 +463,15 @@ func init() {
false,
"Allow duplicate-named metadata records to be merged in the output",
)
mergeCmd.PersistentFlags().StringVarP(
&coalesceChannels,
"coalesce-channels",
"",
"auto",
`channel coalescing behavior (supported: auto, force, none).
- auto: Coalesce channels with matching topic, schema and metadata
- force: Same as auto but ignores metadata
- none: Do not coalesce channels
`,
)
}
93 changes: 71 additions & 22 deletions go/cli/mcap/cmd/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/assert"
)

func prepInput(t *testing.T, w io.Writer, schema *mcap.Schema, channelID uint16, topic string) {
func prepInput(t *testing.T, w io.Writer, schema *mcap.Schema, channel *mcap.Channel) {
writer, err := mcap.NewWriter(w, &mcap.WriterOptions{
Chunked: true,
})
Expand All @@ -21,13 +21,15 @@ func prepInput(t *testing.T, w io.Writer, schema *mcap.Schema, channelID uint16,
assert.Nil(t, writer.WriteSchema(schema))
}
assert.Nil(t, writer.WriteChannel(&mcap.Channel{
ID: channelID,
SchemaID: schema.ID,
Topic: topic,
ID: channel.ID,
SchemaID: schema.ID,
Topic: channel.Topic,
MessageEncoding: channel.MessageEncoding,
Metadata: channel.Metadata,
}))
for i := 0; i < 100; i++ {
assert.Nil(t, writer.WriteMessage(&mcap.Message{
ChannelID: channelID,
ChannelID: channel.ID,
LogTime: uint64(i),
}))
}
Expand All @@ -36,7 +38,7 @@ func prepInput(t *testing.T, w io.Writer, schema *mcap.Schema, channelID uint16,
Name: "a",
Metadata: map[string]string{
"b": "c",
"topic": topic,
"topic": channel.Topic,
},
}))

Expand Down Expand Up @@ -74,11 +76,12 @@ func TestMCAPMerging(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
buf3 := &bytes.Buffer{}
prepInput(t, buf1, &mcap.Schema{ID: 1}, 1, "/foo")
prepInput(t, buf2, &mcap.Schema{ID: 1}, 1, "/bar")
prepInput(t, buf3, &mcap.Schema{ID: 1}, 1, "/baz")
prepInput(t, buf1, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/bar"})
prepInput(t, buf3, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/baz"})

c.opts.chunked = chunked
c.opts.coalesceChannels = "none"
merger := newMCAPMerger(c.opts)
output := &bytes.Buffer{}
inputs := []namedReader{
Expand Down Expand Up @@ -163,7 +166,8 @@ func TestChannelsWithSameSchema(t *testing.T) {
}))
assert.Nil(t, writer.Close())
merger := newMCAPMerger(mergeOpts{
chunked: true,
chunked: true,
coalesceChannels: "none",
})
output := &bytes.Buffer{}
assert.Nil(t, merger.mergeInputs(output, []namedReader{{"buf", buf}}))
Expand All @@ -181,10 +185,11 @@ func TestChannelsWithSameSchema(t *testing.T) {
func TestMultiChannelInput(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
prepInput(t, buf1, &mcap.Schema{ID: 1}, 1, "/foo")
prepInput(t, buf2, &mcap.Schema{ID: 1}, 1, "/bar")
prepInput(t, buf1, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/bar"})
merger := newMCAPMerger(mergeOpts{
allowDuplicateMetadata: true,
coalesceChannels: "none",
})
multiChannelInput := &bytes.Buffer{}
inputs := []namedReader{
Expand All @@ -193,7 +198,7 @@ func TestMultiChannelInput(t *testing.T) {
}
assert.Nil(t, merger.mergeInputs(multiChannelInput, inputs))
buf3 := &bytes.Buffer{}
prepInput(t, buf3, &mcap.Schema{ID: 2}, 2, "/baz")
prepInput(t, buf3, &mcap.Schema{ID: 2}, &mcap.Channel{ID: 2, Topic: "/baz"})
output := &bytes.Buffer{}
inputs2 := []namedReader{
{"multiChannelInput", multiChannelInput},
Expand All @@ -219,10 +224,11 @@ func TestMultiChannelInput(t *testing.T) {
func TestSchemalessChannelInput(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
prepInput(t, buf1, &mcap.Schema{ID: 0}, 1, "/foo")
prepInput(t, buf2, &mcap.Schema{ID: 1}, 1, "/bar")
prepInput(t, buf1, &mcap.Schema{ID: 0}, &mcap.Channel{ID: 1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/bar"})
merger := newMCAPMerger(mergeOpts{
allowDuplicateMetadata: true,
coalesceChannels: "none",
})
output := &bytes.Buffer{}
inputs := []namedReader{
Expand Down Expand Up @@ -277,7 +283,7 @@ func TestMultipleSchemalessChannelSingleInput(t *testing.T) {
}))
assert.Nil(t, writer.Close())

merger := newMCAPMerger(mergeOpts{})
merger := newMCAPMerger(mergeOpts{coalesceChannels: "none"})
output := &bytes.Buffer{}
inputs := []namedReader{
{"buf", buf},
Expand Down Expand Up @@ -313,7 +319,7 @@ func TestBadInputGivesNamedErrors(t *testing.T) {
"bad magic",
func() *bytes.Buffer {
buf := &bytes.Buffer{}
prepInput(t, buf, &mcap.Schema{ID: 0}, 1, "/foo")
prepInput(t, buf, &mcap.Schema{ID: 0}, &mcap.Channel{ID: 1, Topic: "/foo"})
buf.Bytes()[0] = 0x00
return buf
},
Expand All @@ -323,7 +329,7 @@ func TestBadInputGivesNamedErrors(t *testing.T) {
"bad content",
func() *bytes.Buffer {
buf := &bytes.Buffer{}
prepInput(t, buf, &mcap.Schema{ID: 0}, 1, "/foo")
prepInput(t, buf, &mcap.Schema{ID: 0}, &mcap.Channel{ID: 1, Topic: "/foo"})
for i := 3000; i < 4000; i++ {
buf.Bytes()[i] = 0x00
}
Expand All @@ -337,7 +343,8 @@ func TestBadInputGivesNamedErrors(t *testing.T) {
t.Run(fmt.Sprintf("%s chunked %v", c.assertion, chunked), func(t *testing.T) {
buf := c.input()
merger := newMCAPMerger(mergeOpts{
chunked: chunked,
chunked: chunked,
coalesceChannels: "none",
})
inputs := []namedReader{
{"filename", buf},
Expand All @@ -356,11 +363,12 @@ func TestSameSchemasNotDuplicated(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
buf3 := &bytes.Buffer{}
prepInput(t, buf1, &mcap.Schema{ID: 1, Name: "SchemaA"}, 1, "/foo")
prepInput(t, buf2, &mcap.Schema{ID: 1, Name: "SchemaA"}, 1, "/bar")
prepInput(t, buf3, &mcap.Schema{ID: 1, Name: "SchemaB"}, 1, "/baz")
prepInput(t, buf1, &mcap.Schema{ID: 1, Name: "SchemaA"}, &mcap.Channel{ID: 1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID: 1, Name: "SchemaA"}, &mcap.Channel{ID: 1, Topic: "/bar"})
prepInput(t, buf3, &mcap.Schema{ID: 1, Name: "SchemaB"}, &mcap.Channel{ID: 1, Topic: "/baz"})
merger := newMCAPMerger(mergeOpts{
allowDuplicateMetadata: true,
coalesceChannels: "none",
})
output := &bytes.Buffer{}
inputs := []namedReader{
Expand Down Expand Up @@ -391,3 +399,44 @@ func TestSameSchemasNotDuplicated(t *testing.T) {
assert.Equal(t, 2, len(schemas))
assert.Equal(t, schemaNames, []string{"SchemaA", "SchemaB"})
}

func TestChannelCoalesceBehavior(t *testing.T) {
expectedMsgCountByChannel := map[string]map[uint16]int{
"none": {1: 100, 2: 100, 3: 100, 4: 100},
"auto": {1: 200, 2: 100, 3: 100},
"force": {1: 300, 2: 100},
}

for coalesceChannels, messagesByChannel := range expectedMsgCountByChannel {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
buf3 := &bytes.Buffer{}
buf4 := &bytes.Buffer{}
prepInput(t, buf1, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 2, Topic: "/foo"})
prepInput(t, buf3, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 3, Topic: "/foo", Metadata: map[string]string{"k": "v"}})
prepInput(t, buf4, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 4, Topic: "/bar"})
output := &bytes.Buffer{}
inputs := []namedReader{
{"buf1", buf1},
{"buf2", buf2},
{"buf3", buf3},
{"buf4", buf4},
}
merger := newMCAPMerger(mergeOpts{coalesceChannels: coalesceChannels, allowDuplicateMetadata: true})
assert.Nil(t, merger.mergeInputs(output, inputs))
// output should now be a well-formed mcap
reader, err := mcap.NewReader(output)
assert.Nil(t, err)
assert.Equal(t, reader.Header().Profile, "testprofile")
it, err := reader.Messages(mcap.UsingIndex(false))
assert.Nil(t, err)
messages := make(map[uint16]int)
err = mcap.Range(it, func(schema *mcap.Schema, channel *mcap.Channel, message *mcap.Message) error {
messages[channel.ID]++
return nil
})
assert.Nil(t, err)
assert.Equal(t, messagesByChannel, messages)
}
}

0 comments on commit 79d9068

Please sign in to comment.