Skip to content

Commit

Permalink
enable creation of multiple workers
Browse files Browse the repository at this point in the history
  • Loading branch information
Loic Hermann authored and Loic Hermann committed Aug 11, 2024
1 parent 5ab3617 commit c21a949
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package io.quarkiverse.temporal.deployment;

import org.jboss.jandex.ClassInfo;

import io.quarkus.builder.item.MultiBuildItem;

public final class ActivityImplBuildItem extends MultiBuildItem {

public ActivityImplBuildItem(ClassInfo classInfo) {
this.classInfo = classInfo;
public ActivityImplBuildItem(Class<?> clazz, String[] workers) {
this.clazz = clazz;
this.workers = workers;
}

public final ClassInfo classInfo;
public final Class<?> clazz;
public final String[] workers;

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BooleanSupplier;

import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -14,18 +18,27 @@
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;

import io.quarkiverse.temporal.ActivityImpl;
import io.quarkiverse.temporal.WorkerFactoryRecorder;
import io.quarkiverse.temporal.WorkflowClientRecorder;
import io.quarkiverse.temporal.WorkflowImpl;
import io.quarkiverse.temporal.WorkflowServiceStubsRecorder;
import io.quarkiverse.temporal.config.ConnectionRuntimeConfig;
import io.quarkiverse.temporal.config.TemporalBuildtimeConfig;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.BeanArchiveIndexBuildItem;
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.annotations.*;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.Consume;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Produce;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.*;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.ServiceStartBuildItem;
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.quarkus.deployment.pkg.builditem.ArtifactResultBuildItem;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.configuration.ConfigurationException;
Expand All @@ -38,6 +51,10 @@

public class TemporalProcessor {

public static final DotName ACTIVITY_IMPL = DotName.createSimple(ActivityImpl.class);

public static final DotName WORKFLOW_IMPL = DotName.createSimple(WorkflowImpl.class);

public static final DotName WORKFLOW_INTERFACE = DotName.createSimple(WorkflowInterface.class);

public static final DotName ACTIVITY_INTERFACE = DotName.createSimple(ActivityInterface.class);
Expand All @@ -63,7 +80,10 @@ void produceWorkflows(
throw new IllegalStateException("Workflow " + target.asClass().name() + " must have exactly one implementor");
}
allKnownImplementors.forEach(implementor -> {
producer.produce(new WorkflowImplBuildItem(implementor));
AnnotationInstance annotation = implementor.annotation(WORKFLOW_IMPL);
String[] workers = annotation == null ? new String[] { "<default>" }
: annotation.value("workers").asStringArray();
producer.produce(new WorkflowImplBuildItem(loadClass(implementor), workers));
});
}
}
Expand All @@ -81,9 +101,45 @@ void produceActivities(
throw new IllegalStateException("Activity " + target.asClass().name() + " must have exactly one implementor");
}
allKnownImplementors.forEach(implementor -> {
producer.produce(new ActivityImplBuildItem(implementor));
AnnotationInstance annotation = implementor.annotation(ACTIVITY_IMPL);
String[] workers = annotation == null ? new String[] { "<default>" }
: annotation.value("workers").asStringArray();
producer.produce(new ActivityImplBuildItem(loadClass(implementor), workers));
});
}
}

@BuildStep
void produceWorkers(
List<WorkflowImplBuildItem> workflowImplBuildItems,
List<ActivityImplBuildItem> activityImplBuildItems,
BuildProducer<WorkerBuildItem> producer) {

Set<String> workers = new HashSet<>();

Map<String, List<Class<?>>> workflowsByWorker = new HashMap<>();

for (WorkflowImplBuildItem workflowImplBuildItem : workflowImplBuildItems) {
for (String worker : workflowImplBuildItem.workers) {
workers.add(worker);
workflowsByWorker.computeIfAbsent(worker, (w) -> new ArrayList<>())
.add(workflowImplBuildItem.clazz);
}
}

Map<String, List<Class<?>>> activitiesByWorker = new HashMap<>();

for (ActivityImplBuildItem activityImplBuildItem : activityImplBuildItems) {
for (String worker : activityImplBuildItem.workers) {
workers.add(worker);
activitiesByWorker.computeIfAbsent(worker, (w) -> new ArrayList<>())
.add(activityImplBuildItem.clazz);
}
}

for (String worker : workers) {
producer.produce(new WorkerBuildItem(worker, workflowsByWorker.get(worker), activitiesByWorker.get(worker)));
}

}

Expand All @@ -93,7 +149,7 @@ void produceActivityBeans(
BuildProducer<AdditionalBeanBuildItem> producer) {
activities.forEach(activity -> {
producer.produce(AdditionalBeanBuildItem.builder()
.addBeanClass(activity.classInfo.name().toString())
.addBeanClass(activity.clazz)
.setDefaultScope(DotName.createSimple(ApplicationScoped.class))
.setUnremovable()
.build());
Expand Down Expand Up @@ -157,25 +213,14 @@ Optional<WorkerFactoryBuildItem> recordWorkflowFactory(
@Consume(ConfigValidatedBuildItem.class)
Optional<InitializedWorkerFactoryBuildItem> setupWorkflowFactory(
Optional<WorkerFactoryBuildItem> workerFactoryBuildItem,
List<WorkflowImplBuildItem> workflowImplBuildItems,
List<ActivityImplBuildItem> activityImplBuildItems,
List<WorkerBuildItem> workerBuildItems,
WorkerFactoryRecorder workerFactoryRecorder) {

return workerFactoryBuildItem.map(buildItem -> {
List<Class<?>> workflows = new ArrayList<>();

for (var workflowBuildItem : workflowImplBuildItems) {
workflows.add(loadClass(workflowBuildItem.classInfo));
}

List<Class<?>> activities = new ArrayList<>();

for (var activityBuildItem : activityImplBuildItems) {
activities.add(loadClass(activityBuildItem.classInfo));
for (WorkerBuildItem workerBuildItem : workerBuildItems) {
workerFactoryRecorder.createWorker(buildItem.workerFactory, workerBuildItem.name,
workerBuildItem.workflows, workerBuildItem.activities);
}

workerFactoryRecorder.createWorker(buildItem.workerFactory, workflows, activities);

return new InitializedWorkerFactoryBuildItem(buildItem.workerFactory);
});

Expand Down Expand Up @@ -221,4 +266,4 @@ public boolean getAsBoolean() {
}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkiverse.temporal.deployment;

import java.util.List;

import io.quarkus.builder.item.MultiBuildItem;

public final class WorkerBuildItem extends MultiBuildItem {

WorkerBuildItem(String name, List<Class<?>> workflows, List<Class<?>> activities) {
this.name = name;
this.workflows = workflows;
this.activities = activities;
}

final String name;
final List<Class<?>> workflows;
final List<Class<?>> activities;
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package io.quarkiverse.temporal.deployment;

import org.jboss.jandex.ClassInfo;

import io.quarkus.builder.item.MultiBuildItem;

public final class WorkflowImplBuildItem extends MultiBuildItem {

public WorkflowImplBuildItem(ClassInfo classInfo) {
this.classInfo = classInfo;
public WorkflowImplBuildItem(Class<?> clazz, String[] workers) {
this.clazz = clazz;
this.workers = workers;
}

public final ClassInfo classInfo;
public final Class<?> clazz;

public final String[] workers;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ public RuntimeValue<WorkerFactory> createWorkerFactory(WorkflowClient workflowCl
return new RuntimeValue<>(WorkerFactory.newInstance(workflowClient));
}

public void createWorker(RuntimeValue<WorkerFactory> runtimeValue, List<Class<?>> workflows, List<Class<?>> activities) {
public void createWorker(RuntimeValue<WorkerFactory> runtimeValue, String queueName, List<Class<?>> workflows,
List<Class<?>> activities) {
WorkerFactory workerFactory = runtimeValue.getValue();
Worker worker = workerFactory.newWorker("MONEY_TRANSFER_TASK_QUEUE");
Worker worker = workerFactory.newWorker(queueName);
for (var workflow : workflows) {
worker.registerWorkflowImplementationTypes(workflow);
}
Expand Down

0 comments on commit c21a949

Please sign in to comment.