Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Epic 1748 doc ref handling #3361

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions connector-runtime/connector-runtime-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
<artifactId>connector-core</artifactId>
</dependency>

<dependency>
<groupId>io.camunda.connector</groupId>
<artifactId>connector-document</artifactId>
</dependency>

<!-- Jackson dependencies defined as provided in the Connector SDK Core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down Expand Up @@ -95,5 +100,11 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.skyscreamer</groupId>
<artifactId>jsonassert</artifactId>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.camunda.connector.runtime.core.inbound.correlation.InboundCorrelationHandler;
import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails.ValidInboundConnectorDetails;
import io.camunda.connector.runtime.core.secret.SecretProviderAggregator;
import io.camunda.document.factory.DocumentFactory;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.function.Consumer;
Expand All @@ -35,18 +36,21 @@ public class DefaultInboundConnectorContextFactory implements InboundConnectorCo
private final SecretProviderAggregator secretProviderAggregator;
private final ValidationProvider validationProvider;
private final OperateClientAdapter operateClientAdapter;
private final DocumentFactory documentFactory;

public DefaultInboundConnectorContextFactory(
final ObjectMapper mapper,
final InboundCorrelationHandler correlationHandler,
final SecretProviderAggregator secretProviderAggregator,
final ValidationProvider validationProvider,
final OperateClientAdapter operateClientAdapter) {
final OperateClientAdapter operateClientAdapter,
final DocumentFactory documentFactory) {
this.objectMapper = mapper;
this.correlationHandler = correlationHandler;
this.secretProviderAggregator = secretProviderAggregator;
this.validationProvider = validationProvider;
this.operateClientAdapter = operateClientAdapter;
this.documentFactory = documentFactory;
}

@Override
Expand All @@ -60,6 +64,7 @@ public <T extends InboundConnectorExecutable<?>> InboundConnectorContext createC
new InboundConnectorContextImpl(
secretProviderAggregator,
validationProvider,
documentFactory,
connectorDetails,
correlationHandler,
cancellationCallback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
import io.camunda.connector.runtime.core.inbound.correlation.InboundCorrelationHandler;
import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails;
import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails.ValidInboundConnectorDetails;
import io.camunda.document.Document;
import io.camunda.document.factory.DocumentFactory;
import io.camunda.document.factory.DocumentFactoryImpl;
import io.camunda.document.store.DocumentCreationRequest;
import io.camunda.document.store.InMemoryDocumentStore;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -51,20 +56,22 @@ public class InboundConnectorContextImpl extends AbstractConnectorContext
private final ObjectMapper objectMapper;

private final Consumer<Throwable> cancellationCallback;

private Health health = Health.unknown();

private final EvictingQueue<Activity> logs;
private final DocumentFactory documentFactory;
private Health health = Health.unknown();
private Map<String, Object> propertiesWithSecrets;

public InboundConnectorContextImpl(
SecretProvider secretProvider,
ValidationProvider validationProvider,
DocumentFactory documentFactory,
ValidInboundConnectorDetails connectorDetails,
InboundCorrelationHandler correlationHandler,
Consumer<Throwable> cancellationCallback,
ObjectMapper objectMapper,
EvictingQueue logs) {
super(secretProvider, validationProvider);
this.documentFactory = documentFactory;
this.correlationHandler = correlationHandler;
this.connectorDetails = connectorDetails;
this.properties =
Expand All @@ -75,6 +82,25 @@ public InboundConnectorContextImpl(
this.logs = logs;
}

public InboundConnectorContextImpl(
SecretProvider secretProvider,
ValidationProvider validationProvider,
ValidInboundConnectorDetails connectorDetails,
InboundCorrelationHandler correlationHandler,
Consumer<Throwable> cancellationCallback,
ObjectMapper objectMapper,
EvictingQueue logs) {
this(
secretProvider,
validationProvider,
new DocumentFactoryImpl(InMemoryDocumentStore.INSTANCE),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should always expect this parameter to be passed in.

connectorDetails,
correlationHandler,
cancellationCallback,
objectMapper,
logs);
}

@Override
public CorrelationResult correlateWithResult(Object variables) {
try {
Expand Down Expand Up @@ -149,7 +175,10 @@ public List<InboundConnectorElement> connectorElements() {
return connectorDetails.connectorElements();
}

private Map<String, Object> propertiesWithSecrets;
@Override
public Document createDocument(DocumentCreationRequest request) {
return documentFactory.create(request);
}

private Map<String, Object> getPropertiesWithSecrets(Map<String, Object> properties) {
if (propertiesWithSecrets == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.camunda.connector.api.validation.ValidationProvider;
import io.camunda.connector.runtime.core.inbound.correlation.InboundCorrelationHandler;
import io.camunda.connector.runtime.core.inbound.correlation.MessageCorrelationPoint.BoundaryEventCorrelationPoint;
import io.camunda.document.Document;
import io.camunda.document.store.DocumentCreationRequest;
import io.camunda.operate.model.FlowNodeInstance;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -131,6 +133,11 @@ public Queue<Activity> getLogs() {
return inboundContext.getLogs();
}

@Override
public Document createDocument(DocumentCreationRequest request) {
return inboundContext.createDocument(request);
}

@Override
public List<InboundConnectorElement> connectorElements() {
return inboundContext.connectorElements();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.camunda.connector.runtime.core.outbound.ErrorExpressionJobContext.ErrorExpressionJob;
import io.camunda.connector.runtime.core.secret.SecretProviderAggregator;
import io.camunda.connector.runtime.core.secret.SecretProviderDiscovery;
import io.camunda.document.factory.DocumentFactory;
import io.camunda.zeebe.client.api.command.FinalCommandStep;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.response.CompleteJobResponse;
Expand All @@ -58,6 +59,8 @@ public class ConnectorJobHandler implements JobHandler {

protected ValidationProvider validationProvider;

protected DocumentFactory documentFactory;

protected ObjectMapper objectMapper;

/**
Expand All @@ -80,10 +83,12 @@ public ConnectorJobHandler(
final OutboundConnectorFunction call,
final SecretProvider secretProvider,
final ValidationProvider validationProvider,
final DocumentFactory documentFactory,
final ObjectMapper objectMapper) {
this.call = call;
this.secretProvider = secretProvider;
this.validationProvider = validationProvider;
this.documentFactory = documentFactory;
this.objectMapper = objectMapper;
}

Expand Down Expand Up @@ -180,7 +185,8 @@ public void handle(final JobClient client, final ActivatedJob job) {

try {
var context =
new JobHandlerContext(job, getSecretProvider(), validationProvider, objectMapper);
new JobHandlerContext(
job, getSecretProvider(), validationProvider, documentFactory, objectMapper);
var response = call.execute(context);
var responseVariables =
ConnectorHelper.createOutputVariables(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
import io.camunda.connector.api.secret.SecretProvider;
import io.camunda.connector.api.validation.ValidationProvider;
import io.camunda.connector.runtime.core.AbstractConnectorContext;
import io.camunda.document.Document;
import io.camunda.document.factory.DocumentFactory;
import io.camunda.document.factory.DocumentFactoryImpl;
import io.camunda.document.store.DocumentCreationRequest;
import io.camunda.document.store.InMemoryDocumentStore;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import java.util.Objects;
import org.slf4j.Logger;
Expand All @@ -45,19 +50,35 @@ public class JobHandlerContext extends AbstractConnectorContext

private final ObjectMapper objectMapper;
private final JobContext jobContext;
private final DocumentFactory documentFactory;
private String jsonWithSecrets = null;

public JobHandlerContext(
final ActivatedJob job,
final SecretProvider secretProvider,
final ValidationProvider validationProvider,
final DocumentFactory documentFactory,
final ObjectMapper objectMapper) {
super(secretProvider, validationProvider);
this.documentFactory = documentFactory;
this.job = job;
this.objectMapper = objectMapper;
this.jobContext = new ActivatedJobContext(job, this::getJsonReplacedWithSecrets);
}

public JobHandlerContext(
final ActivatedJob job,
final SecretProvider secretProvider,
final ValidationProvider validationProvider,
final ObjectMapper objectMapper) {
this(
job,
secretProvider,
validationProvider,
new DocumentFactoryImpl(InMemoryDocumentStore.INSTANCE),
objectMapper);
}

@Override
public <T> T bindVariables(Class<T> cls) {
var mappedObject = mapJson(cls);
Expand Down Expand Up @@ -110,6 +131,11 @@ public JobContext getJobContext() {
return jobContext;
}

@Override
public Document createDocument(DocumentCreationRequest document) {
return documentFactory.create(document);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.camunda.connector.runtime.core.inbound.correlation.InboundCorrelationHandler;
import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails.ValidInboundConnectorDetails;
import io.camunda.connector.runtime.core.secret.SecretProviderAggregator;
import io.camunda.document.factory.DocumentFactory;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -44,6 +45,7 @@ class DefaultInboundConnectorContextFactoryTest {
@Mock private OperateClientAdapter operateClientAdapter;
@Mock private Consumer<Throwable> cancellationCallback;
@Mock private ValidInboundConnectorDetails newConnector;
@Mock private DocumentFactory documentFactory;
private DefaultInboundConnectorContextFactory factory;

@BeforeEach
Expand All @@ -54,7 +56,8 @@ void setUp() {
correlationHandler,
secretProviderAggregator,
validationProvider,
operateClientAdapter);
operateClientAdapter,
documentFactory);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.camunda.connector.runtime.inbound.state.ProcessStateStore;
import io.camunda.connector.runtime.inbound.state.TenantAwareProcessStateStoreImpl;
import io.camunda.connector.runtime.inbound.webhook.WebhookConnectorRegistry;
import io.camunda.document.factory.DocumentFactory;
import io.camunda.operate.CamundaOperateClient;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;
Expand All @@ -58,6 +59,11 @@ public class InboundConnectorRuntimeConfiguration {
@Value("${camunda.connector.inbound.message.ttl:PT1H}")
private Duration messageTtl;

@Bean
public static InboundConnectorBeanDefinitionProcessor inboundConnectorBeanDefinitionProcessor() {
return new InboundConnectorBeanDefinitionProcessor();
}

@Bean
public ProcessElementContextFactory processElementContextFactory(
ObjectMapper objectMapper,
Expand All @@ -77,24 +83,21 @@ public InboundCorrelationHandler inboundCorrelationHandler(
zeebeClient, feelEngine, metricsRecorder, elementContextFactory, messageTtl);
}

@Bean
public static InboundConnectorBeanDefinitionProcessor inboundConnectorBeanDefinitionProcessor() {
return new InboundConnectorBeanDefinitionProcessor();
}

@Bean
public InboundConnectorContextFactory springInboundConnectorContextFactory(
ObjectMapper mapper,
InboundCorrelationHandler correlationHandler,
SecretProviderAggregator secretProviderAggregator,
@Autowired(required = false) ValidationProvider validationProvider,
OperateClientAdapter operateClientAdapter) {
OperateClientAdapter operateClientAdapter,
DocumentFactory documentFactory) {
return new DefaultInboundConnectorContextFactory(
mapper,
correlationHandler,
secretProviderAggregator,
validationProvider,
operateClientAdapter);
operateClientAdapter,
documentFactory);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import io.camunda.connector.runtime.core.secret.SecretProviderAggregator;
import io.camunda.connector.runtime.outbound.lifecycle.OutboundConnectorAnnotationProcessor;
import io.camunda.connector.runtime.outbound.lifecycle.OutboundConnectorManager;
import io.camunda.document.factory.DocumentFactory;
import io.camunda.document.factory.DocumentFactoryImpl;
import io.camunda.document.store.InMemoryDocumentStore;
import io.camunda.zeebe.spring.client.jobhandling.CommandExceptionHandlingStrategy;
import io.camunda.zeebe.spring.client.jobhandling.JobWorkerManager;
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;
Expand All @@ -40,13 +43,19 @@ public OutboundConnectorFactory outboundConnectorFactory() {
OutboundConnectorDiscovery.loadConnectorConfigurations());
}

@Bean
public DocumentFactory documentFactory() {
return new DocumentFactoryImpl(InMemoryDocumentStore.INSTANCE);
}

@Bean
public OutboundConnectorManager outboundConnectorManager(
JobWorkerManager jobWorkerManager,
OutboundConnectorFactory connectorFactory,
CommandExceptionHandlingStrategy commandExceptionHandlingStrategy,
SecretProviderAggregator secretProviderAggregator,
@Autowired(required = false) ValidationProvider validationProvider,
DocumentFactory documentFactory,
ObjectMapper objectMapper,
MetricsRecorder metricsRecorder) {
return new OutboundConnectorManager(
Expand All @@ -55,6 +64,7 @@ public OutboundConnectorManager outboundConnectorManager(
commandExceptionHandlingStrategy,
secretProviderAggregator,
validationProvider,
documentFactory,
objectMapper,
metricsRecorder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.camunda.connector.runtime.core.secret.SecretProviderAggregator;
import io.camunda.connector.runtime.metrics.ConnectorMetrics;
import io.camunda.connector.runtime.metrics.ConnectorMetrics.Outbound;
import io.camunda.document.factory.DocumentFactory;
import io.camunda.zeebe.client.api.command.FinalCommandStep;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.worker.JobClient;
Expand All @@ -52,10 +53,16 @@ public SpringConnectorJobHandler(
CommandExceptionHandlingStrategy commandExceptionHandlingStrategy,
SecretProviderAggregator secretProviderAggregator,
ValidationProvider validationProvider,
DocumentFactory documentFactory,
ObjectMapper objectMapper,
OutboundConnectorFunction connectorFunction,
OutboundConnectorConfiguration connectorConfiguration) {
super(connectorFunction, secretProviderAggregator, validationProvider, objectMapper);
super(
connectorFunction,
secretProviderAggregator,
validationProvider,
documentFactory,
objectMapper);
this.metricsRecorder = metricsRecorder;
this.commandExceptionHandlingStrategy = commandExceptionHandlingStrategy;
this.connectorConfiguration = connectorConfiguration;
Expand Down
Loading
Loading