Skip to content

Commit

Permalink
chore(version): update to version 'v0.11.1'.
Browse files Browse the repository at this point in the history
  • Loading branch information
brian-mulier-p committed Sep 11, 2023
2 parents f30bef9 + 679f71e commit fef619b
Show file tree
Hide file tree
Showing 14 changed files with 77 additions and 45 deletions.
31 changes: 9 additions & 22 deletions core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@
import io.kestra.core.services.FlowService;
import io.kestra.core.validations.FlowValidation;
import io.micronaut.core.annotation.Introspected;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.validation.ConstraintViolation;
import javax.validation.ConstraintViolationException;
import javax.validation.Valid;
import javax.validation.constraints.*;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
Expand All @@ -62,17 +62,16 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
@Pattern(regexp = "[a-z0-9._-]+")
String namespace;

@With
@Min(value = 1)
Integer revision;

String description;

@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
@Schema(implementation = Object.class, anyOf = {List.class, Map.class})
List<Label> labels;


@Valid
List<Input<?>> inputs;

Expand Down Expand Up @@ -318,21 +317,9 @@ public String generateSource() {
}

public Flow toDeleted() {
return new Flow(
this.id,
this.namespace,
this.revision + 1,
this.description,
this.labels,
this.inputs,
this.variables,
this.tasks,
this.errors,
this.listeners,
this.triggers,
this.taskDefaults,
this.disabled,
true
);
return this.toBuilder()
.revision(this.revision + 1)
.deleted(true)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.kestra.core.models.flows;

import io.micronaut.core.annotation.Introspected;
import lombok.*;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,16 @@

import io.kestra.core.services.FlowService;
import io.micronaut.core.annotation.Introspected;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;

@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Introspected
@ToString
@Slf4j
public class FlowWithSource extends Flow {
String source;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public List<Execution> computeExecutionsFromFlowTriggers(Execution execution, Li
.flatMap(flow -> flowTriggers(flow).map(trigger -> new FlowWithFlowTrigger(flow, trigger)))
.filter(flowWithFlowTrigger -> conditionService.valid(
flowWithFlowTrigger.getFlow(),
flowWithFlowTrigger.getTrigger().getConditions().stream()
Optional.ofNullable(flowWithFlowTrigger.getTrigger().getConditions()).stream().flatMap(Collection::stream)
.filter(Predicate.not(MultipleCondition.class::isInstance))
.toList(),
conditionService.conditionContext(
Expand All @@ -67,7 +67,7 @@ public List<Execution> computeExecutionsFromFlowTriggers(Execution execution, Li
if (multipleConditionStorage.isPresent()) {
List<FlowWithFlowTriggerAndMultipleCondition> flowWithMultipleConditionsToEvaluate = validTriggersBeforeMultipleConditionEval.stream()
.flatMap(flowWithFlowTrigger ->
flowWithFlowTrigger.getTrigger().getConditions().stream()
Optional.ofNullable(flowWithFlowTrigger.getTrigger().getConditions()).stream().flatMap(Collection::stream)
.filter(MultipleCondition.class::isInstance)
.map(MultipleCondition.class::cast)
.map(multipleCondition -> new FlowWithFlowTriggerAndMultipleCondition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void trigger() throws InterruptedException, TimeoutException {

executionQueue.receive(execution -> {
synchronized (ended) {
if (execution.getState().getCurrent() == State.Type.SUCCESS) {
if (execution.getState().getCurrent() == State.Type.SUCCESS && !execution.getFlowId().equals("trigger-flow-listener-no-condition")) {
if (!ended.containsKey(execution.getId())) {
ended.put(execution.getId(), execution);
countDownLatch.countDown();
Expand Down Expand Up @@ -91,7 +91,7 @@ public void failed() throws InterruptedException, TimeoutException {

executionQueue.receive(execution -> {
synchronized (ended) {
if (execution.getState().getCurrent().isTerminated()) {
if (execution.getState().getCurrent().isTerminated() && !execution.getFlowId().equals("trigger-flow-listener-no-condition")) {
if (!ended.containsKey(execution.getId())) {
ended.put(execution.getId(), execution);
countDownLatch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void restartFailedThenSuccess() throws Exception {

// wait
Execution finishedRestartedExecution = runnerUtils.awaitExecution(
execution -> execution.getState().getCurrent() == State.Type.SUCCESS,
execution -> !execution.getFlowId().equals("trigger-flow-listener-no-condition") && execution.getState().getCurrent() == State.Type.SUCCESS,
throwRunnable(() -> {
Execution restartedExec = executionService.restart(firstExecution, null);
executionQueue.emit(restartedExec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,14 @@ public void multipleCondition() {

@Test
public void self1() {
Flow flow = parse("flows/valids/trigger-multiplecondition-listener.yaml").withRevision(1);
Flow flow = parse("flows/valids/trigger-multiplecondition-listener.yaml").toBuilder().revision(1).build();

assertThat(flowTopologyService.isChild(flow, flow), nullValue());
}

@Test
public void self() {
Flow flow = parse("flows/valids/trigger-flow-listener.yaml").withRevision(1);
Flow flow = parse("flows/valids/trigger-flow-listener.yaml").toBuilder().revision(1).build();

assertThat(flowTopologyService.isChild(flow, flow), nullValue());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package io.kestra.core.tasks.flows;

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -16,7 +19,7 @@ public class BadFlowableTest extends AbstractMemoryRunnerTest {
void sequential() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "bad-flowable");

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat("Task runs were: "+ execution.getTaskRunList().stream().map(TaskRun::getTaskId).toList(), execution.getTaskRunList(), hasSize(2));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/test/java/io/kestra/core/tasks/flows/PauseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void run(RunnerUtils runnerUtils) throws Exception {
);

execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
e -> !e.getFlowId().equals("trigger-flow-listener-no-condition") && e.getState().getCurrent() == State.Type.SUCCESS,
() -> executionQueue.emit(restarted),
Duration.ofSeconds(5)
);
Expand All @@ -81,7 +81,7 @@ public void runDelay(RunnerUtils runnerUtils) throws Exception {
assertThat(execution.getTaskRunList(), hasSize(1));

execution = runnerUtils.awaitExecution(
e -> e.getState().getCurrent() == State.Type.SUCCESS,
e -> !e.getFlowId().equals("trigger-flow-listener-no-condition") && e.getState().getCurrent() == State.Type.SUCCESS,
() -> {},
Duration.ofSeconds(5)
);
Expand All @@ -104,7 +104,7 @@ public void runTimeout(RunnerUtils runnerUtils) throws Exception {
Duration.ofSeconds(5)
);

assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
assertThat("Task runs were: " + execution.getTaskRunList().toString(), execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.PAUSED).count(), is(1L));
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.RUNNING).count(), is(1L));
assertThat(execution.getTaskRunList().get(0).getState().getHistories().stream().filter(history -> history.getState() == State.Type.FAILED).count(), is(1L));
assertThat(execution.getTaskRunList(), hasSize(1));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
id: trigger-flow-listener-no-condition
namespace: io.kestra.tests

inputs:
- name: from-parent
type: STRING

tasks:
- id: only-listener
type: io.kestra.core.tasks.debugs.Return
format: "simple return"

triggers:
- id: listen-flow
type: io.kestra.core.models.triggers.types.Flow
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=0.12.0-SNAPSHOT
version=0.11.1

jacksonVersion=2.14.2
micronautVersion=3.9.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ public FlowWithSource update(Flow flow, Flow previous, String flowSource, Flow f

@SneakyThrows
private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowSource) throws ConstraintViolationException {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}

// flow exists, return it
Optional<FlowWithSource> exists = this.findByIdWithSource(flow.getNamespace(), flow.getId());
if (exists.isPresent() && exists.get().isUpdatable(flow, flowSource)) {
Expand All @@ -357,9 +361,9 @@ private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowS
List<FlowWithSource> revisions = this.findRevisions(flow.getNamespace(), flow.getId());

if (revisions.size() > 0) {
flow = flow.withRevision(revisions.get(revisions.size() - 1).getRevision() + 1);
flow = flow.toBuilder().revision(revisions.get(revisions.size() - 1).getRevision() + 1).build();
} else {
flow = flow.withRevision(1);
flow = flow.toBuilder().revision(1).build();
}

Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(flow);
Expand All @@ -376,6 +380,10 @@ private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowS
@SneakyThrows
@Override
public Flow delete(Flow flow) {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}

Optional<Flow> revision = this.findById(flow.getNamespace(), flow.getId(), Optional.of(flow.getRevision()));
if (revision.isEmpty()) {
throw new IllegalStateException("Flow " + flow.getId() + " doesn't exists");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ public FlowWithSource update(Flow flow, Flow previous, String flowSource, Flow f
}

private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowSource) throws ConstraintViolationException {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}

// flow exists, return it
Optional<Flow> exists = this.findById(flow.getNamespace(), flow.getId());
Optional<String> existsSource = this.findSourceById(flow.getNamespace(), flow.getId());
Expand All @@ -205,9 +209,9 @@ private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowS
List<FlowWithSource> revisions = this.findRevisions(flow.getNamespace(), flow.getId());

if (revisions.size() > 0) {
flow = flow.withRevision(revisions.get(revisions.size() - 1).getRevision() + 1);
flow = flow.toBuilder().revision(revisions.get(revisions.size() - 1).getRevision() + 1).build();
} else {
flow = flow.withRevision(1);
flow = flow.toBuilder().revision(1).build();
}

this.flows.put(flowId(flow), flow);
Expand All @@ -222,6 +226,10 @@ private FlowWithSource save(Flow flow, CrudEventType crudEventType, String flowS

@Override
public Flow delete(Flow flow) {
if (flow instanceof FlowWithSource) {
flow = ((FlowWithSource) flow).toFlow();
}

if (this.findById(flow.getNamespace(), flow.getId(), Optional.of(flow.getRevision())).isEmpty()) {
throw new IllegalStateException("Flow " + flow.getId() + " doesn't exists");
}
Expand All @@ -232,8 +240,9 @@ public Flow delete(Flow flow) {
this.flows.remove(flowId(deleted));
this.revisions.put(deleted.uid(), deleted);

Flow finalFlow = flow;
ListUtils.emptyOnNull(flow.getTriggers())
.forEach(abstractTrigger -> triggerQueue.delete(Trigger.of(flow, abstractTrigger)));
.forEach(abstractTrigger -> triggerQueue.delete(Trigger.of(finalFlow, abstractTrigger)));

eventPublisher.publishEvent(new CrudEvent<>(flow, CrudEventType.DELETE));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.kestra.core.models.hierarchies.FlowGraph;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.tasks.debugs.Return;
import io.kestra.core.tasks.flows.Sequential;
Expand Down Expand Up @@ -251,7 +252,18 @@ void createFlow() {
Flow get = parseFlow(client.toBlocking().retrieve(HttpRequest.GET("/api/v1/flows/" + flow.getNamespace() + "/" + flow.getId()), String.class));
assertThat(get.getId(), is(flow.getId()));
assertThat(get.getInputs().get(0).getName(), is("a"));
}

@Test
void createFlowWithJsonLabels() {
Map<String, Object> flow = JacksonMapper.toMap(generateFlow("io.kestra.unittest", "a"));
flow.put("labels", Map.of("a", "b"));

Flow result = parseFlow(client.toBlocking().retrieve(POST("/api/v1/flows", flow), String.class));

assertThat(result.getId(), is(flow.get("id")));
assertThat(result.getLabels().get(0).key(), is("a"));
assertThat(result.getLabels().get(0).value(), is("b"));
}

@Test
Expand Down

0 comments on commit fef619b

Please sign in to comment.