Skip to content

Commit

Permalink
Fix regression in JSONSchema (de)serialization
Browse files Browse the repository at this point in the history
Fix checking headers in the script
Remove unused error code and recorder error codes
  • Loading branch information
mostafa committed Aug 4, 2022
1 parent 423aacb commit 91cf77c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 14 deletions.
5 changes: 2 additions & 3 deletions error_codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ const (
failedUnmarshalJSON errCode = 2007
failedValidateJSON errCode = 2008
failedEncodeToJSON errCode = 2009
failedEncodeJSONToBinary errCode = 2010
failedDecodeJSONFromBinary errCode = 2011
failedToUnmarshalSchema errCode = 2012
failedDecodeJSONFromBinary errCode = 2010
failedToUnmarshalSchema errCode = 2011

// producer.
failedWriteMessage errCode = 3000
Expand Down
27 changes: 17 additions & 10 deletions jsonschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,18 @@ func SerializeJSON(
if schemaInfo != nil {
schemaID = schemaInfo.ID()

// Encode the data into JSON and then the wire format
jsonEncodedData, _, err := schemaInfo.Codec().NativeFromTextual(bytesData)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeToJSON,
"Failed to encode data into JSON",
var jsonBytes interface{}
if err := json.Unmarshal(bytesData, &jsonBytes); err != nil {
return nil, NewXk6KafkaError(failedUnmarshalJSON,
"Failed to unmarshal JSON data",
err)
}

bytesData, err = schemaInfo.Codec().BinaryFromNative(nil, jsonEncodedData)
// Encode the data into JSON and then the wire format
err := schemaInfo.JsonSchema().Validate(jsonBytes)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeJSONToBinary,
"Failed to encode JSON data into binary",
return nil, NewXk6KafkaError(failedEncodeToJSON,
"Failed to encode data into JSON",
err)
}
}
Expand Down Expand Up @@ -148,14 +148,21 @@ func DeserializeJSON(
}

if schemaInfo != nil {
var jsonBytes interface{}
if err := json.Unmarshal(bytesDecodedData, &jsonBytes); err != nil {
return nil, NewXk6KafkaError(failedUnmarshalJSON,
"Failed to unmarshal JSON data",
err)
}

// Decode the data from Json
jsonDecodedData, _, err := schemaInfo.Codec().NativeFromBinary(bytesDecodedData)
err := schemaInfo.JsonSchema().Validate(jsonBytes)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeJSONFromBinary,
"Failed to decode data from JSON",
err)
}
return jsonDecodedData, nil
return jsonBytes, nil
}

return bytesDecodedData, nil
Expand Down
2 changes: 1 addition & 1 deletion scripts/test_jsonschema_with_schema_registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export default function () {
"Key is correct": (msg) => msg.key.key == "key0",
"Value is correct": (msg) =>
msg.value.firstName == "firstName-0" && msg.value.lastName == "lastName-0",
"Headers are correct": (msg) => msg.headers.length == 0,
"Headers are correct": (msg) => msg.headers.hasOwnProperty("mykey") == false,
"Time is past": (msg) => new Date(msg["time"]) < new Date(),
"Offset is correct": (msg) => msg.offset == 0,
"Partition is correct": (msg) => msg.partition == 0,
Expand Down

0 comments on commit 91cf77c

Please sign in to comment.