Skip to content

Commit

Permalink
fix templating in combination with proto/avro
Browse files Browse the repository at this point in the history
  • Loading branch information
birdayz committed Jun 3, 2022
1 parent 75a263c commit cc81524
Showing 1 changed file with 39 additions and 32 deletions.
71 changes: 39 additions & 32 deletions cmd/kaf/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,37 +175,6 @@ var produceCmd = &cobra.Command{
}

for data := range out {
if protoType != "" {
if dynamicMessage := reg.MessageForType(protoType); dynamicMessage != nil {
err = dynamicMessage.UnmarshalJSON(data)
if err != nil {
errorExit("Failed to parse input JSON as proto type %v: %v", protoType, err)
}

pb, err := pb.Marshal(dynamicMessage)
if err != nil {
errorExit("Failed to marshal proto: %v", err)
}

data = pb
} else {
errorExit("Failed to load payload proto type")
}
} else if avroSchemaID != -1 {
avro, err := schemaCache.EncodeMessage(avroSchemaID, data)
if err != nil {
errorExit("Failed to encode avro value", err)
}
data = avro
}

var ts time.Time
t, err := time.Parse(time.RFC3339, timestampFlag)
if err != nil {
ts = time.Now()
} else {
ts = t
}

for i := 0; i < repeatFlag; i++ {

Expand All @@ -230,12 +199,50 @@ var produceCmd = &cobra.Command{
input = buf.Bytes()
}

// Encode to..something

var marshaledInput []byte

if protoType != "" {
if dynamicMessage := reg.MessageForType(protoType); dynamicMessage != nil {
err = dynamicMessage.UnmarshalJSON(input)
if err != nil {
errorExit("Failed to parse input JSON as proto type %v: %v", protoType, err)
}

pb, err := pb.Marshal(dynamicMessage)
if err != nil {
errorExit("Failed to marshal proto: %v", err)
}

marshaledInput = pb
} else {
errorExit("Failed to load payload proto type")
}
} else if avroSchemaID != -1 {
avro, err := schemaCache.EncodeMessage(avroSchemaID, data)
if err != nil {
errorExit("Failed to encode avro value", err)
}
marshaledInput = avro
} else {
marshaledInput = input
}

var ts time.Time
t, err := time.Parse(time.RFC3339, timestampFlag)
if err != nil {
ts = time.Now()
} else {
ts = t
}

msg := &sarama.ProducerMessage{
Topic: args[0],
Key: key,
Headers: headers,
Timestamp: ts,
Value: sarama.ByteEncoder(input),
Value: sarama.ByteEncoder(marshaledInput),
}
if partitionFlag != -1 {
msg.Partition = partitionFlag
Expand Down

0 comments on commit cc81524

Please sign in to comment.