Skip to content

Commit

Permalink
Merge pull request #99 from quarkiverse/managed_channel
Browse files Browse the repository at this point in the history
use a quarkus managed grpc channel
  • Loading branch information
rmanibus authored Sep 15, 2024
2 parents de77974 + 36124be commit abaf439
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 38 deletions.
18 changes: 18 additions & 0 deletions docs/modules/ROOT/pages/includes/quarkus-temporal.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@ endif::add-copy-button-to-env-var[]
|`false`


a|icon:lock[title=Fixed at build time] [[quarkus-temporal_quarkus-temporal-channel-type]]`link:#quarkus-temporal_quarkus-temporal-channel-type[quarkus.temporal.channel-type]`


[.description]
--
either use a channel managed by temporal client (built-in) or use a channel managed by quarkus (quarkus-managed). In this case the channel can be configured using quarkus.grpc.clients.temporal-client.

ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_TEMPORAL_CHANNEL_TYPE+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_TEMPORAL_CHANNEL_TYPE+++`
endif::add-copy-button-to-env-var[]
-- a|
`quarkus-managed`, `built-in`
|`built-in`


a|icon:lock[title=Fixed at build time] [[quarkus-temporal_quarkus-temporal-health-enabled]]`link:#quarkus-temporal_quarkus-temporal-health-enabled[quarkus.temporal.health.enabled]`


Expand Down
4 changes: 4 additions & 0 deletions extension/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc-common-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health-spi</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static io.quarkiverse.temporal.Constants.DEFAULT_WORKER_NAME;
import static io.quarkiverse.temporal.Constants.TEMPORAL_TESTING_CAPABILITY;
import static io.quarkiverse.temporal.config.TemporalBuildtimeConfig.ChannelType.BUILT_IN;
import static io.quarkiverse.temporal.config.TemporalBuildtimeConfig.ChannelType.QUARKUS_MANAGED;
import static io.quarkus.deployment.Capability.OPENTELEMETRY_TRACER;
import static io.quarkus.runtime.metrics.MetricsFactory.MICROMETER;

Expand Down Expand Up @@ -31,6 +33,7 @@
import org.jboss.jandex.DotName;
import org.jboss.jandex.ParameterizedType;

import io.grpc.Channel;
import io.quarkiverse.temporal.OtelRecorder;
import io.quarkiverse.temporal.TemporalActivity;
import io.quarkiverse.temporal.TemporalHealthCheck;
Expand Down Expand Up @@ -60,6 +63,9 @@
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.quarkus.deployment.metrics.MetricsCapabilityBuildItem;
import io.quarkus.deployment.pkg.builditem.ArtifactResultBuildItem;
import io.quarkus.grpc.GrpcClient;
import io.quarkus.grpc.deployment.GrpcClientBuildItem;
import io.quarkus.grpc.deployment.GrpcDotNames;
import io.quarkus.info.GitInfo;
import io.quarkus.runtime.configuration.ConfigurationException;
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;
Expand Down Expand Up @@ -313,38 +319,65 @@ void produceActivityBeans(
});
}

@BuildStep(onlyIf = EnableQuarkusManagedChannel.class)
GrpcClientBuildItem produceGrpcClient() {
GrpcClientBuildItem grpcClientBuildItem = new GrpcClientBuildItem("temporal-client");
grpcClientBuildItem.addClient(
new GrpcClientBuildItem.ClientInfo(GrpcDotNames.CHANNEL, GrpcClientBuildItem.ClientType.CHANNEL, Set.of()));
return grpcClientBuildItem;
}

@BuildStep(onlyIfNot = EnableMock.class)
@Record(ExecutionTime.RUNTIME_INIT)
WorkflowClientBuildItem recordWorkflowClient(
WorkflowServiceStubsRecorder recorder,
WorkflowClientRecorder clientRecorder,
Optional<MetricsCapabilityBuildItem> metricsCapability) {
SyntheticBeanBuildItem produceWorkflowServiceStubSyntheticBean(TemporalBuildtimeConfig config,
Optional<MetricsCapabilityBuildItem> metricsCapability,
WorkflowServiceStubsRecorder recorder) {

boolean micrometerSupported = metricsCapability.isPresent() && metricsCapability.get().metricsSupported(MICROMETER);

WorkflowServiceStubs workflowServiceStubs = recorder.createWorkflowServiceStubs(micrometerSupported);
return new WorkflowClientBuildItem(
clientRecorder.createWorkflowClient(workflowServiceStubs));
if (BUILT_IN.equals(config.channelType())) {
return SyntheticBeanBuildItem
.configure(WorkflowServiceStubs.class)
.scope(ApplicationScoped.class)
.unremovable()
.defaultBean()
.createWith(recorder.createWorkflowServiceStubs(micrometerSupported))
.setRuntimeInit()
.done();
}

// QUARKUS_MANAGED
return SyntheticBeanBuildItem
.configure(WorkflowServiceStubs.class)
.scope(ApplicationScoped.class)
.unremovable()
.defaultBean()
.addInjectionPoint(ClassType.create(Channel.class),
AnnotationInstance.builder(GrpcClient.class).value("temporal-client").build())
.createWith(recorder.createQuarkusManagedWorkflowServiceStubs(micrometerSupported))
.setRuntimeInit()
.done();

}

@BuildStep
Optional<SyntheticBeanBuildItem> produceWorkflowClientSyntheticBean(
Optional<WorkflowClientBuildItem> workflowClientBuildItem) {

return workflowClientBuildItem
.map(buildItem -> SyntheticBeanBuildItem
.configure(WorkflowClient.class)
.scope(ApplicationScoped.class)
.unremovable()
.defaultBean()
.addInjectionPoint(
ParameterizedType.create(Instance.class, ClassType.create(WorkflowClientInterceptor.class)),
AnnotationInstance.builder(Any.class).build())
.addInjectionPoint(ParameterizedType.create(Instance.class, ClassType.create(ContextPropagator.class)),
AnnotationInstance.builder(Any.class).build())
.createWith(buildItem.workflowClient)
.setRuntimeInit()
.done());
@BuildStep(onlyIfNot = EnableMock.class)
@Record(ExecutionTime.RUNTIME_INIT)
SyntheticBeanBuildItem produceWorkflowClientSyntheticBean(WorkflowClientRecorder clientRecorder) {

return SyntheticBeanBuildItem
.configure(WorkflowClient.class)
.scope(ApplicationScoped.class)
.unremovable()
.defaultBean()
.addInjectionPoint(ClassType.create(WorkflowServiceStubs.class))
.addInjectionPoint(
ParameterizedType.create(Instance.class, ClassType.create(WorkflowClientInterceptor.class)),
AnnotationInstance.builder(Any.class).build())
.addInjectionPoint(ParameterizedType.create(Instance.class, ClassType.create(ContextPropagator.class)),
AnnotationInstance.builder(Any.class).build())
.createWith(clientRecorder.createWorkflowClient())
.setRuntimeInit()
.done();

}

Expand Down Expand Up @@ -489,6 +522,14 @@ Class<?> loadClass(ClassInfo classInfo) {
}
}

public static class EnableQuarkusManagedChannel implements BooleanSupplier {
TemporalBuildtimeConfig config;

public boolean getAsBoolean() {
return !config.enableMock() && config.channelType() == QUARKUS_MANAGED;
}
}

public static class EnableMock implements BooleanSupplier {
TemporalBuildtimeConfig config;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.quarkiverse.temporal.deployment;

import jakarta.inject.Inject;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.temporal.serviceclient.WorkflowServiceStubs;

public class QuarkusManagedChannelConfigPriorityTest {

@RegisterExtension
static final QuarkusUnitTest unitTest = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addAsResource(
new StringAsset(
"quarkus.temporal.start-workers: false\n" +
"quarkus.temporal.channel-type: quarkus-managed\n" +
"quarkus.grpc.clients.temporal-client.host: grpcHost\n" +
"quarkus.temporal.connection.target: customTarget:1234\n"),
"application.properties"));

@Inject
WorkflowServiceStubs serviceStubs;

@Test
public void testQuarkusManagedChannel() {
Assertions.assertNull(serviceStubs.getOptions().getTarget());
Assertions.assertEquals("grpcHost:1234", serviceStubs.getOptions().getChannel().authority());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkiverse.temporal.deployment;

import jakarta.inject.Inject;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.temporal.serviceclient.WorkflowServiceStubs;

public class QuarkusManagedChannelTest {

@RegisterExtension
static final QuarkusUnitTest unitTest = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addAsResource(
new StringAsset(
"quarkus.temporal.start-workers: false\n" +
"quarkus.temporal.channel-type: quarkus-managed\n" +
"quarkus.temporal.connection.target: customTarget:1234\n"),
"application.properties"));

@Inject
WorkflowServiceStubs serviceStubs;

@Test
public void testQuarkusManagedChannel() {
Assertions.assertNull(serviceStubs.getOptions().getTarget());
Assertions.assertEquals("customTarget:1234", serviceStubs.getOptions().getChannel().authority());
}
}
4 changes: 4 additions & 0 deletions extension/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc-common</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.quarkiverse.temporal;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import io.smallrye.config.ConfigSourceInterceptor;
import io.smallrye.config.ConfigSourceInterceptorContext;
import io.smallrye.config.ConfigValue;

public class TemporalConfigRelocateInterceptor implements ConfigSourceInterceptor {

@Override
public ConfigValue getValue(ConfigSourceInterceptorContext context, String name) {

if (name.equals("quarkus.grpc.clients.temporal-client.host")) {

ConfigValue host = context.proceed("quarkus.grpc.clients.temporal-client.host");
ConfigValue target = context.proceed("quarkus.temporal.connection.target");
if (host == null && target != null) {
String[] split = target.getValue().split(":");
return target.from()
.withName("quarkus.grpc.clients.temporal-client.host")
.withValue(split[0])
.build();
}
return host;
}

if (name.equals("quarkus.grpc.clients.temporal-client.port")) {

ConfigValue port = context.proceed("quarkus.grpc.clients.temporal-client.port");
ConfigValue target = context.proceed("quarkus.temporal.connection.target");
if (port == null && target != null) {
String[] split = target.getValue().split(":");
return target.from()
.withName("quarkus.grpc.clients.temporal-client.port")
.withValue(split[1])
.build();
}
return port;
}

if (name.equals("quarkus.grpc.clients.temporal-client.test-port")) {

ConfigValue port = context.proceed("quarkus.grpc.clients.temporal-client.test-port");
ConfigValue target = context.proceed("quarkus.temporal.connection.target");
if (port == null && target != null) {
String[] split = target.getValue().split(":");
return target.from()
.withName("quarkus.grpc.clients.temporal-client.test-port")
.withValue(split[1])
.build();
}
return port;
}

return context.proceed(name);
}

@Override
public Iterator<String> iterateNames(ConfigSourceInterceptorContext context) {
Set<String> names = new HashSet<>();
Iterator<String> iterator = context.iterateNames();
while (iterator.hasNext()) {
names.add(iterator.next());
}
names.add("quarkus.grpc.clients.temporal-client.host");
names.add("quarkus.grpc.clients.temporal-client.port");
names.add("quarkus.grpc.clients.temporal-client.test-port");
return names.iterator();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,11 @@ public WorkflowClientOptions createWorkflowClientOptions(
* Creates a new instance of {@link WorkflowClient} using the provided {@link WorkflowServiceStubs},
* context propagators, and telemetry settings.
*
* @param serviceStubs The {@link WorkflowServiceStubs} used to connect to the Temporal service.
* @return A configured {@link WorkflowClient} instance.
*/
public Function<SyntheticCreationalContext<WorkflowClient>, WorkflowClient> createWorkflowClient(
WorkflowServiceStubs serviceStubs) {
return context -> WorkflowClient.newInstance(serviceStubs, createWorkflowClientOptions(context));
public Function<SyntheticCreationalContext<WorkflowClient>, WorkflowClient> createWorkflowClient() {
return context -> WorkflowClient.newInstance(context.getInjectedReference(WorkflowServiceStubs.class),
createWorkflowClientOptions(context));
}

}
Loading

0 comments on commit abaf439

Please sign in to comment.