Skip to content

Commit

Permalink
More control on event publishing (#61)
Browse files Browse the repository at this point in the history
* More control on event publishing
  • Loading branch information
larousso authored Nov 22, 2023
1 parent 43c5905 commit ae79ce6
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 125 deletions.
6 changes: 2 additions & 4 deletions thoth-core-reactor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ dependencies {
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jacksonVersion")
implementation("fr.maif:functional-json:$functionalJsonVersion")

testImplementation("com.typesafe.akka:akka-testkit_$scalaVersion:$akkaVersion")
testImplementation("com.typesafe.akka:akka-stream-testkit_$scalaVersion:$akkaVersion")
testImplementation("com.typesafe.akka:akka-stream-kafka-testkit_$scalaVersion:$alpakkaKafkaVersion")
testImplementation("org.assertj:assertj-core:3.10.0")
testImplementation("com.h2database:h2:1.4.197")
testImplementation("org.mockito:mockito-core:2.22.0")
Expand All @@ -24,7 +21,8 @@ dependencies {
testImplementation("org.junit.vintage:junit-vintage-engine:5.4.2")
testImplementation("net.aichler:jupiter-interface:0.9.1")
testImplementation("org.scalatest:scalatest_$scalaVersion:3.0.8")
testImplementation("org.testcontainers:kafka:1.15.3")
testImplementation("org.testcontainers:kafka:1.18.0")
testImplementation "org.testcontainers:junit-jupiter:1.18.0"
}

test {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package fr.maif.reactor.eventsourcing;

import fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventPublisher;
import fr.maif.eventsourcing.EventStore;
import fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.collection.List;
Expand Down Expand Up @@ -80,20 +80,30 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur
.concatMap(tx -> {
LOGGER.info("Replaying not published in DB for {}", topic);
ConcurrentReplayStrategy strategy = Objects.isNull(concurrentReplayStrategy) ? WAIT : concurrentReplayStrategy;
return Flux.from(eventStore.loadEventsUnpublished(tx, strategy))
return Flux
.from(eventStore.loadEventsUnpublished(tx, strategy))
.transform(publishToKafka(eventStore, Option.some(tx), groupFlow))
.doOnNext(logProgressSink::tryEmitNext)
.then(Mono.fromCompletionStage(() -> {
.concatMap(any -> Mono.fromCompletionStage(() -> {
LOGGER.info("Replaying events not published in DB is finished for {}", topic);
return eventStore.commitOrRollback(Option.none(), tx);
}))
.doOnError(e -> {
eventStore.commitOrRollback(Option.of(e), tx);
LOGGER.error("Error replaying non published events to kafka for " + topic, e);

});
})
.retryWhen(Retry.backoff(Long.MAX_VALUE, restartInterval)
.transientErrors(true)
.maxBackoff(maxRestartInterval)
.doBeforeRetry(ctx -> {
LOGGER.error("Error republishing events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries()), ctx.failure());
})
)
.collectList()
.map(__ -> Tuple.empty())
.switchIfEmpty(Mono.just(Tuple.empty()));
})
.thenMany(
.concatMap(__ ->
this.eventsSource.transform(publishToKafka(
eventStore,
Option.none(),
Expand All @@ -116,14 +126,13 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur


private <TxCtx> Function<Flux<EventEnvelope<E, Meta, Context>>, Flux<EventEnvelope<E, Meta, Context>>> publishToKafka(EventStore<TxCtx, E, Meta, Context> eventStore, Option<TxCtx> tx, Function<Flux<SenderResult<EventEnvelope<E, Meta, Context>>>, Flux<List<SenderResult<EventEnvelope<E, Meta, Context>>>>> groupFlow) {
Function<Flux<SenderRecord<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>>, Flux<SenderResult<EventEnvelope<E, Meta, Context>>>> publishToKafkaFlow = it ->
kafkaSender.send(it)
.doOnError(e -> {
LOGGER.error("Error publishing to kafka ", e);
});
return it -> it
.map(this::toKafkaMessage)
.transform(publishToKafkaFlow)
.concatMap(events -> {
LOGGER.debug("Sending event {}", events);
return kafkaSender.send(Flux.just(events))
.doOnError(e -> LOGGER.error("Error publishing to kafka ", e));
})
.transform(groupFlow)
.concatMap(m ->
tx.fold(
Expand All @@ -142,6 +151,7 @@ public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> eve
.concatMap(t ->
Mono.defer(() -> {
Sinks.EmitResult emitResult = queue.tryEmitNext(t);
LOGGER.debug("Event publisher {}, {} buffered elements ( capacity = {} ), emitResult = {}, event = {}", topic, queue.scan(Scannable.Attr.BUFFERED), queue.scan(Scannable.Attr.CAPACITY), emitResult, t);
if (emitResult.isFailure()) {
return Mono.error(new RuntimeException("Error publishing to queue for %s : %s".formatted(topic, emitResult)));
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package fr.maif.reactor;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import static io.vavr.API.println;

@Testcontainers
public interface KafkaContainerTest {

AtomicInteger counter = new AtomicInteger(0);

KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));

static void startContainer() {
kafkaContainer.start();
}

default String bootstrapServers() {
return kafkaContainer.getBootstrapServers();
}

default Admin adminClient() {
return Admin.create(Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()
));
}

default String createTopic() {
return createTopic("topic-"+counter.incrementAndGet(), 3, 1);
}

default String createTopic(String name, int partitions, int replication) {
try {
CreateTopicsResult createTopicsResult = adminClient().createTopics(java.util.List.of(new NewTopic(name, partitions, (short) replication)));
createTopicsResult.all().get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException("Unable to create topic with name " + name, e);
}
return name;
}

default void deleteTopics() {
try {
Set<String> topics = adminClient().listTopics().names().get(5, TimeUnit.SECONDS);
if (!topics.isEmpty()) {
println("Deleting " + String.join(",", topics));
adminClient().deleteTopics(topics).all().get();
}
} catch (Exception e) {
e.printStackTrace();
}
}


default ReceiverOptions<String, String> receiverDefault() {
return ReceiverOptions.<String, String>create(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()
))
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer());
}

default SenderOptions<String, String> senderDefault() {
return SenderOptions.<String, String>create(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()
))
.withKeySerializer(new StringSerializer())
.withValueSerializer(new StringSerializer());
}

default void produceString(String topic, String event) {
KafkaSender.create(senderDefault()).send(Mono.just(
SenderRecord.create(new ProducerRecord<>(
topic, event
), null)
)).collectList().block();
}

}
Loading

0 comments on commit ae79ce6

Please sign in to comment.