The microprofile-reactive-messaging-kafka
quickstart demonstrates the use of the MicroProfile Reactive Messaging specification backed by Apache Kafka in {productName}.
MicroProfile Reactive Messaging is a framework for building event-driven, data streaming, and event-sourcing applications using CDI. It lets your application interact using messaging technologies such as Apache Kafka.
The implementation of MicroProfile Reactive Messaging is built on top of MicroProfile Reactive Streams Operators. Reactive Streams Operators extends the Reactive Streams specification, by adding operators such as map()
, filter()
and others.
In this quickstart, we have a CDI bean that demonstrates the functionality of the MicroProfile Reactive Messaging specification. Currently, we support Reactive Messaging 1.0. Connections to external messaging systems such as Apache Kafka are configured via MicroProfile Config. We will also use Reactive Streams Operators to modify the data relayed in these streams in the methods handling the Reactive Messaging streams.
We recommend that you follow the instructions that create the application step by step. However, you can also go right to the completed example which is available in this directory.
Caution
|
Kafka must be running before attempting to deploy the Quickstart application. See Running the Apache Kafka Service for how to do this in your local environment. |
mvn archetype:generate \
-DgroupId=org.wildfly.quickstarts \
-DartifactId=microprofile-reactive-messaging-kafka \
-DinteractiveMode=false \
-DarchetypeGroupId=org.apache.maven.archetypes \
-DarchetypeArtifactId=maven-archetype-webapp
cd microprofile-reactive-messaging-kafka
Open the project in your favourite IDE.
The project needs to be updated to use Java 8 as the minimum:
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
Next set up our dependencies. Add the following section to your
pom.xml
:
<dependencyManagement>
<dependencies>
<!-- importing the ee-with-tools BOM adds specs and other useful artifacts as managed dependencies -->
<dependency>
<groupId>org.wildfly.bom</groupId>
<artifactId>wildfly-ee-with-tools</artifactId>
<version>{versionServerBom}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- importing the microprofile BOM adds MicroProfile specs -->
<dependency>
<groupId>org.wildfly.bom</groupId>
<artifactId>wildfly-microprofile</artifactId>
<version>{versionMicroprofileBom}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
By using boms the majority of dependencies used within this quickstart align with the version used by the application server.
Now we need to add the dependencies which are needed by what is the focus of this QuickStart (CDI, MicroProfile Reactive Messaging, Reactive Streams Operators, Reactive Streams and the Apache Kafka Client). Additionally we need to add dependencies for 'supporting' functionality (JPA, JTA and JAX-RS):
<dependencies>
<!-- Core dependencies -->
<!-- Import the CDI API, we use provided scope as the API is included in WildFly -->
<dependency>
<groupId>jakarta.enterprise</groupId>
<artifactId>jakarta.enterprise.cdi-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- Import the Kafka Client API, we use provided scope as the API is included in WildFly -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>provided</scope>
</dependency>
<!-- Import the SmallRye Kafka API, we use provided scope as the API is included in WildFly -->
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-kafka-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- Import the Reactive Messaging API, we use provided scope as the API is included in WildFly -->
<dependency>
<groupId>org.eclipse.microprofile.reactive.messaging</groupId>
<artifactId>microprofile-reactive-messaging-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- Import the Reactive Streams Operators API, we use provided scope as the API is included in WildFly -->
<dependency>
<groupId>org.eclipse.microprofile.reactive-streams-operators</groupId>
<artifactId>microprofile-reactive-streams-operators-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- Import the Reactive Streams Operators API, we use provided scope as the API is included in WildFly -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<scope>provided</scope>
</dependency>
<!-- 'Supporting' dependencies -->
<!-- Import the Persistence API, we use provided scope as the API is included in WildFly -->
<dependency>
<groupId>jakarta.persistence</groupId>
<artifactId>jakarta.persistence-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- Import the Annotations API, we use provided scope as the API is included in WildFly -->
<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- Import the Persistence API, we use provided scope as the API is included in WildFly -->
<dependency>
<groupId>jakarta.transaction</groupId>
<artifactId>jakarta.transaction-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- Import the JAX-RS API, we use provided scope as the API is included in WildFly -->
<dependency>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- Import the resteasy-jaxrs API, we use provided scope as the API is included in WildFly -->
<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-jaxrs</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
All dependencies have the 'provided' scope.
As we are going to be deploying this application to the {productName} server, let’s also add a maven plugin that will simplify the deployment operations (you can replace the generated build section):
<build>
<!-- Set the name of the archive -->
<finalName>${project.artifactId}</finalName>
<plugins>
<!-- Allows to use mvn wildfly:deploy -->
<plugin>
<groupId>org.wildfly.plugins</groupId>
<artifactId>wildfly-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
Now we are ready to start working with MicroProfile Reactive Messaging.
This will walk you through the steps to write our application. They are:
-
Create a generator for the generated messages. We will create something which mocks a call to an asynchnronous resource.
-
Add a data object which wraps the generated messages and adds a timestamp. We will use JPA annotations on this to make it persistable.
-
Create our CDI bean interfacing with the Kafka streams via annotated methods. For the streams that are managed by Kafka we will provide a MicroProfile Config file to configure how to interact with Kafka. It will log, filter and store entries to a RDBMS.
-
Create a CDI bean that will be used to store and retrieve entries from a RDBMS.
-
Create a JAX-RS endpoint to return the data that was stored in the RDBMS to the user.
Copy across the microprofile-reactive-messaging-kafka/src/main/java/org/wildfly/quickstarts/microprofile/reactive/messaging/MockExternalAsyncResource.java
file to your project. This class mocks a call to an asynchronous external resource. The details of how it is implemented are not important for this QuickStart.
MockExternalAsyncResource
has one callable method:
public CompletionStage<String> getNextValue()
The CompletionStage
returned by this method will complete with a String when ready. A String is ready every two seconds. It will emit the following Strings in the given order:
-
Hello
-
World
-
Reactive
-
Messaging
-
with
-
Kafka
After this initial sequence the returned CompletionStage
will complete with a random entry from the above list. A new entry is available every two seconds.
Later we will wrap the strings in a TimedEntry
object which contains the String and a timestamp. Since we will be storing it in a database, we add JPA annotations to it:
package org.wildfly.quickstarts.microprofile.reactive.messaging;
import java.sql.Timestamp;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.Id;
@Entity
public class TimedEntry {
private Long id;
private Timestamp time;
private String message;
public TimedEntry() {
}
public TimedEntry(Timestamp time, String message) {
this.time = time;
this.message = message;
}
@Id
@GeneratedValue
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Timestamp getTime() {
return time;
}
public void setTime(Timestamp time) {
this.time = time;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
String s = "TimedEntry{";
if (id != null) {
s += "id=" + id + ", ";
}
s += "time=" + time +
", message='" + message + '\'' +
'}';
return s;
}
}
MicroProfile Reactive Messaging is based on CDI, so all interaction with the MicroProfile Reactive Messaging streams must happen from a CDI beans. Note: The beans must have either the @ApplicationScoped
or @Dependent
scopes.
Then within these beans we have a set of methods using the @Incoming
and @Outgoing
annotations from the MicroProfile Reactive Messaging specification to map the underlying streams. For an @Outgoing
annotation its value
specifies the Reactive Messaging stream to send data to, and for an @Incoming
annotation its value
specifies the Reactive Messaging stream to read data from. Although in this QuickStart we are putting all these methods into one CDI bean class, they could be spread over several beans.
The MicroProfile Reactive Messaging specification contains a full list of all the valid method signatures for such @Incoming
/@Outging
methods. We will use a few of them, and see how they make different use-cases easier.
Our bean looks as follows, and this is the main focus for the functionality in this QuickStart. We will also be using some other technologies, but they are secondary to this section. Explanations of each method will be given in line.
package org.wildfly.quickstarts.microprofile.reactive.messaging;
import java.sql.Timestamp;
import java.util.concurrent.CompletionStage;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Publisher;
@ApplicationScoped
public class MessagingBean {
First we inject our mock asynchronous external data producer, which produces a string every two seconds. We explained this class in a previous section.
@Inject
MockExternalAsyncResource producer;
We inject a bean that will be used to persist entries to a RDBMS later on.
@Inject
DatabaseBean dbBean;
Now we get to the reactive messaging part.
Our first method is a 'producer' method, since is annotated with the @Outgoing
annotation. It simply relays the output of our mock producer bean to the reactive messaging system. It uses the channel source
(as indicated in the @Outgoing
annotation’s value) as the target stream to send the data to. You can think of 'producer' methods as the entry point to push data into the reactive messaging streams.
@Outgoing("source")
public CompletionStage<String> sendInVm() {
return producer.getNextValue();
}
Next we have a 'processor' method. It is annotated with both @Incoming
and @Outgoing
annotations so it gets data from the reactive messaging streams and then pushes it to another stream. Essentially it simply relays data.
In this case we get data from the source
stream, so it will receive the entries made available by the sendInVm()
method above, and forwards everything onto the filter
stream.
In this case, since we just want to log the strings that were emitted, we are using a simple method signature receiving and returning the raw string provided. The Reactive Messaging implementation has unwrapped the CompletionStage
from the previous method for us.
Note that there is no Kafka involved yet. Since the @Incoming
and @Outgoing
values match up, Reactive Messaging will use internal, in-memory streams.
@Incoming("source")
@Outgoing("filter")
public String logAllMessages(String message) {
System.out.println("Received " + message);
return message;
}
Now we have another 'processor' method. We get the data from the filter
stream (what was relayed by the previous logAllMessages()
method) and forward it on to the sender
stream.
In this method we want to do something a bit more advanced, namely apply a filter to the messages. We use a method receiving and returning a Reactive Stream, in this case we use PublisherBuilder
from the MicroProfile Reactive Streams Operators specification. PublisherBuilder
extends the Publisher
interface from the Reactive Streams specification, and provides us with the filter()
methods we will use here.
Again the Reactive Messaging implementation does all the wrapping for us.
Our filter method tells us to only relay messages that match Hello
or Kafka
, and drop everything else. In other words, later methods in the stream will only receive occurrences of Hello
or Kafka
.
@Incoming("filter")
@Outgoing("sender")
public PublisherBuilder<String> filter(PublisherBuilder<String> messages) {
return messages
.filter(s -> s.equals("Hello") || s.equals("Kafka"));
}
Next we have another 'processor' method, which receives data from the sender
stream and forwards it on to the to-kafka
stream. It’s parameter is a simple String
, MicroProfile Reactive Messaging will unwrap the stream from the PublisherBuilder
returned in the previous method and call this next method with the individual entries.
In this method we want to wrap up the data into the TimedEntry
class we defined earlier, so we have tuple of the message and a timestamp.
Additionally we want to set a Kafka key for the entries so that we can take advantage of Kafka’s querying capabilities (not done in this quickstart). In order to do this, we do the following steps:
-
Create an instance of
Message
from the MicroProfile Reactive Messaging API. AMessage
is a simple wrapper around the payload. In our case we use theTimedEntry
instance we created. -
We create the key for the
TimedEntry
. In this case we just use a hash of the message. -
Use the
OutgoingKafkaRecordMetadata
builder to create an instance ofOutgoingKafkaRecordMetadata
with the key -
Next we call
KafkaMetadataUtil.writeOutgoingKafkaMetadata()
to augment theMessage
with theOutgoingKafkaRecordMetadata
. Note that theMessage
passed in toKafkaMetadataUtil.writeOutgoingKafkaMetadata()
is not modified, we need the returned one. -
We return the augmented
Message
to the stream which is backed by Kafka
@Incoming("sender")
@Outgoing("to-kafka")
public Message<TimedEntry> sendToKafka(String msg) {
// Simpler example for debugging
TimedEntry te = new TimedEntry(new Timestamp(System.currentTimeMillis()), msg);
Message<TimedEntry> m = Message.of(te);
// Just use the hash as the Kafka key for this example
int key = te.getMessage().hashCode();
// Create Metadata containing the Kafka key
OutgoingKafkaRecordMetadata<Integer> md = OutgoingKafkaRecordMetadata
.<Integer>builder()
.withKey(key)
.build();
// The returned message will have the metadata added
return KafkaMetadataUtil.writeOutgoingKafkaMetadata(m, md);
}
Our final method is a 'consumer' method, as it has only an @Incoming
annotation. You can think of this as a 'final destination' for the data in your application. We are using a Message<TimedEntry>
as our method parameter. We are using this signature since we want to access the IncomingKafkaRecordMetadata
, which contains the key we added in the previous method and additional information such as the Kafka partition and topic the message was sent on. Since we are using the signature taking a Message
as the parameter, we need to ack()
the message and return the resulting CompletionStage<Void>
. (If we don’t want to ack the receipt of the message and are not interested in the IncomingKafkaRecordMetadata
, we could have used a simpler signature such as void receiveFromKafka(TimedEntry message)
.)
The methid calls through to our dbBean
to store the received data in a RDBMS. We will look at this briefly later.
@Incoming("from-kafka")
public CompletionStage<Void> receiveFromKafka(Message<TimedEntry> message) {
TimedEntry payload = message.getPayload();
IncomingKafkaRecordMetadata<Integer, TimedEntry> md =
KafkaMetadataUtil.readIncomingKafkaMetadata(message).get();
String msg =
"Received from Kafka, storing it in database\n" +
"\t%s\n" +
"\tkey: %d; partition: %d, topic: %s";
msg = String.format(msg, payload, md.getKey(), md.getPartition(), md.getTopic());
System.out.println(msg);
dbBean.store(payload);
return message.ack();
}} // MessagingBean - END
You might have noticed that up to, and including, the sendToKafka()
method the @Incoming.value()
matches the @Outgoing.value()
of the prior method. This indicates that these streams (source
, filter
and sender
) are handled in memory by the Reactive Messaging implementation.
For the last two methods this is different and there is no such pairing. The sendToKafka()
method sends its data to the to-kafka
stream:
...
@Outgoing("to-kafka")
public Publisher<TimedEntry> sendToKafka(Publisher<String> messages) {
...
}
However, there are no methods annotated with @Incoming("to-kafka)
.
And the receiveFromKafka()
method is expecting to receive data from the from-kafka
stream:
@Incoming("from-kafka")
public void receiveFromKafka(TimedEntry message) {
...
}
Again, there are no methods annotated with @Outgoing("from-kafka")
.
These 'unpaired' sets of methods indicate that we do not want to use an in-memory stream, and want to use an external system for these streams. If we were try to deploy the MessagingBean in this state the application would fail to deploy. To fix this, and tell it what to map onto, we need to provide some configuration.
Note
|
IncomingKafkaRecordMetadata is only available on incoming streams coming from Kafka. OutgoingKafkaRecordMetadata will only have effect on outgoing streams going to Kafka.
|
To map 'unpaired' streams onto Kafka we need to add a MicroProfile Config file to configure these streams.
Create a file called src/main/resources/META-INF/microprofile-config.properties
and add the following:
mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092
mp.messaging.outgoing.to-kafka.connector=smallrye-kafka
mp.messaging.outgoing.to-kafka.topic=testing
mp.messaging.outgoing.to-kafka.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.to-kafka.value.serializer=org.wildfly.quickstarts.microprofile.reactive.messaging.TimedEntrySerializer
# Configure the Kafka source (we read from it)
mp.messaging.incoming.from-kafka.connector=smallrye-kafka
mp.messaging.incoming.from-kafka.topic=testing
mp.messaging.incoming.from-kafka.value.deserializer=org.wildfly.quickstarts.microprofile.reactive.messaging.TimedEntryDeserializer
mp.messaging.incoming.from-kafka.key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
The MicroProfile Reactive Messaging specification mandates the following pre-fixes:
-
mp.messaging.connector.
- used to set overall configuration for your application. -
mp.messaging.outgoing.
- used to configure streams we are writing to from methods annotated with@Outgoing
. The next element determines the name of the stream as identified in the@Outgoing
annotation so all the properties starting withmp.messaging.outgoing.to-kafka
are used to configure the writing done by thesendToKafka()
method which is annotated with@Outgoing("to-kafka")
. -
mp.messaging.incoming.
- used to configure streams we are reading from in methods annotated with@Incoming
. The next element determines the name of the stream as identified in the@Incoming
annotation so all the properties starting withmp.messaging.incoming.from-kafka
are used to configure the reading done by thereceiveFromKafka()
method which is annotated with@Incoming("from-kafka")
.
What comes after these prefixes is vendor dependent. We use the SmallRye implementation of MicroProfile Reactive Messaging.
At the application level, the mp.messaging.connector.smallrye-kafka.bootstrap.servers
property says that all conections to Kafka in this application should go to localhost:9092
. This is not strictly necessary, since this value is the default that would be used if not specified. If we wanted to override this for say the @Outgoing("to-kafka")
annotated sendToKafka()
method we could specify this with a property such as:
mp.messaging.outgoing.to-kafka.bootstrap.servers=otherhost:9092
For the incoming and outgoing properties we can see that they all specify that they should use the smallrye-kafka
connector and that the outgoing one writes to the same topic, testing
, as the incoming one reads from.
We see that the outgoing configuration uses a TimedEntrySerializer
while the incoming one uses TimedEntryDeserializer
for the values. Kafka just deals with byte streams, so we need to tell it how to serialize the data we are sending and how to deserialize the data we are receiving. As seen is configured with properties of the form mp.messaging.outgoing.<stream name>.value.serializer
and mp.messaging.incoming.<stream name>.value.deserializer
. The org.apache.kafka.common.serialization package contains implementations of serializers and deserializers for simple data types and constructs such as maps.
Finally, since the Kafka key is an Integer
, we use IntegerSerializer
and IntegerDeserializer
for the keys. The concept is exactly the same as for the value (de)serializers, but is instead configured with the properties mp.messaging.outgoing.<stream name>.key.serializer
and mp.messaging.incoming.<stream name>.key.deserializer
.
In our case the data we are sending to and receiving from Kafka is not a simple object. It is an object of a class defined in our application, so we need to define our own serialization and deserialization. Luckily, this is easy. We just need to implement the org.apache.kafka.common.serialization.Serializer and org.apache.kafka.common.serialization.Deserializer interfaces.
Here is our TimedEntrySerializer
:
package org.wildfly.quickstarts.microprofile.reactive.messaging;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import org.apache.kafka.common.serialization.Serializer;
public class TimedEntrySerializer implements Serializer<TimedEntry> {
@Override
public byte[] serialize(String topic, TimedEntry data) {
if (data == null) {
return null;
}
try {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bout);
out.writeLong(data.getTime().getTime());
out.writeUTF(data.getMessage());
out.close();
return bout.toByteArray();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
And here is our TimedEntryDeserializer
:
package org.wildfly.quickstarts.microprofile.reactive.messaging;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.sql.Timestamp;
import org.apache.kafka.common.serialization.Deserializer;
public class TimedEntryDeserializer implements Deserializer<TimedEntry> {
@Override
public TimedEntry deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data))){
Timestamp time = new Timestamp(in.readLong());
String message = in.readUTF();
return new TimedEntry(time, message);
} catch (IOException e){
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
As you can see the serializer writes the time as a Long, and the message as a string, and the deserializer reads them in the same order. Then in our microprofile-config.properties
above we saw how to make Kafka use our classes for serialization and deserialization.
We have covered all the reactive messaging parts, but have missed out how the MessagingBean.receiveFromKafka()
stores data via the DatabaseBean
. This is not the focus of this QuickStart, so we will just mention how this works quickly.
This is the definition of DatabaseBean
:
package org.wildfly.quickstarts.microprofile.reactive.messaging;
import java.util.List;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import jakarta.persistence.TypedQuery;
import jakarta.transaction.Transactional;
@ApplicationScoped
public class DatabaseBean {
@PersistenceContext(unitName = "test")
EntityManager em;
@Transactional
public void store(Object entry) {
em.persist(entry);
}
public List<TimedEntry> loadAllTimedEntries() {
TypedQuery<TimedEntry> query = em.createQuery("SELECT t from TimedEntry t", TimedEntry.class);
List<TimedEntry> result = query.getResultList();
return result;
}
}
It injects an EntityManager
for the persitence context test
, which is defined in the src/main/resources/META-INF/persistence.xml
file:
<?xml version="1.0" encoding="UTF-8"?>
<persistence version="2.2"
xmlns="http://xmlns.jcp.org/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://xmlns.jcp.org/xml/ns/persistence
http://xmlns.jcp.org/xml/ns/persistence/persistence_2_2.xsd">
<persistence-unit name="test">
<jta-data-source>java:jboss/datasources/ExampleDS</jta-data-source>
<properties>
<property name="hibernate.hbm2ddl.auto" value="create-drop"/>
</properties>
</persistence-unit>
</persistence>
The DatabaseBean.store()
method saves the TimedEntry
and the DatabaseBean.loadAllTimedEntries()
method loads all the ones we stored.
It is worth pointing out that the @Incoming
and @Outgoing
annotated methods called by the Reactive Messaging implementation (such as MessagingBean.receiveFromKafka()
) happen outside of user space, so there is no @Transaction associated with them. So we need to annotated the DatabaseBean.store()
method with @Transactional
in order to save our entry to the database.
Finally, we would like to be able to view the data that was stored in the database. To do this we will add a JAX-RS endpoint that queries the database by calling DatabaseBean.loadAllTimedEntries()
.
package org.wildfly.quickstarts.microprofile.reactive.messaging;
import java.util.List;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/")
public class RootResource {
@Inject
DatabaseBean dbBean;
@GET
@Produces(MediaType.TEXT_PLAIN)
public String getRootResponse() {
List<TimedEntry> entries = dbBean.loadAllTimedEntries();
StringBuffer sb = new StringBuffer();
for (TimedEntry t : entries) {
sb.append(t);
sb.append("\n");
}
return sb.toString();
}
}
We expose our JAX-RS application at the context path:
package org.wildfly.quickstarts.microprofile.reactive.messaging;
import jakarta.ws.rs.ApplicationPath;
import jakarta.ws.rs.core.Application;
@ApplicationPath("/")
public class JaxRsApplication extends Application {
}
So far what we have seen is really happening in the back-end with little user interaction.
The MicroProfile Reactive Messaging 2.0 specification adds a @Channel
annotation and an Emitter
interface which makes it easier to send data to MicroProfile Reactive Messaging streams from user initiated code and to receive data from Reactive Messaging streams.
To showcase this functionality we add another CDI bean called UserMessagingBean
:
package org.wildfly.quickstarts.microprofile.reactive.messaging;
import java.util.concurrent.CopyOnWriteArraySet;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.FormParam;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.core.Response;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@ApplicationScoped
public class UserMessagingBean {
@Inject
@Channel("user")
private Emitter<String> emitter;
private BroadcastPublisher<String> broadcastPublisher;
public UserMessagingBean() {
//Needed for CDI spec compliance
//The @Inject annotated one will be used
}
@Inject
public UserMessagingBean(@Channel("user") Publisher<String> receiver) {
this.broadcastPublisher = new BroadcastPublisher(receiver);
}
@PreDestroy
public void destroy() {
broadcastPublisher.close();
}
public Response send(String value) {
System.out.println("Sending " + value);
emitter.send(value);
return Response.accepted().build();
}
public Publisher<String> getPublisher() {
return broadcastPublisher;
}
private class BroadcastPublisher<T> implements Publisher<T> {
// See source code for more details
}
}
Looking at this in more detail, the following field
@Inject
@Channel("user")
private Emitter<String> emitter;
is used to send data to the MicroProfile Reactive Messagin stream called user
, which is done in the following method
public Response send(String value) {
System.out.println("Sending " + value);
emitter.send(value);
return Response.accepted().build();
}
@Inject @Channel
on an Emitter
can be considered similiar to an @Outgoing
annotated method but with the data coming from code paths invoked by user interaction. In this case we are not using Kafka to back the stream but if we wanted to, for this example, the MicroProfile Config properties would be prefixed with the mp.messaging.outgoing.user.
prefix.
Next we have the constructor where we inject a Publisher
(We could also have used a MicroProfile Reactive Streams Operators PublisherBuiilder
) to define the receiving side.
@Inject
public UserMessagingBean(@Channel("user") Publisher<String> receiver) {
this.broadcastPublisher = new BroadcastPublisher(receiver);
}
We store this injected Publisher
in the broadcastPublisher
field. We will come back to why we are wrapping it in a BroadcastPublisher
in a second. So an @Inject @Channel
on a Publisher
(or PublisherBuilder
) can be considered equivalent to use of the @Incoming
annotation. In this case we are listening to the user
in memory stream so messages sent via the Emitter
will be received on this Publisher
. If instead we wanted to configure it to send via Kafka we would use MicroProfile Config properties prefixed with the mp.messaging.incoming.user.
prefix.
There are a few caveats on this mechanism though:
-
There must be an active
Subscription
(from the Reactive Streams specification) on the channel before theEmitter.send()
method is called. -
There can only be one
Subscription
on the injectedPublisher
. This means that we cannot simply return thisPublisher
as is via an asynchronous JAX-RS endpoint as each client request would result in a separateSubscription
.
The above two points will hopefully be fixed in a future version of the specification. For the purposes of this quickstart we are bypassing the above limitations by creating the BroadcastPublisher
class and wrapping the original Publisher
in that instead. Note that implementing Publisher
is hard, and that BroadcastPublisher
should be considered a 'proof of concept'. For more details about the BroadcastPublisher
see the source code of the UserMessagingBean
class. In a nutshell what it does is:
-
Its constructor subscribes to the injected
Publisher
to avoid the first problem -
When code subscribes to it, it handles the
Subscription
on its own level, and forwards on code received on the ' a separate level and forwards on data received from the baseSubsciption
created in the constructor.
Finally we have a JAX-RS endpoint
package org.wildfly.quickstarts.microprofile.reactive.messaging;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.jboss.resteasy.annotations.Stream;
import org.reactivestreams.Publisher;
@ApplicationScoped
@Path("/user")
@Produces(MediaType.TEXT_PLAIN)
public class UserResource {
@Inject
UserMessagingBean bean;
@POST
@Path("{value}")
@Consumes(MediaType.TEXT_PLAIN)
public Response send(@PathParam("value") String value) {
bean.send(value);
return Response.ok().build();
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@Stream
public Publisher<String> get() {
return bean.getPublisher();
}
}
It simply delegates the values received from POST
requests under /user/<value>
onto the bean which sends them via the Emitter
.
Then GET
requests for /user
return the BroadCastPublisher
to the user who will then receive data received on the MicroProfile Reactive Messaging channel.
Now you should be able to compile the application and prepare it for deployment.
To run the Apache Kafka service we will use its Docker container. Note that this also requires Apache Zookeeper, so rather than a simple docker run
command this quickstart directory contains a docker-compose.yaml
file defining the two Docker containers needed. The
file can be found here.
Note
|
If you don’t have Docker installed on your machine please follow the instructions at https://www.docker.com/get-started. |
Run the following command from this folder:
docker-compose up
Note
|
This can take a minute. |
Let’s check that our application works!
-
Make sure the {productName} server is started as described above.
-
{productName} ships with all the modules to run MicroProfile Reactive Messaging applications with Kafka, but the functionality is not enabled out of the box, so we need to enable it. Run:
$ {jbossHomeName}/bin/jboss-cli.sh --connect --file=enable-reactive-messaging.cli
to set everything up. Theenable-reactive-messaging.cli
file used can be found here. -
Open new terminal and navigate to the root directory of your project.
-
Type the following command to build and deploy the project:
$ mvn clean package wildfly:deploy
Now we should see output in the server console. First, we see output for the ones in the determined order:
14:24:39,197 INFO [stdout] (vert.x-eventloop-thread-0) Received from Kafka, storing it in database
14:24:39,197 INFO [stdout] (vert.x-eventloop-thread-0) TimedEntry{time=2021-08-06 14:24:39.183, message='Hello'}
14:24:39,197 INFO [stdout] (vert.x-eventloop-thread-0) key: 69609650; partition: 0, topic: testing
14:24:41,185 INFO [stdout] (pool-22-thread-1) Received world
14:24:43,183 INFO [stdout] (pool-22-thread-1) Received Reactive
14:24:45,183 INFO [stdout] (pool-22-thread-1) Received Messaging
14:24:47,183 INFO [stdout] (pool-22-thread-1) Received with
14:24:49,182 INFO [stdout] (pool-22-thread-1) Received Kafka
14:24:49,188 INFO [stdout] (vert.x-eventloop-thread-0) Received from Kafka, storing it in database
14:24:49,188 INFO [stdout] (vert.x-eventloop-thread-0) TimedEntry{time=2021-08-06 14:24:49.183, message='Kafka'}
14:24:49,188 INFO [stdout] (vert.x-eventloop-thread-0) key: 72255238; partition: 0, topic: testing
14:24:51,184 INFO [stdout] (pool-22-thread-1) Received Kafka
Then we get another section where it is using the randomised order
14:24:51,184 INFO [stdout] (pool-22-thread-1) Received Kafka
14:24:51,190 INFO [stdout] (vert.x-eventloop-thread-0) Received from Kafka, storing it in database
14:24:51,190 INFO [stdout] (vert.x-eventloop-thread-0) TimedEntry{time=2021-08-06 14:24:51.184, message='Kafka'}
14:24:51,190 INFO [stdout] (vert.x-eventloop-thread-0) key: 72255238; partition: 0, topic: testing
14:24:53,184 INFO [stdout] (pool-22-thread-1) Received world
14:24:55,184 INFO [stdout] (pool-22-thread-1) Received world
14:24:57,184 INFO [stdout] (pool-22-thread-1) Received Reactive
14:24:59,181 INFO [stdout] (pool-22-thread-1) Received Hello
14:24:59,187 INFO [stdout] (vert.x-eventloop-thread-0) Received from Kafka, storing it in database
14:24:59,187 INFO [stdout] (vert.x-eventloop-thread-0) TimedEntry{time=2021-08-06 14:24:59.182, message='Hello'}
14:24:59,187 INFO [stdout] (vert.x-eventloop-thread-0) key: 69609650; partition: 0, topic: testing
In both parts of the log we see that all messages reach the logAllMessages()
method, while only Hello
and Kafka
reach the receiveFromKafka()
method which saves them to the RDBMS.
To inspect what was stored in the database, go to http://localhost:8080/microprofile-reactive-messaging-kafka in your browser and you should see something like:
TimedEntry{id=1, time=2021-08-06 14:24:39.183, message='Hello'}
TimedEntry{id=2, time=2021-08-06 14:24:49.183, message='Kafka'}
TimedEntry{id=3, time=2021-08-06 14:24:51.184, message='Kafka'}
TimedEntry{id=4, time=2021-08-06 14:24:59.182, message='Hello'}
The timestamps of the entries in the browser match the ones we saw in the server logs.
With the application still running, open two terminal windows. Enter the following curl
command in both of them
$curl -N http://localhost:8080/microprofile-reactive-messaging-kafka/user
The -N
option keeps the connection open, so we receive data as it becomes available on the publisher.
In a third terminal window enter the commands:
$curl -X POST http://localhost:8080/microprofile-reactive-messaging-kafka/user/one
$curl -X POST http://localhost:8080/microprofile-reactive-messaging-kafka/user/two
$curl -X POST http://localhost:8080/microprofile-reactive-messaging-kafka/user/three
In the first two terminal windows you should see these entries appear as they are posted:
data: one
data: two
data: three