Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Sep 8, 2024
1 parent 96b36c4 commit 4bb8ae5
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ public void testObservabilityEventFlowFailed() throws IOException, ReflectiveOpe
jobStatusMonitor.shutDown();
}

@Test// (dependsOnMethods = "testObservabilityEventFlowFailed")
@Test
public void testProcessMessageForSkippedEvent() throws IOException, ReflectiveOperationException {
DagManagementStateStore dagManagementStateStore = mock(DagManagementStateStore.class);
KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic8");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
log.info("Submitted job {} for dagId {}", DagUtils.getJobName(dagNode), dagId);
}

public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws IOException {
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel) throws IOException {
Properties cancelJobArgs = new Properties();
String serializedFuture = null;

Expand Down Expand Up @@ -185,22 +185,31 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
/**
* Emits JOB_SKIPPED GTE for each of the dependent job.
*/
public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan> dag, Dag.DagNode<JobExecutionPlan> node,
DagManagementStateStore dagManagementStateStore) throws IOException {
for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
child.getValue().setExecutionStatus(SKIPPED);
dagManagementStateStore.updateDagNode(child);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), child.getValue());
public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan> dag, Dag.DagNode<JobExecutionPlan> node) {
Set<Dag.DagNode<JobExecutionPlan>> dependentJobs = new HashSet<>();
findDependentJobs(dag, node, dependentJobs);
for (Dag.DagNode<JobExecutionPlan> dependentJob : dependentJobs) {
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), dependentJob.getValue());
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata);
}
}

public static void cancelDag(Dag<JobExecutionPlan> dag, DagManagementStateStore dagManagementStateStore) throws IOException {
private static void findDependentJobs(Dag<JobExecutionPlan> dag,
Dag.DagNode<JobExecutionPlan> node, Set<Dag.DagNode<JobExecutionPlan>> dependentJobs) {
for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
if (!dependentJobs.contains(child)) {
dependentJobs.add(child);
findDependentJobs(dag, child, dependentJobs);
}
}
}

public static void cancelDag(Dag<JobExecutionPlan> dag) throws IOException {
List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), DagUtils.generateDagId(dag));

for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNodeToCancel);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore,
log.info("Found {} DagNodes to cancel (DagId {}).", dagNodesToCancel.size(), getDagId());

for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNodeToCancel);
}

dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore,
log.info("Job exceeded the job start deadline. Killing it now. Job - {}, jobOrchestratedTime - {}, timeOutForJobStart - {}",
DagUtils.getJobName(dagNode), jobOrchestratedTime, timeOutForJobStart);
dagManagementStateStore.getDagManagerMetrics().incrementCountsStartSlaExceeded(dagNode);
DagProcUtils.cancelDagNode(dagNode, dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNode);
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
dag.setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
if (this.shouldKillSpecificJob) {
Optional<Dag.DagNode<JobExecutionPlan>> dagNodeToCancel = dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId).getLeft();
if (dagNodeToCancel.isPresent()) {
DagProcUtils.cancelDagNode(dagNodeToCancel.get(), dagManagementStateStore);
DagProcUtils.cancelDagNode(dagNodeToCancel.get());
} else {
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
log.error("Did not find Dag node with id {}, it might be already cancelled/finished and thus cleaned up from the store.", getDagNodeId());
}
} else {
DagProcUtils.cancelDag(dag.get(), dagManagementStateStore);
DagProcUtils.cancelDag(dag.get());
}
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
// The other ReevaluateDagProc can do that purely out of race condition when the dag is cancelled and ReevaluateDagProcs
// are being processed for dag node kill requests; or when this DagProc ran into some exception after updating the
// status and thus gave the other ReevaluateDagProc sufficient time to delete the dag before being retried.
// This can also happen when a job is cancelled/failed and dag is cleaned; but we are still processing Reevaluate
// dag actions for SKIPPED dependent jobs
log.warn("Dag not found {}", getDagId());
return;
}
Expand Down Expand Up @@ -159,12 +161,12 @@ private void onJobFinish(DagManagementStateStore dagManagementStateStore, Dag.Da
dag.setMessage("Flow failed because job " + jobName + " failed");
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED);
dagManagementStateStore.getDagManagerMetrics().incrementExecutorFailed(dagNode);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode, dagManagementStateStore);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode);
break;
case CANCELLED:
case SKIPPED:
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode, dagManagementStateStore);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode);
break;
case COMPLETE:
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
KAFKA_AUTO_OFFSET_RESET_KEY, KAFKA_AUTO_OFFSET_RESET_SMALLEST));

private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES = ImmutableList
.of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.SKIPPED, ExecutionStatus.PENDING_RESUME,
ExecutionStatus.PENDING_RETRY, ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE,
.of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY,
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.SKIPPED, ExecutionStatus.COMPLETE,
ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);

private final JobIssueEventHandler jobIssueEventHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -225,20 +224,5 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc
.submit(eq(TimingEvent.LauncherTimings.JOB_CANCEL), anyMap());
Mockito.verify(this.mockedEventSubmitter, Mockito.times(numOfCancelledFlows))
.submit(eq(TimingEvent.FlowTimings.FLOW_CANCELLED), anyMap());

Assert.assertEquals(dag.getNodes().get(0).getValue().getExecutionStatus(), ExecutionStatus.ORCHESTRATED); // because this was a mocked dag and we launched this job
Assert.assertEquals(dag.getNodes().get(1).getValue().getExecutionStatus(), ExecutionStatus.PENDING); // because this was a mocked dag and we did not launch the job
Assert.assertEquals(dag.getNodes().get(2).getValue().getExecutionStatus(), ExecutionStatus.CANCELLED); // because we cancelled this job
Assert.assertEquals(dag.getNodes().get(3).getValue().getExecutionStatus(), ExecutionStatus.PENDING); // because this was a mocked dag and we did not launch the job
Assert.assertEquals(dag.getNodes().get(4).getValue().getExecutionStatus(), ExecutionStatus.SKIPPED); // because its parent job was cancelled

Assert.assertTrue(dagManagementStateStore.hasRunningJobs(dagId));

dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(3).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);

doReturn(new HashSet<>(dag.getNodes())).when(dagManagementStateStore).getDagNodes(any());
Assert.assertFalse(dagManagementStateStore.hasRunningJobs(dagId));
}
}
Loading

0 comments on commit 4bb8ae5

Please sign in to comment.