Skip to content

Commit

Permalink
Use delivery channel to get kafka producer metrics (#162)
Browse files Browse the repository at this point in the history
Confluent provides the channel interface to get delivery reports
We should use that to get kafka metrics
Add more metrics for other failure mode
Disable reliable acks since it will not work as expected
  • Loading branch information
agbpatro authored Apr 17, 2024
1 parent f36e2e5 commit 03528fd
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 38 deletions.
6 changes: 5 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ func (c *Config) prometheusEnabled() bool {
return false
}

func (c *Config) ReliableAcksDisabled() bool {
return c.ReliableAck == false && c.ReliableAckWorkers == 0
}

// ConfigureProducers validates and establishes connections to the producers (kafka/pubsub/logger)
func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (map[string][]telemetry.Producer, error) {
producers := make(map[telemetry.Dispatcher]telemetry.Producer)
Expand All @@ -262,7 +266,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l
return nil, errors.New("Expected Kafka to be configured")
}
convertKafkaConfig(c.Kafka)
kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.ReliableAckWorkers, c.AckChan, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, logger)
kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, logger)
if err != nil {
return nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions config/config_initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"encoding/json"
"errors"
"flag"
"log"
"os"
Expand Down Expand Up @@ -54,6 +55,10 @@ func loadApplicationConfig(configFilePath string) (*Config, error) {
}
config.MetricCollector = metrics.NewCollector(config.Monitoring, logger)

// TODO disble this check when reliable acks are properly supported
if !config.ReliableAcksDisabled() {
return nil, errors.New("reliable acks not support yet. Unset `reliable_ack` and `reliable_ack_workers` in the config file")
}
config.AckChan = make(chan *telemetry.Record)
return config, err
}
Expand Down
9 changes: 7 additions & 2 deletions config/config_initializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ var _ = Describe("Test application config initialization", func() {
Namespace: "tesla_telemetry",
TLS: &TLS{CAFile: "tesla.ca", ServerCert: "your_own_cert.crt", ServerKey: "your_own_key.key"},
RateLimit: &RateLimit{Enabled: true, MessageLimit: 1000, MessageInterval: 30},
ReliableAck: true,
ReliableAckWorkers: 15,
ReliableAck: false,
ReliableAckWorkers: 0,
Kafka: &confluent.ConfigMap{
"bootstrap.servers": "some.broker1:9093,some.broker1:9093",
"ssl.ca.location": "kafka.ca",
Expand Down Expand Up @@ -73,6 +73,11 @@ var _ = Describe("Test application config initialization", func() {
Expect(loadedConfig).To(Equal(expectedConfig))
})

It("fails when reliable acks are set", func() {
_, err := loadTestApplicationConfig(TestReliableAckConfig)
Expect(err).Should(MatchError("reliable acks not support yet. Unset `reliable_ack` and `reliable_ack_workers` in the config file"))
})

It("returns an error if config is not appropriate", func() {
_, err := loadTestApplicationConfig(BadTopicConfig)
Expect(err).To(MatchError("invalid character '}' looking for beginning of object key string"))
Expand Down
28 changes: 26 additions & 2 deletions config/test_configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ const TestConfig = `{
"log_level": "info",
"json_log_enable": true,
"namespace": "tesla_telemetry",
"reliable_ack": true,
"reliable_ack_workers": 15,
"kafka": {
"bootstrap.servers": "some.broker1:9093,some.broker1:9093",
"ssl.ca.location": "kafka.ca",
Expand Down Expand Up @@ -61,6 +59,32 @@ const TestSmallConfig = `
}
`

const TestReliableAckConfig = `
{
"host": "127.0.0.1",
"port": 443,
"status_port": 8080,
"namespace": "tesla_telemetry",
"reliable_ack": true,
"reliable_ack_workers": 15,
"kafka": {
"bootstrap.servers": "some.broker1:9093,some.broker1:9093",
"ssl.ca.location": "kafka.ca",
"ssl.certificate.location": "kafka.crt",
"ssl.key.location": "kafka.key",
"queue.buffering.max.messages": 1000000
},
"records": {
"FS": ["kafka"]
},
"tls": {
"ca_file": "tesla.ca",
"server_cert": "your_own_cert.crt",
"server_key": "your_own_key.key"
}
}
`

const TestPubsubConfig = `
{
"host": "127.0.0.1",
Expand Down
23 changes: 12 additions & 11 deletions datastore/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Producer struct {
metricsCollector metrics.MetricCollector
logger *logrus.Logger
airbrakeHandler *airbrake.AirbrakeHandler
reliableAck bool
deliveryChan chan kafka.Event
}

// Metrics stores metrics reported from this package
Expand All @@ -41,8 +41,7 @@ var (
)

// NewProducer establishes the kafka connection and define the dispatch method
func NewProducer(config *kafka.ConfigMap, namespace string, reliableAckWorkers int,
ackChan chan (*telemetry.Record), prometheusEnabled bool, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (telemetry.Producer, error) {
func NewProducer(config *kafka.ConfigMap, namespace string, prometheusEnabled bool, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (telemetry.Producer, error) {
registerMetricsOnce(metricsCollector)

kafkaProducer, err := kafka.NewProducer(config)
Expand All @@ -57,10 +56,10 @@ func NewProducer(config *kafka.ConfigMap, namespace string, reliableAckWorkers i
prometheusEnabled: prometheusEnabled,
logger: logger,
airbrakeHandler: airbrakeHandler,
reliableAck: reliableAckWorkers > 0,
deliveryChan: make(chan kafka.Event),
}

go producer.handleProducerEvents(ackChan)
go producer.handleProducerEvents()
go producer.reportProducerMetrics()
producer.logger.ActivityLog("kafka_registered", logrus.LogInfo{"namespace": namespace})
return producer, nil
Expand All @@ -82,7 +81,7 @@ func (p *Producer) Produce(entry *telemetry.Record) {
// Note: confluent kafka supports the concept of one channel per connection, so we could add those here and get rid of reliableAckWorkers
// ex.: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/producer_custom_channel_example/producer_custom_channel_example.go#L79
entry.ProduceTime = time.Now()
if err := p.kafkaProducer.Produce(msg, nil); err != nil {
if err := p.kafkaProducer.Produce(msg, p.deliveryChan); err != nil {
p.logError(err)
return
}
Expand All @@ -106,21 +105,23 @@ func headersFromRecord(record *telemetry.Record) (headers []kafka.Header) {
return
}

func (p *Producer) handleProducerEvents(ackChan chan (*telemetry.Record)) {
for e := range p.kafkaProducer.Events() {
func (p *Producer) handleProducerEvents() {
for e := range p.deliveryChan {
switch ev := e.(type) {
case kafka.Error:
p.logError(fmt.Errorf("producer_error %v", ev))
case *kafka.Message:
if ev.TopicPartition.Error != nil {
p.logError(fmt.Errorf("topic_partition_error %v", ev))
continue
}
entry, ok := ev.Opaque.(*telemetry.Record)
if !ok {
p.logError(fmt.Errorf("opaque_record_missing %v", ev))
continue
}
metricsRegistry.producerAckCount.Inc(map[string]string{"record_type": entry.TxType})
metricsRegistry.bytesAckTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType})
if p.reliableAck {
ackChan <- entry
}
default:
p.logger.ActivityLog("kafka_event_ignored", logrus.LogInfo{"event": ev.String()})
}
Expand Down
20 changes: 0 additions & 20 deletions server/streaming/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,26 +324,6 @@ func (sm SocketManager) ReportMetricBytesPerRecords(recordType string, byteSize
metricsRegistry.recordCount.Inc(map[string]string{"record_type": recordType})
}

// DatastoreAckProcessor records metrics after acking records
func (sm SocketManager) DatastoreAckProcessor(ackChan chan (*telemetry.Record)) {
for record := range ackChan {
durationMs := time.Since(record.ProduceTime) / time.Millisecond

metricsRegistry.kafkaWriteMs.Observe(int64(durationMs), map[string]string{})
metricsRegistry.kafkaWriteBytesTotal.Add(int64(record.Length()), map[string]string{"record_type": record.TxType})
metricsRegistry.kafkaWriteCount.Inc(map[string]string{"record_type": record.TxType})

if record.Serializer != nil && record.Serializer.ReliableAck() {
if socket := sm.registry.GetSocket(record.SocketID); socket != nil {
metricsRegistry.reliableAckCount.Inc(map[string]string{"record_type": record.TxType})
socket.respondToVehicle(record, nil)
} else {
metricsRegistry.reliableAckMissCount.Inc(map[string]string{"record_type": record.TxType})
}
}
}
}

func registerMetricsOnce(metricsCollector metrics.MetricCollector) {
metricsOnce.Do(func() { registerMetrics(metricsCollector) })
}
Expand Down
2 changes: 0 additions & 2 deletions test/integration/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
"log_level": "info",
"json_log_enable": true,
"namespace": "tesla_telemetry",
"reliable_ack": true,
"reliable_ack_workers": 15,
"kafka": {
"bootstrap.servers": "kafka:9092",
"queue.buffering.max.messages": 1000000,
Expand Down

0 comments on commit 03528fd

Please sign in to comment.