Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to support schema registry by AvroConverter #840

Open
cdmikechen opened this issue Apr 7, 2022 · 2 comments
Open

How to support schema registry by AvroConverter #840

cdmikechen opened this issue Apr 7, 2022 · 2 comments

Comments

@cdmikechen
Copy link

Issue Guidelines

Please review these questions before submitting any issue?

What version of the Stream Reactor are you reporting this issue for?

3.0.1

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

Yes

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

Have you read the docs?

https://docs.lenses.io/5.0/integrations/connectors/stream-reactor/sources/mqttsourceconnector/

What is the expected behaviour?

I use JSON by default for kafka data storage.
I've found that this project can support AvroConverter and I also found that we can use connect.converter.avro.schemas to specify avro file. But when pushing data to kafka as a source connector, we should support the schema registry mode and register the avro schema to the schema registry service.

What was observed?

Except for some descriptions in kudu, I didn't find how to configure it in other places.

What is your Connect cluster configuration (connect-avro-distributed.properties)?

What is your connector properties configuration (my-connector.properties)?

{
    "name": "mqtt-source-device",
    "config": {
        "connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
        "tasks.max": "1",
        "connect.mqtt.kcql": "INSERT INTO mqtt-device-test SELECT * FROM `$device/update/request/+` WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`",
        "connect.mqtt.client.id": "kafka-connect-mqtt-test",
        "connect.mqtt.hosts": "tcp://xxx.xxx.svc.cluster.local:1883",
        "connect.mqtt.service.quality": "1"
    }
}

Please provide full log files (redact and sensitive information)

No

@ethanttbui
Copy link

maybe you can try using the confluent avro converter like in this example?

@jclarysse
Copy link

We have a similar question related to Avro events consumed by StreamReactor MQTT source (latest version):

  • Using Confluent's AvroConverter, a new bytes-only schema is created in Schema Registry (no field inference).
  • Using StreamReactor's AvroConverter, it is not possible to pass a Schema Registry URL through avro.schemas, indeed $path must be a valid java.io.File e.g. /path/to/schema.avsc otherwise a ConfigException is thrown by this code.

Would it be reasonable and valuable to allow Schema Registry URLs as well? Since the PR will affect all connectors (kafka-connect-common), is there any performance or security concern?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants