Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: SchemaParseException: Undefined name with avro union (kafka + schema registry) #32620

Open
2 of 17 tasks
kirksw opened this issue Oct 1, 2024 · 9 comments
Open
2 of 17 tasks

Comments

@kirksw
Copy link

kirksw commented Oct 1, 2024

What happened?

We are attempting to use the beam java sdk to consume from a kafka topic with contains avro messages of varying schemas but however need to be correctly ordered. However as you can imagine the different events have branching logic based on the specific type.

As an example we have a topic "Example" where the corresponding schema registry subject "Example-value" contains a avro reference union as following: [ com.example.schema.ExampleCreated, com.example.schema.ExampleUpdated ]

What happens however is it tries to parse find subjects with these names instead of parsing this union, as shown with the error message:

Undefined name: "com.example.schema.ExampleCreated"
org.apache.avro.SchemaParseException: Undefined name: "com.example.schema.ExampleCreated"

I referred to this confluent document: https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/, which stated that Setting use.latest.version to true causes the Avro serializer to look up the latest schema version in the subject (which will be the union) and use that for serialization; otherwise, if set to false, the serializer will look for the event type in the subject and fail to find it.

I am specifying use.latest.version = true, however after some debugging I can't see it being used anywhere, and the behavior is as the article suggests when it is disabled.

// Kafka schema registry client configuration
        ImmutableMap<String, Object> schemaRegistryConfig =
                ImmutableMap.<String, Object>builder()
                        .put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,"USER_INFO")
                        .put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, dotenv.get("KAFKA_SCHEMA_REGISTRY_USERNAME")+":"+dotenv.get("KAFKA_SCHEMA_REGISTRY_PASSWORD"))
                        .put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true)
                        .put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false)
                        .build();

        ImmutableMap<String, Object> kafkaConsumerConfig =
                ImmutableMap.<String, Object>builder()
                        .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                        .put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
                        .put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512")
                        .put(SaslConfigs.SASL_JAAS_CONFIG,
                                String.format(
                                        "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";",
                                        dotenv.get("KAFKA_SASL_USERNAME"), dotenv.get("KAFKA_SASL_PASSWORD")))
                        .build();

        // read from kafka topic using schema registry
        PCollection<KafkaRecord<String, GenericRecord>> input = p
                .apply(KafkaIO.<String, GenericRecord>read()
                                .withBootstrapServers(dotenv.get("KAFKA_BOOTSTRAP_SERVERS"))
                                .withTopic(dotenv.get("KAFKA_TOPIC"))
                                .withConsumerConfigUpdates(kafkaConsumerConfig)
                                .withKeyDeserializer(StringDeserializer.class)                   .withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(dotenv.get("KAFKA_SCHEMA_REGISTRY_URL"), subject, null, schemaRegistryConfig))
                );

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@johnjcasey
Copy link
Contributor

Where does that parse failure originate from? This looks to be a deserialization error in the deserializer itself, which is "below" the level that Beam operates at, so I'm not sure how much we will be able to help

@kirksw
Copy link
Author

kirksw commented Oct 29, 2024

Agreed to some extent, with a raw kafka consumer implementation I would simply pass these into the consumer config along with the schema registry config, which would then give me specific records which I would then parse based on the type.

avroConsumeConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
avroConsumeConfigs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

But beam introduces a wrapper I guess because it also wants to build the beam schema alongside parsing the message, but this uses the separate (and I guess outdated schema registry implementation which doesn't support this).

I see that the flink kafka source connector seems to have a thinner shim for the kafka consumer and so looks to function somewhat the same as the raw kafka consumer client.

@johnjcasey
Copy link
Contributor

Can you share more of a stack trace for that parse error?

@kirksw
Copy link
Author

kirksw commented Oct 29, 2024

Sure, attached below is the complete trace for the example above

Undefined name: "com.example.schema.ExampleCreated"
org.apache.avro.SchemaParseException: Undefined name: "com.example.schema.ExampleCreated"
	at org.apache.avro.Schema.parse(Schema.java:1697)
	at org.apache.avro.Schema.parse(Schema.java:1823)
	at org.apache.avro.Schema$Parser.parse(Schema.java:1471)
	at org.apache.avro.Schema$Parser.parse(Schema.java:1458)
	at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:138)
	at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:134)
	at org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1878)
	at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1527)
	at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:654)
	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:559)
	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:507)
	at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
	at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:195)
	at com.example.processor.ExampleProcessor.readFromKafka(ExampleProcessor.java:76)
	at TestSchemaRegistryTransform.testKafkaSchemaTransform(TestSchemaRegistryTransform.java:48)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
	at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at jdk.proxy1/jdk.proxy1.$Proxy2.processTestClass(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:119)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:66)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)

@johnjcasey
Copy link
Contributor

Ok, I think this is "below" what beam can straightforwardly interact with. It looks to me like the avro schema itself isn't quite complete, in that that particular class isn't defined properly. This parse failure happens when trying to parse the schema itself, not when it uses the schema to parse a message

@kirksw
Copy link
Author

kirksw commented Oct 29, 2024

Yea exactly - instead of parsing the schema references it tries to parse the elements directly, which obviously don't exist.

@johnjcasey
Copy link
Contributor

I don't think its trying to parse any elements here. This error log comes up during the construction of the transform, where its trying to create a coder from the serializer. It looks like its failing to parse the schema using whatever Avro tooling is used to parse an avro schema

@kirksw
Copy link
Author

kirksw commented Oct 29, 2024

Yeah sorry maybe I wasn't precise with my wording, I meant union element not PCollection element - I agree that it fails when parsing the union, it tries to parse the string literal "com.example.schema.ExampleCreated" instead of resolving the reference in the schema registry with the actual schema.

Since the CachedSchemaRegistryClient seems to be outdated should Beam be using it?

@johnjcasey
Copy link
Contributor

I would defer to guidance from Confluent on that one. Do they have a recommended replacement? If so, we could look to use that instead

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants