Skip to content

Commit

Permalink
Lint new changes to main
Browse files Browse the repository at this point in the history
  • Loading branch information
narasaka committed Sep 29, 2023
2 parents a5a3d9a + 04d820a commit 6bea2bf
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 32 deletions.
51 changes: 35 additions & 16 deletions go/cli/mcap/cmd/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cmd

import (
"container/heap"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -41,12 +43,9 @@ type channelID struct {
}

type mcapMerger struct {
schemas map[schemaID]*mcap.Schema
channels map[channelID]*mcap.Channel
schemaIDs map[schemaID]uint16
channelIDs map[channelID]uint16

outputChannelSchemas map[uint16]uint16
schemaIDs map[schemaID]uint16
channelIDs map[channelID]uint16
schemaIDByHash map[string]uint16

nextChannelID uint16
nextSchemaID uint16
Expand All @@ -55,14 +54,12 @@ type mcapMerger struct {

func newMCAPMerger(opts mergeOpts) *mcapMerger {
return &mcapMerger{
schemas: make(map[schemaID]*mcap.Schema),
channels: make(map[channelID]*mcap.Channel),
schemaIDs: make(map[schemaID]uint16),
channelIDs: make(map[channelID]uint16),
outputChannelSchemas: make(map[uint16]uint16),
nextChannelID: 1,
nextSchemaID: 1,
opts: opts,
schemaIDs: make(map[schemaID]uint16),
channelIDs: make(map[channelID]uint16),
schemaIDByHash: make(map[string]uint16),
nextChannelID: 1,
nextSchemaID: 1,
opts: opts,
}
}

Expand Down Expand Up @@ -98,7 +95,6 @@ func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Chann
MessageEncoding: channel.MessageEncoding,
Metadata: channel.Metadata,
}
m.channels[key] = channel
m.channelIDs[key] = m.nextChannelID
err := w.WriteChannel(newChannel)
if err != nil {
Expand All @@ -108,16 +104,32 @@ func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Chann
return newChannel.ID, nil
}

func getSchemaHash(schema *mcap.Schema) string {
hasher := md5.New()
hasher.Write([]byte(schema.Name))
hasher.Write([]byte(schema.Encoding))
hasher.Write(schema.Data)
hash := hasher.Sum(nil)
return hex.EncodeToString(hash)
}

func (m *mcapMerger) addSchema(w *mcap.Writer, inputID int, schema *mcap.Schema) error {
key := schemaID{inputID, schema.ID}
schemaHash := getSchemaHash(schema)
schemaID, schemaKnown := m.schemaIDByHash[schemaHash]
if schemaKnown {
m.schemaIDs[key] = schemaID
return nil
}

newSchema := &mcap.Schema{
ID: m.nextSchemaID, // substitute the next output schema ID
Name: schema.Name,
Encoding: schema.Encoding,
Data: schema.Data,
}
m.schemas[key] = newSchema
m.schemaIDs[key] = m.nextSchemaID
m.schemaIDByHash[schemaHash] = m.nextSchemaID
err := w.WriteSchema(newSchema)
if err != nil {
return fmt.Errorf("failed to write schema: %w", err)
Expand Down Expand Up @@ -157,6 +169,13 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []namedReader) error {
profiles := make([]string, len(inputs))
pq := utils.NewPriorityQueue(nil)

// Reset struct members
m.schemaIDByHash = make(map[string]uint16)
m.schemaIDs = make(map[schemaID]uint16)
m.channelIDs = make(map[channelID]uint16)
m.nextChannelID = 1
m.nextSchemaID = 1

// for each input reader, initialize an mcap reader and read the first
// message off. Insert the schema and channel into the output with
// renumbered IDs, and load the message (with renumbered IDs) into the
Expand Down
68 changes: 52 additions & 16 deletions go/cli/mcap/cmd/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,19 @@ import (
"github.com/stretchr/testify/assert"
)

func prepInput(t *testing.T, w io.Writer, schemaID uint16, channelID uint16, topic string) {
func prepInput(t *testing.T, w io.Writer, schema *mcap.Schema, channelID uint16, topic string) {
writer, err := mcap.NewWriter(w, &mcap.WriterOptions{
Chunked: true,
})
assert.Nil(t, err)

assert.Nil(t, writer.WriteHeader(&mcap.Header{Profile: "testprofile"}))
if schemaID != 0 {
assert.Nil(t, writer.WriteSchema(&mcap.Schema{
ID: schemaID,
}))
if schema.ID != 0 {
assert.Nil(t, writer.WriteSchema(schema))
}
assert.Nil(t, writer.WriteChannel(&mcap.Channel{
ID: channelID,
SchemaID: schemaID,
SchemaID: schema.ID,
Topic: topic,
}))
for i := 0; i < 100; i++ {
Expand All @@ -42,9 +40,9 @@ func TestMCAPMerging(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
buf3 := &bytes.Buffer{}
prepInput(t, buf1, 1, 1, "/foo")
prepInput(t, buf2, 1, 1, "/bar")
prepInput(t, buf3, 1, 1, "/baz")
prepInput(t, buf1, &mcap.Schema{ID: 1}, 1, "/foo")
prepInput(t, buf2, &mcap.Schema{ID: 1}, 1, "/bar")
prepInput(t, buf3, &mcap.Schema{ID: 1}, 1, "/baz")
merger := newMCAPMerger(mergeOpts{
chunked: chunked,
})
Expand Down Expand Up @@ -136,8 +134,8 @@ func TestChannelsWithSameSchema(t *testing.T) {
func TestMultiChannelInput(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
prepInput(t, buf1, 1, 1, "/foo")
prepInput(t, buf2, 1, 1, "/bar")
prepInput(t, buf1, &mcap.Schema{ID: 1}, 1, "/foo")
prepInput(t, buf2, &mcap.Schema{ID: 1}, 1, "/bar")
merger := newMCAPMerger(mergeOpts{})
multiChannelInput := &bytes.Buffer{}
inputs := []namedReader{
Expand All @@ -146,7 +144,7 @@ func TestMultiChannelInput(t *testing.T) {
}
assert.Nil(t, merger.mergeInputs(multiChannelInput, inputs))
buf3 := &bytes.Buffer{}
prepInput(t, buf3, 2, 2, "/baz")
prepInput(t, buf3, &mcap.Schema{ID: 2}, 2, "/baz")
output := &bytes.Buffer{}
inputs2 := []namedReader{
{"multiChannelInput", multiChannelInput},
Expand All @@ -172,8 +170,8 @@ func TestMultiChannelInput(t *testing.T) {
func TestSchemalessChannelInput(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
prepInput(t, buf1, 0, 1, "/foo")
prepInput(t, buf2, 1, 1, "/bar")
prepInput(t, buf1, &mcap.Schema{ID: 0}, 1, "/foo")
prepInput(t, buf2, &mcap.Schema{ID: 1}, 1, "/bar")
merger := newMCAPMerger(mergeOpts{})
output := &bytes.Buffer{}
inputs := []namedReader{
Expand Down Expand Up @@ -264,7 +262,7 @@ func TestBadInputGivesNamedErrors(t *testing.T) {
"bad magic",
func() *bytes.Buffer {
buf := &bytes.Buffer{}
prepInput(t, buf, 0, 1, "/foo")
prepInput(t, buf, &mcap.Schema{ID: 0}, 1, "/foo")
buf.Bytes()[0] = 0x00
return buf
},
Expand All @@ -274,7 +272,7 @@ func TestBadInputGivesNamedErrors(t *testing.T) {
"bad content",
func() *bytes.Buffer {
buf := &bytes.Buffer{}
prepInput(t, buf, 0, 1, "/foo")
prepInput(t, buf, &mcap.Schema{ID: 0}, 1, "/foo")
for i := 3000; i < 4000; i++ {
buf.Bytes()[i] = 0x00
}
Expand Down Expand Up @@ -302,3 +300,41 @@ func TestBadInputGivesNamedErrors(t *testing.T) {
}
}
}

func TestSameSchemasNotDuplicated(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
buf3 := &bytes.Buffer{}
prepInput(t, buf1, &mcap.Schema{ID: 1, Name: "SchemaA"}, 1, "/foo")
prepInput(t, buf2, &mcap.Schema{ID: 1, Name: "SchemaA"}, 1, "/bar")
prepInput(t, buf3, &mcap.Schema{ID: 1, Name: "SchemaB"}, 1, "/baz")
merger := newMCAPMerger(mergeOpts{})
output := &bytes.Buffer{}
inputs := []namedReader{
{"buf1", buf1},
{"buf2", buf2},
{"buf3", buf3},
}
assert.Nil(t, merger.mergeInputs(output, inputs))
// output should now be a well-formed mcap
reader, err := mcap.NewReader(output)
assert.Nil(t, err)
assert.Equal(t, reader.Header().Profile, "testprofile")
it, err := reader.Messages(readopts.UsingIndex(false))
assert.Nil(t, err)
schemas := make(map[uint16]bool)
var schemaNames []string
err = mcap.Range(it, func(schema *mcap.Schema, channel *mcap.Channel, message *mcap.Message) error {
_, ok := schemas[schema.ID]
if !ok {
schemas[schema.ID] = true
schemaNames = append(schemaNames, schema.Name)
}
return nil
})
if err != nil {
die("failed to iterate through schemas: %s", err)
}
assert.Equal(t, 2, len(schemas))
assert.Equal(t, schemaNames, []string{"SchemaA", "SchemaB"})
}

0 comments on commit 6bea2bf

Please sign in to comment.