Skip to content

Commit

Permalink
add first integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
Loic Hermann authored and Loic Hermann committed Aug 11, 2024
1 parent 73c6f05 commit 4b0d88c
Show file tree
Hide file tree
Showing 13 changed files with 267 additions and 105 deletions.
17 changes: 0 additions & 17 deletions docs/modules/ROOT/pages/includes/quarkus-temporal.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,6 @@ h|Default
a|icon:lock[title=Fixed at build time] [[quarkus-temporal_quarkus-temporal-enable-mock]]`link:#quarkus-temporal_quarkus-temporal-enable-mock[quarkus.temporal.enable-mock]`


[.description]
--
enable mock for testing

ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_TEMPORAL_ENABLE_MOCK+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_TEMPORAL_ENABLE_MOCK+++`
endif::add-copy-button-to-env-var[]
--|boolean
|`false`


a|icon:lock[title=Fixed at build time] [[quarkus-temporal_quarkus-temporal-enable-mock]]`link:#quarkus-temporal_quarkus-temporal-enable-mock[quarkus.temporal.enable-mock]`


[.description]
--
enable mock for testing
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.quarkiverse.temporal.deployment;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -76,13 +78,17 @@ void produceWorkflows(
AnnotationTarget target = instance.target();
Collection<ClassInfo> allKnownImplementors = beanArchiveBuildItem.getIndex()
.getAllKnownImplementors(target.asClass().name());
if (allKnownImplementors.size() != 1) {
throw new IllegalStateException("Workflow " + target.asClass().name() + " must have exactly one implementor");
}
Set<String> seenWorkers = new HashSet<>();
allKnownImplementors.forEach(implementor -> {
AnnotationInstance annotation = implementor.annotation(WORKFLOW_IMPL);
String[] workers = annotation == null ? new String[] { "<default>" }
: annotation.value("workers").asStringArray();

if (!Collections.disjoint(seenWorkers, Arrays.asList(workers))) {
throw new IllegalStateException(
"Workflow " + target.asClass().name() + " has more than one implementor on worker");
}
Collections.addAll(seenWorkers, workers);
producer.produce(new WorkflowImplBuildItem(loadClass(implementor), workers));
});
}
Expand All @@ -97,13 +103,16 @@ void produceActivities(
AnnotationTarget target = instance.target();
Collection<ClassInfo> allKnownImplementors = beanArchiveBuildItem.getIndex()
.getAllKnownImplementors(target.asClass().name());
if (allKnownImplementors.size() != 1) {
throw new IllegalStateException("Activity " + target.asClass().name() + " must have exactly one implementor");
}
Set<String> seenWorkers = new HashSet<>();
allKnownImplementors.forEach(implementor -> {
AnnotationInstance annotation = implementor.annotation(ACTIVITY_IMPL);
String[] workers = annotation == null ? new String[] { "<default>" }
: annotation.value("workers").asStringArray();
if (!Collections.disjoint(seenWorkers, Arrays.asList(workers))) {
throw new IllegalStateException(
"Activity " + target.asClass().name() + " has more than one implementor on worker");
}
Collections.addAll(seenWorkers, workers);
producer.produce(new ActivityImplBuildItem(loadClass(implementor), workers));
});
}
Expand Down
5 changes: 5 additions & 0 deletions integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
<artifactId>quarkus-temporal</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkiverse.temporal</groupId>
<artifactId>quarkus-temporal-test</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkiverse.temporal.it.worker;
package io.quarkiverse.temporal.it.defaultWorker;

import io.quarkiverse.temporal.it.shared.AccountActivity;
import io.temporal.activity.Activity;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.quarkiverse.temporal.it.defaultWorker;

import io.quarkiverse.temporal.it.shared.TransactionDetails;

public class CoreTransactionDetails implements TransactionDetails {

private String sourceAccountId;
private String destinationAccountId;
private String transactionReferenceId;
private int amountToTransfer;

public CoreTransactionDetails() {
// Default constructor is needed for Jackson deserialization
}

public CoreTransactionDetails(String sourceAccountId,
String destinationAccountId,
String transactionReferenceId,
int amountToTransfer) {
this.sourceAccountId = sourceAccountId;
this.destinationAccountId = destinationAccountId;
this.transactionReferenceId = transactionReferenceId;
this.amountToTransfer = amountToTransfer;
}

// MARK: Getter methods

public String getSourceAccountId() {
return sourceAccountId;
}

public String getDestinationAccountId() {
return destinationAccountId;
}

public String getTransactionReferenceId() {
return transactionReferenceId;
}

public int getAmountToTransfer() {
return amountToTransfer;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkiverse.temporal.it.worker;
package io.quarkiverse.temporal.it.defaultWorker;

import java.time.Duration;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.quarkiverse.temporal.it.namedWorker;

import io.quarkiverse.temporal.ActivityImpl;
import io.quarkiverse.temporal.it.shared.AccountActivity;
import io.temporal.activity.Activity;

@ActivityImpl(workers = "namedWorker")
public class AccountActivityImpl implements AccountActivity {
// Mock up the withdrawal of an amount of money from the source account
@Override
public void withdraw(String accountId, String referenceId, int amount) {
System.out.printf("\nWithdrawing $%d from account %s.\n[ReferenceId: %s]\n", amount, accountId, referenceId);
System.out.flush();
}

// Mock up the deposit of an amount of money from the destination account
@Override
public void deposit(String accountId, String referenceId, int amount) {
boolean activityShouldSucceed = true;

if (!activityShouldSucceed) {
System.out.println("Deposit failed");
System.out.flush();
throw Activity.wrap(new RuntimeException("Simulated Activity error during deposit of funds"));
}

System.out.printf("\nDepositing $%d into account %s.\n[ReferenceId: %s]\n", amount, accountId, referenceId);
System.out.flush();
}

// Mock up a compensation refund to the source account
@Override
public void refund(String accountId, String referenceId, int amount) {
boolean activityShouldSucceed = true;

if (!activityShouldSucceed) {
System.out.println("Refund failed");
System.out.flush();
throw Activity.wrap(new RuntimeException("Simulated Activity error during refund to source account"));
}

System.out.printf("\nRefunding $%d to account %s.\n[ReferenceId: %s]\n", amount, accountId, referenceId);
System.out.flush();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkiverse.temporal.it.worker;
package io.quarkiverse.temporal.it.namedWorker;

import io.quarkiverse.temporal.it.shared.TransactionDetails;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package io.quarkiverse.temporal.it.namedWorker;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import io.quarkiverse.temporal.WorkflowImpl;
import io.quarkiverse.temporal.it.shared.AccountActivity;
import io.quarkiverse.temporal.it.shared.MoneyTransferWorkflow;
import io.quarkiverse.temporal.it.shared.TransactionDetails;
import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.workflow.Workflow;

@WorkflowImpl(workers = "namedWorker")
public class MoneyTransferWorkflowImpl implements MoneyTransferWorkflow {
private static final String WITHDRAW = "Withdraw";

// RetryOptions specify how to automatically handle retries when Activities fail
private final RetryOptions retryoptions = RetryOptions.newBuilder()
.setInitialInterval(Duration.ofSeconds(1)) // Wait 1 second before first retry
.setMaximumInterval(Duration.ofSeconds(20)) // Do not exceed 20 seconds between retries
.setBackoffCoefficient(2) // Wait 1 second, then 2, then 4, etc
.setMaximumAttempts(5000) // Fail after 5000 attempts
.build();

// ActivityOptions specify the limits on how long an Activity can execute before
// being interrupted by the Orchestration service
private final ActivityOptions defaultActivityOptions = ActivityOptions.newBuilder()
.setRetryOptions(retryoptions) // Apply the RetryOptions defined above
.setStartToCloseTimeout(Duration.ofSeconds(2)) // Max execution time for single Activity
.setScheduleToCloseTimeout(Duration.ofSeconds(5000)) // Entire duration from scheduling to completion including queue time
.build();

private final Map<String, ActivityOptions> perActivityMethodOptions = new HashMap<String, ActivityOptions>() {
{
// A heartbeat time-out is a proof-of life indicator that an activity is still working.
// The 5 second duration used here waits for up to 5 seconds to hear a heartbeat.
// If one is not heard, the Activity fails.
// The `withdraw` method is hard-coded to succeed, so this never happens.
// Use heartbeats for long-lived event-driven applications.
put(WITHDRAW, ActivityOptions.newBuilder().setHeartbeatTimeout(Duration.ofSeconds(5)).build());
}
};

// ActivityStubs enable calls to methods as if the Activity object is local but actually perform an RPC invocation
private final AccountActivity accountActivityStub = Workflow.newActivityStub(AccountActivity.class, defaultActivityOptions,
perActivityMethodOptions);

// The transfer method is the entry point to the Workflow
// Activity method executions can be orchestrated here or from within other Activity methods
@Override
public void transfer(TransactionDetails transaction) {
// Retrieve transaction information from the `transaction` instance
String sourceAccountId = transaction.getSourceAccountId();
String destinationAccountId = transaction.getDestinationAccountId();
String transactionReferenceId = transaction.getTransactionReferenceId();
int amountToTransfer = transaction.getAmountToTransfer();

// Stage 1: Withdraw funds from source
try {
// Launch `withdrawal` Activity
accountActivityStub.withdraw(sourceAccountId, transactionReferenceId, amountToTransfer);
} catch (Exception e) {
// If the withdrawal fails, for any exception, it's caught here
System.out.printf("[%s] Withdrawal of $%d from account %s failed", transactionReferenceId, amountToTransfer,
sourceAccountId);
System.out.flush();

// Transaction ends here
return;
}

// Stage 2: Deposit funds to destination
try {
// Perform `deposit` Activity
accountActivityStub.deposit(destinationAccountId, transactionReferenceId, amountToTransfer);

// The `deposit` was successful
System.out.printf("[%s] Transaction succeeded.\n", transactionReferenceId);
System.out.flush();

// Transaction ends here
return;
} catch (Exception e) {
// If the deposit fails, for any exception, it's caught here
System.out.printf("[%s] Deposit of $%d to account %s failed.\n", transactionReferenceId, amountToTransfer,
destinationAccountId);
System.out.flush();
}

// Continue by compensating with a refund

try {
// Perform `refund` Activity
System.out.printf("[%s] Refunding $%d to account %s.\n", transactionReferenceId, amountToTransfer, sourceAccountId);
System.out.flush();

accountActivityStub.refund(sourceAccountId, transactionReferenceId, amountToTransfer);

// Recovery successful. Transaction ends here
System.out.printf("[%s] Refund to originating account was successful.\n", transactionReferenceId);
System.out.printf("[%s] Transaction is complete. No transfer made.\n", transactionReferenceId);
return;
} catch (Exception e) {
// A recovery mechanism can fail too. Handle any exception here
System.out.printf("[%s] Deposit of $%d to account %s failed. Did not compensate withdrawal.\n",
transactionReferenceId, amountToTransfer, destinationAccountId);
System.out.printf("[%s] Workflow failed.", transactionReferenceId);
System.out.flush();

// Rethrowing the exception causes a Workflow Task failure
throw (e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;

import io.quarkiverse.temporal.it.worker.CoreTransactionDetails;
import io.quarkiverse.temporal.it.defaultWorker.CoreTransactionDetails;

@JsonDeserialize(as = CoreTransactionDetails.class)
public interface TransactionDetails {
Expand Down
Loading

0 comments on commit 4b0d88c

Please sign in to comment.