diff --git a/README.md b/README.md index d2269001..4d98771a 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ There is a docker image `telefonica/prometheus-kafka-adapter:1.6.0` [available o Prometheus-kafka-adapter listens for metrics coming from Prometheus and sends them to Kafka. This behaviour can be configured with the following environment variables: - `KAFKA_BROKER_LIST`: defines kafka endpoint and port, defaults to `kafka:9092`. -- `KAFKA_TOPIC`: defines kafka topic to be used, defaults to `metrics`. +- `KAFKA_TOPIC`: defines kafka topic to be used, defaults to `metrics`. Could use go template, labels are passed (as a map) to the template: e.g: `metrics.{{ index . "__name__" }}` to use per-metric topic. Two template functions are available: replace (`{{ index . "__name__" | replace "message" "msg" }}`) and substring (`{{ index . "__name__" | substring 0 5 }}`) - `KAFKA_COMPRESSION`: defines the compression type to be used, defaults to `none`. - `KAFKA_BATCH_NUM_MESSAGES`: defines the number of messages to batch write, defaults to `10000`. - `SERIALIZATION_FORMAT`: defines the serialization format, can be `json`, `avro-json`, defaults to `json`. diff --git a/config.go b/config.go index 27d0e4ca..56ce6302 100644 --- a/config.go +++ b/config.go @@ -16,21 +16,19 @@ package main import ( "os" + "strings" + "text/template" - "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/sirupsen/logrus" ) var ( - kafkaBrokerList = "kafka:9092" - kafkaTopic = "metrics" - basicauth = false - basicauthUsername = "" - basicauthPassword = "" - kafkaPartition = kafka.TopicPartition{ - Topic: &kafkaTopic, - Partition: kafka.PartitionAny, - } + kafkaBrokerList = "kafka:9092" + kafkaTopic = "metrics" + topicTemplate *template.Template + basicauth = false + basicauthUsername = "" + basicauthPassword = "" kafkaCompression = "none" kafkaBatchNumMessages = "10000" kafkaSslClientCertFile = "" @@ -55,11 +53,6 @@ func init() { if value := os.Getenv("KAFKA_TOPIC"); value != "" { kafkaTopic = value - - kafkaPartition = kafka.TopicPartition{ - Topic: &kafkaTopic, - Partition: kafka.PartitionAny, - } } if value := os.Getenv("BASIC_AUTH_USERNAME"); value != "" { @@ -100,6 +93,11 @@ func init() { if err != nil { logrus.WithError(err).Fatalln("couldn't create a metrics serializer") } + + topicTemplate, err = parseTopicTemplate(kafkaTopic) + if err != nil { + logrus.WithError(err).Fatalln("couldn't parse the topic template") + } } func parseLogLevel(value string) logrus.Level { @@ -124,3 +122,24 @@ func parseSerializationFormat(value string) (Serializer, error) { return NewJSONSerializer() } } + +func parseTopicTemplate(tpl string) (*template.Template, error) { + funcMap := template.FuncMap{ + "replace": func(old, new, src string) string { + return strings.Replace(src, old, new, -1) + }, + "substring": func(start, end int, s string) string { + if start < 0 { + start = 0 + } + if end < 0 || end > len(s) { + end = len(s) + } + if start >= end { + panic("template function - substring: start is bigger (or equal) than end. That will produce an empty string.") + } + return s[start:end] + }, + } + return template.New("topic").Funcs(funcMap).Parse(tpl) +} diff --git a/handlers.go b/handlers.go index ec432414..f382961d 100644 --- a/handlers.go +++ b/handlers.go @@ -54,23 +54,30 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin return } - metrics, err := processWriteRequest(&req) + metricsPerTopic, err := processWriteRequest(&req) if err != nil { c.AbortWithStatus(http.StatusInternalServerError) logrus.WithError(err).Error("couldn't process write request") return } - for _, metric := range metrics { - err := producer.Produce(&kafka.Message{ - TopicPartition: kafkaPartition, - Value: metric, - }, nil) + for topic, metrics := range metricsPerTopic { + t := topic + part := kafka.TopicPartition{ + Partition: kafka.PartitionAny, + Topic: &t, + } + for _, metric := range metrics { + err := producer.Produce(&kafka.Message{ + TopicPartition: part, + Value: metric, + }, nil) - if err != nil { - c.AbortWithStatus(http.StatusInternalServerError) - logrus.WithError(err).Error("couldn't produce message in kafka") - return + if err != nil { + c.AbortWithStatus(http.StatusInternalServerError) + logrus.WithError(err).Error("couldn't produce message in kafka") + return + } } } diff --git a/prometheus.go b/prometheus.go index 0cd7ace0..ad232dfa 100644 --- a/prometheus.go +++ b/prometheus.go @@ -19,7 +19,7 @@ import ( "github.com/sirupsen/logrus" ) -func processWriteRequest(req *prompb.WriteRequest) ([][]byte, error) { +func processWriteRequest(req *prompb.WriteRequest) (map[string][][]byte, error) { logrus.WithField("var", req).Debugln() return Serialize(serializer, req) } diff --git a/serializers.go b/serializers.go index 4dac6eec..89b232a1 100644 --- a/serializers.go +++ b/serializers.go @@ -15,6 +15,7 @@ package main import ( + "bytes" "encoding/json" "io/ioutil" "strconv" @@ -33,8 +34,8 @@ type Serializer interface { } // Serialize generates the JSON representation for a given Prometheus metric. -func Serialize(s Serializer, req *prompb.WriteRequest) ([][]byte, error) { - result := [][]byte{} +func Serialize(s Serializer, req *prompb.WriteRequest) (map[string][][]byte, error) { + result := make(map[string][][]byte) for _, ts := range req.Timeseries { labels := make(map[string]string, len(ts.Labels)) @@ -43,6 +44,8 @@ func Serialize(s Serializer, req *prompb.WriteRequest) ([][]byte, error) { labels[string(model.LabelName(l.Name))] = string(model.LabelValue(l.Value)) } + t := topic(labels) + for _, sample := range ts.Samples { epoch := time.Unix(sample.Timestamp/1000, 0).UTC() @@ -58,7 +61,7 @@ func Serialize(s Serializer, req *prompb.WriteRequest) ([][]byte, error) { logrus.WithError(err).Errorln("couldn't marshal timeseries") } - result = append(result, data) + result[t] = append(result[t], data) } } @@ -104,3 +107,11 @@ func NewAvroJSONSerializer(schemaPath string) (*AvroJSONSerializer, error) { codec: codec, }, nil } + +func topic(labels map[string]string) string { + var buf bytes.Buffer + if err := topicTemplate.Execute(&buf, labels); err != nil { + return "" + } + return buf.String() +} diff --git a/serializers_test.go b/serializers_test.go index 4e89af21..3c58a852 100644 --- a/serializers_test.go +++ b/serializers_test.go @@ -42,7 +42,7 @@ func TestSerializeToJSON(t *testing.T) { writeRequest := NewWriteRequest() output, err := Serialize(serializer, writeRequest) - assert.Len(t, output, 2) + assert.Len(t, output["metrics"], 2) assert.Nil(t, err) expectedSamples := []string{ @@ -50,7 +50,7 @@ func TestSerializeToJSON(t *testing.T) { "{\"value\":\"+Inf\",\"timestamp\":\"1970-01-01T00:00:10Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"}}", } - for i, metric := range output { + for i, metric := range output["metrics"] { assert.JSONEqf(t, expectedSamples[i], string(metric[:]), "wrong json serialization found") } } @@ -72,7 +72,7 @@ func TestSerializeToAvro(t *testing.T) { writeRequest := NewWriteRequest() output, err := Serialize(serializer, writeRequest) - assert.Len(t, output, 2) + assert.Len(t, output["metrics"], 2) assert.Nil(t, err) expectedSamples := []string{ @@ -80,11 +80,26 @@ func TestSerializeToAvro(t *testing.T) { "{\"value\":\"+Inf\",\"timestamp\":\"1970-01-01T00:00:10Z\",\"name\":\"foo\",\"labels\":{\"__name__\":\"foo\",\"labelfoo\":\"label-bar\"}}", } - for i, metric := range output { + for i, metric := range output["metrics"] { assert.JSONEqf(t, expectedSamples[i], string(metric[:]), "wrong json serialization found") } } +func TestTemplatedTopic(t *testing.T) { + var err error + topicTemplate, err = parseTopicTemplate("{{ index . \"labelfoo\" | replace \"bar\" \"foo\" | substring 6 -1 }}") + assert.Nil(t, err) + serializer, err := NewJSONSerializer() + assert.Nil(t, err) + + writeRequest := NewWriteRequest() + output, err := Serialize(serializer, writeRequest) + + for k, _ := range output { + assert.Equal(t, "foo", k, "templated topic failed") + } +} + func BenchmarkSerializeToAvroJSON(b *testing.B) { serializer, _ := NewAvroJSONSerializer("schemas/metric.avsc") writeRequest := NewWriteRequest()