diff --git a/extension/deployment/src/main/java/io/quarkiverse/temporal/deployment/ActivityImplBuildItem.java b/extension/deployment/src/main/java/io/quarkiverse/temporal/deployment/ActivityImplBuildItem.java index dae774a..c5f2a21 100644 --- a/extension/deployment/src/main/java/io/quarkiverse/temporal/deployment/ActivityImplBuildItem.java +++ b/extension/deployment/src/main/java/io/quarkiverse/temporal/deployment/ActivityImplBuildItem.java @@ -1,14 +1,15 @@ package io.quarkiverse.temporal.deployment; -import org.jboss.jandex.ClassInfo; - import io.quarkus.builder.item.MultiBuildItem; public final class ActivityImplBuildItem extends MultiBuildItem { - public ActivityImplBuildItem(ClassInfo classInfo) { - this.classInfo = classInfo; + public ActivityImplBuildItem(Class clazz, String[] workers) { + this.clazz = clazz; + this.workers = workers; } - public final ClassInfo classInfo; + public final Class clazz; + public final String[] workers; + } diff --git a/extension/deployment/src/main/java/io/quarkiverse/temporal/deployment/TemporalProcessor.java b/extension/deployment/src/main/java/io/quarkiverse/temporal/deployment/TemporalProcessor.java index 766e3f4..29325c2 100644 --- a/extension/deployment/src/main/java/io/quarkiverse/temporal/deployment/TemporalProcessor.java +++ b/extension/deployment/src/main/java/io/quarkiverse/temporal/deployment/TemporalProcessor.java @@ -2,8 +2,12 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.BooleanSupplier; import jakarta.enterprise.context.ApplicationScoped; @@ -14,8 +18,10 @@ import org.jboss.jandex.ClassInfo; import org.jboss.jandex.DotName; +import io.quarkiverse.temporal.ActivityImpl; import io.quarkiverse.temporal.WorkerFactoryRecorder; import io.quarkiverse.temporal.WorkflowClientRecorder; +import io.quarkiverse.temporal.WorkflowImpl; import io.quarkiverse.temporal.WorkflowServiceStubsRecorder; import io.quarkiverse.temporal.config.ConnectionRuntimeConfig; import io.quarkiverse.temporal.config.TemporalBuildtimeConfig; @@ -23,9 +29,16 @@ import io.quarkus.arc.deployment.BeanArchiveIndexBuildItem; import io.quarkus.arc.deployment.SyntheticBeanBuildItem; import io.quarkus.deployment.Capabilities; -import io.quarkus.deployment.annotations.*; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.Consume; +import io.quarkus.deployment.annotations.ExecutionTime; +import io.quarkus.deployment.annotations.Produce; import io.quarkus.deployment.annotations.Record; -import io.quarkus.deployment.builditem.*; +import io.quarkus.deployment.builditem.CombinedIndexBuildItem; +import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.ServiceStartBuildItem; +import io.quarkus.deployment.builditem.ShutdownContextBuildItem; import io.quarkus.deployment.pkg.builditem.ArtifactResultBuildItem; import io.quarkus.runtime.RuntimeValue; import io.quarkus.runtime.configuration.ConfigurationException; @@ -38,6 +51,10 @@ public class TemporalProcessor { + public static final DotName ACTIVITY_IMPL = DotName.createSimple(ActivityImpl.class); + + public static final DotName WORKFLOW_IMPL = DotName.createSimple(WorkflowImpl.class); + public static final DotName WORKFLOW_INTERFACE = DotName.createSimple(WorkflowInterface.class); public static final DotName ACTIVITY_INTERFACE = DotName.createSimple(ActivityInterface.class); @@ -63,7 +80,10 @@ void produceWorkflows( throw new IllegalStateException("Workflow " + target.asClass().name() + " must have exactly one implementor"); } allKnownImplementors.forEach(implementor -> { - producer.produce(new WorkflowImplBuildItem(implementor)); + AnnotationInstance annotation = implementor.annotation(WORKFLOW_IMPL); + String[] workers = annotation == null ? new String[] { "" } + : annotation.value("workers").asStringArray(); + producer.produce(new WorkflowImplBuildItem(loadClass(implementor), workers)); }); } } @@ -81,9 +101,45 @@ void produceActivities( throw new IllegalStateException("Activity " + target.asClass().name() + " must have exactly one implementor"); } allKnownImplementors.forEach(implementor -> { - producer.produce(new ActivityImplBuildItem(implementor)); + AnnotationInstance annotation = implementor.annotation(ACTIVITY_IMPL); + String[] workers = annotation == null ? new String[] { "" } + : annotation.value("workers").asStringArray(); + producer.produce(new ActivityImplBuildItem(loadClass(implementor), workers)); }); } + } + + @BuildStep + void produceWorkers( + List workflowImplBuildItems, + List activityImplBuildItems, + BuildProducer producer) { + + Set workers = new HashSet<>(); + + Map>> workflowsByWorker = new HashMap<>(); + + for (WorkflowImplBuildItem workflowImplBuildItem : workflowImplBuildItems) { + for (String worker : workflowImplBuildItem.workers) { + workers.add(worker); + workflowsByWorker.computeIfAbsent(worker, (w) -> new ArrayList<>()) + .add(workflowImplBuildItem.clazz); + } + } + + Map>> activitiesByWorker = new HashMap<>(); + + for (ActivityImplBuildItem activityImplBuildItem : activityImplBuildItems) { + for (String worker : activityImplBuildItem.workers) { + workers.add(worker); + activitiesByWorker.computeIfAbsent(worker, (w) -> new ArrayList<>()) + .add(activityImplBuildItem.clazz); + } + } + + for (String worker : workers) { + producer.produce(new WorkerBuildItem(worker, workflowsByWorker.get(worker), activitiesByWorker.get(worker))); + } } @@ -93,7 +149,7 @@ void produceActivityBeans( BuildProducer producer) { activities.forEach(activity -> { producer.produce(AdditionalBeanBuildItem.builder() - .addBeanClass(activity.classInfo.name().toString()) + .addBeanClass(activity.clazz) .setDefaultScope(DotName.createSimple(ApplicationScoped.class)) .setUnremovable() .build()); @@ -157,25 +213,14 @@ Optional recordWorkflowFactory( @Consume(ConfigValidatedBuildItem.class) Optional setupWorkflowFactory( Optional workerFactoryBuildItem, - List workflowImplBuildItems, - List activityImplBuildItems, + List workerBuildItems, WorkerFactoryRecorder workerFactoryRecorder) { return workerFactoryBuildItem.map(buildItem -> { - List> workflows = new ArrayList<>(); - - for (var workflowBuildItem : workflowImplBuildItems) { - workflows.add(loadClass(workflowBuildItem.classInfo)); - } - - List> activities = new ArrayList<>(); - - for (var activityBuildItem : activityImplBuildItems) { - activities.add(loadClass(activityBuildItem.classInfo)); + for (WorkerBuildItem workerBuildItem : workerBuildItems) { + workerFactoryRecorder.createWorker(buildItem.workerFactory, workerBuildItem.name, + workerBuildItem.workflows, workerBuildItem.activities); } - - workerFactoryRecorder.createWorker(buildItem.workerFactory, workflows, activities); - return new InitializedWorkerFactoryBuildItem(buildItem.workerFactory); }); @@ -221,4 +266,4 @@ public boolean getAsBoolean() { } } -} \ No newline at end of file +} diff --git a/extension/deployment/src/main/java/io/quarkiverse/temporal/deployment/WorkerBuildItem.java b/extension/deployment/src/main/java/io/quarkiverse/temporal/deployment/WorkerBuildItem.java new file mode 100644 index 0000000..d5b65f8 --- /dev/null +++ b/extension/deployment/src/main/java/io/quarkiverse/temporal/deployment/WorkerBuildItem.java @@ -0,0 +1,18 @@ +package io.quarkiverse.temporal.deployment; + +import java.util.List; + +import io.quarkus.builder.item.MultiBuildItem; + +public final class WorkerBuildItem extends MultiBuildItem { + + WorkerBuildItem(String name, List> workflows, List> activities) { + this.name = name; + this.workflows = workflows; + this.activities = activities; + } + + final String name; + final List> workflows; + final List> activities; +} diff --git a/extension/deployment/src/main/java/io/quarkiverse/temporal/deployment/WorkflowImplBuildItem.java b/extension/deployment/src/main/java/io/quarkiverse/temporal/deployment/WorkflowImplBuildItem.java index 15f9901..5d701bc 100644 --- a/extension/deployment/src/main/java/io/quarkiverse/temporal/deployment/WorkflowImplBuildItem.java +++ b/extension/deployment/src/main/java/io/quarkiverse/temporal/deployment/WorkflowImplBuildItem.java @@ -1,14 +1,15 @@ package io.quarkiverse.temporal.deployment; -import org.jboss.jandex.ClassInfo; - import io.quarkus.builder.item.MultiBuildItem; public final class WorkflowImplBuildItem extends MultiBuildItem { - public WorkflowImplBuildItem(ClassInfo classInfo) { - this.classInfo = classInfo; + public WorkflowImplBuildItem(Class clazz, String[] workers) { + this.clazz = clazz; + this.workers = workers; } - public final ClassInfo classInfo; + public final Class clazz; + + public final String[] workers; } diff --git a/extension/runtime/src/main/java/io/quarkiverse/temporal/WorkerFactoryRecorder.java b/extension/runtime/src/main/java/io/quarkiverse/temporal/WorkerFactoryRecorder.java index e282aa6..fa054cb 100644 --- a/extension/runtime/src/main/java/io/quarkiverse/temporal/WorkerFactoryRecorder.java +++ b/extension/runtime/src/main/java/io/quarkiverse/temporal/WorkerFactoryRecorder.java @@ -18,9 +18,10 @@ public RuntimeValue createWorkerFactory(WorkflowClient workflowCl return new RuntimeValue<>(WorkerFactory.newInstance(workflowClient)); } - public void createWorker(RuntimeValue runtimeValue, List> workflows, List> activities) { + public void createWorker(RuntimeValue runtimeValue, String queueName, List> workflows, + List> activities) { WorkerFactory workerFactory = runtimeValue.getValue(); - Worker worker = workerFactory.newWorker("MONEY_TRANSFER_TASK_QUEUE"); + Worker worker = workerFactory.newWorker(queueName); for (var workflow : workflows) { worker.registerWorkflowImplementationTypes(workflow); }