diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java index 71693705fce..da144c28cd0 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import com.google.common.base.Optional; import com.google.common.collect.Maps; @@ -174,7 +175,7 @@ public static void cancelDagNode(Dag.DagNode dagNodeToCancel, log.warn("No Job future when canceling DAG node - {}", dagNodeToCancel.getValue().getId()); } DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), cancelJobArgs).get(); - sendCancellationEvent(dagNodeToCancel); + sendJobCancellationEvent(dagNodeToCancel); log.info("Cancelled dag node {}, spec_producer_future {}", dagNodeToCancel.getValue().getId(), serializedFuture); } catch (Exception e) { throw new IOException(e); @@ -190,7 +191,7 @@ public static void cancelDag(Dag dag, DagManagementStateStore } } - private static void sendCancellationEvent(Dag.DagNode dagNodeToCancel) { + private static void sendJobCancellationEvent(Dag.DagNode dagNodeToCancel) { JobExecutionPlan jobExecutionPlan = dagNodeToCancel.getValue(); Map jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan); DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata); @@ -357,4 +358,21 @@ private static boolean areAllParentsInCanRun(Dag.DagNode node, Set> canRun) { return node.getParentNodes() == null || canRun.containsAll(node.getParentNodes()); } + + public static String calcFlowStatus(Dag dag) { + Set jobsStatuses = dag.getNodes().stream().map(node -> node.getValue().getExecutionStatus()) + .collect(Collectors.toSet()); + + if (jobsStatuses.contains(FAILED)) { + return TimingEvent.FlowTimings.FLOW_FAILED; + } else if (jobsStatuses.contains(CANCELLED)) { + return TimingEvent.FlowTimings.FLOW_CANCELLED; + } else if (jobsStatuses.contains(PENDING_RESUME)) { + return TimingEvent.FlowTimings.FLOW_PENDING_RESUME; + } else if (jobsStatuses.stream().allMatch(jobStatus -> jobStatus == COMPLETE)) { + return TimingEvent.FlowTimings.FLOW_SUCCEEDED; + } else { + return TimingEvent.FlowTimings.FLOW_RUNNING; + } + } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java index cdfa3d05095..a4afe4e1987 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java @@ -115,13 +115,8 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair dag = DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 1, proxyUser, additionalConfig); setJobStatuses(dag, Collections.singletonList(COMPLETE)); Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_SUCCEEDED, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Collections.singletonList(FAILED)); Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Collections.singletonList(CANCELLED)); Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Collections.singletonList(PENDING)); Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Collections.singletonList(PENDING_RETRY)); Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Collections.singletonList(PENDING_RESUME)); Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_PENDING_RESUME, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Collections.singletonList(ORCHESTRATED)); Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Collections.singletonList(RUNNING)); Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag)); } @Test - public void testIsDagFinishedTwoNodes() throws URISyntaxException { + public void testFlowStatusAndIsDagFinishedTwoNodes() throws URISyntaxException { Dag dag = DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2, proxyUser, additionalConfig); setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING)); Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED)); Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Arrays.asList(FAILED, PENDING)); Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING)); Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag)); } @Test - public void testIsDagFinishedThreeNodes() throws URISyntaxException { + public void testFlowStatusAndIsDagFinishedThreeNodes() throws URISyntaxException { Dag dag = buildComplexDag3(); setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING)); Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED, PENDING)); Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, PENDING)); Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag)); } @Test - public void testIsDagFinishedFourNodes() throws URISyntaxException { + public void testFlowStatusAndIsDagFinishedFourNodes() throws URISyntaxException { Dag dag = buildLinearDagOf4Nodes(); setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING, PENDING)); Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Arrays.asList(FAILED, PENDING, PENDING, PENDING)); Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING, PENDING, PENDING)); Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Arrays.asList(PENDING, PENDING, PENDING, PENDING)); Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag)); } @Test - public void testIsDagFinishedMultiNodes() throws URISyntaxException { + public void testFlowStatusAndIsDagFinishedMultiNodes() throws URISyntaxException { Dag dag = buildComplexDag1(); setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE)); Assert.assertTrue(DagProcUtils.isDagFinished(dag)); Collections.shuffle(dag.getNodes()); Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_SUCCEEDED, DagProcUtils.calcFlowStatus(dag)); Dag dag2 = buildComplexDag1(); setJobStatuses(dag2, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING)); Assert.assertFalse(DagProcUtils.isDagFinished(dag2)); Collections.shuffle(dag2.getNodes()); Assert.assertFalse(DagProcUtils.isDagFinished(dag2)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag2)); Dag dag3 = buildComplexDag1(); setJobStatuses(dag3, Arrays.asList(FAILED, COMPLETE, COMPLETE, COMPLETE, PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING)); Assert.assertTrue(DagProcUtils.isDagFinished(dag3)); Collections.shuffle(dag3.getNodes()); Assert.assertTrue(DagProcUtils.isDagFinished(dag3)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag3)); Dag dag4 = buildComplexDag1(); setJobStatuses(dag4, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, CANCELLED, COMPLETE, PENDING, PENDING, PENDING)); Assert.assertFalse(DagProcUtils.isDagFinished(dag4)); Collections.shuffle(dag4.getNodes()); Assert.assertFalse(DagProcUtils.isDagFinished(dag4)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag4)); Dag dag5 = buildComplexDag1(); setJobStatuses(dag5, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, CANCELLED, COMPLETE, COMPLETE, PENDING, PENDING)); Assert.assertTrue(DagProcUtils.isDagFinished(dag5)); Collections.shuffle(dag5.getNodes()); Assert.assertTrue(DagProcUtils.isDagFinished(dag5)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag5)); Dag dag6 = buildComplexDag1(); setJobStatuses(dag6, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, PENDING_RESUME, COMPLETE, COMPLETE, PENDING, PENDING)); Assert.assertFalse(DagProcUtils.isDagFinished(dag6)); Collections.shuffle(dag6.getNodes()); Assert.assertFalse(DagProcUtils.isDagFinished(dag6)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_PENDING_RESUME, DagProcUtils.calcFlowStatus(dag6)); Dag dag7 = buildComplexDag1(); setJobStatuses(dag7, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, PENDING_RETRY, COMPLETE, COMPLETE, PENDING, PENDING)); Assert.assertFalse(DagProcUtils.isDagFinished(dag7)); Collections.shuffle(dag7.getNodes()); Assert.assertFalse(DagProcUtils.isDagFinished(dag7)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag7)); Dag dag8 = buildComplexDag1(); setJobStatuses(dag8, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, RUNNING, COMPLETE, COMPLETE, PENDING, PENDING)); Assert.assertFalse(DagProcUtils.isDagFinished(dag8)); Collections.shuffle(dag8.getNodes()); Assert.assertFalse(DagProcUtils.isDagFinished(dag8)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, DagProcUtils.calcFlowStatus(dag8)); Dag dag9 = buildComplexDag1(); setJobStatuses(dag9, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, FAILED, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING, COMPLETE)); Assert.assertFalse(DagProcUtils.isDagFinished(dag9)); Collections.shuffle(dag9.getNodes()); Assert.assertFalse(DagProcUtils.isDagFinished(dag9)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag9)); } @Test - public void testIsDagFinishedWithFinishRunningFailureOptionTwoNodes() throws URISyntaxException { + public void testFlowStatusAndIsDagFinishedWithFinishRunningFailureOptionTwoNodes() throws URISyntaxException { Dag dag = DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2, proxyUser, additionalConfig); setJobStatuses(dag, Arrays.asList(FAILED, PENDING)); Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING)); Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag)); } @Test - public void testIsDagFinishedWithFinishRunningFailureOptionMultiNodes() throws URISyntaxException { + public void testFlowStatusAndIsDagFinishedWithFinishRunningFailureOptionMultiNodes() throws URISyntaxException { Dag dag = buildComplexDagWithFinishRunningFailureOption(); setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, PENDING, PENDING)); Assert.assertTrue(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag)); setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, RUNNING, PENDING)); Assert.assertFalse(DagProcUtils.isDagFinished(dag)); + Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, DagProcUtils.calcFlowStatus(dag)); } private void setJobStatuses(Dag dag, List statuses) {