This is an example of a Spring Cloud Stream processor using Kafka Streams support.
The example is based on the word count application from the reference documentation. The application has two inputs one as a KStream and another one with the binding for a message channel.
Go to the root of the repository and do:
docker-compose up -d
./mvnw clean package
java -jar target/kafka-streams-message-channel-0.0.1-SNAPSHOT.jar --spring.cloud.stream.kafka.streams.timeWindow.length=60000
Issue the following commands:
docker exec -it kafka-mc /opt/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic words
On another terminal:
docker exec -it kafka-mc /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic counts
Enter some text in the console producer and watch the output in the console consumer.
Also watch the console for logging statements from the regular sink StreamListener method.
The default time window is configured for 5 seconds and you can change that using the following property.
spring.cloud.stream.kafka.streams.timeWindow.length
(value is expressed in milliseconds)
In order to switch to a hopping window, you can use the spring.cloud.stream.kafka.streams.timeWindow.advanceBy
(value in milliseconds).
This will create an overlapped hopping windows depending on the value you provide.
Here is an example with 2 overlapping windows (window length of 10 seconds and a hop (advance) by 5 seconds:
java -jar target/kafka-streams-message-channel-0.0.1-SNAPSHOT.jar --spring.cloud.stream.kafka.streams.timeWindow.length=10000 --spring.cloud.stream.kafka.streams.timeWindow.advanceBy=5000