Skip to content

Commit

Permalink
[GOBBLIN-2150] calculate flow status correctly (#4048)
Browse files Browse the repository at this point in the history
* calculate flow status correctly
* address review comments
  • Loading branch information
arjun4084346 authored Sep 5, 2024
1 parent 0ea2414 commit f40bb44
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.google.common.base.Optional;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -174,7 +175,7 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getId());
}
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), cancelJobArgs).get();
sendCancellationEvent(dagNodeToCancel);
sendJobCancellationEvent(dagNodeToCancel);
log.info("Cancelled dag node {}, spec_producer_future {}", dagNodeToCancel.getValue().getId(), serializedFuture);
} catch (Exception e) {
throw new IOException(e);
Expand All @@ -190,7 +191,7 @@ public static void cancelDag(Dag<JobExecutionPlan> dag, DagManagementStateStore
}
}

private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeToCancel) {
private static void sendJobCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNodeToCancel) {
JobExecutionPlan jobExecutionPlan = dagNodeToCancel.getValue();
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
Expand Down Expand Up @@ -357,4 +358,21 @@ private static boolean areAllParentsInCanRun(Dag.DagNode<JobExecutionPlan> node,
Set<Dag.DagNode<JobExecutionPlan>> canRun) {
return node.getParentNodes() == null || canRun.containsAll(node.getParentNodes());
}

public static String calcFlowStatus(Dag<JobExecutionPlan> dag) {
Set<ExecutionStatus> jobsStatuses = dag.getNodes().stream().map(node -> node.getValue().getExecutionStatus())
.collect(Collectors.toSet());

if (jobsStatuses.contains(FAILED)) {
return TimingEvent.FlowTimings.FLOW_FAILED;
} else if (jobsStatuses.contains(CANCELLED)) {
return TimingEvent.FlowTimings.FLOW_CANCELLED;
} else if (jobsStatuses.contains(PENDING_RESUME)) {
return TimingEvent.FlowTimings.FLOW_PENDING_RESUME;
} else if (jobsStatuses.stream().allMatch(jobStatus -> jobStatus == COMPLETE)) {
return TimingEvent.FlowTimings.FLOW_SUCCEEDED;
} else {
return TimingEvent.FlowTimings.FLOW_RUNNING;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,8 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
dag.setFlowEvent(null);
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, getDagId());
} else if (DagProcUtils.isDagFinished(dag)) {
if (dag.getFlowEvent() == null) {
// If the dag flow event is not set and there are no more jobs running, then it is successful
// also note that `onJobFinish` method does whatever is required to do after job finish, determining a Dag's
// status is not possible on individual job's finish status
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
}
String flowEvent = dag.getFlowEvent();
String flowEvent = DagProcUtils.calcFlowStatus(dag);
dag.setFlowEvent(flowEvent);
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag, flowEvent);
if (flowEvent.equals(TimingEvent.FlowTimings.FLOW_SUCCEEDED)) {
// todo - verify if work from PR#3641 is required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
Expand Down Expand Up @@ -73,162 +74,194 @@ public void testGetJobSpecFromDag() throws Exception {
}

@Test
public void testIsDagFinishedSingleNode() throws URISyntaxException {
public void testFlowStatusAndIsDagFinishedSingleNode() throws URISyntaxException {
Dag<JobExecutionPlan> dag =
DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 1, proxyUser, additionalConfig);

setJobStatuses(dag, Collections.singletonList(COMPLETE));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_SUCCEEDED, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Collections.singletonList(FAILED));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Collections.singletonList(CANCELLED));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Collections.singletonList(PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Collections.singletonList(PENDING_RETRY));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Collections.singletonList(PENDING_RESUME));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_PENDING_RESUME, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Collections.singletonList(ORCHESTRATED));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Collections.singletonList(RUNNING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));
}

@Test
public void testIsDagFinishedTwoNodes() throws URISyntaxException {
public void testFlowStatusAndIsDagFinishedTwoNodes() throws URISyntaxException {
Dag<JobExecutionPlan> dag =
DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2, proxyUser, additionalConfig);

setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Arrays.asList(FAILED, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag));
}

@Test
public void testIsDagFinishedThreeNodes() throws URISyntaxException {
public void testFlowStatusAndIsDagFinishedThreeNodes() throws URISyntaxException {
Dag<JobExecutionPlan> dag = buildComplexDag3();

setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag));
}

@Test
public void testIsDagFinishedFourNodes() throws URISyntaxException {
public void testFlowStatusAndIsDagFinishedFourNodes() throws URISyntaxException {
Dag<JobExecutionPlan> dag = buildLinearDagOf4Nodes();

setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Arrays.asList(FAILED, PENDING, PENDING, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING, PENDING, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Arrays.asList(PENDING, PENDING, PENDING, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag));
}

@Test
public void testIsDagFinishedMultiNodes() throws URISyntaxException {
public void testFlowStatusAndIsDagFinishedMultiNodes() throws URISyntaxException {
Dag<JobExecutionPlan> dag = buildComplexDag1();
setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Collections.shuffle(dag.getNodes());
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_SUCCEEDED, DagProcUtils.calcFlowStatus(dag));

Dag<JobExecutionPlan> dag2 = buildComplexDag1();
setJobStatuses(dag2, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag2));
Collections.shuffle(dag2.getNodes());
Assert.assertFalse(DagProcUtils.isDagFinished(dag2));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag2));

Dag<JobExecutionPlan> dag3 = buildComplexDag1();
setJobStatuses(dag3, Arrays.asList(FAILED, COMPLETE, COMPLETE, COMPLETE, PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag3));
Collections.shuffle(dag3.getNodes());
Assert.assertTrue(DagProcUtils.isDagFinished(dag3));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag3));

Dag<JobExecutionPlan> dag4 = buildComplexDag1();
setJobStatuses(dag4, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, CANCELLED, COMPLETE, PENDING, PENDING, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag4));
Collections.shuffle(dag4.getNodes());
Assert.assertFalse(DagProcUtils.isDagFinished(dag4));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag4));

Dag<JobExecutionPlan> dag5 = buildComplexDag1();
setJobStatuses(dag5, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, CANCELLED, COMPLETE, COMPLETE, PENDING, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag5));
Collections.shuffle(dag5.getNodes());
Assert.assertTrue(DagProcUtils.isDagFinished(dag5));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag5));

Dag<JobExecutionPlan> dag6 = buildComplexDag1();
setJobStatuses(dag6, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, PENDING_RESUME, COMPLETE, COMPLETE, PENDING, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag6));
Collections.shuffle(dag6.getNodes());
Assert.assertFalse(DagProcUtils.isDagFinished(dag6));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_PENDING_RESUME, DagProcUtils.calcFlowStatus(dag6));

Dag<JobExecutionPlan> dag7 = buildComplexDag1();
setJobStatuses(dag7, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, PENDING_RETRY, COMPLETE, COMPLETE, PENDING, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag7));
Collections.shuffle(dag7.getNodes());
Assert.assertFalse(DagProcUtils.isDagFinished(dag7));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag7));

Dag<JobExecutionPlan> dag8 = buildComplexDag1();
setJobStatuses(dag8, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, RUNNING, COMPLETE, COMPLETE, PENDING, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag8));
Collections.shuffle(dag8.getNodes());
Assert.assertFalse(DagProcUtils.isDagFinished(dag8));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag8));

Dag<JobExecutionPlan> dag9 = buildComplexDag1();
setJobStatuses(dag9, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, FAILED, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING, COMPLETE));
Assert.assertFalse(DagProcUtils.isDagFinished(dag9));
Collections.shuffle(dag9.getNodes());
Assert.assertFalse(DagProcUtils.isDagFinished(dag9));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag9));
}

@Test
public void testIsDagFinishedWithFinishRunningFailureOptionTwoNodes() throws URISyntaxException {
public void testFlowStatusAndIsDagFinishedWithFinishRunningFailureOptionTwoNodes() throws URISyntaxException {
Dag<JobExecutionPlan> dag =
DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2, proxyUser, additionalConfig);

setJobStatuses(dag, Arrays.asList(FAILED, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag));
}

@Test
public void testIsDagFinishedWithFinishRunningFailureOptionMultiNodes() throws URISyntaxException {
public void testFlowStatusAndIsDagFinishedWithFinishRunningFailureOptionMultiNodes() throws URISyntaxException {
Dag<JobExecutionPlan> dag = buildComplexDagWithFinishRunningFailureOption();

setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, PENDING, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag));

setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, RUNNING, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag));
}

private void setJobStatuses(Dag<JobExecutionPlan> dag, List<ExecutionStatus> statuses) {
Expand Down

0 comments on commit f40bb44

Please sign in to comment.