Skip to content

Commit

Permalink
More control on event publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed Nov 23, 2023
1 parent 569ac54 commit 42fce41
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 7 deletions.
1 change: 1 addition & 0 deletions thoth-core-reactor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {
testImplementation("org.scalatest:scalatest_$scalaVersion:3.0.8")
testImplementation("org.testcontainers:kafka:1.18.0")
testImplementation "org.testcontainers:junit-jupiter:1.18.0"
testImplementation "ch.qos.logback:logback-classic:1.4.8"
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur
eventStore.commitOrRollback(Option.of(e), tx);
LOGGER.error("Error replaying non published events to kafka for " + topic, e);
})
.retryWhen(Retry.backoff(Long.MAX_VALUE, restartInterval)
.retryWhen(Retry.backoff(10, restartInterval)
.transientErrors(true)
.maxBackoff(maxRestartInterval)
.doBeforeRetry(ctx -> {
Expand All @@ -101,6 +101,7 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur
)
.collectList()
.map(__ -> Tuple.empty())
.onErrorReturn(Tuple.empty())
.switchIfEmpty(Mono.just(Tuple.empty()));
})
.concatMap(__ ->
Expand Down Expand Up @@ -173,7 +174,7 @@ public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> eve

@Override
public void close() throws IOException {
if (Objects.nonNull(killSwitch)) {
if (Objects.nonNull(killSwitch) && !killSwitch.isDisposed()) {
try {
this.killSwitch.dispose();
} catch (UnsupportedOperationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ public void eventConsumption() throws IOException, InterruptedException {
.receive()
.map(ConsumerRecord::value)
.map(KafkaEventPublisherTest::deserialize)
.doOnNext(elt -> System.out.println("next : "+elt))
.take(3)
.doOnNext(elt -> System.out.println("Group : " + elt))
.doOnError(e -> e.printStackTrace())
.map(e -> {
println(e);
return e;
Expand Down Expand Up @@ -165,8 +162,6 @@ public void eventConsumptionWithEventFromDb() throws IOException, InterruptedExc
)
.receive()
.map(ConsumerRecord::value)
.doOnNext(elt -> System.out.println(elt))
.doOnError(e -> e.printStackTrace())
.take(6)
.timeout(Duration.of(20, ChronoUnit.SECONDS))
.collectList()
Expand Down
18 changes: 18 additions & 0 deletions thoth-core-reactor/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%logger{15} - %message%n%xException{10}</pattern>
</encoder>
</appender>

<appender name="ASYNCSTDOUT" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="STDOUT" />
</appender>
<logger name="org.apache.kafka" level="WARN" />
<logger name="org.testcontainers.utility" level="WARN" />
<logger name="fr.maif.reactor" level="DEBUG" />
<root level="INFO">
<appender-ref ref="ASYNCSTDOUT" />
</root>

</configuration>

0 comments on commit 42fce41

Please sign in to comment.