Skip to content

Commit

Permalink
feat: add avro support (#16)
Browse files Browse the repository at this point in the history
* feat: add avro support

* chore: add line break

* fix: always get a serializer

* fix: timestamp to epoch

* fix: timestamp example in docs
  • Loading branch information
palmerabollo authored Oct 30, 2018
1 parent 04fbed1 commit 08df269
Show file tree
Hide file tree
Showing 14 changed files with 294 additions and 50 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Dockerfile
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

prometheus-kafka-adapter
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ sudo: required

language: minimal

# TODO use language go, test and build the code after building/publishing the docker image

services:
- docker

Expand Down
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ ADD . /src/prometheus-kafka-adapter

RUN go build -o /prometheus-kafka-adapter

FROM alpine
FROM alpine:3.8

RUN apk add --no-cache librdkafka

COPY --from=build /src/prometheus-kafka-adapter/schemas/metric.avsc /schemas/metric.avsc
COPY --from=build /prometheus-kafka-adapter /

CMD /prometheus-kafka-adapter
36 changes: 25 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,41 @@ We use `prometheus-kafka-adapter` internally at Telefonica for dumping Prometheu

## output

It produces the following messages in a kafka topic:
It is able to write JSON or Avro-JSON messages in a kafka topic, depending on the `SERIALIZATION_FORMAT` configuration variable.

### JSON

```json
{
"__timestamp__": 1234567890,
"__value__": 9876543210,

"__name__": "up",
"job": "federation",

"label1": "value1",
"label2": "value2"
"timestamp": "1970-01-01T00:00:00Z",
"value": "9876543210",
"name": "up",

"labels": {
"__name__": "up",
"label1": "value1",
"label2": "value2"
}
}
```

`__timestamp__` and `__value__` are reserved values, and can't be used as label names. `__name__` defines the name of the metric.
`timestamp` and `value` are reserved values, and can't be used as label names. `__name__` is a special label that defines the name of the metric and is copied as `name` to the top level for convenience.

### Avro JSON

The Avro-JSON serialization is the same. See the [Avro schema](./schemas/metric.avsc).

## configuration

### prometheus-kafka-adapter

There is a docker image `telefonica/prometheus-kafka-adapter:1.1.0-dev-4` [available on Docker Hub](https://hub.docker.com/r/telefonica/prometheus-kafka-adapter/).
There is a docker image `telefonica/prometheus-kafka-adapter:1.1.0` [available on Docker Hub](https://hub.docker.com/r/telefonica/prometheus-kafka-adapter/).

Prometheus-kafka-adapter listens for metrics coming from Prometheus and sends them to Kafka. This behaviour can be configured with the following environment variables:

- `KAFKA_BROKER_LIST`: defines kafka endpoint and port, defaults to `kafka:9092`.
- `KAFKA_TOPIC`: defines kafka topic to be used, defaults to `metrics`.
- `SERIALIZATION_FORMAT`: defines the serialization format, can be `json`, `avro-json`, defaults to `json`.
- `PORT`: defines http port to listen, defaults to `8080`, used directly by [gin](https://github.com/gin-gonic/gin).
- `LOG_LEVEL`: defines log level for [`logrus`](https://github.com/sirupsen/logrus), can be `debug`, `info`, `warn`, `error`, `fatal` or `panic`, defaults to `info`.
- `GIN_MODE`: manage [gin](https://github.com/gin-gonic/gin) debug logging, can be `debug` or `release`.
Expand All @@ -51,6 +58,13 @@ remote_write:
- url: "http://prometheus-kafka-adapter:8080/receive"
```
## development
```
go test
go build
```

## contributing

With issues:
Expand Down
19 changes: 19 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
Topic: &kafkaTopic,
Partition: kafka.PartitionAny,
}
serializer Serializer
)

func init() {
Expand All @@ -50,6 +51,12 @@ func init() {
Partition: kafka.PartitionAny,
}
}

var err error
serializer, err = parseSerializationFormat(os.Getenv("SERIALIZATION_FORMAT"))
if err != nil {
logrus.WithError(err).Fatalln("couldn't create a metrics serializer")
}
}

func parseLogLevel(value string) logrus.Level {
Expand All @@ -62,3 +69,15 @@ func parseLogLevel(value string) logrus.Level {

return level
}

func parseSerializationFormat(value string) (Serializer, error) {
switch value {
case "json":
return NewJSONSerializer()
case "avro-json":
return NewAvroJSONSerializer("schemas/metric.avsc")
default:
logrus.WithField("serialization-format-value", value).Warningln("invalid serialization format, using json")
return NewJSONSerializer()
}
}
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
module github.com/Telefonica/prometheus-kafka-adapter

require (
github.com/actgardner/gogen-avro v5.1.0+incompatible // indirect
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/confluentinc/confluent-kafka-go v0.11.4
github.com/containous/traefik v1.7.1
github.com/fatih/structs v1.1.0
github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7 // indirect
github.com/gin-gonic/contrib v0.0.0-20180614032058-39cfb9727134
github.com/gin-gonic/gin v1.3.0
github.com/gogo/protobuf v1.1.1
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/grpc-ecosystem/grpc-gateway v1.5.0 // indirect
github.com/linkedin/goavro v2.1.0+incompatible
github.com/mattn/go-isatty v0.0.4 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/prometheus/client_golang v0.8.0
Expand All @@ -18,6 +21,7 @@ require (
github.com/prometheus/procfs v0.0.0-20180920065004-418d78d0b9a7 // indirect
github.com/prometheus/prometheus v2.4.2+incompatible
github.com/sirupsen/logrus v1.1.0
github.com/stretchr/testify v1.2.2
github.com/ugorji/go/codec v0.0.0-20180927125128-99ea80c8b19a // indirect
golang.org/x/net v0.0.0-20180926154720-4dfa2610cdf3 // indirect
google.golang.org/genproto v0.0.0-20180928223349-c7e5094acea1 // indirect
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/actgardner/gogen-avro v5.1.0+incompatible h1:FifTNNceWAXLIgeLiLaFzLcJ9NyBqh59g113kgOmqvo=
github.com/actgardner/gogen-avro v5.1.0+incompatible/go.mod h1:N2PzqZtS+5w9xxGp2daeykhWdTL0lBiRhbbvkVj4Yd8=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/confluentinc/confluent-kafka-go v0.11.4 h1:uH5doflVcMn+2G/ECv0wxpgmVkvEpTwYFW57V2iLqHo=
github.com/confluentinc/confluent-kafka-go v0.11.4/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/containous/traefik v1.7.1 h1:8fZ0MIRANiu39sBo/sIVy1EV1hRKcdz1Nc1QQjpL5zM=
github.com/containous/traefik v1.7.1/go.mod h1:epDRqge3JzKOhlSWzOpNYEEKXmM6yfN5tPzDGKk3ljo=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7 h1:AzN37oI0cOS+cougNAV9szl6CVoj2RYwzS3DpUQNtlY=
github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s=
github.com/gin-gonic/contrib v0.0.0-20180614032058-39cfb9727134 h1:xgqFZVwmmtWiuq5LUZ/wa34hJR2Dm9NZAH+Cj9a7Hu0=
Expand All @@ -26,10 +31,13 @@ github.com/grpc-ecosystem/grpc-gateway v1.5.0 h1:WcmKMm43DR7RdtlkEXQJyo5ws8iTp98
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/linkedin/goavro v2.1.0+incompatible h1:DV2aUlj2xZiuxQyvag8Dy7zjY69ENjS66bWkSfdpddY=
github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM=
github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.8.0 h1:1921Yw9Gc3iSc4VQh3PIoOqgPCZS7G/4xQNVUp8Mda8=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
Expand All @@ -43,6 +51,7 @@ github.com/prometheus/prometheus v2.4.2+incompatible h1:IpbpeZAXsg39pqRThfPHoNRY
github.com/prometheus/prometheus v2.4.2+incompatible/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
github.com/sirupsen/logrus v1.1.0 h1:65VZabgUiV9ktjGM5nTq0+YurgTyX+YI2lSSfDjI+qU=
github.com/sirupsen/logrus v1.1.0/go.mod h1:zrgwTnHtNr00buQ1vSptGe8m1f/BbgsPukg8qsT7A+A=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/ugorji/go/codec v0.0.0-20180927125128-99ea80c8b19a h1:BgdofUvNP/srMxiUUpGyZm+WjX/qXpMXdl3edRf1Ta0=
github.com/ugorji/go/codec v0.0.0-20180927125128-99ea80c8b19a/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
Expand Down
8 changes: 4 additions & 4 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/golang/snappy"

"github.com/prometheus/prometheus/prompb"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/prometheus/prompb"
)

func receiveHandler(p *kafka.Producer) func(c *gin.Context) {
func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin.Context) {
return func(c *gin.Context) {
httpRequestsTotal.Add(float64(1))

Expand Down Expand Up @@ -61,11 +61,11 @@ func receiveHandler(p *kafka.Producer) func(c *gin.Context) {
}

for _, metric := range metrics {
err := p.Produce(&kafka.Message{
err := producer.Produce(&kafka.Message{
TopicPartition: kafkaPartition,
Value: metric,
}, nil)

if err != nil {
c.AbortWithStatus(http.StatusInternalServerError)
logrus.WithError(err).Error("couldn't produce message in kafka")
Expand Down
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ import (
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/prometheus/client_golang/prometheus"
"github.com/containous/traefik/log"
"github.com/gin-gonic/contrib/ginrus"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

func main() {
log.Info("creating kafka producer")

p, err := kafka.NewProducer(&kafka.ConfigMap{
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": kafkaBrokerList,
"go.batch.producer": true, // Enable batch producer (for increased performance).
"go.delivery.reports": false, // per-message delivery reports to the Events() channel
Expand All @@ -42,7 +42,7 @@ func main() {

r.Use(ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true), gin.Recovery())

r.POST("/receive", receiveHandler(p))
r.POST("/receive", receiveHandler(producer, serializer))
r.GET("/metrics", gin.WrapH(prometheus.UninstrumentedHandler()))

r.Run()
Expand Down
32 changes: 1 addition & 31 deletions prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,11 @@
package main

import (
"encoding/json"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/sirupsen/logrus"
)

func processWriteRequest(req *prompb.WriteRequest) ([][]byte, error) {
logrus.WithField("var", req).Debugln()
result := [][]byte{}

for _, ts := range req.Timeseries {
labels := make(model.Metric, len(ts.Labels))

for _, l := range ts.Labels {
labels[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}

for _, sample := range ts.Samples {
metric := make(map[string]interface{}, len(labels)+2)
metric["__value__"] = sample.Value
metric["__timestamp__"] = sample.Timestamp

for key, value := range labels {
metric[string(key)] = value
}

data, err := json.Marshal(metric)
if err != nil {
logrus.WithError(err).Errorln("couldn't proccess timeseries")
}

result = append(result, data)
}
}

return result, nil
return Serialize(serializer, req)
}
12 changes: 12 additions & 0 deletions schemas/metric.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"namespace": "io.prometheus",
"type": "record",
"name": "Metric",
"doc:" : "A basic schema for representing Prometheus metrics",
"fields": [
{"name": "timestamp", "type": "string"},
{"name": "value", "type": "string"},
{"name": "name", "type": "string"},
{"name": "labels", "type": { "type": "map", "values": "string"} }
]
}
Loading

0 comments on commit 08df269

Please sign in to comment.