Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/cli: Coalesce identical channels by default #985

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions go/cli/mcap/cmd/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ const (
FileTypeDB3 FileType = "db3"
)

const (
CompressionFormatLz4 = "lz4"
CompressionFormatZstd = "zstd"
CompressionFormatNone = "none"
)

func checkMagic(path string) (FileType, error) {
f, err := os.Open(path)
if err != nil {
Expand Down Expand Up @@ -89,11 +95,11 @@ var convertCmd = &cobra.Command{

var compressionFormat mcap.CompressionFormat
switch convertCompression {
case "lz4":
case CompressionFormatLz4:
compressionFormat = mcap.CompressionLZ4
case "zstd":
case CompressionFormatZstd:
compressionFormat = mcap.CompressionZSTD
case "none":
case CompressionFormatNone:
compressionFormat = mcap.CompressionNone
}

Expand Down
99 changes: 79 additions & 20 deletions go/cli/mcap/cmd/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"container/heap"
"crypto/md5"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
Expand Down Expand Up @@ -36,6 +37,7 @@ var (
mergeChunked bool
mergeOutputFile string
mergeAllowDuplicateMetadata bool
coalesceChannels string
)

type mergeOpts struct {
Expand All @@ -44,6 +46,7 @@ type mergeOpts struct {
includeCRC bool
chunked bool
allowDuplicateMetadata bool
coalesceChannels string
}

// schemaID uniquely identifies a schema across the inputs.
Expand All @@ -58,27 +61,37 @@ type channelID struct {
channelID uint16
}

type HashSum = [md5.Size]byte

type mcapMerger struct {
schemaIDs map[schemaID]uint16
channelIDs map[channelID]uint16
schemaIDByHash map[string]uint16
metadataHashes map[string]bool
metadataNames map[string]bool
nextChannelID uint16
nextSchemaID uint16
opts mergeOpts
schemaIDs map[schemaID]uint16
channelIDs map[channelID]uint16
schemaIDByHash map[HashSum]uint16
channelIDByHash map[HashSum]uint16
metadataHashes map[string]bool
metadataNames map[string]bool
nextChannelID uint16
nextSchemaID uint16
opts mergeOpts
}

const (
AutoCoalescing = "auto"
ForceCoalescing = "force"
NoCoalescing = "none"
)

func newMCAPMerger(opts mergeOpts) *mcapMerger {
return &mcapMerger{
schemaIDs: make(map[schemaID]uint16),
channelIDs: make(map[channelID]uint16),
schemaIDByHash: make(map[string]uint16),
metadataHashes: make(map[string]bool),
metadataNames: make(map[string]bool),
nextChannelID: 1,
nextSchemaID: 1,
opts: opts,
schemaIDs: make(map[schemaID]uint16),
channelIDs: make(map[channelID]uint16),
schemaIDByHash: make(map[HashSum]uint16),
channelIDByHash: make(map[HashSum]uint16),
metadataHashes: make(map[string]bool),
metadataNames: make(map[string]bool),
nextChannelID: 1,
nextSchemaID: 1,
opts: opts,
}
}

Expand Down Expand Up @@ -132,6 +145,29 @@ func (m *mcapMerger) addMetadata(w *mcap.Writer, metadata *mcap.Metadata) error
return nil
}

func getChannelHash(channel *mcap.Channel, coalesceChannels string) HashSum {
hasher := md5.New()
schemaIDBytes := make([]byte, 2)
binary.LittleEndian.PutUint16(schemaIDBytes, channel.SchemaID)
hasher.Write(schemaIDBytes)
hasher.Write([]byte(channel.Topic))
hasher.Write([]byte(channel.MessageEncoding))

switch coalesceChannels {
case AutoCoalescing: // Include channel metadata in hash
for key, value := range channel.Metadata {
hasher.Write([]byte(key))
hasher.Write([]byte(value))
}
case ForceCoalescing: // Channel metadata is not included in hash
break
default:
die("Invalid value for --coalesce-channels: %s\n", coalesceChannels)
}

return HashSum(hasher.Sum(nil))
}

func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Channel) (uint16, error) {
outputSchemaID, ok := m.outputSchemaID(inputID, channel.SchemaID)
if !ok {
Expand All @@ -145,6 +181,17 @@ func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Chann
MessageEncoding: channel.MessageEncoding,
Metadata: channel.Metadata,
}

if m.opts.coalesceChannels != NoCoalescing {
channelHash := getChannelHash(newChannel, m.opts.coalesceChannels)
channelID, channelKnown := m.channelIDByHash[channelHash]
if channelKnown {
m.channelIDs[key] = channelID
return channelID, nil
}
m.channelIDByHash[channelHash] = m.nextChannelID
}

m.channelIDs[key] = m.nextChannelID
err := w.WriteChannel(newChannel)
if err != nil {
Expand All @@ -154,13 +201,12 @@ func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Chann
return newChannel.ID, nil
}

func getSchemaHash(schema *mcap.Schema) string {
func getSchemaHash(schema *mcap.Schema) HashSum {
hasher := md5.New()
hasher.Write([]byte(schema.Name))
hasher.Write([]byte(schema.Encoding))
hasher.Write(schema.Data)
hash := hasher.Sum(nil)
return hex.EncodeToString(hash)
return HashSum(hasher.Sum(nil))
}

func (m *mcapMerger) addSchema(w *mcap.Writer, inputID int, schema *mcap.Schema) error {
Expand Down Expand Up @@ -217,7 +263,8 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []namedReader) error {
pq := utils.NewPriorityQueue(nil)

// Reset struct members
m.schemaIDByHash = make(map[string]uint16)
m.schemaIDByHash = make(map[HashSum]uint16)
m.channelIDByHash = make(map[HashSum]uint16)
m.schemaIDs = make(map[schemaID]uint16)
m.channelIDs = make(map[channelID]uint16)
m.nextChannelID = 1
Expand Down Expand Up @@ -351,6 +398,7 @@ var mergeCmd = &cobra.Command{
includeCRC: mergeIncludeCRC,
chunked: mergeChunked,
allowDuplicateMetadata: mergeAllowDuplicateMetadata,
coalesceChannels: coalesceChannels,
}
merger := newMCAPMerger(opts)
var writer io.Writer
Expand Down Expand Up @@ -415,4 +463,15 @@ func init() {
false,
"Allow duplicate-named metadata records to be merged in the output",
)
mergeCmd.PersistentFlags().StringVarP(
&coalesceChannels,
"coalesce-channels",
"",
"auto",
`channel coalescing behavior (supported: auto, force, none).
- auto: Coalesce channels with matching topic, schema and metadata
- force: Same as auto but ignores metadata
- none: Do not coalesce channels
`,
)
}
Loading
Loading