diff --git a/cmd/main.go b/cmd/main.go index dd38d70..dfcc8e0 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -12,6 +12,7 @@ import ( "github.com/teslamotors/fleet-telemetry/server/airbrake" "github.com/teslamotors/fleet-telemetry/server/monitoring" "github.com/teslamotors/fleet-telemetry/server/streaming" + "github.com/teslamotors/fleet-telemetry/telemetry" ) func main() { @@ -63,7 +64,7 @@ func startServer(config *config.Config, airbrakeNotifier *gobrake.Notifier, logg monitoring.StartServerMetrics(config, logger, registry) } - producerRules, err := config.ConfigureProducers(airbrakeHandler, logger) + dispatchers, producerRules, err := config.ConfigureProducers(airbrakeHandler, logger) if err != nil { return err } @@ -75,5 +76,15 @@ func startServer(config *config.Config, airbrakeNotifier *gobrake.Notifier, logg if server.TLSConfig, err = config.ExtractServiceTLSConfig(logger); err != nil { return err } - return server.ListenAndServeTLS(config.TLS.ServerCert, config.TLS.ServerKey) + if err := server.ListenAndServeTLS(config.TLS.ServerCert, config.TLS.ServerKey); err != nil { + return err + } + for dispatcher, producer := range dispatchers { + // We don't care if this fails. If it does, we'll just continue on. + if err := producer.Close(); err != nil { + logger.ErrorLog("producer_close_error", err, logrus.LogInfo{"dispatcher": dispatcher}) + } + } + logger.ActivityLog("stopped_server", nil) + return nil } diff --git a/config/config.go b/config/config.go index d8b8da7..d23865a 100644 --- a/config/config.go +++ b/config/config.go @@ -249,10 +249,10 @@ func (c *Config) prometheusEnabled() bool { } // ConfigureProducers validates and establishes connections to the producers (kafka/pubsub/logger) -func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *logrus.Logger) (map[string][]telemetry.Producer, error) { +func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *logrus.Logger) (map[telemetry.Dispatcher]telemetry.Producer, map[string][]telemetry.Producer, error) { reliableAckSources, err := c.configureReliableAckSources() if err != nil { - return nil, err + return nil, nil, err } producers := make(map[telemetry.Dispatcher]telemetry.Producer) @@ -267,30 +267,30 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l if _, ok := requiredDispatchers[telemetry.Kafka]; ok { if c.Kafka == nil { - return nil, errors.New("expected Kafka to be configured") + return nil, nil, errors.New("expected Kafka to be configured") } convertKafkaConfig(c.Kafka) kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Kafka], logger) if err != nil { - return nil, err + return nil, nil, err } producers[telemetry.Kafka] = kafkaProducer } if _, ok := requiredDispatchers[telemetry.Pubsub]; ok { if c.Pubsub == nil { - return nil, errors.New("expected Pubsub to be configured") + return nil, nil, errors.New("expected Pubsub to be configured") } googleProducer, err := googlepubsub.NewProducer(c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Pubsub], logger) if err != nil { - return nil, err + return nil, nil, err } producers[telemetry.Pubsub] = googleProducer } if recordNames, ok := requiredDispatchers[telemetry.Kinesis]; ok { if c.Kinesis == nil { - return nil, errors.New("expected Kinesis to be configured") + return nil, nil, errors.New("expected Kinesis to be configured") } maxRetries := 1 if c.Kinesis.MaxRetries != nil { @@ -299,18 +299,18 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l streamMapping := c.CreateKinesisStreamMapping(recordNames) kinesis, err := kinesis.NewProducer(maxRetries, streamMapping, c.Kinesis.OverrideHost, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Kinesis], logger) if err != nil { - return nil, err + return nil, nil, err } producers[telemetry.Kinesis] = kinesis } if _, ok := requiredDispatchers[telemetry.ZMQ]; ok { if c.ZMQ == nil { - return nil, errors.New("expected ZMQ to be configured") + return nil, nil, errors.New("expected ZMQ to be configured") } zmqProducer, err := zmq.NewProducer(context.Background(), c.ZMQ, c.MetricCollector, c.Namespace, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.ZMQ], logger) if err != nil { - return nil, err + return nil, nil, err } producers[telemetry.ZMQ] = zmqProducer } @@ -324,11 +324,11 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l dispatchProducerRules[recordName] = dispatchFuncs if len(dispatchProducerRules[recordName]) == 0 { - return nil, fmt.Errorf("unknown_dispatch_rule record: %v, dispatchRule:%v", recordName, dispatchRules) + return nil, nil, fmt.Errorf("unknown_dispatch_rule record: %v, dispatchRule:%v", recordName, dispatchRules) } } - return dispatchProducerRules, nil + return producers, dispatchProducerRules, nil } func (c *Config) configureReliableAckSources() (map[telemetry.Dispatcher]map[string]interface{}, error) { diff --git a/config/config_test.go b/config/config_test.go index 80b8920..5fa19dc 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -133,7 +133,7 @@ var _ = Describe("Test full application config", func() { config, err := loadTestApplicationConfig(TestSmallConfig) Expect(err).NotTo(HaveOccurred()) - producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) + _, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) Expect(err).NotTo(HaveOccurred()) Expect(producers["V"]).To(HaveLen(1)) @@ -174,7 +174,7 @@ var _ = Describe("Test full application config", func() { config, err := loadTestApplicationConfig(configInput) Expect(err).NotTo(HaveOccurred()) - producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) + _, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) Expect(err).To(MatchError(errMessage)) Expect(producers).To(BeNil()) }, @@ -192,7 +192,7 @@ var _ = Describe("Test full application config", func() { config.Records = map[string][]telemetry.Dispatcher{"V": {"kinesis"}} var err error - producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) + _, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) Expect(err).To(MatchError("expected Kinesis to be configured")) Expect(producers).To(BeNil()) }) @@ -226,7 +226,7 @@ var _ = Describe("Test full application config", func() { log, _ := logrus.NoOpLogger() _ = os.Setenv("PUBSUB_EMULATOR_HOST", "some_url") _ = os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", "some_service_account_path") - _, err := pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) + _, _, err := pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) Expect(err).To(MatchError("pubsub_connect_error pubsub cannot initialize with both emulator and GCP resource")) }) @@ -234,7 +234,7 @@ var _ = Describe("Test full application config", func() { log, _ := logrus.NoOpLogger() _ = os.Setenv("PUBSUB_EMULATOR_HOST", "some_url") var err error - producers, err = pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) + _, producers, err = pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) Expect(err).NotTo(HaveOccurred()) Expect(producers["V"]).NotTo(BeNil()) }) @@ -253,10 +253,10 @@ var _ = Describe("Test full application config", func() { log, _ := logrus.NoOpLogger() config.Records = map[string][]telemetry.Dispatcher{"V": {"zmq"}} var err error - producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) + _, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) Expect(err).To(MatchError("expected ZMQ to be configured")) Expect(producers).To(BeNil()) - producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) + _, producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) Expect(err).To(BeNil()) }) @@ -265,7 +265,7 @@ var _ = Describe("Test full application config", func() { zmqConfig.ZMQ.Addr = "tcp://127.0.0.1:5285" log, _ := logrus.NoOpLogger() var err error - producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) + _, producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log) Expect(err).NotTo(HaveOccurred()) Expect(producers["V"]).NotTo(BeNil()) }) diff --git a/datastore/googlepubsub/publisher.go b/datastore/googlepubsub/publisher.go index 3109a5c..2ff45cfd 100644 --- a/datastore/googlepubsub/publisher.go +++ b/datastore/googlepubsub/publisher.go @@ -115,6 +115,11 @@ func (p *Producer) Produce(entry *telemetry.Record) { } +// Close the producer +func (p *Producer) Close() error { + return p.pubsubClient.Close() +} + // ProcessReliableAck sends to ackChan if reliable ack is configured func (p *Producer) ProcessReliableAck(entry *telemetry.Record) { _, ok := p.reliableAckTxTypes[entry.TxType] diff --git a/datastore/kafka/kafka.go b/datastore/kafka/kafka.go index 6467129..50d66fc 100644 --- a/datastore/kafka/kafka.go +++ b/datastore/kafka/kafka.go @@ -134,6 +134,12 @@ func (p *Producer) handleProducerEvents() { } } +// Close the producer +func (p *Producer) Close() error { + p.kafkaProducer.Close() + return nil +} + // ProcessReliableAck sends to ackChan if reliable ack is configured func (p *Producer) ProcessReliableAck(entry *telemetry.Record) { _, ok := p.reliableAckTxTypes[entry.TxType] diff --git a/datastore/kinesis/kinesis.go b/datastore/kinesis/kinesis.go index ec3a631..80c5ade 100644 --- a/datastore/kinesis/kinesis.go +++ b/datastore/kinesis/kinesis.go @@ -103,6 +103,11 @@ func (p *Producer) Produce(entry *telemetry.Record) { metricsRegistry.byteTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType}) } +// Close the producer +func (p *Producer) Close() error { + return nil +} + // ProcessReliableAck sends to ackChan if reliable ack is configured func (p *Producer) ProcessReliableAck(entry *telemetry.Record) { _, ok := p.reliableAckTxTypes[entry.TxType] diff --git a/datastore/simple/logger.go b/datastore/simple/logger.go index 11faeed..c808830 100644 --- a/datastore/simple/logger.go +++ b/datastore/simple/logger.go @@ -15,23 +15,28 @@ type Config struct { Verbose bool `json:"verbose"` } -// ProtoLogger is a simple protobuf logger -type ProtoLogger struct { +// Producer is a simple protobuf logger +type Producer struct { Config *Config logger *logrus.Logger } // NewProtoLogger initializes the parameters for protobuf payload logging func NewProtoLogger(config *Config, logger *logrus.Logger) telemetry.Producer { - return &ProtoLogger{Config: config, logger: logger} + return &Producer{Config: config, logger: logger} +} + +// Close the producer +func (p *Producer) Close() error { + return nil } // ProcessReliableAck noop method -func (p *ProtoLogger) ProcessReliableAck(_ *telemetry.Record) { +func (p *Producer) ProcessReliableAck(_ *telemetry.Record) { } // Produce sends the data to the logger -func (p *ProtoLogger) Produce(entry *telemetry.Record) { +func (p *Producer) Produce(entry *telemetry.Record) { data, err := p.recordToLogMap(entry) if err != nil { p.logger.ErrorLog("record_logging_error", err, logrus.LogInfo{"vin": entry.Vin, "txtype": entry.TxType, "metadata": entry.Metadata()}) @@ -41,11 +46,11 @@ func (p *ProtoLogger) Produce(entry *telemetry.Record) { } // ReportError noop method -func (p *ProtoLogger) ReportError(_ string, _ error, _ logrus.LogInfo) { +func (p *Producer) ReportError(_ string, _ error, _ logrus.LogInfo) { } // recordToLogMap converts the data of a record to a map or slice of maps -func (p *ProtoLogger) recordToLogMap(record *telemetry.Record) (interface{}, error) { +func (p *Producer) recordToLogMap(record *telemetry.Record) (interface{}, error) { switch payload := record.GetProtoMessage().(type) { case *protos.Payload: return transformers.PayloadToMap(payload, p.Config.Verbose, p.logger), nil diff --git a/datastore/simple/logger_test.go b/datastore/simple/logger_test.go index 2c93346..47f2e4a 100644 --- a/datastore/simple/logger_test.go +++ b/datastore/simple/logger_test.go @@ -19,7 +19,7 @@ import ( var _ = Describe("ProtoLogger", func() { var ( - protoLogger *simple.ProtoLogger + protoLogger *simple.Producer testLogger *logrus.Logger hook *test.Hook config *simple.Config @@ -28,7 +28,7 @@ var _ = Describe("ProtoLogger", func() { BeforeEach(func() { testLogger, hook = logrus.NoOpLogger() config = &simple.Config{Verbose: false} - protoLogger = simple.NewProtoLogger(config, testLogger).(*simple.ProtoLogger) + protoLogger = simple.NewProtoLogger(config, testLogger).(*simple.Producer) }) Describe("NewProtoLogger", func() { @@ -116,7 +116,7 @@ var _ = Describe("ProtoLogger", func() { Context("when verbose set to true", func() { BeforeEach(func() { config.Verbose = true - protoLogger = simple.NewProtoLogger(config, testLogger).(*simple.ProtoLogger) + protoLogger = simple.NewProtoLogger(config, testLogger).(*simple.Producer) }) It("does not include types in the data", func() { diff --git a/telemetry/producer.go b/telemetry/producer.go index fbc7656..b40d9b5 100644 --- a/telemetry/producer.go +++ b/telemetry/producer.go @@ -29,6 +29,7 @@ func BuildTopicName(namespace, recordName string) string { // Producer handles dispatching data received from the vehicle type Producer interface { + Close() error Produce(entry *Record) ProcessReliableAck(entry *Record) ReportError(message string, err error, logInfo logrus.LogInfo) diff --git a/telemetry/serializer_test.go b/telemetry/serializer_test.go index 8cc9e69..07cf3c1 100644 --- a/telemetry/serializer_test.go +++ b/telemetry/serializer_test.go @@ -18,6 +18,10 @@ type CallbackTester struct { reliableAck int } +func (c *CallbackTester) Close() error { + return nil +} + func (c *CallbackTester) Produce(_ *telemetry.Record) { c.counter++ }