forked from mostafa/xk6-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
jsonschema.go
149 lines (124 loc) · 5.08 KB
/
jsonschema.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package kafka
import (
"encoding/json"
"github.com/riferrei/srclient"
"github.com/santhosh-tekuri/jsonschema/v5"
"github.com/sirupsen/logrus"
)
const (
JsonSchemaSerializer string = "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer"
JsonSchemaDeserializer string = "io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer"
)
// SerializeJson serializes the data to JSON and adds the wire format to the data and
// returns the serialized data. It uses the given version to retrieve the schema from
// Schema Registry, otherwise it uses the given schema to manually create the codec and
// encode the data. The configuration is used to configure the Schema Registry client.
// The element is used to define the subject. The data should be a string.
func SerializeJson(configuration Configuration, topic string, data interface{}, element Element, schema string, version int) ([]byte, *Xk6KafkaError) {
bytesData := []byte(data.(string))
client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry)
subject := topic + "-" + string(element)
var schemaInfo *srclient.Schema
schemaID := 0
var xk6KafkaError *Xk6KafkaError
if schema != "" {
// Schema is provided, so we need to create it and get the schema ID
schemaInfo, xk6KafkaError = CreateSchema(client, subject, schema, srclient.Json)
} else {
// Schema is not provided, so we need to fetch the schema from the Schema Registry
schemaInfo, xk6KafkaError = GetSchema(client, subject, schema, srclient.Json, version)
}
if xk6KafkaError != nil {
logrus.New().WithField("error", xk6KafkaError).Warn("Failed to create or get schema, manually encoding the data")
codec, err := jsonschema.CompileString(subject, schema)
if err != nil {
return nil, NewXk6KafkaError(failedCreateJsonSchemaCodec,
"Failed to create codec for encoding JSON",
err)
}
var jsonBytes interface{}
if err := json.Unmarshal(bytesData, &jsonBytes); err != nil {
return nil, NewXk6KafkaError(failedUnmarshalJson,
"Failed to unmarshal JSON data",
err)
}
if err := codec.Validate(jsonBytes); err != nil {
return nil, NewXk6KafkaError(failedValidateJson,
"Failed to validate JSON data",
err)
}
}
if schemaInfo != nil {
schemaID = schemaInfo.ID()
// Encode the data into JSON and then the wire format
jsonEncodedData, _, err := schemaInfo.Codec().NativeFromTextual(bytesData)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeToJson,
"Failed to encode data into JSON",
err)
}
bytesData, err = schemaInfo.Codec().BinaryFromNative(nil, jsonEncodedData)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeJsonToBinary,
"Failed to encode JSON data into binary",
err)
}
}
return EncodeWireFormat(bytesData, schemaID), nil
}
// DeserializeJson deserializes the data from JSON and returns the decoded data. It
// uses the given version to retrieve the schema from Schema Registry, otherwise it
// uses the given schema to manually create the codec and decode the data. The
// configuration is used to configure the Schema Registry client. The element is
// used to define the subject. The data should be a byte array.
func DeserializeJson(configuration Configuration, topic string, data []byte, element Element, schema string, version int) (interface{}, *Xk6KafkaError) {
bytesDecodedData, err := DecodeWireFormat(data)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeFromWireFormat,
"Failed to remove wire format from the binary data",
err)
}
client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry)
subject := topic + "-" + string(element)
var schemaInfo *srclient.Schema
var xk6KafkaError *Xk6KafkaError
if schema != "" {
// Schema is provided, so we need to create it and get the schema ID
schemaInfo, xk6KafkaError = CreateSchema(client, subject, schema, srclient.Json)
} else {
// Schema is not provided, so we need to fetch the schema from the Schema Registry
schemaInfo, xk6KafkaError = GetSchema(client, subject, schema, srclient.Json, version)
}
if xk6KafkaError != nil {
logrus.New().WithField("error", xk6KafkaError).Warn("Failed to create or get schema, manually decoding the data")
codec, err := jsonschema.CompileString(string(element), schema)
if err != nil {
return nil, NewXk6KafkaError(failedCreateJsonSchemaCodec,
"Failed to create codec for decoding JSON data",
err)
}
var jsonBytes interface{}
if err := json.Unmarshal(bytesDecodedData, &jsonBytes); err != nil {
return nil, NewXk6KafkaError(failedUnmarshalJson,
"Failed to unmarshal JSON data",
err)
}
if err := codec.Validate(jsonBytes); err != nil {
return jsonBytes, NewXk6KafkaError(failedValidateJson,
"Failed to validate JSON data, yet returning the data",
err)
}
return jsonBytes, nil
}
if schemaInfo != nil {
// Decode the data from Json
jsonDecodedData, _, err := schemaInfo.Codec().NativeFromBinary(bytesDecodedData)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeJsonFromBinary,
"Failed to decode data from JSON",
err)
}
return jsonDecodedData, nil
}
return bytesDecodedData, nil
}