Skip to content

Commit

Permalink
Handle logger when transmit_decoded_records is set to true
Browse files Browse the repository at this point in the history
  • Loading branch information
agbpatro committed Oct 30, 2024
1 parent b3d1541 commit 1946228
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 93 deletions.
9 changes: 2 additions & 7 deletions datastore/simple/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (p *ProtoLogger) ProcessReliableAck(entry *telemetry.Record) {
func (p *ProtoLogger) Produce(entry *telemetry.Record) {
data, err := p.recordToLogMap(entry)
if err != nil {
p.logger.ErrorLog("record_logging_error", err, logrus.LogInfo{"vin": entry.Vin, "metadata": entry.Metadata()})
p.logger.ErrorLog("record_logging_error", err, logrus.LogInfo{"vin": entry.Vin, "txtype": entry.TxType, "metadata": entry.Metadata()})
return
}
p.logger.ActivityLog("record_payload", logrus.LogInfo{"vin": entry.Vin, "metadata": entry.Metadata(), "data": data})
Expand All @@ -45,12 +45,7 @@ func (p *ProtoLogger) ReportError(message string, err error, logInfo logrus.LogI

// recordToLogMap converts the data of a record to a map or slice of maps
func (p *ProtoLogger) recordToLogMap(record *telemetry.Record) (interface{}, error) {
payload, err := record.GetProtoMessage()
if err != nil {
return nil, err
}

switch payload := payload.(type) {
switch payload := record.GetProtoMessage().(type) {
case *protos.Payload:
return transformers.PayloadToMap(payload, p.Config.Verbose, p.logger), nil
case *protos.VehicleAlerts:
Expand Down
73 changes: 42 additions & 31 deletions datastore/simple/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/teslamotors/fleet-telemetry/datastore/simple"
logrus "github.com/teslamotors/fleet-telemetry/logger"
"github.com/teslamotors/fleet-telemetry/messages"
"github.com/teslamotors/fleet-telemetry/protos"
"github.com/teslamotors/fleet-telemetry/telemetry"

Expand Down Expand Up @@ -46,7 +47,8 @@ var _ = Describe("ProtoLogger", func() {

Describe("Produce", func() {
var (
record *telemetry.Record
streamMessageBytes []byte
serializer *telemetry.BinarySerializer
)

BeforeEach(func() {
Expand All @@ -71,40 +73,45 @@ var _ = Describe("ProtoLogger", func() {
payloadBytes, err := proto.Marshal(payload)
Expect(err).NotTo(HaveOccurred())

record = &telemetry.Record{
Vin: "TEST123",
PayloadBytes: payloadBytes,
TxType: "V",
}
logger, _ := logrus.NoOpLogger()
serializer = telemetry.NewBinarySerializer(
&telemetry.RequestIdentity{
DeviceID: "TEST123",
SenderID: "vehicle_device.TEST123",
},
map[string][]telemetry.Producer{},
logger,
)
message := messages.StreamMessage{TXID: []byte("1234"), SenderID: []byte("vehicle_device.TEST123"), MessageTopic: []byte("V"), Payload: payloadBytes}
streamMessageBytes, err = message.ToBytes()
Expect(err).NotTo(HaveOccurred())
})

It("logs data", func() {
protoLogger.Produce(record)

lastLog := hook.LastEntry()
Expect(lastLog.Message).To(Equal("record_payload"))
Expect(lastLog.Data).To(HaveKeyWithValue("vin", "TEST123"))
Expect(lastLog.Data).To(HaveKey("data"))

data, ok := lastLog.Data["data"].(map[string]interface{})
Expect(ok).To(BeTrue())
Expect(data).To(Equal(map[string]interface{}{
"VehicleName": "TestVehicle",
"Gear": "ShiftStateD",
"Vin": "TEST123",
"CreatedAt": "1970-01-01T00:00:00Z",
}))
})
DescribeTable("logs data",
func(useDecoded bool) {
record, err := telemetry.NewRecord(serializer, streamMessageBytes, "1", useDecoded)
Expect(err).NotTo(HaveOccurred())
Expect(record).NotTo(BeNil())

It("logs an error when unmarshaling fails", func() {
record.PayloadBytes = []byte("invalid payload")
protoLogger.Produce(record)
protoLogger.Produce(record)

lastLog := hook.LastEntry()
Expect(lastLog.Message).To(Equal("record_logging_error"))
Expect(lastLog.Data).To(HaveKeyWithValue("vin", "TEST123"))
Expect(lastLog.Data).To(HaveKey("metadata"))
})
lastLog := hook.LastEntry()
Expect(lastLog.Message).To(Equal("record_payload"))
Expect(lastLog.Data).To(HaveKeyWithValue("vin", "TEST123"))
Expect(lastLog.Data).To(HaveKey("data"))

data, ok := lastLog.Data["data"].(map[string]interface{})
Expect(ok).To(BeTrue())
Expect(data).To(Equal(map[string]interface{}{
"VehicleName": "TestVehicle",
"Gear": "ShiftStateD",
"Vin": "TEST123",
"CreatedAt": "1970-01-01T00:00:00Z",
}))
},
Entry("record", true),
Entry("decoded record", false),
)

Context("when verbose set to true", func() {
BeforeEach(func() {
Expand All @@ -113,6 +120,10 @@ var _ = Describe("ProtoLogger", func() {
})

It("does not include types in the data", func() {
record, err := telemetry.NewRecord(serializer, streamMessageBytes, "1", true)
Expect(err).NotTo(HaveOccurred())
Expect(record).NotTo(BeNil())

protoLogger.Produce(record)

data, ok := hook.LastEntry().Data["data"].(map[string]interface{})
Expand Down
37 changes: 9 additions & 28 deletions telemetry/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,6 @@ var (
UseEnumNumbers: false,
EmitUnpopulated: true,
Indent: ""}
protobufMap = map[string]func() proto.Message{
"alerts": func() proto.Message {
return &protos.VehicleAlerts{}
},
"errors": func() proto.Message {
return &protos.VehicleErrors{}
},
"V": func() proto.Message {
return &protos.Payload{}
},
"connectivity": func() proto.Message {
return &protos.VehicleConnectivity{}
},
}
scientificNotationFloatRegex = regexp.MustCompile("^[+-]?(\\d*\\.\\d+|\\d+\\.\\d*)([eE][+-]?\\d+)$")
)

Expand All @@ -60,6 +46,7 @@ type Record struct {
PayloadBytes []byte
RawBytes []byte
transmitDecodedRecords bool
protoMessage proto.Message
}

// NewRecord Sanitizes and instantiates a Record from a message
Expand Down Expand Up @@ -158,6 +145,7 @@ func (record *Record) applyProtoRecordTransforms() error {
message.Vin = record.Vin
transformTimestamp(message)
record.PayloadBytes, err = proto.Marshal(message)
record.protoMessage = message
return err
case "errors":
message := &protos.VehicleErrors{}
Expand All @@ -167,6 +155,7 @@ func (record *Record) applyProtoRecordTransforms() error {
}
message.Vin = record.Vin
record.PayloadBytes, err = proto.Marshal(message)
record.protoMessage = message
return err
case "V":
message := &protos.Payload{}
Expand All @@ -178,6 +167,7 @@ func (record *Record) applyProtoRecordTransforms() error {
transformLocation(message)
transformScientificNotation(message)
record.PayloadBytes, err = proto.Marshal(message)
record.protoMessage = message
return err
case "connectivity":
message := &protos.VehicleConnectivity{}
Expand All @@ -186,6 +176,7 @@ func (record *Record) applyProtoRecordTransforms() error {
return err
}
record.PayloadBytes, err = proto.Marshal(message)
record.protoMessage = message
return err
default:
return nil
Expand All @@ -204,24 +195,14 @@ func (record *Record) applyRecordTransforms() error {
return err
}

// GetProtoMessage converts the record to a proto Message
func (record *Record) GetProtoMessage() (proto.Message, error) {
msgFunc, ok := protobufMap[record.TxType]
if !ok {
return nil, fmt.Errorf("no mapping for txType: %s", record.TxType)
}
message := msgFunc()
err := proto.Unmarshal(record.Payload(), message)
return message, err
// GetProtoMessage gets extracted protobuf message
func (record *Record) GetProtoMessage() proto.Message {
return record.protoMessage
}

// ToJSON serializes the record to a JSON data in bytes
func (record *Record) toJSON() ([]byte, error) {
payload, err := record.GetProtoMessage()
if err != nil {
return nil, err
}
return jsonOptions.Marshal(payload)
return jsonOptions.Marshal(record.protoMessage)
}

// transformLocation does a best-effort attempt to convert the Location field to a proper protos.Location
Expand Down
55 changes: 28 additions & 27 deletions telemetry/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package telemetry_test

import (
"crypto/rand"
"fmt"
"sort"
"time"

Expand Down Expand Up @@ -258,32 +259,50 @@ var _ = Describe("Socket handler test", func() {

Describe("GetProtoMessage", func() {
DescribeTable("valid alert types",
func(txType string, input proto.Message, verifyOutput func(proto.Message) bool) {
func(txType string, vin string, input proto.Message, verifyOutput func(proto.Message) bool) {
payloadBytes, err := proto.Marshal(input)
Expect(err).NotTo(HaveOccurred())
record := &telemetry.Record{
TxType: txType,
PayloadBytes: payloadBytes,
}
output, err := record.GetProtoMessage()

message := messages.StreamMessage{TXID: []byte("1234"), DeviceID: []byte(vin), SenderID: []byte(fmt.Sprintf("vehicle_device.%s", vin)), MessageTopic: []byte(txType), Payload: payloadBytes}
recordMsg, err := message.ToBytes()
Expect(err).NotTo(HaveOccurred())

serializer = telemetry.NewBinarySerializer(
&telemetry.RequestIdentity{
DeviceID: vin,
SenderID: fmt.Sprintf("vehicle_device.%s", vin),
},
map[string][]telemetry.Producer{"D4": nil},
logger,
)

record, err := telemetry.NewRecord(serializer, recordMsg, "1", true)
Expect(err).NotTo(HaveOccurred())
output := record.GetProtoMessage()
Expect(verifyOutput(output)).To(BeTrue())
},
Entry("for txType alerts", "alerts", &protos.VehicleAlerts{Vin: "testAlertVin"}, func(msg proto.Message) bool {
Entry("for txType alerts", "alerts", "testAlertVin", &protos.VehicleAlerts{Vin: "testAlertVin"}, func(msg proto.Message) bool {
myMsg, ok := msg.(*protos.VehicleAlerts)
if !ok {
return false
}
return myMsg.GetVin() == "testAlertVin"
}),
Entry("for txType errors", "errors", &protos.VehicleErrors{Vin: "testErrorVin"}, func(msg proto.Message) bool {
Entry("for txType errors", "errors", "testErrorVin", &protos.VehicleErrors{Vin: "testErrorVin"}, func(msg proto.Message) bool {
myMsg, ok := msg.(*protos.VehicleErrors)
if !ok {
return false
}
return myMsg.GetVin() == "testErrorVin"
}),
Entry("for txType V", "V", &protos.Payload{Vin: "testPayloadVIN"}, func(msg proto.Message) bool {
Entry("for txType connectivity", "connectivity", "testConnectivityVin", &protos.VehicleConnectivity{Vin: "testConnectivityVin"}, func(msg proto.Message) bool {
myMsg, ok := msg.(*protos.VehicleConnectivity)
if !ok {
return false
}
return myMsg.GetVin() == "testConnectivityVin"
}),
Entry("for txType V", "V", "testPayloadVIN", &protos.Payload{Vin: "testPayloadVIN"}, func(msg proto.Message) bool {
myMsg, ok := msg.(*protos.Payload)
if !ok {
return false
Expand All @@ -292,14 +311,6 @@ var _ = Describe("Socket handler test", func() {
}),
)

It("errors on unknown txtype", func() {
record := &telemetry.Record{
TxType: "badTxType",
}
_, err := record.GetProtoMessage()
Expect(err).To(MatchError("no mapping for txType: badTxType"))
})

It("json payload returns valid data when transmitDecodedRecords is false", func() {
message := messages.StreamMessage{TXID: []byte("1234"), SenderID: []byte("vehicle_device.42"), MessageTopic: []byte("V"), Payload: generatePayload("cybertruck", "42", nil)}
recordMsg, err := message.ToBytes()
Expand Down Expand Up @@ -333,16 +344,6 @@ var _ = Describe("Socket handler test", func() {
Expect(err).NotTo(HaveOccurred())
Expect(record.Payload()).To(Equal(data))
})

It("returns error on invalid txType", func() {
message := messages.StreamMessage{TXID: []byte("1234"), SenderID: []byte("vehicle_device.42"), MessageTopic: []byte("INVALID"), Payload: generatePayload("cybertruck", "42", nil)}
recordMsg, err := message.ToBytes()
Expect(err).NotTo(HaveOccurred())

record, err := telemetry.NewRecord(serializer, recordMsg, "1", true)
Expect(err).To(MatchError("no mapping for txType: INVALID"))
Expect(record).NotTo(BeNil())
})
})
})

Expand Down

0 comments on commit 1946228

Please sign in to comment.