Skip to content

Commit

Permalink
add consumer proto schema validation for pulsar
Browse files Browse the repository at this point in the history
Signed-off-by: yaron2 <[email protected]>
  • Loading branch information
yaron2 committed Jul 25, 2023
1 parent 9957d69 commit c58dccf
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 6 deletions.
22 changes: 16 additions & 6 deletions pubsub/pulsar/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
redeliveryDelay = "redeliveryDelay"
avroProtocol = "avro"
jsonProtocol = "json"
protoProtocol = "proto"
partitionKey = "partitionKey"

defaultTenant = "public"
Expand All @@ -61,11 +62,12 @@ const (
pulsarToken = "token"
// topicFormat is the format for pulsar, which have a well-defined structure: {persistent|non-persistent}://tenant/namespace/topic,
// see https://pulsar.apache.org/docs/en/concepts-messaging/#topics for details.
topicFormat = "%s://%s/%s/%s"
persistentStr = "persistent"
nonPersistentStr = "non-persistent"
topicJSONSchemaIdentifier = ".jsonschema"
topicAvroSchemaIdentifier = ".avroschema"
topicFormat = "%s://%s/%s/%s"
persistentStr = "persistent"
nonPersistentStr = "non-persistent"
topicJSONSchemaIdentifier = ".jsonschema"
topicAvroSchemaIdentifier = ".avroschema"
topicProtoSchemaIdentifier = ".protoschema"

// defaultBatchingMaxPublishDelay init default for maximum delay to batch messages.
defaultBatchingMaxPublishDelay = 10 * time.Millisecond
Expand Down Expand Up @@ -137,11 +139,17 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
value: v,
}
} else if strings.HasSuffix(k, topicAvroSchemaIdentifier) {
topic := k[:len(k)-len(topicJSONSchemaIdentifier)]
topic := k[:len(k)-len(topicAvroSchemaIdentifier)]
m.internalTopicSchemas[topic] = schemaMetadata{
protocol: avroProtocol,
value: v,
}
} else if strings.HasSuffix(k, topicProtoSchemaIdentifier) {
topic := k[:len(k)-len(topicProtoSchemaIdentifier)]
m.internalTopicSchemas[topic] = schemaMetadata{
protocol: protoProtocol,
value: v,
}
}
}

Expand Down Expand Up @@ -267,6 +275,8 @@ func getPulsarSchema(metadata schemaMetadata) pulsar.Schema {
return pulsar.NewJSONSchema(metadata.value, nil)
case avroProtocol:
return pulsar.NewAvroSchema(metadata.value, nil)
case protoProtocol:
return pulsar.NewProtoSchema(metadata.value, nil)
default:
return nil
}
Expand Down
37 changes: 37 additions & 0 deletions pubsub/pulsar/pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ func TestParsePulsarSchemaMetadata(t *testing.T) {
assert.Equal(t, "2", meta.internalTopicSchemas["kenobi.avroschema"].value)
})

t.Run("test proto", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"host": "a",
"obiwan.avroschema": "1",
"kenobi.protoschema.protoschema": "2",
}
meta, err := parsePulsarMetadata(m)

assert.Nil(t, err)
assert.Equal(t, "a", meta.Host)
assert.Len(t, meta.internalTopicSchemas, 2)
assert.Equal(t, "1", meta.internalTopicSchemas["obiwan"].value)
assert.Equal(t, "2", meta.internalTopicSchemas["kenobi.protoschema"].value)
})

t.Run("test combined avro/json", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
Expand All @@ -98,6 +114,27 @@ func TestParsePulsarSchemaMetadata(t *testing.T) {
assert.Equal(t, jsonProtocol, meta.internalTopicSchemas["kenobi"].protocol)
})

t.Run("test combined avro/json/proto", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"host": "a",
"obiwan.avroschema": "1",
"kenobi.jsonschema": "2",
"darth.protoschema": "3",
}
meta, err := parsePulsarMetadata(m)

assert.Nil(t, err)
assert.Equal(t, "a", meta.Host)
assert.Len(t, meta.internalTopicSchemas, 3)
assert.Equal(t, "1", meta.internalTopicSchemas["obiwan"].value)
assert.Equal(t, "2", meta.internalTopicSchemas["kenobi"].value)
assert.Equal(t, "3", meta.internalTopicSchemas["darth"].value)
assert.Equal(t, avroProtocol, meta.internalTopicSchemas["obiwan"].protocol)
assert.Equal(t, jsonProtocol, meta.internalTopicSchemas["kenobi"].protocol)
assert.Equal(t, protoProtocol, meta.internalTopicSchemas["darth"].protocol)
})

t.Run("test funky edge case", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
Expand Down

0 comments on commit c58dccf

Please sign in to comment.