From cc815241d916e18932681161352701571c9840b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Fri, 3 Jun 2022 12:09:58 +0200 Subject: [PATCH] fix templating in combination with proto/avro --- cmd/kaf/produce.go | 71 +++++++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 32 deletions(-) diff --git a/cmd/kaf/produce.go b/cmd/kaf/produce.go index 3bec358..8ed7cce 100644 --- a/cmd/kaf/produce.go +++ b/cmd/kaf/produce.go @@ -175,37 +175,6 @@ var produceCmd = &cobra.Command{ } for data := range out { - if protoType != "" { - if dynamicMessage := reg.MessageForType(protoType); dynamicMessage != nil { - err = dynamicMessage.UnmarshalJSON(data) - if err != nil { - errorExit("Failed to parse input JSON as proto type %v: %v", protoType, err) - } - - pb, err := pb.Marshal(dynamicMessage) - if err != nil { - errorExit("Failed to marshal proto: %v", err) - } - - data = pb - } else { - errorExit("Failed to load payload proto type") - } - } else if avroSchemaID != -1 { - avro, err := schemaCache.EncodeMessage(avroSchemaID, data) - if err != nil { - errorExit("Failed to encode avro value", err) - } - data = avro - } - - var ts time.Time - t, err := time.Parse(time.RFC3339, timestampFlag) - if err != nil { - ts = time.Now() - } else { - ts = t - } for i := 0; i < repeatFlag; i++ { @@ -230,12 +199,50 @@ var produceCmd = &cobra.Command{ input = buf.Bytes() } + // Encode to..something + + var marshaledInput []byte + + if protoType != "" { + if dynamicMessage := reg.MessageForType(protoType); dynamicMessage != nil { + err = dynamicMessage.UnmarshalJSON(input) + if err != nil { + errorExit("Failed to parse input JSON as proto type %v: %v", protoType, err) + } + + pb, err := pb.Marshal(dynamicMessage) + if err != nil { + errorExit("Failed to marshal proto: %v", err) + } + + marshaledInput = pb + } else { + errorExit("Failed to load payload proto type") + } + } else if avroSchemaID != -1 { + avro, err := schemaCache.EncodeMessage(avroSchemaID, data) + if err != nil { + errorExit("Failed to encode avro value", err) + } + marshaledInput = avro + } else { + marshaledInput = input + } + + var ts time.Time + t, err := time.Parse(time.RFC3339, timestampFlag) + if err != nil { + ts = time.Now() + } else { + ts = t + } + msg := &sarama.ProducerMessage{ Topic: args[0], Key: key, Headers: headers, Timestamp: ts, - Value: sarama.ByteEncoder(input), + Value: sarama.ByteEncoder(marshaledInput), } if partitionFlag != -1 { msg.Partition = partitionFlag