Mixing with native reactive streams declaration
Reactive Stream topology reshaping and instrumentation
This project is about experimenting with new ideas for the Reactive Messaging for MicroProfile specifications.
The current main focus is on the follow aspects:
-
Messages lifecycle (creation, child messages, ...).
-
Message acknowledgment.
-
Dependency injection integration, noticeably in regard of reactive streams.
All of the code of this project is a prototype. Some pieces of it are advanced, others are not.
Even the prototyped new APIs are not fully polished so far and are likely to benefit from iterations.
The main expected benefits of this project are:
-
Enabling messages forks and joins in the reactive flow.
-
Understanding how dependency injection and reactive streams could play together, specifically with the challenge of threads (i.e. In CDI, a NormalScope is thread bound.).
-
Understanding how, if possible, this could be implemented at least with CDI, and also with Quarkus DI.
-
To be used as a basis (API and implementation wise) of discussion for integration into the Reactive Messaging for MicroProfile specifications..
This section introduces some basic elements. Additional concepts are introduced in the following sections with more advanced scenario.
A Message
is exposed as an interface, however it is only implemented by the framework.
There is no sub-classing by connector providers.
A Message
, along with all what is contained inside it, is a simlpe POJO and can go through reactive streams without any concern.
It can be injected into beans, however it cannot itself have injected fields or method parameters.
Here is the basic API:
public interface Message<T> {
Iterable<Metadata> getMetadata();
<C extends Metadata> C getMetadata(String key);
void addMetadata(Metadata ctx);
T getPayload();
void setPayload(T payload);
...
}
As with the MP-RM v1.0, the Message is parameterized by the payload type. A payload is mutable.
Metadata
is a new addition to the Message
.
It enables to provide contextualized meta-data to it.
As part of the Message
it is a POJO.
This contextualization can be provided by the source connector.
For instance a Kafka connector, will provide a KafkaIncoming
with Kafka specific pieces of information, e.g.:
@MessageScoped
public interface KafkaIncoming extends Metadata {
String KEY = "MY_KAFKA_IMPLEMENTATION";
String topic();
int partition();
long offset();
long timestamp();
TimestampType timestampType();
}
Beyond the source connector, a corporate framework can also provide Metadata(s)
to enrich the message just after it creation, e.g.:
@MessageScoped
public interface EventMetadata extends Metadata {
String getUniqueMessageId();
}
As with the MP-RM v1.0, a processor can be implemented using the @Incoming
and @Outgoing
annotations with the
following signature:
@Incoming("input_channel")
@Outgoing("business_internal_channel1")
public void stage1(String payload) {
...
}
@Incoming("business_internal_channel1")
@Outgoing("business_internal_channel2")
public void stage2(Message<String> message) {
...
}
What is added to the API is:
The injection of Metadata
in method call:
@Incoming("internal_channel")
@Outgoing("output_channel")
public void example(KafkaIncoming kafkaIncoming, EventMetadata eventMetadata, Message<String> message)
...
}
The injection of what can be injected into the method call using regular DI mechanism (except the payload, while it might be added):
@Inject
private KafkaIncoming kafkaIncoming;
@Inject
private EventMetadata eventMetadata;
@Inject
private Message<String> message;
@Incoming("internal_channel")
@Outgoing("output_channel")
public void example() {
...
}
The two previous examples are equivalents from user perspective.
In the previous example, the injected objects are bound to the execution thread, hence they can't be directly used inside a reactive stream, nor in any asynchronous mechanism.
To do so, the developer has to get a direct reference to the POJO instead of its proxied (thread-bound) instance.
This is achieved thanks to the introduction of a new type Async
belonging to the MP-RM framework.
This Async
type is parameterized with the type of instance that is expected, e.g.:
@Inject
private Message<String> message;
@Inject
private Async<Message<String>> asyncMessage;
@Incoming("internal_channel")
@Outgoing("output_channel")
public void example() {
Message<String> pojoMessage = asyncMessage.get(); // non-proxied instance of the message
...
}
The pojoMessage
instance can be sent to any asynchronous processing here.
It is worth noting that the message
injected proxy is referencing the POJO object obtained in pojoMessage
.
In another words, for instance, updating the payload in one or the other is equivalent.
This part has not been prototyped so far
The current Emitter<T>
API has two send
methods, one for payload the other one for Message
:
public interface Emitter<T> {
CompletionStage<Void> send(T msg);
<M extends Message<? extends T>> void send(M msg);
...
}
It is proposed to change it only send Message
:
public interface Emitter<T> {
void send(Message<T> msg);
...
}
Using Emitter
will fit the regular case as any DI enabled Message will flow.
See below for Message(s)
and their child lifecycle.
Let's dig into more details, and focus on the lifecycle of Message(s)
.
One example can be found in the module kafka-connector-provider
A Message
can be built only by using the framework API.
A source connector can provide it own Metadata
at this time, e.g.:
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);
...
return consumer.toFlowable()
.map(record -> Message.builder()
.addMetadata(new KafkaIncoming(record))
.payload(record.value())
.build());
Here the MessageBuilder.build()
method call returns the POJO that is send into the reactive stream.
As the Message
is built by the framework, any needed CDI-related initialization can be performed at this time.
Note: The builder is also exposing a 'dependencyInjection(boolean activate)' method that is enabling the override the default value for CDI activation (...). This is to enable higher performances for highly constrained implementations (e.g. for a technical gateway).
As a context has to be created for each Message
when CDI is activated, it has to be deleted at some stage.
This deletion is handled by the CDI implementation of the framework upon message global acknowledgement.
Note: This context is not necessarily a CDI Context
(...).
One example can be found in the module corporate-framework
It is possible to enrich the Message
with additional Metadata(s)
when the message is built.
To do so, the MessageBuilder
will call method(s) annotated with @MessageInitializer
in a bean.
The method will then be able to customize the message and, if needed, get access to other Metadata
as the
bean can be injected.
The injected Metadata
should be the one initially given to the builder to prevent issue with creation order.
public class EventMetadataMessageInitializer {
@Inject
private KafkaIncoming kafkaIncoming;
@Inject
private Message<?> message;
@MessageInitializer
public void initialize(KafkaIncoming direcKafkaIncoming) {
EventMetadata ec = new EventMetadataImpl();
ec.setEventKey((String) kafkaIncoming.key());
message.addMetadata(ec);
String payload = (String) message.getPayload();
ec.setUniqueMessageId(payload.split("-")[0]);
}
}
Note: Some additional logic could be added to handle creation order.
Let's assume that a processor is sending several messages built from a single one.
This can either be achieved by using Emitter
or by returning a Publisher<Message>
, or even by a of mix of both.
Let's assume that from, a message m0
, the processor is sending (m1
, m1'
).
|-- m1
m0 --> processor --> |
|-- m1'
The naturally expected semantic for theses messages is to be children of m0
:
-
Often, the payload is not expected to be shared (while it can be in some cases).
-
Some
Metadata
might be propagated, other not, e.g.: theKafkaIncoming
. Some otherMetadata
can be built from the previous one (e.g. for OpenTracing span creation using SpanBuilder.asChildOf). -
Regarding CDI, each new message should have its own specific context.
Here is a first example of how a child message m1
could be created from m0
and propagated:
Emitter<Message<String>> way1;
...
{ ...
// Build a derived EventMetadata:
// * The EventMetadata could contain OpenTracing id and it is wanted to created a following span.
// * The EventMetadata provider would have to also provide this builder.
EventMetadata m1ec = EventMetadata.build(m0.getMetadata(EventMetadata.KEY)).build();
// Create m1 with the same KafkaIncoming and the derived EventMetadata
Message<String> m1 = Message.builder()
.fromParent(m0)
.addMetadata(m1ec)
.payload("processed: " + m0.getPayload())
.build();
way1.send(m1);
...}
However, for creating a child Message
with a new payload a one-liner would be nicer.
To do so, let's introduce the following concepts:
A Metadata
is regarded as immutable.
A MutableMetadata
is a sub-interface of Metadata
for mutable Metadata
and expose a method createChild()
.
A child-Message
will the be able to use its parent Metadata
instance, except when mutable where a child-Metadata
will be created.
Hence, the previous code can be re-written as:
Emitter<Message<String>> way1;
...
{ ...
Message<String> m1 = Message.builder()
.fromParent(m0)
.payload("processed: " + m0.getPayload())
.build();
way1.send(m1);
... }
The MessageBuilder
will automatically propagate the Metadata
into the child-Message
following this logic.
Let's introduce an example of a micro-service to make it more concrete:
Here are the steps:
-
An incoming message is entering a source connector.
-
The source connector is converting it into a Reactive Messaging
Message
(m0
) and publishing it into a channel of the reactive stream.
3 & 3': The business processor has created two messages (m1
and m1'
) and sent them towards different channels.
The flows for m1
and m1'
are not synchronized as they go through two dissociated branches of the reactive stream.
4 & 4': The sink connectors have transformed their input message into technology specific messages and sent them to their destination.
5 & 5': The destinations are acknowledging their messages.
6: The connector acknowledges the incoming m1
message.
7: The message m1
signal towards m0
that it has been acknowledged.
However, at this stage, m0
is not acknowledged as it is waiting the acknowledgment from m1'
.
6': The connector acknowledges the incoming m1'
message.
7: The message m1'
signal towards m0
that it has been acknowledged.
As, now, both m1
AND m1'
are acknowledged, m0
is also acknowledged.
8: As the source connector was waiting asynchronously the acknowledgement of the message it has produced, it is now triggered and:
9: The source connector acknowledges its own source.
Hence, the incoming m0
should be acked only when the outgoing m1
,m1'
are acked.
Let's start with a drawing exhibiting the acknowledgment logic for a parent m0
message with a unique child m1
both still
flowing into reactive streams:
Note 1: message.isAcked(), here, means: Is the message globally acked at root level (e.g. from the source connector perspective).
Note 2. m0.stagedAcked()
means m0
is acked at this stage of the process.
However, this doesn't mean that the message is "globally" acked (see below).
Let us introduce these acknowledgement concepts:
-
Message acknowledgement: The message is globally regarded as acknowledged.
-
Message staged-acknowledgement: At each stage of the processing a message can be acknowledged. This acknowledgement can be chained with other ones using
CompletableFuture
that can also depends on other completion conditions (for instance, created withCompletableFuture.allOf
). The staged level acknowledgement are creating a chain towards theMessage
acknowledgment.
Let us represent two stages with their CompletableFuture
materializing the acknowledgement state of the message:
In stage n:
-
There is a completion link between the
CompletableFuture
associated to the current stage and the message global acknowledgementCompletableFuture
. -
Then, the completion of the stage will directly propagate towards the global message completion (while not necessary completing it).
In stage n+1:
-
A new current stage
CompletableFuture
has been created. -
A new anonymous
CompletableFuture
has been created usingCompletableFuture.allOf
from the current stageCompletableFuture
and anotherCompletableFuture
X
. -
This
X
CompletableFuture
can be used, for instance, to handle the completion of a database command, a Rest call, the acknowledge of a message send to an external broker (...). -
When the anonymous
CompletableFuture
will complete this will propagate towards the global message completion. As described for stage n, this might not necessarily globally acknowledge the message as previous stages might also have create such kind of additionalX
conditions that might not be completed.
Now let us dig into the completion logic for a parent-child relationship:
Starting with two messages m0
and m1
, let's make m0
the parent of m1
:
The current acknowledgment stage of the Message
m0
should be replaced by a new one stagedAck(m0).1
that will
propagate the acknowledgement towards messageAck(m0)
only when:
-
It is, it-self, completed;
-
AND, the
Message
m1
is also acknowledged.
Note: See AbstractMessageBuilder.setupParentChildLink(List<Message<?>> parents, Message child)
and its U.T.
Now, let us consider a "pathological" cases:
- (
m1
,m1'
) are actually the samem0
that is re-sent several times.
If so, it is likely that one of these propagation of m0
will be acked first which will destroy the CDI context.
Hence, if this CDI context should be started afterward, an exception will be raised from inside the stream (...).
Let's considere the case where a business processor will issue one output message from several input ones. This can be seen as a join case.
Let's considere we have two different source connectors:
The business processor will issue the message mC
from mA
and mB
with the following strategy:
-
For each type of
Metadata
, a newMetadata
should be associate tomC
that has been built from the merge of the instances of the same type coming frommA
andmB
. The merge strategy is typically to be provided by theMetadata
provider. -
The payload of
mC
has to be provided by the business developer. -
The
mC
message global acknowledgement is mandatory for the staged acknowledged ofmA
andmB
.
A concrete Metadata
type can either support naturally this merge, or not.
Let's considere a Kafka Metadata
.
It may indicate the source Kafka topic from where the message is coming but, in case of a merge,
it might not have the capability to indicate the list of the several source topics:
public interface KafkaIncoming extends Metadata {
...
String topic();
...
}
To achieve this, a new kind of Kafka Metadata
supporting multiple sources might be introduced:
public interface MultiKafkaIncoming extends MutableMetadata {
...
List<KafkaIncoming> getMetadata();
...
}
Ideally, from user perspective, when it is known that there is a single origin it would be wanted to deal with KafkaIncoming
and, when it's not the case, MultiKafkaIncoming
would be used.
Hence, in our example, messages A and B would have a KafkaIncoming
and message C would have a MultiKafkaIncoming
.
This means, the Metadata
provide should expose the capability to merge KafkaIncoming(s)
and MultiKafkaIncoming(s)
towards a MultiKafkaIncoming
.
To achieve this, Metadata
exposes the following method that is to be called by the framework when a child message is created:
public interface Metadata {
...
Metadata merge(Metadata... metadata);
...
}
This method will be called to merge compatible Metadatas
.
For instance, a KafkaIncoming
and a MultiKafkaIncoming
that are compatible.
However, a KafkaIncoming
and RabbitMQ Metadata
are not compatible.
In order for a set of Metadata
type to indicate their compatibility, the following getMetadataMergeKey
method is also exposed by Metadata
:
public interface Metadata {
...
String getMetadataMergeKey();
...
}
All the Metadata
that are compatible should return the same String
value.
The framework will call the merge
method only for compatible type.
Here is the representation of the merge of two messages coming from a Kafka source and a message coming for a RabbitMQ source:
The two KafkaIncoming
are merged as they return the same value when getMetadataMergeKey()
is called.
The result of the merge is producing a MultiKafkaIncoming
that is also returning the same value for getMetadataMergeKey()
.
The RabbitMQMetadata
is not merged as it returns a value not shared with other Metadata
for getMetadataMergeKey()
.
Note: Beyond child message creation, adding a Metadata
into a message may also result in calling this merge logic.
One example can be found in the module reactive-messaging
in MessageImplTest
.
Here is an overview of the life of a message (CDI) with the current implementation:
Here are the main steps:
1- A connector is receiving a message from a queueing system (or a rest call...).
2- The connector is calling the framework MessageBuilder
with the payload an possible Metadata(s)
.
3- The framework is spawning a new MessageScopeContext
unique to this message.
The message keeps secretly an identifier to this context.
4- The newly built message is returned as a POJO to the connector.
5- The connector is pushing this POJO message into the reactive stream.
6- A framework FunctionInvoker
is :
-
receiving the message
-
extracting the hidden context identifier and activating the
MessageScopeContext
-
calling the end-user
Business Processor
(BTW, by looking up into theMessageScopeContext
to inject the parameters)
7- The Business Process
is using its method parameter or CDI proxies to access the Message
objects.
8- The Business Process
method call returns, and the FunctionInvoker
is suspending the CDI context.
Then the message flows to the output connector...
9- The output connector is sending the message to its destination
(and acknowledge the CompletionStage
in the received Message
)
9+ - When the Message
is acknowledged
(either when the output connector is acking, or when later if CompletionStage
has been intercepted),
the MessageScopeContext
is destroyed.
This implementation is storing all the injected instance in a simple class (MessageScopedContext
) that is not a CDI context.
The injected beans (Message
, EventMetadata
...) are hand-made proxies performing a look-up in this MessageScopedContext
class.
Hence, two levels of proxies are used.
This could be improved by wrapping injection targets (see the link to the RestEasy code below).
Beyond that, this implementation is likely to benefit from performance tuning.
Another possible implementation strategy is possible. For instance, Resteasy is:
-
Wrapping the injection targets: https://github.com/resteasy/Resteasy/blob/master/resteasy-cdi/src/main/java/org/jboss/resteasy/cdi/ResteasyCdiExtension.java#L183
-
And then injects its 'property': https://github.com/resteasy/Resteasy/blob/master/resteasy-cdi/src/main/java/org/jboss/resteasy/cdi/JaxrsInjectionTarget.java#L58
-
The lookup of the injected values being eventually done from a non-cdi object (
ResteasyContext.contextualData
).
It might be good to know the rationals behind the Resteasy technical choices (performance?).
It is in the rm-ext
module and enable to use the same API either in bytecode or native mode.
In this section, let us call:
-
CDI injection: Injection with @Inject as with regular CDI or Quarkus DI;
-
Direct injection: Injection performed when calling a method when the parameters as set according to their type (hence not performed by a CDI(-like) container).
-
Dependency injection (DI): Both of the previous ones.
Whatever the implementation, DI is likely to have an impact on performance.
With low/middle-end services (tens of thousands to hundreds of thousands messages/s), the CPU impact might be acceptable. For high-end services (hundreds of thousands to millions messages/s), it might not.
Also, there is memory-impact as each messages will have a context associated to it. With reactive, we can considere some scenario with a high number of messages in memory being processed asynchronously (high-bandwith / high-latency external external systems).
Hence, it might be wanted to have the capacity to disengage the DI features when needed.
Dependency injection will provide added value as exposing to the end-user some message contextual data:
-
In a strongly typed way, hence with a clearly defined semantic;
-
Optionally, according to what is chosen to be injected.
Let's take three examples:
- A basic business developer:
As with current API, he can simply request for the payload and process it:
@Incoming("x")
@Outgoing("y")
void process(Payload payload) {
// Business logic
}
- A business developer that would also need some contextual data provided by its corporate framework:
@Incoming("x")
@Outgoing("y")
void process(EventMetadata eventMetadata, Payload payload) {
String eventId = eventMetadata.getUniqueMessageId();
...
}
- A "business" developer, that would be more on the technical side, and would need to implement a
Kafka
specific processing.
Let's consider implementing a Kafka-to-Kafka replicator that will drop messages with to old timestamp:
@Channel("output")
Emitter<Message<?>> output;
@Incoming("input")
void process(KafkaIncoming kafkaIncoming, Message message) {
if (kafkaIncoming.timestamp() < /* something */)) {
// drop it
} else {
output.send(message);
}
}
Whether injection is performed using CDI injection or Direct injection is probably likely to provide similar benefits from user perspective most of the time.
For instance, these two examples are equivalent:
Example 1:
@Incoming("input")
@Outgoing("output")
public void process(EventMetadata eventMetadata, Payload payload)
...
}
Example 2:
@Inject
private EventMetadata eventMetadata;
@Incoming("input")
@Outgoing("output")
public void process(Payload payload) {
...
}
However, the capacity to inject using CDI:
-
Is more aligned with JAX-RS along with request-scoped beans.
-
Might be interesting in case a lot of different context are to be injected.
One example can be found in the module business-app
in the class MyRxJavaProcessor
.
The following processor signature has been prototyped Publisher<Message<O>> method(Publisher<Message<I>> publisher)
.
This enables to implement a processor with RxJava, e.g.:
public Publisher<Message<String>> stage6(Publisher<Message<String>> publisher) {
return Flowable.fromPublisher(publisher)
.flatMap(message -> {
Message<String> child = Message.<String> builder()
.fromParent(message)
.payload(message.getPayload())
.build();
KafkaTarget target = child.getMetadata(KafkaTarget.KEY);
target.topic(target.topic() + "-child");
return Flowable.fromArray(message, child);
})
.delay(1, TimeUnit.SECONDS);
}
Here, a child message is created with a different Kafka topic target and it is send with its parent into the reactive stream.
Here, no injection can be used.
However, reusing an injected Emitter
using the Async
mechanism might be possible (to investigate).
It might be wanted to reshape the declared topology that is eventually used to build the reactive stream. For instance, each topology node can be:
-
Instrumented with logs or spans before and after a message is processed;
-
Instrumented with performance metrics;
-
Etc.
To do so, the idea is to follow this approach:
-
An actual
Topology
object is built from the annotation based declaration of the processing topology. -
A Reactive Stream (or several ones) is built from a
Topology
object. -
In the basic case, the
Topology
object coming from the annotated methods is directly used to build the Reactive Stream. -
When it is wanted to have reshaped/instrumented version of the Reactive Stream, the
Topology
object is first transformed into another modified instance. This last modified instance is used to build the Reactive Stream.
In the current first, and limited, implementation the following pieces are used:
- Declared Publisher/Processor/Subscriber can be given a name by using the
NodeName
annotation, e.g.:
@Incoming("input_channel")
@Outgoing("business_internal_channel1")
@NodeName("stage1")
public void stage1(String payload) {
...
}
Report to the module business-app
in all the processor classes.
- Each Publisher/Processor/Subscriber can be added with a pre and post-processing operation by implementing the
NodeInterceptor
interface, and annotating methods with theBefore
andAfter
, e.g.:
public class LoggerNodeInterceptor implements NodeInterceptor {
private static final Logger logger = LoggerFactory.getLogger(LoggerNodeInterceptor.class);
private String nodeName;
@Inject
private Message<?> message;
@Override
public void setNodeName(String name) {
this.nodeName = name;
}
@Before
public void before() {
logger.info("before: {} message={}", nodeName, message);
}
@After
public void after(Message<?> message) {
logger.info("after: {}", nodeName);
}
}
Report to the module corportate-framework
in the class LoggerNodeInterceptor
.
A NodeInterceptor
will be given the name of the Publisher/Processor/Subscriber node it is attached by the framework calling its setNodeName
method.
Then, its method annotated with @Before
will be called before each Processor/Subscriber is called in the reactive stream.
And, its method annotated with @After
will be called after each Publisher/Processor is called in the reactive stream.
A NodeInterceptor
is injected (either by CDI or direct injection) in the same way a regular processor is.
- An instrumented topology can be created using the
InstrumentedTopologyBuilderVisitor
from atopology
instance, and a reactive stream started as follow:
Topology instrumentedTopology = InstrumentedTopologyBuilderVisitor.build("logger",
() -> new LoggerNodeInterceptor(), container.getBeanManager(), topology);
reactiveStreamFactory.build(instrumentedTopology);
Report to the module business-app
in the BusinessApp
class for this code.
Report to the module reactive-streamm
for the InstrumentedTopologyBuilderVisitor
.
This results in the following king of outcome:
16:23:15.707 [vert.x-eventloop-thread-0] INFO c.a.m.o.r.m.c.f.LoggerNodeInterceptor:28 - before: stage1 message={metadata=[{topic='mytopic', partition='3', offset='0'}, com.amadeus.middleware.odyssey.reactive.messaging.kafka.connector.provider.KafkaTargetImpl@f8fdcdd, {uniqueMessageId='124', key='key2'}], payload=124-value2}
16:23:15.707 [vert.x-eventloop-thread-0] INFO c.a.m.o.r.m.b.a.MyBusinessProcessor:19 - stage1 start
16:23:15.707 [vert.x-eventloop-thread-0] INFO c.a.m.o.r.m.b.a.MyBusinessProcessor:20 - stage1 payload=124-value2
16:23:15.707 [vert.x-eventloop-thread-0] INFO c.a.m.o.r.m.b.a.MyBusinessProcessor:21 - stage1 stop
16:23:15.707 [vert.x-eventloop-thread-0] INFO c.a.m.o.r.m.c.f.LoggerNodeInterceptor:33 - after: stage1
An actual Topology transformation solution could be implemented. The topology is actually a graph, while the prototype is only implementing linear topology with simple before/after node instrumentation.
A selection mechanism could be used to only instrument specific nodes according to several characteristics.
MessageInitializer
could be dropped in favor of this, with a selection mechanism that could selectively target Publishers of specific kind of messages
(i.e. specific kind of input-connectors).
reactives-messaging
: The API customized from the MP-RM, and basic framework implementation.
This is agnostic to any DI framework (however some CDI annotations are used).
cdi-reactive-messaging
: CDI implementation of the framework.
It works with Weld (no other container implementation tested).
However only CDI-spec API is used.
test-cdi-reactive-messaging
: First basic tests of cdi-reactive-messaging
(using Weld).
kafka-connector-provider
: A Kafka connector.
It does not follow the specification but provides an actual Publisher of Kafka messages using the
prototype API.
corporate-framework
: This provides EventMetadata
to the message computed from the KafkaIncoming
and the payload.
business-app
: This is an example of a business application using the framework.
jmh
: First basic jmh performance tests to get an idea of the CDI implementation impact.
rm-ext
: Quarkus extension to implement the framework.
quarkus-app
: Quarkus application similar to business-app
but for Quarkus (both might be merged one day).
The projects can be built using mvn clean install
.
In order to run business-app
example:
-
A
docker-compose clean
anddocker-compose up
should be performed into thekafka-connector-provider
module. -
In this same module,
KafkaSender.main
should be called to insert message into Kafka.
Then, BusinessApp.main
can be launched producing the following output:
...
10:41:57.583 [vert.x-eventloop-thread-0] DEBUG c.a.m.e.r.m.c.c.CDIMessageBuilderImpl:43 - new message with scopeid=0
10:41:57.594 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyBusinessProcessor:17 - stage1 start
10:41:57.594 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyBusinessProcessor:18 - stage1 payload=124-value2
10:41:57.594 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyBusinessProcessor:19 - stage1 stop
10:41:57.594 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyBusinessProcessor:25 - stage2 start
10:41:57.594 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyBusinessProcessor:26 - stage2 payload=124-value2
10:41:57.595 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyBusinessProcessor:27 - stage2 stop
10:41:57.595 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyBusinessProcessor:33 - stage3 start
10:41:57.595 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyBusinessProcessor:34 - stage3 event id=124 payload=124-value2
10:41:57.595 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyBusinessProcessor:35 - stage3 stop
10:41:57.597 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyAdvancedProcessor:50 - stage4 start
10:41:57.597 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyAdvancedProcessor:53 - KafkaIncoming direct={topic='mytopic', partition='3', offset='0'},cdi={topic='mytopic', partition='3', offset='0'}
10:41:57.597 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyAdvancedProcessor:54 - EventMetadata direct={uniqueMessageId='124', key='0'},cdi={uniqueMessageId='124', key='0'}
10:41:57.597 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyAdvancedProcessor:55 - Message.payload direct=124-value2,cdi=124-value2
10:41:57.597 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyAdvancedProcessor:58 - Payload direct=124-value2
10:41:57.598 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyAdvancedProcessor:63 - EventMetadata async direct={uniqueMessageId='124', key='0'},cdi={uniqueMessageId='124', key='0'}
10:41:57.598 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyAdvancedProcessor:67 - EventMetadata after POJO update direct={uniqueMessageId='pojo-124', key='0'},cdi={uniqueMessageId='pojo-124', key='0'}
10:41:57.598 [vert.x-eventloop-thread-0] DEBUG c.a.m.e.r.m.b.a.MyAdvancedProcessor:71 - KafkaIncoming from message={topic='mytopic', partition='3', offset='0'}
10:41:57.609 [vert.x-eventloop-thread-0] INFO c.a.m.e.r.m.b.a.MyAdvancedProcessor:76 - stage4 stop
...