Skip to content

Commit

Permalink
Merge pull request #5752 from TheThingsNetwork/feature/normalized-pay…
Browse files Browse the repository at this point in the history
…load-warnings

Treat normalized payload validation errors as warnings
  • Loading branch information
johanstokking committed Sep 1, 2022
2 parents a2ac877 + 4fe002c commit 4f42b75
Show file tree
Hide file tree
Showing 6 changed files with 385 additions and 72 deletions.
39 changes: 34 additions & 5 deletions pkg/messageprocessors/javascript/javascript.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,24 @@ func (h *host) DecodeUplink(
return h.decodeUplink(ctx, msg, run)
}

func appendValidationErrors(dst []string, measurements []normalizedpayload.ParsedMeasurement) []string {
for i, m := range measurements {
for _, err := range m.ValidationErrors {
var (
errString string
ttnErr *errors.Error
)
if errors.As(err, &ttnErr) {
errString = ttnErr.FormatMessage(ttnErr.PublicAttributes())
} else {
errString = err.Error()
}
dst = append(dst, fmt.Sprintf("measurement %d: %s", i+1, errString))
}
}
return dst
}

func (*host) decodeUplink(
ctx context.Context,
msg *ttnpb.ApplicationUplink,
Expand Down Expand Up @@ -292,13 +310,14 @@ func (*host) decodeUplink(
return errOutput.WithCause(err)
}
msg.DecodedPayload, msg.DecodedPayloadWarnings = decodedPayload, output.Decoded.Warnings
msg.NormalizedPayload, msg.NormalizedPayloadWarnings = nil, nil

if normalized := output.Normalized; normalized != nil {
if errs := normalized.Errors; len(errs) > 0 {
return errOutputErrors.WithAttributes("errors", strings.Join(errs, ", "))
}
if normalized.Data == nil {
return errOutput.New()
return nil
}
// The returned data can be an array of measurements or a single measurement object.
var measurements []map[string]interface{}
Expand Down Expand Up @@ -327,21 +346,31 @@ func (*host) decodeUplink(
normalizedPayload[i] = pb
}
// Validate the normalized payload.
_, err := normalizedpayload.Parse(normalizedPayload)
normalizedMeasurements, err := normalizedpayload.Parse(normalizedPayload)
if err != nil {
return errOutput.WithCause(err)
}
msg.NormalizedPayload, msg.NormalizedPayloadWarnings = normalizedPayload, normalized.Warnings
msg.NormalizedPayload = make([]*pbtypes.Struct, len(normalizedMeasurements))
for i, measurement := range normalizedMeasurements {
msg.NormalizedPayload[i] = measurement.Valid
}
msg.NormalizedPayloadWarnings = make([]string, 0, len(normalized.Warnings))
msg.NormalizedPayloadWarnings = append(msg.NormalizedPayloadWarnings, normalized.Warnings...)
msg.NormalizedPayloadWarnings = appendValidationErrors(msg.NormalizedPayloadWarnings, normalizedMeasurements)
} else {
// If the normalizer is not set, the decoder may return already normalized payload.
// This is a best effort attempt to parse the decoded payload as normalized payload.
// If that does not return an error, the decoded payload is assumed to be normalized.
normalizedPayload := []*pbtypes.Struct{
decodedPayload,
}
_, err := normalizedpayload.Parse(normalizedPayload)
normalizedMeasurements, err := normalizedpayload.Parse(normalizedPayload)
if err == nil {
msg.NormalizedPayload, msg.NormalizedPayloadWarnings = normalizedPayload, nil
msg.NormalizedPayload = make([]*pbtypes.Struct, len(normalizedMeasurements))
for i, measurement := range normalizedMeasurements {
msg.NormalizedPayload[i] = measurement.Valid
}
msg.NormalizedPayloadWarnings = appendValidationErrors(msg.NormalizedPayloadWarnings, normalizedMeasurements)
}
}
return nil
Expand Down
233 changes: 211 additions & 22 deletions pkg/messageprocessors/javascript/javascript_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,24 +367,49 @@ func TestDecodeUplink(t *testing.T) {
err := host.DecodeUplink(ctx, ids, nil, message, script)
a.So(err, should.BeNil)

decodedPayload, err := gogoproto.Map(message.DecodedPayload)
a.So(err, should.BeNil)
a.So(decodedPayload, should.Resemble, map[string]interface{}{
"temperature": -21.3,
a.So(message.DecodedPayload, should.Resemble, &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"temperature": {
Kind: &pbtypes.Value_NumberValue{
NumberValue: -21.3,
},
},
},
})
a.So(message.DecodedPayloadWarnings, should.Resemble, []string{"it's cold"})

a.So(message.NormalizedPayload, should.HaveLength, 1)
a.So(message.NormalizedPayload, should.Resemble, []*pbtypes.Struct{
{
Fields: map[string]*pbtypes.Value{
"air": {
Kind: &pbtypes.Value_StructValue{
StructValue: &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"temperature": {
Kind: &pbtypes.Value_NumberValue{
NumberValue: -21.3,
},
},
},
},
},
},
},
},
})
a.So(message.NormalizedPayloadWarnings, should.BeEmpty)

measurements, err := normalizedpayload.Parse(message.NormalizedPayload)
a.So(err, should.BeNil)
a.So(measurements[0], should.Resemble, normalizedpayload.Measurement{
Air: &normalizedpayload.Air{
a.So(measurements[0].Measurement, should.Resemble, normalizedpayload.Measurement{
Air: normalizedpayload.Air{
Temperature: float64Ptr(-21.3),
},
})
}

// Decode a single measurement that is already normalized.
// In the normalized payload, empty objects are omitted.
{
//nolint:lll
script := `
Expand All @@ -393,20 +418,65 @@ func TestDecodeUplink(t *testing.T) {
data: {
air: {
temperature: (((input.bytes[0] & 0x80 ? input.bytes[0] - 0x100 : input.bytes[0]) << 8) | input.bytes[1]) / 100
}
},
wind: {}
}
}
}
`
err := host.DecodeUplink(ctx, ids, nil, message, script)
a.So(err, should.BeNil)
a.So(message.NormalizedPayload, should.HaveLength, 1)
a.So(message.DecodedPayload, should.Resemble, message.NormalizedPayload[0])

a.So(message.DecodedPayload, should.Resemble, &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"air": {
Kind: &pbtypes.Value_StructValue{
StructValue: &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"temperature": {
Kind: &pbtypes.Value_NumberValue{
NumberValue: -21.3,
},
},
},
},
},
},
"wind": {
Kind: &pbtypes.Value_StructValue{
StructValue: &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{},
},
},
},
},
})
a.So(message.DecodedPayloadWarnings, should.BeEmpty)
a.So(message.NormalizedPayload, should.Resemble, []*pbtypes.Struct{
{
Fields: map[string]*pbtypes.Value{
"air": {
Kind: &pbtypes.Value_StructValue{
StructValue: &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"temperature": {
Kind: &pbtypes.Value_NumberValue{
NumberValue: -21.3,
},
},
},
},
},
},
},
},
})
a.So(message.NormalizedPayloadWarnings, should.BeEmpty)

measurements, err := normalizedpayload.Parse(message.NormalizedPayload)
a.So(err, should.BeNil)
a.So(measurements[0], should.Resemble, normalizedpayload.Measurement{
Air: &normalizedpayload.Air{
a.So(measurements[0].Measurement, should.Resemble, normalizedpayload.Measurement{
Air: normalizedpayload.Air{
Temperature: float64Ptr(-21.3),
},
})
Expand Down Expand Up @@ -443,23 +513,81 @@ func TestDecodeUplink(t *testing.T) {
err := host.DecodeUplink(ctx, ids, nil, message, script)
a.So(err, should.BeNil)

decodedPayload, err := gogoproto.Map(message.DecodedPayload)
a.So(err, should.BeNil)
a.So(decodedPayload, should.Resemble, map[string]interface{}{
"temperatures": []interface{}{-21.3, -20.88},
a.So(message.DecodedPayload, should.Resemble, &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"temperatures": {
Kind: &pbtypes.Value_ListValue{
ListValue: &pbtypes.ListValue{
Values: []*pbtypes.Value{
{
Kind: &pbtypes.Value_NumberValue{
NumberValue: -21.3,
},
},
{
Kind: &pbtypes.Value_NumberValue{
NumberValue: -20.88,
},
},
},
},
},
},
},
})
a.So(message.DecodedPayloadWarnings, should.BeEmpty)
a.So(message.NormalizedPayload, should.Resemble, []*pbtypes.Struct{
{
Fields: map[string]*pbtypes.Value{
"air": {
Kind: &pbtypes.Value_StructValue{
StructValue: &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"temperature": {
Kind: &pbtypes.Value_NumberValue{
NumberValue: -21.3,
},
},
},
},
},
},
},
},
{
Fields: map[string]*pbtypes.Value{
"air": {
Kind: &pbtypes.Value_StructValue{
StructValue: &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"temperature": {
Kind: &pbtypes.Value_NumberValue{
NumberValue: -20.88,
},
},
},
},
},
},
},
},
})
a.So(message.NormalizedPayloadWarnings, should.BeEmpty)

a.So(message.NormalizedPayload, should.HaveLength, 2)
measurements, err := normalizedpayload.Parse(message.NormalizedPayload)
parsedMeasurements, err := normalizedpayload.Parse(message.NormalizedPayload)
a.So(err, should.BeNil)
measurements := make([]normalizedpayload.Measurement, len(parsedMeasurements))
for i, m := range parsedMeasurements {
measurements[i] = m.Measurement
}
a.So(measurements, should.Resemble, []normalizedpayload.Measurement{
{
Air: &normalizedpayload.Air{
Air: normalizedpayload.Air{
Temperature: float64Ptr(-21.3),
},
},
{
Air: &normalizedpayload.Air{
Air: normalizedpayload.Air{
Temperature: float64Ptr(-20.88),
},
},
Expand Down Expand Up @@ -512,6 +640,49 @@ func TestDecodeUplink(t *testing.T) {
a.So(errors.IsAborted(err), should.BeTrue)
}

// Return no normalized payload (data is nil).
{
script := `
function decodeUplink(input) {
return {
data: {
state: input.bytes[0]
}
}
}
function normalizeUplink(input) {
return {
data: null
}
}
`
err := host.DecodeUplink(ctx, ids, nil, message, script)
a.So(err, should.BeNil)
a.So(message.NormalizedPayload, should.BeNil)
a.So(message.NormalizedPayloadWarnings, should.BeEmpty)
}

// Return no normalized payload (no return value).
{
script := `
function decodeUplink(input) {
return {
data: {
state: input.bytes[0]
}
}
}
function normalizeUplink(input) {
}
`
err := host.DecodeUplink(ctx, ids, nil, message, script)
a.So(err, should.BeNil)
a.So(message.NormalizedPayload, should.BeNil)
a.So(message.NormalizedPayloadWarnings, should.BeEmpty)
}

// Decode and normalize a single measurement with out-of-range value.
{
message := &ttnpb.ApplicationUplink{
Expand Down Expand Up @@ -539,8 +710,26 @@ func TestDecodeUplink(t *testing.T) {
}
`
err := host.DecodeUplink(ctx, ids, nil, message, script)
a.So(err, should.NotBeNil)
a.So(errors.IsInvalidArgument(err), should.BeTrue)
a.So(err, should.BeNil)

a.So(message.NormalizedPayload, should.Resemble, []*pbtypes.Struct{
{
Fields: map[string]*pbtypes.Value{},
},
})
a.So(message.NormalizedPayloadWarnings, should.Resemble, []string{
"measurement 1: `air.temperature` should be equal or greater than `-273.15`",
})

parsedMeasurements, err := normalizedpayload.Parse(message.NormalizedPayload)
a.So(err, should.BeNil)
measurements := make([]normalizedpayload.Measurement, len(parsedMeasurements))
for i, m := range parsedMeasurements {
measurements[i] = m.Measurement
}
a.So(measurements, should.Resemble, []normalizedpayload.Measurement{
{},
})
}

// The Things Node example.
Expand Down
Loading

0 comments on commit 4f42b75

Please sign in to comment.