From 97046a930e2f45e4666ebd56cf179ff98740a58e Mon Sep 17 00:00:00 2001 From: Johan Stokking Date: Wed, 31 Aug 2022 17:57:05 +0200 Subject: [PATCH 1/4] as: Treat normalized payload validation errors as warnings --- .../javascript/javascript.go | 36 +++++++- .../javascript/javascript_test.go | 29 +++++-- .../normalizedpayload/uplink.go | 82 +++++++++++++------ .../normalizedpayload/uplink_internal_test.go | 22 +++++ .../normalizedpayload/uplink_test.go | 51 ++++++++++-- 5 files changed, 179 insertions(+), 41 deletions(-) create mode 100644 pkg/messageprocessors/normalizedpayload/uplink_internal_test.go diff --git a/pkg/messageprocessors/javascript/javascript.go b/pkg/messageprocessors/javascript/javascript.go index 65f77d2ad7..17e14926e5 100644 --- a/pkg/messageprocessors/javascript/javascript.go +++ b/pkg/messageprocessors/javascript/javascript.go @@ -261,6 +261,24 @@ func (h *host) DecodeUplink( return h.decodeUplink(ctx, msg, run) } +func appendValidationErrors(dst []string, measurements []normalizedpayload.ParsedMeasurement) []string { + for i, m := range measurements { + for _, err := range m.ValidationErrors { + var ( + errString string + ttnErr *errors.Error + ) + if errors.As(err, &ttnErr) { + errString = ttnErr.FormatMessage(ttnErr.PublicAttributes()) + } else { + errString = err.Error() + } + dst = append(dst, fmt.Sprintf("measurement %d: %s", i+1, errString)) + } + } + return dst +} + func (*host) decodeUplink( ctx context.Context, msg *ttnpb.ApplicationUplink, @@ -327,11 +345,17 @@ func (*host) decodeUplink( normalizedPayload[i] = pb } // Validate the normalized payload. - _, err := normalizedpayload.Parse(normalizedPayload) + normalizedMeasurements, err := normalizedpayload.Parse(normalizedPayload) if err != nil { return errOutput.WithCause(err) } - msg.NormalizedPayload, msg.NormalizedPayloadWarnings = normalizedPayload, normalized.Warnings + msg.NormalizedPayload = make([]*pbtypes.Struct, len(normalizedMeasurements)) + for i, measurement := range normalizedMeasurements { + msg.NormalizedPayload[i] = measurement.Valid + } + msg.NormalizedPayloadWarnings = make([]string, 0, len(normalized.Warnings)) + msg.NormalizedPayloadWarnings = append(msg.NormalizedPayloadWarnings, normalized.Warnings...) + msg.NormalizedPayloadWarnings = appendValidationErrors(msg.NormalizedPayloadWarnings, normalizedMeasurements) } else { // If the normalizer is not set, the decoder may return already normalized payload. // This is a best effort attempt to parse the decoded payload as normalized payload. @@ -339,9 +363,13 @@ func (*host) decodeUplink( normalizedPayload := []*pbtypes.Struct{ decodedPayload, } - _, err := normalizedpayload.Parse(normalizedPayload) + normalizedMeasurements, err := normalizedpayload.Parse(normalizedPayload) if err == nil { - msg.NormalizedPayload, msg.NormalizedPayloadWarnings = normalizedPayload, nil + msg.NormalizedPayload = make([]*pbtypes.Struct, len(normalizedMeasurements)) + for i, measurement := range normalizedMeasurements { + msg.NormalizedPayload[i] = measurement.Valid + } + msg.NormalizedPayloadWarnings = appendValidationErrors(msg.NormalizedPayloadWarnings, normalizedMeasurements) } } return nil diff --git a/pkg/messageprocessors/javascript/javascript_test.go b/pkg/messageprocessors/javascript/javascript_test.go index 86a1d43c81..745658f6d4 100644 --- a/pkg/messageprocessors/javascript/javascript_test.go +++ b/pkg/messageprocessors/javascript/javascript_test.go @@ -377,7 +377,7 @@ func TestDecodeUplink(t *testing.T) { a.So(message.NormalizedPayload, should.HaveLength, 1) measurements, err := normalizedpayload.Parse(message.NormalizedPayload) a.So(err, should.BeNil) - a.So(measurements[0], should.Resemble, normalizedpayload.Measurement{ + a.So(measurements[0].Measurement, should.Resemble, normalizedpayload.Measurement{ Air: &normalizedpayload.Air{ Temperature: float64Ptr(-21.3), }, @@ -405,7 +405,7 @@ func TestDecodeUplink(t *testing.T) { measurements, err := normalizedpayload.Parse(message.NormalizedPayload) a.So(err, should.BeNil) - a.So(measurements[0], should.Resemble, normalizedpayload.Measurement{ + a.So(measurements[0].Measurement, should.Resemble, normalizedpayload.Measurement{ Air: &normalizedpayload.Air{ Temperature: float64Ptr(-21.3), }, @@ -450,8 +450,12 @@ func TestDecodeUplink(t *testing.T) { }) a.So(message.NormalizedPayload, should.HaveLength, 2) - measurements, err := normalizedpayload.Parse(message.NormalizedPayload) + parsedMeasurements, err := normalizedpayload.Parse(message.NormalizedPayload) a.So(err, should.BeNil) + measurements := make([]normalizedpayload.Measurement, len(parsedMeasurements)) + for i, m := range parsedMeasurements { + measurements[i] = m.Measurement + } a.So(measurements, should.Resemble, []normalizedpayload.Measurement{ { Air: &normalizedpayload.Air{ @@ -539,8 +543,23 @@ func TestDecodeUplink(t *testing.T) { } ` err := host.DecodeUplink(ctx, ids, nil, message, script) - a.So(err, should.NotBeNil) - a.So(errors.IsInvalidArgument(err), should.BeTrue) + a.So(err, should.BeNil) + + a.So(message.NormalizedPayload, should.HaveLength, 1) + parsedMeasurements, err := normalizedpayload.Parse(message.NormalizedPayload) + a.So(err, should.BeNil) + measurements := make([]normalizedpayload.Measurement, len(parsedMeasurements)) + for i, m := range parsedMeasurements { + measurements[i] = m.Measurement + } + a.So(measurements, should.Resemble, []normalizedpayload.Measurement{ + { + Air: &normalizedpayload.Air{}, + }, + }) + a.So(message.NormalizedPayloadWarnings, should.Resemble, []string{ + "measurement 1: `air.temperature` should be equal or greater than `-273.15`", + }) } // The Things Node example. diff --git a/pkg/messageprocessors/normalizedpayload/uplink.go b/pkg/messageprocessors/normalizedpayload/uplink.go index a8fc4fe6fe..54403f130e 100644 --- a/pkg/messageprocessors/normalizedpayload/uplink.go +++ b/pkg/messageprocessors/normalizedpayload/uplink.go @@ -46,34 +46,34 @@ type Measurement struct { var ( errFieldType = errors.DefineInvalidArgument("field_type", "invalid field type of `{path}`") - errFieldMinimum = errors.DefineInvalidArgument( + errFieldMinimum = errors.DefineDataLoss( "field_minimum", "`{path}` should be equal or greater than `{minimum}`", ) //nolint:unused - errFieldExclusiveMinimum = errors.DefineInvalidArgument( + errFieldExclusiveMinimum = errors.DefineDataLoss( "field_exclusive_minimum", "`{path}` should be greater than `{minimum}`", ) - errFieldMaximum = errors.DefineInvalidArgument( + errFieldMaximum = errors.DefineDataLoss( "field_maximum", "`{path}` should be equal or less than `{maximum}`", ) - errFieldExclusiveMaximum = errors.DefineInvalidArgument( + errFieldExclusiveMaximum = errors.DefineDataLoss( "field_exclusive_maximum", "`{path}` should be less than `{maximum}`", ) errUnknownField = errors.DefineInvalidArgument("unknown_field", "unknown field `{path}`") ) -type fieldParser func(dst *Measurement, src *pbtypes.Value, path string) error +type fieldParser func(dst *Measurement, src *pbtypes.Value, path string) []error // object validates that the path is a structure and sets the target to an empty value. func object[T any](selector func(*Measurement) **T) fieldParser { - return func(dst *Measurement, src *pbtypes.Value, path string) error { + return func(dst *Measurement, src *pbtypes.Value, path string) []error { _, ok := src.Kind.(*pbtypes.Value_StructValue) if !ok { - return errFieldType.WithAttributes("path", path) + return []error{errFieldType.WithAttributes("path", path)} } *selector(dst) = new(T) return nil @@ -82,28 +82,28 @@ func object[T any](selector func(*Measurement) **T) fieldParser { type fieldValidator[T any] func(v T, path string) error -func validate[T any](val T, validators []fieldValidator[T], path string) error { +func validate[T any](val T, validators []fieldValidator[T], path string) (errs []error) { for _, v := range validators { if err := v(val, path); err != nil { - return err + errs = append(errs, err) } } - return nil + return errs } // parseTime parses and validates the time. The input value must be RFC3339. func parseTime(selector func(dst *Measurement) **time.Time, vals ...fieldValidator[time.Time]) fieldParser { - return func(dst *Measurement, src *pbtypes.Value, path string) error { + return func(dst *Measurement, src *pbtypes.Value, path string) []error { val, ok := src.Kind.(*pbtypes.Value_StringValue) if !ok { - return errFieldType.WithAttributes("path", path) + return []error{errFieldType.WithAttributes("path", path)} } t, err := time.Parse(time.RFC3339Nano, val.StringValue) if err != nil { - return err + return []error{err} } - if err := validate(t, vals, path); err != nil { - return err + if validateErrs := validate(t, vals, path); len(validateErrs) > 0 { + return validateErrs } *selector(dst) = &t return nil @@ -112,14 +112,14 @@ func parseTime(selector func(dst *Measurement) **time.Time, vals ...fieldValidat // parseNumber parses and validates a number. func parseNumber(selector func(dst *Measurement) **float64, vals ...fieldValidator[float64]) fieldParser { - return func(dst *Measurement, src *pbtypes.Value, path string) error { + return func(dst *Measurement, src *pbtypes.Value, path string) []error { val, ok := src.Kind.(*pbtypes.Value_NumberValue) if !ok { - return errFieldType.WithAttributes("path", path) + return []error{errFieldType.WithAttributes("path", path)} } n := val.NumberValue - if err := validate(n, vals, path); err != nil { - return err + if validateErrs := validate(n, vals, path); len(validateErrs) > 0 { + return validateErrs } *selector(dst) = &n return nil @@ -231,10 +231,22 @@ var fieldParsers = map[string]fieldParser{ ), } +// ParsedMeasurement is the result of parsing measurements with Parse. +type ParsedMeasurement struct { + Measurement + // ValidationErrors contains any errors that occurred during field validation. + ValidationErrors []error + // Valid only contains the valid fields, for which there were no validation errors. + Valid *pbtypes.Struct +} + // Parse parses and validates the measurements. -func Parse(measurements []*pbtypes.Struct) ([]Measurement, error) { - res := make([]Measurement, len(measurements)) +func Parse(measurements []*pbtypes.Struct) ([]ParsedMeasurement, error) { + res := make([]ParsedMeasurement, len(measurements)) for i, src := range measurements { + res[i].Valid = &pbtypes.Struct{ + Fields: make(map[string]*pbtypes.Value), + } err := parse(&res[i], src, "") if err != nil { return nil, err @@ -243,21 +255,41 @@ func Parse(measurements []*pbtypes.Struct) ([]Measurement, error) { return res, nil } -func parse(dst *Measurement, src *pbtypes.Struct, prefix string) error { +func parse(dst *ParsedMeasurement, src *pbtypes.Struct, prefix string) error { for k, v := range src.GetFields() { path := fmt.Sprintf("%s%s", prefix, k) parser, ok := fieldParsers[path] if !ok { return errUnknownField.WithAttributes("path", path) } - if err := parser(dst, v, path); err != nil { - return err + if errs := parser(&dst.Measurement, v, path); errs != nil { + for _, err := range errs { + if !errors.IsDataLoss(err) { + return err + } + dst.ValidationErrors = append(dst.ValidationErrors, err) + } + continue } + validFieldValue := v if s, ok := v.Kind.(*pbtypes.Value_StructValue); ok { - if err := parse(dst, s.StructValue, path+"."); err != nil { + nested := &ParsedMeasurement{ + Measurement: dst.Measurement, + Valid: &pbtypes.Struct{ + Fields: make(map[string]*pbtypes.Value), + }, + } + if err := parse(nested, s.StructValue, path+"."); err != nil { return err } + dst.ValidationErrors = append(dst.ValidationErrors, nested.ValidationErrors...) + validFieldValue = &pbtypes.Value{ + Kind: &pbtypes.Value_StructValue{ + StructValue: nested.Valid, + }, + } } + dst.Valid.Fields[k] = validFieldValue } return nil } diff --git a/pkg/messageprocessors/normalizedpayload/uplink_internal_test.go b/pkg/messageprocessors/normalizedpayload/uplink_internal_test.go new file mode 100644 index 0000000000..a918052a54 --- /dev/null +++ b/pkg/messageprocessors/normalizedpayload/uplink_internal_test.go @@ -0,0 +1,22 @@ +// Copyright © 2022 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package normalizedpayload + +var ( + ErrFieldMinimum = errFieldMinimum + ErrFieldExclusiveMinimum = errFieldExclusiveMinimum + ErrFieldMaximum = errFieldMaximum + ErrFieldExclusiveMaximum = errFieldExclusiveMaximum +) diff --git a/pkg/messageprocessors/normalizedpayload/uplink_test.go b/pkg/messageprocessors/normalizedpayload/uplink_test.go index 5f847f46ad..efa47fa8d1 100644 --- a/pkg/messageprocessors/normalizedpayload/uplink_test.go +++ b/pkg/messageprocessors/normalizedpayload/uplink_test.go @@ -37,10 +37,11 @@ func TestUplink(t *testing.T) { t.Parallel() for _, tc := range []struct { - name string - normalizedPayload []*pbtypes.Struct - expected []normalizedpayload.Measurement - errorAssertion func(error) bool + name string + normalizedPayload []*pbtypes.Struct + expected []normalizedpayload.Measurement + expectedValidationErrors [][]error + errorAssertion func(error) bool }{ { name: "single timestamp", @@ -133,7 +134,19 @@ func TestUplink(t *testing.T) { }, }, }, - errorAssertion: errors.IsInvalidArgument, + expected: []normalizedpayload.Measurement{ + { + Air: &normalizedpayload.Air{}, + }, + }, + expectedValidationErrors: [][]error{ + { + normalizedpayload.ErrFieldMinimum.WithAttributes( + "path", "air.temperature", + "minimum", -273.15, + ), + }, + }, }, { name: "invalid direction", @@ -156,7 +169,19 @@ func TestUplink(t *testing.T) { }, }, }, - errorAssertion: errors.IsInvalidArgument, + expected: []normalizedpayload.Measurement{ + { + Wind: &normalizedpayload.Wind{}, + }, + }, + expectedValidationErrors: [][]error{ + { + normalizedpayload.ErrFieldExclusiveMaximum.WithAttributes( + "path", "wind.direction", + "maximum", 360.0, + ), + }, + }, }, { name: "invalid type", @@ -216,7 +241,19 @@ func TestUplink(t *testing.T) { a.So(tc.errorAssertion(err), should.BeTrue) } else { a.So(err, should.BeNil) - a.So(measurements, should.Resemble, tc.expected) + if !a.So(measurements, should.HaveLength, len(tc.expected)) { + t.FailNow() + } + for i, parsed := range measurements { + if len(parsed.ValidationErrors) > 0 { + a.So(len(tc.expectedValidationErrors), should.BeGreaterThanOrEqualTo, i+1) + a.So(parsed.ValidationErrors, should.HaveLength, len(tc.expectedValidationErrors[i])) + for j, err := range parsed.ValidationErrors { + a.So(err, should.EqualErrorOrDefinition, tc.expectedValidationErrors[i][j]) + } + } + a.So(parsed.Measurement, should.Resemble, tc.expected[i]) + } } }) } From 69231cf003cfa3a51a87874d1691d66d1945b7f7 Mon Sep 17 00:00:00 2001 From: Johan Stokking Date: Wed, 31 Aug 2022 17:57:19 +0200 Subject: [PATCH 2/4] console: Show simulated normalized payload output --- .../payload-formatters-form/test-form/index.js | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/webui/console/components/payload-formatters-form/test-form/index.js b/pkg/webui/console/components/payload-formatters-form/test-form/index.js index be95c4677c..e330adc7de 100644 --- a/pkg/webui/console/components/payload-formatters-form/test-form/index.js +++ b/pkg/webui/console/components/payload-formatters-form/test-form/index.js @@ -86,8 +86,9 @@ const TestForm = props => { uplink, testResult, testResult: { - decoded_payload: payload, - decoded_payload_warnings: warnings, + decoded_payload: decodedPayload, + decoded_payload_warnings: decodedPayloadWarnings, + normalized_payload_warnings: normalizedPayloadWarnings, frm_payload: framePayload, }, } = props @@ -104,8 +105,10 @@ const TestForm = props => { const hasTestError = isBackend(testResult) const hasFatalError = !isOutputError(testResult) - const hasTestWarning = warnings instanceof Array && warnings.length !== 0 - const hasPayload = payload !== undefined + const hasTestWarning = + (decodedPayloadWarnings instanceof Array && decodedPayloadWarnings.length !== 0) || + (normalizedPayloadWarnings instanceof Array && normalizedPayloadWarnings.length !== 0) + const hasPayload = decodedPayload !== undefined const showTestError = hasTestError const showTestWarning = !hasTestError && hasTestWarning @@ -200,7 +203,7 @@ const TestForm = props => { {!showTestError && ( Date: Thu, 1 Sep 2022 10:14:46 +0200 Subject: [PATCH 3/4] as: Omit empty objects in normalized payload --- .../javascript/javascript_test.go | 175 +++++++++++++++--- .../normalizedpayload/uplink.go | 27 +-- .../normalizedpayload/uplink_test.go | 12 +- 3 files changed, 170 insertions(+), 44 deletions(-) diff --git a/pkg/messageprocessors/javascript/javascript_test.go b/pkg/messageprocessors/javascript/javascript_test.go index 745658f6d4..d55a5808c5 100644 --- a/pkg/messageprocessors/javascript/javascript_test.go +++ b/pkg/messageprocessors/javascript/javascript_test.go @@ -367,24 +367,49 @@ func TestDecodeUplink(t *testing.T) { err := host.DecodeUplink(ctx, ids, nil, message, script) a.So(err, should.BeNil) - decodedPayload, err := gogoproto.Map(message.DecodedPayload) - a.So(err, should.BeNil) - a.So(decodedPayload, should.Resemble, map[string]interface{}{ - "temperature": -21.3, + a.So(message.DecodedPayload, should.Resemble, &pbtypes.Struct{ + Fields: map[string]*pbtypes.Value{ + "temperature": { + Kind: &pbtypes.Value_NumberValue{ + NumberValue: -21.3, + }, + }, + }, }) a.So(message.DecodedPayloadWarnings, should.Resemble, []string{"it's cold"}) - a.So(message.NormalizedPayload, should.HaveLength, 1) + a.So(message.NormalizedPayload, should.Resemble, []*pbtypes.Struct{ + { + Fields: map[string]*pbtypes.Value{ + "air": { + Kind: &pbtypes.Value_StructValue{ + StructValue: &pbtypes.Struct{ + Fields: map[string]*pbtypes.Value{ + "temperature": { + Kind: &pbtypes.Value_NumberValue{ + NumberValue: -21.3, + }, + }, + }, + }, + }, + }, + }, + }, + }) + a.So(message.NormalizedPayloadWarnings, should.BeEmpty) + measurements, err := normalizedpayload.Parse(message.NormalizedPayload) a.So(err, should.BeNil) a.So(measurements[0].Measurement, should.Resemble, normalizedpayload.Measurement{ - Air: &normalizedpayload.Air{ + Air: normalizedpayload.Air{ Temperature: float64Ptr(-21.3), }, }) } // Decode a single measurement that is already normalized. + // In the normalized payload, empty objects are omitted. { //nolint:lll script := ` @@ -393,20 +418,65 @@ func TestDecodeUplink(t *testing.T) { data: { air: { temperature: (((input.bytes[0] & 0x80 ? input.bytes[0] - 0x100 : input.bytes[0]) << 8) | input.bytes[1]) / 100 - } + }, + wind: {} } } } ` err := host.DecodeUplink(ctx, ids, nil, message, script) a.So(err, should.BeNil) - a.So(message.NormalizedPayload, should.HaveLength, 1) - a.So(message.DecodedPayload, should.Resemble, message.NormalizedPayload[0]) + + a.So(message.DecodedPayload, should.Resemble, &pbtypes.Struct{ + Fields: map[string]*pbtypes.Value{ + "air": { + Kind: &pbtypes.Value_StructValue{ + StructValue: &pbtypes.Struct{ + Fields: map[string]*pbtypes.Value{ + "temperature": { + Kind: &pbtypes.Value_NumberValue{ + NumberValue: -21.3, + }, + }, + }, + }, + }, + }, + "wind": { + Kind: &pbtypes.Value_StructValue{ + StructValue: &pbtypes.Struct{ + Fields: map[string]*pbtypes.Value{}, + }, + }, + }, + }, + }) + a.So(message.DecodedPayloadWarnings, should.BeEmpty) + a.So(message.NormalizedPayload, should.Resemble, []*pbtypes.Struct{ + { + Fields: map[string]*pbtypes.Value{ + "air": { + Kind: &pbtypes.Value_StructValue{ + StructValue: &pbtypes.Struct{ + Fields: map[string]*pbtypes.Value{ + "temperature": { + Kind: &pbtypes.Value_NumberValue{ + NumberValue: -21.3, + }, + }, + }, + }, + }, + }, + }, + }, + }) + a.So(message.NormalizedPayloadWarnings, should.BeEmpty) measurements, err := normalizedpayload.Parse(message.NormalizedPayload) a.So(err, should.BeNil) a.So(measurements[0].Measurement, should.Resemble, normalizedpayload.Measurement{ - Air: &normalizedpayload.Air{ + Air: normalizedpayload.Air{ Temperature: float64Ptr(-21.3), }, }) @@ -443,13 +513,67 @@ func TestDecodeUplink(t *testing.T) { err := host.DecodeUplink(ctx, ids, nil, message, script) a.So(err, should.BeNil) - decodedPayload, err := gogoproto.Map(message.DecodedPayload) - a.So(err, should.BeNil) - a.So(decodedPayload, should.Resemble, map[string]interface{}{ - "temperatures": []interface{}{-21.3, -20.88}, + a.So(message.DecodedPayload, should.Resemble, &pbtypes.Struct{ + Fields: map[string]*pbtypes.Value{ + "temperatures": { + Kind: &pbtypes.Value_ListValue{ + ListValue: &pbtypes.ListValue{ + Values: []*pbtypes.Value{ + { + Kind: &pbtypes.Value_NumberValue{ + NumberValue: -21.3, + }, + }, + { + Kind: &pbtypes.Value_NumberValue{ + NumberValue: -20.88, + }, + }, + }, + }, + }, + }, + }, }) + a.So(message.DecodedPayloadWarnings, should.BeEmpty) + a.So(message.NormalizedPayload, should.Resemble, []*pbtypes.Struct{ + { + Fields: map[string]*pbtypes.Value{ + "air": { + Kind: &pbtypes.Value_StructValue{ + StructValue: &pbtypes.Struct{ + Fields: map[string]*pbtypes.Value{ + "temperature": { + Kind: &pbtypes.Value_NumberValue{ + NumberValue: -21.3, + }, + }, + }, + }, + }, + }, + }, + }, + { + Fields: map[string]*pbtypes.Value{ + "air": { + Kind: &pbtypes.Value_StructValue{ + StructValue: &pbtypes.Struct{ + Fields: map[string]*pbtypes.Value{ + "temperature": { + Kind: &pbtypes.Value_NumberValue{ + NumberValue: -20.88, + }, + }, + }, + }, + }, + }, + }, + }, + }) + a.So(message.NormalizedPayloadWarnings, should.BeEmpty) - a.So(message.NormalizedPayload, should.HaveLength, 2) parsedMeasurements, err := normalizedpayload.Parse(message.NormalizedPayload) a.So(err, should.BeNil) measurements := make([]normalizedpayload.Measurement, len(parsedMeasurements)) @@ -458,12 +582,12 @@ func TestDecodeUplink(t *testing.T) { } a.So(measurements, should.Resemble, []normalizedpayload.Measurement{ { - Air: &normalizedpayload.Air{ + Air: normalizedpayload.Air{ Temperature: float64Ptr(-21.3), }, }, { - Air: &normalizedpayload.Air{ + Air: normalizedpayload.Air{ Temperature: float64Ptr(-20.88), }, }, @@ -545,7 +669,15 @@ func TestDecodeUplink(t *testing.T) { err := host.DecodeUplink(ctx, ids, nil, message, script) a.So(err, should.BeNil) - a.So(message.NormalizedPayload, should.HaveLength, 1) + a.So(message.NormalizedPayload, should.Resemble, []*pbtypes.Struct{ + { + Fields: map[string]*pbtypes.Value{}, + }, + }) + a.So(message.NormalizedPayloadWarnings, should.Resemble, []string{ + "measurement 1: `air.temperature` should be equal or greater than `-273.15`", + }) + parsedMeasurements, err := normalizedpayload.Parse(message.NormalizedPayload) a.So(err, should.BeNil) measurements := make([]normalizedpayload.Measurement, len(parsedMeasurements)) @@ -553,12 +685,7 @@ func TestDecodeUplink(t *testing.T) { measurements[i] = m.Measurement } a.So(measurements, should.Resemble, []normalizedpayload.Measurement{ - { - Air: &normalizedpayload.Air{}, - }, - }) - a.So(message.NormalizedPayloadWarnings, should.Resemble, []string{ - "measurement 1: `air.temperature` should be equal or greater than `-273.15`", + {}, }) } diff --git a/pkg/messageprocessors/normalizedpayload/uplink.go b/pkg/messageprocessors/normalizedpayload/uplink.go index 54403f130e..bdd12d8c59 100644 --- a/pkg/messageprocessors/normalizedpayload/uplink.go +++ b/pkg/messageprocessors/normalizedpayload/uplink.go @@ -40,8 +40,8 @@ type Wind struct { // Measurement is a measurement. type Measurement struct { Time *time.Time - Air *Air - Wind *Wind + Air Air + Wind Wind } var ( @@ -69,13 +69,13 @@ var ( type fieldParser func(dst *Measurement, src *pbtypes.Value, path string) []error // object validates that the path is a structure and sets the target to an empty value. -func object[T any](selector func(*Measurement) **T) fieldParser { +func object[T any](selector func(*Measurement) *T) fieldParser { return func(dst *Measurement, src *pbtypes.Value, path string) []error { _, ok := src.Kind.(*pbtypes.Value_StructValue) if !ok { return []error{errFieldType.WithAttributes("path", path)} } - *selector(dst) = new(T) + *selector(dst) = *new(T) return nil } } @@ -187,7 +187,7 @@ var fieldParsers = map[string]fieldParser{ }, ), "air": object( - func(dst *Measurement) **Air { + func(dst *Measurement) *Air { return &dst.Air }, ), @@ -212,7 +212,7 @@ var fieldParsers = map[string]fieldParser{ maximum(1100.0), ), "wind": object( - func(dst *Measurement) **Wind { + func(dst *Measurement) *Wind { return &dst.Wind }, ), @@ -271,7 +271,6 @@ func parse(dst *ParsedMeasurement, src *pbtypes.Struct, prefix string) error { } continue } - validFieldValue := v if s, ok := v.Kind.(*pbtypes.Value_StructValue); ok { nested := &ParsedMeasurement{ Measurement: dst.Measurement, @@ -282,14 +281,18 @@ func parse(dst *ParsedMeasurement, src *pbtypes.Struct, prefix string) error { if err := parse(nested, s.StructValue, path+"."); err != nil { return err } + dst.Measurement = nested.Measurement dst.ValidationErrors = append(dst.ValidationErrors, nested.ValidationErrors...) - validFieldValue = &pbtypes.Value{ - Kind: &pbtypes.Value_StructValue{ - StructValue: nested.Valid, - }, + if len(nested.Valid.Fields) > 0 { + dst.Valid.Fields[k] = &pbtypes.Value{ + Kind: &pbtypes.Value_StructValue{ + StructValue: nested.Valid, + }, + } } + } else { + dst.Valid.Fields[k] = v } - dst.Valid.Fields[k] = validFieldValue } return nil } diff --git a/pkg/messageprocessors/normalizedpayload/uplink_test.go b/pkg/messageprocessors/normalizedpayload/uplink_test.go index efa47fa8d1..c78fc01b5d 100644 --- a/pkg/messageprocessors/normalizedpayload/uplink_test.go +++ b/pkg/messageprocessors/normalizedpayload/uplink_test.go @@ -102,12 +102,12 @@ func TestUplink(t *testing.T) { }, expected: []normalizedpayload.Measurement{ { - Air: &normalizedpayload.Air{ + Air: normalizedpayload.Air{ Temperature: float64Ptr(20.42), }, }, { - Air: &normalizedpayload.Air{ + Air: normalizedpayload.Air{ Temperature: float64Ptr(19.61), }, }, @@ -135,9 +135,7 @@ func TestUplink(t *testing.T) { }, }, expected: []normalizedpayload.Measurement{ - { - Air: &normalizedpayload.Air{}, - }, + {}, }, expectedValidationErrors: [][]error{ { @@ -170,9 +168,7 @@ func TestUplink(t *testing.T) { }, }, expected: []normalizedpayload.Measurement{ - { - Wind: &normalizedpayload.Wind{}, - }, + {}, }, expectedValidationErrors: [][]error{ { From 4fe002c68282ecd6e95f8fedbccae7fefc27358d Mon Sep 17 00:00:00 2001 From: Johan Stokking Date: Thu, 1 Sep 2022 12:29:29 +0200 Subject: [PATCH 4/4] as: Support returning no normalized payload --- .../javascript/javascript.go | 3 +- .../javascript/javascript_test.go | 43 +++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/pkg/messageprocessors/javascript/javascript.go b/pkg/messageprocessors/javascript/javascript.go index 17e14926e5..4712d78ebb 100644 --- a/pkg/messageprocessors/javascript/javascript.go +++ b/pkg/messageprocessors/javascript/javascript.go @@ -310,13 +310,14 @@ func (*host) decodeUplink( return errOutput.WithCause(err) } msg.DecodedPayload, msg.DecodedPayloadWarnings = decodedPayload, output.Decoded.Warnings + msg.NormalizedPayload, msg.NormalizedPayloadWarnings = nil, nil if normalized := output.Normalized; normalized != nil { if errs := normalized.Errors; len(errs) > 0 { return errOutputErrors.WithAttributes("errors", strings.Join(errs, ", ")) } if normalized.Data == nil { - return errOutput.New() + return nil } // The returned data can be an array of measurements or a single measurement object. var measurements []map[string]interface{} diff --git a/pkg/messageprocessors/javascript/javascript_test.go b/pkg/messageprocessors/javascript/javascript_test.go index d55a5808c5..b170eb4fa7 100644 --- a/pkg/messageprocessors/javascript/javascript_test.go +++ b/pkg/messageprocessors/javascript/javascript_test.go @@ -640,6 +640,49 @@ func TestDecodeUplink(t *testing.T) { a.So(errors.IsAborted(err), should.BeTrue) } + // Return no normalized payload (data is nil). + { + script := ` + function decodeUplink(input) { + return { + data: { + state: input.bytes[0] + } + } + } + + function normalizeUplink(input) { + return { + data: null + } + } + ` + err := host.DecodeUplink(ctx, ids, nil, message, script) + a.So(err, should.BeNil) + a.So(message.NormalizedPayload, should.BeNil) + a.So(message.NormalizedPayloadWarnings, should.BeEmpty) + } + + // Return no normalized payload (no return value). + { + script := ` + function decodeUplink(input) { + return { + data: { + state: input.bytes[0] + } + } + } + + function normalizeUplink(input) { + } + ` + err := host.DecodeUplink(ctx, ids, nil, message, script) + a.So(err, should.BeNil) + a.So(message.NormalizedPayload, should.BeNil) + a.So(message.NormalizedPayloadWarnings, should.BeEmpty) + } + // Decode and normalize a single measurement with out-of-range value. { message := &ttnpb.ApplicationUplink{