Skip to content

Latest commit

 

History

History
170 lines (132 loc) · 8.74 KB

File metadata and controls

170 lines (132 loc) · 8.74 KB

Schema evolution with Confluent Schema Registry

A collection of Spring Boot applications to demonstrate schema evolution using Spring Cloud Stream and Confluent Schema Registry.

All components (producers and consumers) in this sample are configured to use native encoding[1] which ensures their use of the Confluent provided Avro serializers, rather than the Spring Cloud Stream provided Avro serializers. The Confluent serializers leverage the Confluent schema registry to validate schemas during record production. An additional benefit is that these components are now cross compatible with external tools that use the Confluent Avro serializers such as the out-of-the box tools - kafka-avro-console-consumer and kafka-avro-console-producer - that come with Confluent Schema registry.

Requirement

As a developer, I’d like to design my consumer to be resilient to differing payload schemas, and I want to use Confluent Schema Registry.

Assumptions

There are a lot of online literature on schema evolution, so we are going to skip defining them here. For this sample, however, we will simply assume there are two producers producing events with different payload schemas and a consumer that is able to consume both of the payloads.

Building the applications

To build the applications simply execute the following command from the confluent-schema-registry-integration directory:

./mvnw clean install
Note
The apps can be built and run from w/in an IDE (such as IntelliJ) but you will need to invoke the Maven package goal and then refresh the project as the Avro Maven plugin needs to execute so that it generates the required model classes - otherwise you will see compile failures around missing Sensor class.

Running the applications

Pre-requisites

  • The components have all been built by following the Building the applications steps.

  • Apache Kafka broker available at localhost:9092

  • Confluent Schema Registry available at localhost:8081

Tip
The included Kafka tools can be used to easily start a broker and schema registry locally on the required coordinates

Steps

Make sure the above pre-requisites are satisfied and that you are in the confluent-schema-registry-integration directory and follow the steps below.

Adjust schema compatibility

Execute the following command to set the schema compatibility mode to NONE on the Confluent schema registry:

curl -X PUT http://localhost:8081/config -d '{"compatibility": "NONE"}' -H "Content-Type:application/json"
Expected output
{"compatibility":"NONE"}
Start the applications
  • Start consumer on another terminal session

cd confluent-schema-registry-integration-consumer
java -jar target/confluent-schema-registry-integration-consumer-4.0.0-SNAPSHOT.jar
  • Start producer1 on another terminal session

cd confluent-schema-registry-integration-producer1
java -jar target/confluent-schema-registry-integration-producer1-4.0.0-SNAPSHOT.jar
  • Start producer2 on another terminal session

cd confluent-schema-registry-integration-producer2
java -jar target/confluent-schema-registry-integration-producer2-4.0.0-SNAPSHOT.jar
Send sample data

The producer apps each expose a REST endpoint which sends a sample Kafka event when invoked. Execute the following commands to send some sample data.

curl -X POST http://localhost:9009/randomMessage
curl -X POST http://localhost:9010/randomMessage
curl -X POST http://localhost:9009/randomMessage
curl -X POST http://localhost:9009/randomMessage
curl -X POST http://localhost:9010/randomMessage
View consumer output

The consumer should log the results which should look something like:

{"id": "10fd4598-0a12-4d9b-9512-3e2257d9b713-v1", "internalTemperature": 28.674625, "externalTemperature": 0.0, "acceleration": 5.3196855, "velocity": 41.38155}
{"id": "cc3555b0-0a52-44ef-9a06-89ffda361e0b-v2", "internalTemperature": 8.026814, "externalTemperature": 0.0, "acceleration": 7.5858965, "velocity": 79.71516}
{"id": "5a23d687-093f-4b13-a42f-19842041a96f-v1", "internalTemperature": 31.853527, "externalTemperature": 0.0, "acceleration": 6.006965, "velocity": 19.520967}
{"id": "3fe53513-def6-4376-81b9-a5b332d54fc2-v1", "internalTemperature": 33.353485, "externalTemperature": 0.0, "acceleration": 5.429616, "velocity": 82.439064}
{"id": "5813a495-748c-4485-ac85-cd6ad28a839c-v2", "internalTemperature": 6.988123, "externalTemperature": 0.0, "acceleration": 1.4945298, "velocity": 51.230377}
Note
The id value is appended with -v1 or -v2 indicating if it was sent from producer1 or producer2, respectively.

Alternatively, the Confluent command line tools can also be used to view the output with the following command:

<confluent-tools-install-dir>/bin/kafka-avro-console-consumer \
    --topic sensor-topic \
    --bootstrap-server localhost:9092 \
    --from-beginning

What just happened?

The schema evolved on the temperature field. That field is now split into two separate fields, internalTemperature and externalTemperature. The producer1 produces payload only with temperature and on the other hand, producer2 produces payload with internalTemperature and externalTemperature.

The consumer is coded against a base schema that include the split fields.

The consumer app can happily deserialize the payload with internalTemperature and externalTemperature fields. However, when a producer1 payload arrives (which includes only the single temperature field), the schema evolution and compatibility check are automatically applied.

Because each payload also includes the payload version in the header, Spring Cloud Stream with the help of Schema Registry server and Avro, the schema evolution occurs behind the scenes. The automatic mapping of temperature to internalTemperature field is applied.

Using Confluent Control Center

Confluent provides the Confluent Control Center UI - a web-based tool for managing and monitoring Apache Kafka®.

Tip
The included Kafka tools can be used to easily start a Control Center UI locally

Once the UI is running locally and you have followed the Running the applications section:

  • Open the control center at http://localhost:9021 and click on the provided cluster

  • From the vertical menu select Topics tab

  • From the list of topics select the sensor-topic (created by the samples)

  • Click on the Schema tab to view the Sensors schema.

Confluent Schema Registry REST API

The Confluent Schema Registry also makes available a REST API for schema related operations.

For example, the http://localhost:8081/subjects endpoint will list the schema names (e.g. subjects) currently defined in the registry. After you have run the samples you should be able to see a schema subject name sensor-topic-value.

Schema naming

By default, Confluent uses the TopicNameStrategy to create the name of the message payload schema. The name of the schema is your topic name (e.g. spring.cloud.stream.bindings.<channel>:destination) appended with -value.

This means that by default you can use a single schema per topic. However, the subject naming strategy can be changed to RecordNameStrategy or TopicRecordNameStrategy by setting the following properties:

consumer
spring:
  cloud:
    stream:
      kafka:
        binder:
          consumerProperties:
            value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
producer
spring:
  cloud:
    stream:
      kafka:
        binder:
          producerProperties:
            value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
Note
Currently the Control Center seems to only recognize the subjects created with the TopicNameStrategy . If you configure the RecordNameStrategy the schema will not appear in the UI.

1. By default, message encoding is performed transparently by the framework based on contentType. However, when native encoding is used, the default encoding is disabled and instead handled by the client library. It is the responsibility of the consumer to use an appropriate decoder to deserialize the inbound message and the responsibility of the producer to use an appropriate encoder to serialize the outbound message.