Skip to content

Commit

Permalink
add compression and num messages (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
rrusso1982 authored and palmerabollo committed Feb 27, 2019
1 parent 08df269 commit 627ddc9
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Prometheus-kafka-adapter listens for metrics coming from Prometheus and sends th

- `KAFKA_BROKER_LIST`: defines kafka endpoint and port, defaults to `kafka:9092`.
- `KAFKA_TOPIC`: defines kafka topic to be used, defaults to `metrics`.
- `KAFKA_COMPRESSION`: defines the compression type to be used, defaults to `none`.
- `KAFKA_BATCH_NUM_MESSAGES`: defines the number of messages to batch write, defaults to `10000`.
- `SERIALIZATION_FORMAT`: defines the serialization format, can be `json`, `avro-json`, defaults to `json`.
- `PORT`: defines http port to listen, defaults to `8080`, used directly by [gin](https://github.com/gin-gonic/gin).
- `LOG_LEVEL`: defines log level for [`logrus`](https://github.com/sirupsen/logrus), can be `debug`, `info`, `warn`, `error`, `fatal` or `panic`, defaults to `info`.
Expand Down
10 changes: 10 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ var (
Topic: &kafkaTopic,
Partition: kafka.PartitionAny,
}
kafkaCompression = "none"
kafkaBatchNumMessages = "10000"
serializer Serializer
)

Expand All @@ -52,6 +54,14 @@ func init() {
}
}

if value := os.Getenv("KAFKA_COMPRESSION"); value != "" {
kafkaCompression = value
}

if value := os.Getenv("KAFKA_BATCH_NUM_MESSAGES"); value != "" {
kafkaBatchNumMessages = value
}

var err error
serializer, err = parseSerializationFormat(os.Getenv("SERIALIZATION_FORMAT"))
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func main() {

producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": kafkaBrokerList,
"compression.codec": kafkaCompression,
"batch.num.messages": kafkaBatchNumMessages,
"go.batch.producer": true, // Enable batch producer (for increased performance).
"go.delivery.reports": false, // per-message delivery reports to the Events() channel
})
Expand Down

0 comments on commit 627ddc9

Please sign in to comment.