Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Treat normalized payload validation errors as warnings #5752

Merged
merged 4 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions pkg/messageprocessors/javascript/javascript.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,24 @@ func (h *host) DecodeUplink(
return h.decodeUplink(ctx, msg, run)
}

func appendValidationErrors(dst []string, measurements []normalizedpayload.ParsedMeasurement) []string {
for i, m := range measurements {
for _, err := range m.ValidationErrors {
var (
errString string
ttnErr *errors.Error
)
if errors.As(err, &ttnErr) {
errString = ttnErr.FormatMessage(ttnErr.PublicAttributes())
} else {
errString = err.Error()
}
dst = append(dst, fmt.Sprintf("measurement %d: %s", i+1, errString))
}
}
return dst
}

func (*host) decodeUplink(
ctx context.Context,
msg *ttnpb.ApplicationUplink,
Expand Down Expand Up @@ -292,13 +310,14 @@ func (*host) decodeUplink(
return errOutput.WithCause(err)
}
msg.DecodedPayload, msg.DecodedPayloadWarnings = decodedPayload, output.Decoded.Warnings
msg.NormalizedPayload, msg.NormalizedPayloadWarnings = nil, nil

if normalized := output.Normalized; normalized != nil {
if errs := normalized.Errors; len(errs) > 0 {
return errOutputErrors.WithAttributes("errors", strings.Join(errs, ", "))
}
if normalized.Data == nil {
return errOutput.New()
return nil
}
// The returned data can be an array of measurements or a single measurement object.
var measurements []map[string]interface{}
Expand Down Expand Up @@ -327,21 +346,31 @@ func (*host) decodeUplink(
normalizedPayload[i] = pb
}
// Validate the normalized payload.
_, err := normalizedpayload.Parse(normalizedPayload)
normalizedMeasurements, err := normalizedpayload.Parse(normalizedPayload)
if err != nil {
return errOutput.WithCause(err)
}
msg.NormalizedPayload, msg.NormalizedPayloadWarnings = normalizedPayload, normalized.Warnings
msg.NormalizedPayload = make([]*pbtypes.Struct, len(normalizedMeasurements))
for i, measurement := range normalizedMeasurements {
msg.NormalizedPayload[i] = measurement.Valid
}
msg.NormalizedPayloadWarnings = make([]string, 0, len(normalized.Warnings))
msg.NormalizedPayloadWarnings = append(msg.NormalizedPayloadWarnings, normalized.Warnings...)
msg.NormalizedPayloadWarnings = appendValidationErrors(msg.NormalizedPayloadWarnings, normalizedMeasurements)
} else {
// If the normalizer is not set, the decoder may return already normalized payload.
// This is a best effort attempt to parse the decoded payload as normalized payload.
// If that does not return an error, the decoded payload is assumed to be normalized.
normalizedPayload := []*pbtypes.Struct{
decodedPayload,
}
_, err := normalizedpayload.Parse(normalizedPayload)
normalizedMeasurements, err := normalizedpayload.Parse(normalizedPayload)
if err == nil {
msg.NormalizedPayload, msg.NormalizedPayloadWarnings = normalizedPayload, nil
msg.NormalizedPayload = make([]*pbtypes.Struct, len(normalizedMeasurements))
for i, measurement := range normalizedMeasurements {
msg.NormalizedPayload[i] = measurement.Valid
}
msg.NormalizedPayloadWarnings = appendValidationErrors(msg.NormalizedPayloadWarnings, normalizedMeasurements)
}
}
return nil
Expand Down
233 changes: 211 additions & 22 deletions pkg/messageprocessors/javascript/javascript_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,24 +367,49 @@ func TestDecodeUplink(t *testing.T) {
err := host.DecodeUplink(ctx, ids, nil, message, script)
a.So(err, should.BeNil)

decodedPayload, err := gogoproto.Map(message.DecodedPayload)
a.So(err, should.BeNil)
a.So(decodedPayload, should.Resemble, map[string]interface{}{
"temperature": -21.3,
a.So(message.DecodedPayload, should.Resemble, &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"temperature": {
Kind: &pbtypes.Value_NumberValue{
NumberValue: -21.3,
},
},
},
})
a.So(message.DecodedPayloadWarnings, should.Resemble, []string{"it's cold"})

a.So(message.NormalizedPayload, should.HaveLength, 1)
a.So(message.NormalizedPayload, should.Resemble, []*pbtypes.Struct{
{
Fields: map[string]*pbtypes.Value{
"air": {
Kind: &pbtypes.Value_StructValue{
StructValue: &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"temperature": {
Kind: &pbtypes.Value_NumberValue{
NumberValue: -21.3,
},
},
},
},
},
},
},
},
})
a.So(message.NormalizedPayloadWarnings, should.BeEmpty)

measurements, err := normalizedpayload.Parse(message.NormalizedPayload)
a.So(err, should.BeNil)
a.So(measurements[0], should.Resemble, normalizedpayload.Measurement{
Air: &normalizedpayload.Air{
a.So(measurements[0].Measurement, should.Resemble, normalizedpayload.Measurement{
Air: normalizedpayload.Air{
Temperature: float64Ptr(-21.3),
},
})
}

// Decode a single measurement that is already normalized.
// In the normalized payload, empty objects are omitted.
{
//nolint:lll
script := `
Expand All @@ -393,20 +418,65 @@ func TestDecodeUplink(t *testing.T) {
data: {
air: {
temperature: (((input.bytes[0] & 0x80 ? input.bytes[0] - 0x100 : input.bytes[0]) << 8) | input.bytes[1]) / 100
}
},
wind: {}
}
}
}
`
err := host.DecodeUplink(ctx, ids, nil, message, script)
a.So(err, should.BeNil)
a.So(message.NormalizedPayload, should.HaveLength, 1)
a.So(message.DecodedPayload, should.Resemble, message.NormalizedPayload[0])

a.So(message.DecodedPayload, should.Resemble, &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"air": {
Kind: &pbtypes.Value_StructValue{
StructValue: &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"temperature": {
Kind: &pbtypes.Value_NumberValue{
NumberValue: -21.3,
},
},
},
},
},
},
"wind": {
Kind: &pbtypes.Value_StructValue{
StructValue: &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{},
},
},
},
},
})
a.So(message.DecodedPayloadWarnings, should.BeEmpty)
a.So(message.NormalizedPayload, should.Resemble, []*pbtypes.Struct{
{
Fields: map[string]*pbtypes.Value{
"air": {
Kind: &pbtypes.Value_StructValue{
StructValue: &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"temperature": {
Kind: &pbtypes.Value_NumberValue{
NumberValue: -21.3,
},
},
},
},
},
},
},
},
})
a.So(message.NormalizedPayloadWarnings, should.BeEmpty)

measurements, err := normalizedpayload.Parse(message.NormalizedPayload)
a.So(err, should.BeNil)
a.So(measurements[0], should.Resemble, normalizedpayload.Measurement{
Air: &normalizedpayload.Air{
a.So(measurements[0].Measurement, should.Resemble, normalizedpayload.Measurement{
Air: normalizedpayload.Air{
Temperature: float64Ptr(-21.3),
},
})
Expand Down Expand Up @@ -443,23 +513,81 @@ func TestDecodeUplink(t *testing.T) {
err := host.DecodeUplink(ctx, ids, nil, message, script)
a.So(err, should.BeNil)

decodedPayload, err := gogoproto.Map(message.DecodedPayload)
a.So(err, should.BeNil)
a.So(decodedPayload, should.Resemble, map[string]interface{}{
"temperatures": []interface{}{-21.3, -20.88},
a.So(message.DecodedPayload, should.Resemble, &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"temperatures": {
Kind: &pbtypes.Value_ListValue{
ListValue: &pbtypes.ListValue{
Values: []*pbtypes.Value{
{
Kind: &pbtypes.Value_NumberValue{
NumberValue: -21.3,
},
},
{
Kind: &pbtypes.Value_NumberValue{
NumberValue: -20.88,
},
},
},
},
},
},
},
})
a.So(message.DecodedPayloadWarnings, should.BeEmpty)
a.So(message.NormalizedPayload, should.Resemble, []*pbtypes.Struct{
{
Fields: map[string]*pbtypes.Value{
"air": {
Kind: &pbtypes.Value_StructValue{
StructValue: &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"temperature": {
Kind: &pbtypes.Value_NumberValue{
NumberValue: -21.3,
},
},
},
},
},
},
},
},
{
Fields: map[string]*pbtypes.Value{
"air": {
Kind: &pbtypes.Value_StructValue{
StructValue: &pbtypes.Struct{
Fields: map[string]*pbtypes.Value{
"temperature": {
Kind: &pbtypes.Value_NumberValue{
NumberValue: -20.88,
},
},
},
},
},
},
},
},
})
a.So(message.NormalizedPayloadWarnings, should.BeEmpty)

a.So(message.NormalizedPayload, should.HaveLength, 2)
measurements, err := normalizedpayload.Parse(message.NormalizedPayload)
parsedMeasurements, err := normalizedpayload.Parse(message.NormalizedPayload)
a.So(err, should.BeNil)
measurements := make([]normalizedpayload.Measurement, len(parsedMeasurements))
for i, m := range parsedMeasurements {
measurements[i] = m.Measurement
}
a.So(measurements, should.Resemble, []normalizedpayload.Measurement{
{
Air: &normalizedpayload.Air{
Air: normalizedpayload.Air{
Temperature: float64Ptr(-21.3),
},
},
{
Air: &normalizedpayload.Air{
Air: normalizedpayload.Air{
Temperature: float64Ptr(-20.88),
},
},
Expand Down Expand Up @@ -512,6 +640,49 @@ func TestDecodeUplink(t *testing.T) {
a.So(errors.IsAborted(err), should.BeTrue)
}

// Return no normalized payload (data is nil).
{
script := `
function decodeUplink(input) {
return {
data: {
state: input.bytes[0]
}
}
}

function normalizeUplink(input) {
return {
data: null
}
}
`
err := host.DecodeUplink(ctx, ids, nil, message, script)
a.So(err, should.BeNil)
a.So(message.NormalizedPayload, should.BeNil)
a.So(message.NormalizedPayloadWarnings, should.BeEmpty)
}

// Return no normalized payload (no return value).
{
script := `
function decodeUplink(input) {
return {
data: {
state: input.bytes[0]
}
}
}

function normalizeUplink(input) {
}
`
err := host.DecodeUplink(ctx, ids, nil, message, script)
a.So(err, should.BeNil)
a.So(message.NormalizedPayload, should.BeNil)
a.So(message.NormalizedPayloadWarnings, should.BeEmpty)
}

// Decode and normalize a single measurement with out-of-range value.
{
message := &ttnpb.ApplicationUplink{
Expand Down Expand Up @@ -539,8 +710,26 @@ func TestDecodeUplink(t *testing.T) {
}
`
err := host.DecodeUplink(ctx, ids, nil, message, script)
a.So(err, should.NotBeNil)
a.So(errors.IsInvalidArgument(err), should.BeTrue)
a.So(err, should.BeNil)

a.So(message.NormalizedPayload, should.Resemble, []*pbtypes.Struct{
{
Fields: map[string]*pbtypes.Value{},
},
})
a.So(message.NormalizedPayloadWarnings, should.Resemble, []string{
"measurement 1: `air.temperature` should be equal or greater than `-273.15`",
})

parsedMeasurements, err := normalizedpayload.Parse(message.NormalizedPayload)
a.So(err, should.BeNil)
measurements := make([]normalizedpayload.Measurement, len(parsedMeasurements))
for i, m := range parsedMeasurements {
measurements[i] = m.Measurement
}
a.So(measurements, should.Resemble, []normalizedpayload.Measurement{
{},
})
}

// The Things Node example.
Expand Down
Loading