Skip to content

Commit

Permalink
fix(): use same JDBC mapper (#2915)
Browse files Browse the repository at this point in the history
* fix(): use same JDBC mapper

* cleanup

* fix(test): change wiremock unit test port

---------

Co-authored-by: Ludovic DEHON <[email protected]>
  • Loading branch information
Skraye and tchiotludo committed Jan 23, 2024
1 parent 668380e commit 7463224
Show file tree
Hide file tree
Showing 11 changed files with 23 additions and 20 deletions.
6 changes: 4 additions & 2 deletions jdbc/src/main/java/io/kestra/jdbc/AbstractJdbcRepository.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package io.kestra.jdbc;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.DeserializationException;
import io.kestra.core.models.executions.metrics.MetricAggregation;
import io.kestra.core.queues.QueueService;
import io.kestra.core.repositories.ArrayListTotal;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.data.model.Pageable;
Expand All @@ -31,6 +31,8 @@
import java.util.stream.IntStream;

public abstract class AbstractJdbcRepository<T> {
protected static final ObjectMapper MAPPER = JdbcMapper.of();

protected final QueueService queueService;

protected final Class<T> cls;
Expand Down Expand Up @@ -163,7 +165,7 @@ public <R extends Record> Instant getDate(R record, String groupByType) {

public T deserialize(String record) {
try {
return JacksonMapper.ofJson().readValue(record, cls);
return MAPPER.readValue(record, cls);
} catch (IOException e) {
throw new DeserializationException(e, record);
}
Expand Down
2 changes: 1 addition & 1 deletion jdbc/src/main/java/io/kestra/jdbc/JdbcMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public abstract class JdbcMapper {

public static ObjectMapper of() {
if (MAPPER == null) {
MAPPER = JacksonMapper.ofJson().copy();
MAPPER = JacksonMapper.ofJson(false).copy();

final SimpleModule module = new SimpleModule();
module.addSerializer(Instant.class, new JsonSerializer<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
@Singleton
@Slf4j
public class JdbcWorkerTriggerResultQueueService {
private final static ObjectMapper MAPPER = JacksonMapper.ofJson();
private final static ObjectMapper MAPPER = JdbcMapper.of();

private final JdbcQueue<WorkerTriggerResult> workerTriggerResultQueue;
@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.runners.SubflowExecution;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import org.jooq.DSLContext;
import org.jooq.Field;
Expand All @@ -18,7 +18,7 @@
import java.util.Optional;

public abstract class AbstractJdbcSubflowExecutionStorage extends AbstractJdbcRepository {
private final static ObjectMapper MAPPER = JacksonMapper.ofJson();
private final static ObjectMapper MAPPER = JdbcMapper.of();
protected io.kestra.jdbc.AbstractJdbcRepository<SubflowExecution<?>> jdbcRepository;

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down
3 changes: 2 additions & 1 deletion jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.kestra.core.topologies.FlowTopologyService;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.Either;
import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.repository.AbstractJdbcExecutionRepository;
import io.kestra.jdbc.repository.AbstractJdbcFlowTopologyRepository;
import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepository;
Expand Down Expand Up @@ -58,7 +59,7 @@
@JdbcRunnerEnabled
@Slf4j
public class JdbcExecutor implements ExecutorInterface {
private static final ObjectMapper MAPPER = JacksonMapper.ofJson();
private static final ObjectMapper MAPPER = JdbcMapper.of();;

private final ScheduledExecutorService schedulerDelay = Executors.newSingleThreadScheduledExecutor();

Expand Down
8 changes: 4 additions & 4 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.QueueService;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.Either;
import io.kestra.core.utils.ExecutorsUtils;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.JdbcConfiguration;
import io.kestra.jdbc.JdbcMapper;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import io.micronaut.context.ApplicationContext;
Expand Down Expand Up @@ -42,7 +42,7 @@

@Slf4j
public abstract class JdbcQueue<T> implements QueueInterface<T> {
protected static final ObjectMapper mapper = JacksonMapper.ofJson();
protected static final ObjectMapper MAPPER = JdbcMapper.of();

private static ExecutorService poolExecutor;

Expand Down Expand Up @@ -86,7 +86,7 @@ protected Map<Field<Object>, Object> produceFields(String consumerGroup, String
Map<Field<Object>, Object> fields = new HashMap<>();
fields.put(AbstractJdbcRepository.field("type"), this.cls.getName());
fields.put(AbstractJdbcRepository.field("key"), key != null ? key : IdUtils.create());
fields.put(AbstractJdbcRepository.field("value"), JSONB.valueOf(mapper.writeValueAsString(message)));
fields.put(AbstractJdbcRepository.field("value"), JSONB.valueOf(MAPPER.writeValueAsString(message)));

if (consumerGroup != null) {
fields.put(AbstractJdbcRepository.field("consumer_group"), consumerGroup);
Expand Down Expand Up @@ -289,7 +289,7 @@ protected List<Either<T, DeserializationException>> map(Result<Record> fetch) {
return fetch
.map(record -> {
try {
return Either.left(JacksonMapper.ofJson().readValue(record.get("value", String.class), cls));
return Either.left(MAPPER.readValue(record.get("value", String.class), cls));
} catch (JsonProcessingException e) {
return Either.right(new DeserializationException(e, record.get("value", String.class)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static org.hamcrest.Matchers.*;

@MicronautTest
@WireMockTest(httpPort = 8081)
@WireMockTest(httpPort = 28181)
class BlueprintControllerTest {
@Inject
@Client("/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static org.hamcrest.Matchers.is;

// For this controller tests, we replace every Marketplace URLs with http://localhost:8081/{previous-host} to target Wiremock server
@WireMockTest(httpPort = 8081)
@WireMockTest(httpPort = 28181)
class EditorControllerTest extends JdbcH2ControllerTest {
@Inject
@Client("/")
Expand All @@ -36,12 +36,12 @@ private MarketplaceRequestMapper marketplaceRequestMapper() {
return new MarketplaceRequestMapper() {
@Override
public String url(MarketplaceRequestType type) {
return type.getUrl().replaceFirst("https?://", "http://localhost:8081/");
return type.getUrl().replaceFirst("https?://", "http://localhost:28181/");
}

@Override
public String resourceBaseUrl(String publisher) {
return super.resourceBaseUrl(publisher).replaceFirst("https?://", "http://localhost:8081/");
return super.resourceBaseUrl(publisher).replaceFirst("https?://", "http://localhost:28181/");
}
};
}
Expand Down Expand Up @@ -123,7 +123,7 @@ void getResource_ReplaceUrlsForExtensionPath() throws IOException {

private MappingBuilder hasGoodHeaders(MappingBuilder mappingBuilder) {
return mappingBuilder.withHeader("Origin", equalTo("http://localhost:8080"))
.withHeader("Host", equalTo("localhost:8081"))
.withHeader("Host", equalTo("localhost:28181"))
.withHeader("Cookie", absent())
.withHeader("Access-Control-Allow-Origin", absent());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ void downloadFile() throws TimeoutException {
FileMetas.class
).blockingFirst();

assertThat(metas.getSize(), equalTo(3002L));
assertThat(metas.getSize(), greaterThanOrEqualTo(3003L));

String newExecutionId = IdUtils.create();

Expand Down
4 changes: 2 additions & 2 deletions webserver/src/test/resources/__files/extension.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[
"http://localhost:8081/my-publisher.vscode-unpkg.net/my-publisher/my-extension/1.0.0/extension/package.json",
"http://localhost:8081/my-publisher.vscode-unpkg.net/my-publisher/my-extension/1.0.0/extension/package.nls.json"
"http://localhost:28181/my-publisher.vscode-unpkg.net/my-publisher/my-extension/1.0.0/extension/package.json",
"http://localhost:28181/my-publisher.vscode-unpkg.net/my-publisher/my-extension/1.0.0/extension/package.nls.json"
]
2 changes: 1 addition & 1 deletion webserver/src/test/resources/application-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ micronaut:
http:
services:
api:
url: http://localhost:8081
url: http://localhost:28181
server:
cors:
enabled: true
Expand Down

0 comments on commit 7463224

Please sign in to comment.