-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: SchemaParseException: Undefined name with avro union (kafka + schema registry) #32620
Comments
Where does that parse failure originate from? This looks to be a deserialization error in the deserializer itself, which is "below" the level that Beam operates at, so I'm not sure how much we will be able to help |
Agreed to some extent, with a raw kafka consumer implementation I would simply pass these into the consumer config along with the schema registry config, which would then give me specific records which I would then parse based on the type. avroConsumeConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
avroConsumeConfigs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); But beam introduces a wrapper I guess because it also wants to build the beam schema alongside parsing the message, but this uses the separate (and I guess outdated schema registry implementation which doesn't support this). I see that the flink kafka source connector seems to have a thinner shim for the kafka consumer and so looks to function somewhat the same as the raw kafka consumer client. |
Can you share more of a stack trace for that parse error? |
Sure, attached below is the complete trace for the example above
|
Ok, I think this is "below" what beam can straightforwardly interact with. It looks to me like the avro schema itself isn't quite complete, in that that particular class isn't defined properly. This parse failure happens when trying to parse the schema itself, not when it uses the schema to parse a message |
Yea exactly - instead of parsing the schema references it tries to parse the elements directly, which obviously don't exist. |
I don't think its trying to parse any elements here. This error log comes up during the construction of the transform, where its trying to create a coder from the serializer. It looks like its failing to parse the schema using whatever Avro tooling is used to parse an avro schema |
Yeah sorry maybe I wasn't precise with my wording, I meant union element not PCollection element - I agree that it fails when parsing the union, it tries to parse the string literal "com.example.schema.ExampleCreated" instead of resolving the reference in the schema registry with the actual schema. Since the CachedSchemaRegistryClient seems to be outdated should Beam be using it? |
I would defer to guidance from Confluent on that one. Do they have a recommended replacement? If so, we could look to use that instead |
What happened?
We are attempting to use the beam java sdk to consume from a kafka topic with contains avro messages of varying schemas but however need to be correctly ordered. However as you can imagine the different events have branching logic based on the specific type.
As an example we have a topic "Example" where the corresponding schema registry subject "Example-value" contains a avro reference union as following:
[ com.example.schema.ExampleCreated, com.example.schema.ExampleUpdated ]
What happens however is it tries to parse find subjects with these names instead of parsing this union, as shown with the error message:
I referred to this confluent document: https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/, which stated that Setting use.latest.version to true causes the Avro serializer to look up the latest schema version in the subject (which will be the union) and use that for serialization; otherwise, if set to false, the serializer will look for the event type in the subject and fail to find it.
I am specifying use.latest.version = true, however after some debugging I can't see it being used anywhere, and the behavior is as the article suggests when it is disabled.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
The text was updated successfully, but these errors were encountered: