Skip to content

Commit

Permalink
lint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
achim-k committed Oct 9, 2023
1 parent 9254f87 commit 89ac083
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 47 deletions.
48 changes: 27 additions & 21 deletions go/cli/mcap/cmd/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,31 @@ type channelID struct {
}

type mcapMerger struct {
schemaIDs map[schemaID]uint16
channelIDs map[channelID]uint16
schemaIDByHash map[string]uint16
schemaIDs map[schemaID]uint16
channelIDs map[channelID]uint16
schemaIDByHash map[string]uint16
channelIDByHash map[string]uint16

nextChannelID uint16
nextSchemaID uint16
opts mergeOpts
}

const (
AutoCoalescing = "auto"
ForceCoalescing = "force"
NoCoalescing = "none"
)

func newMCAPMerger(opts mergeOpts) *mcapMerger {
return &mcapMerger{
schemaIDs: make(map[schemaID]uint16),
channelIDs: make(map[channelID]uint16),
schemaIDByHash: make(map[string]uint16),
channelIDByHash: make(map[string]uint16),
nextChannelID: 1,
nextSchemaID: 1,
opts: opts,
schemaIDs: make(map[schemaID]uint16),
channelIDs: make(map[channelID]uint16),
schemaIDByHash: make(map[string]uint16),
channelIDByHash: make(map[string]uint16),
nextChannelID: 1,
nextSchemaID: 1,
opts: opts,
}
}

Expand Down Expand Up @@ -96,19 +102,19 @@ func getChannelHash(channel *mcap.Channel, coalesceChannels string) string {
hasher.Write([]byte(channel.MessageEncoding))

switch coalesceChannels {
case "auto": // Include channel metadata in hash
for key, value := range channel.Metadata {
hasher.Write([]byte(key))
hasher.Write([]byte(value))
}
case "force": // Channel metadata is not included in hash
break;
default:
die("Invalid value for --coalesce-channels: %s\n", coalesceChannels)
case AutoCoalescing: // Include channel metadata in hash
for key, value := range channel.Metadata {
hasher.Write([]byte(key))
hasher.Write([]byte(value))
}
case ForceCoalescing: // Channel metadata is not included in hash
break
default:
die("Invalid value for --coalesce-channels: %s\n", coalesceChannels)
}

hash := hasher.Sum(nil)
return hex.EncodeToString(hash[:])
return hex.EncodeToString(hash)
}

func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Channel) (uint16, error) {
Expand All @@ -125,7 +131,7 @@ func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Chann
Metadata: channel.Metadata,
}

if m.opts.coalesceChannels != "none" {
if m.opts.coalesceChannels != NoCoalescing {
channelHash := getChannelHash(newChannel, m.opts.coalesceChannels)
channelID, channelKnown := m.channelIDByHash[channelHash]
if channelKnown {
Expand Down
53 changes: 27 additions & 26 deletions go/cli/mcap/cmd/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ func prepInput(t *testing.T, w io.Writer, schema *mcap.Schema, channel *mcap.Cha
assert.Nil(t, writer.WriteSchema(schema))
}
assert.Nil(t, writer.WriteChannel(&mcap.Channel{
ID: channel.ID,
SchemaID: schema.ID,
Topic: channel.Topic,
ID: channel.ID,
SchemaID: schema.ID,
Topic: channel.Topic,
MessageEncoding: channel.MessageEncoding,
Metadata: channel.Metadata,
Metadata: channel.Metadata,
}))
for i := 0; i < 100; i++ {
assert.Nil(t, writer.WriteMessage(&mcap.Message{
Expand All @@ -42,11 +42,11 @@ func TestMCAPMerging(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
buf3 := &bytes.Buffer{}
prepInput(t, buf1, &mcap.Schema{ID:1}, &mcap.Channel{ID:1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID:1}, &mcap.Channel{ID:1, Topic: "/bar"})
prepInput(t, buf3, &mcap.Schema{ID:1}, &mcap.Channel{ID:1, Topic: "/baz"})
prepInput(t, buf1, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/bar"})
prepInput(t, buf3, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/baz"})
merger := newMCAPMerger(mergeOpts{
chunked: chunked,
chunked: chunked,
coalesceChannels: "none",
})
output := &bytes.Buffer{}
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestChannelsWithSameSchema(t *testing.T) {
}))
assert.Nil(t, writer.Close())
merger := newMCAPMerger(mergeOpts{
chunked: true,
chunked: true,
coalesceChannels: "none",
})
output := &bytes.Buffer{}
Expand All @@ -138,8 +138,8 @@ func TestChannelsWithSameSchema(t *testing.T) {
func TestMultiChannelInput(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
prepInput(t, buf1, &mcap.Schema{ID:1}, &mcap.Channel{ID:1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID:1}, &mcap.Channel{ID:1, Topic: "/bar"})
prepInput(t, buf1, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/bar"})
merger := newMCAPMerger(mergeOpts{coalesceChannels: "none"})
multiChannelInput := &bytes.Buffer{}
inputs := []namedReader{
Expand All @@ -148,7 +148,7 @@ func TestMultiChannelInput(t *testing.T) {
}
assert.Nil(t, merger.mergeInputs(multiChannelInput, inputs))
buf3 := &bytes.Buffer{}
prepInput(t, buf3, &mcap.Schema{ID:2}, &mcap.Channel{ID:2, Topic: "/baz"})
prepInput(t, buf3, &mcap.Schema{ID: 2}, &mcap.Channel{ID: 2, Topic: "/baz"})
output := &bytes.Buffer{}
inputs2 := []namedReader{
{"multiChannelInput", multiChannelInput},
Expand All @@ -174,8 +174,8 @@ func TestMultiChannelInput(t *testing.T) {
func TestSchemalessChannelInput(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
prepInput(t, buf1, &mcap.Schema{ID:0}, &mcap.Channel{ID:1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID:1}, &mcap.Channel{ID:1, Topic: "/bar"})
prepInput(t, buf1, &mcap.Schema{ID: 0}, &mcap.Channel{ID: 1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/bar"})
merger := newMCAPMerger(mergeOpts{coalesceChannels: "none"})
output := &bytes.Buffer{}
inputs := []namedReader{
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestBadInputGivesNamedErrors(t *testing.T) {
"bad magic",
func() *bytes.Buffer {
buf := &bytes.Buffer{}
prepInput(t, buf, &mcap.Schema{ID:0}, &mcap.Channel{ID:1, Topic: "/foo"})
prepInput(t, buf, &mcap.Schema{ID: 0}, &mcap.Channel{ID: 1, Topic: "/foo"})
buf.Bytes()[0] = 0x00
return buf
},
Expand All @@ -276,7 +276,7 @@ func TestBadInputGivesNamedErrors(t *testing.T) {
"bad content",
func() *bytes.Buffer {
buf := &bytes.Buffer{}
prepInput(t, buf, &mcap.Schema{ID:0}, &mcap.Channel{ID:1, Topic: "/foo"})
prepInput(t, buf, &mcap.Schema{ID: 0}, &mcap.Channel{ID: 1, Topic: "/foo"})
for i := 3000; i < 4000; i++ {
buf.Bytes()[i] = 0x00
}
Expand All @@ -290,7 +290,7 @@ func TestBadInputGivesNamedErrors(t *testing.T) {
t.Run(fmt.Sprintf("%s chunked %v", c.assertion, chunked), func(t *testing.T) {
buf := c.input()
merger := newMCAPMerger(mergeOpts{
chunked: chunked,
chunked: chunked,
coalesceChannels: "none",
})
inputs := []namedReader{
Expand All @@ -310,9 +310,9 @@ func TestSameSchemasNotDuplicated(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
buf3 := &bytes.Buffer{}
prepInput(t, buf1, &mcap.Schema{ID:1, Name: "SchemaA"}, &mcap.Channel{ID:1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID:1, Name: "SchemaA"}, &mcap.Channel{ID:1, Topic: "/bar"})
prepInput(t, buf3, &mcap.Schema{ID:1, Name: "SchemaB"}, &mcap.Channel{ID:1, Topic: "/baz"})
prepInput(t, buf1, &mcap.Schema{ID: 1, Name: "SchemaA"}, &mcap.Channel{ID: 1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID: 1, Name: "SchemaA"}, &mcap.Channel{ID: 1, Topic: "/bar"})
prepInput(t, buf3, &mcap.Schema{ID: 1, Name: "SchemaB"}, &mcap.Channel{ID: 1, Topic: "/baz"})
merger := newMCAPMerger(mergeOpts{coalesceChannels: "none"})
output := &bytes.Buffer{}
inputs := []namedReader{
Expand Down Expand Up @@ -346,8 +346,8 @@ func TestSameSchemasNotDuplicated(t *testing.T) {

func TestChannelCoalesceBehavior(t *testing.T) {
expectedMsgCountByChannel := map[string]map[uint16]int{
"none": {1: 100, 2: 100, 3: 100, 4: 100},
"auto": {1: 200, 2: 100, 3: 100},
"none": {1: 100, 2: 100, 3: 100, 4: 100},
"auto": {1: 200, 2: 100, 3: 100},
"force": {1: 300, 2: 100},
}

Expand All @@ -356,10 +356,10 @@ func TestChannelCoalesceBehavior(t *testing.T) {
buf2 := &bytes.Buffer{}
buf3 := &bytes.Buffer{}
buf4 := &bytes.Buffer{}
prepInput(t, buf1, &mcap.Schema{ID:1}, &mcap.Channel{ID:1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID:1}, &mcap.Channel{ID:2, Topic: "/foo"})
prepInput(t, buf3, &mcap.Schema{ID:1}, &mcap.Channel{ID:3, Topic: "/foo", Metadata: map[string]string{"k": "v"}})
prepInput(t, buf4, &mcap.Schema{ID:1}, &mcap.Channel{ID:4, Topic: "/bar"})
prepInput(t, buf1, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 1, Topic: "/foo"})
prepInput(t, buf2, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 2, Topic: "/foo"})
prepInput(t, buf3, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 3, Topic: "/foo", Metadata: map[string]string{"k": "v"}})
prepInput(t, buf4, &mcap.Schema{ID: 1}, &mcap.Channel{ID: 4, Topic: "/bar"})
output := &bytes.Buffer{}
inputs := []namedReader{
{"buf1", buf1},
Expand All @@ -380,6 +380,7 @@ func TestChannelCoalesceBehavior(t *testing.T) {
messages[channel.ID]++
return nil
})
assert.Nil(t, err)
assert.Equal(t, messagesByChannel, messages)
}
}

0 comments on commit 89ac083

Please sign in to comment.