Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
reverted WorkflowExecutor.pushParentWorkflow changes from #2739, adde…
Browse files Browse the repository at this point in the history
…d log when subWorkflowChanged is reset.
  • Loading branch information
aravindanr committed Feb 8, 2022
1 parent 8c7742a commit 16c6342
Showing 1 changed file with 19 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.core.utils.ParametersUtils;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.core.utils.Utils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
Expand All @@ -56,6 +55,7 @@
import com.google.common.base.Preconditions;

import static com.netflix.conductor.core.exception.ApplicationException.Code.*;
import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE;
import static com.netflix.conductor.model.TaskModel.Status.*;

/** Workflow services provider interface */
Expand Down Expand Up @@ -707,7 +707,7 @@ private void retry(WorkflowModel workflow) {
workflow.setReasonForIncompletion(null);
// Add to decider queue
queueDAO.push(
Utils.DECIDER_QUEUE,
DECIDER_QUEUE,
workflow.getWorkflowId(),
workflow.getPriority(),
properties.getWorkflowOffsetTimeout().getSeconds());
Expand Down Expand Up @@ -834,8 +834,7 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) {
LOGGER.debug("Completing workflow execution for {}", workflow.getWorkflowId());

if (workflow.getStatus().equals(WorkflowModel.Status.COMPLETED)) {
queueDAO.remove(
Utils.DECIDER_QUEUE, workflow.getWorkflowId()); // remove from the sweep queue
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId()); // remove from the sweep queue
executionDAOFacade.removeFromPendingWorkflow(
workflow.getWorkflowName(), workflow.getWorkflowId());
LOGGER.debug("Workflow: {} has already been completed.", workflow.getWorkflowId());
Expand Down Expand Up @@ -1356,6 +1355,11 @@ private void adjustStateIfSubWorkflowChanged(WorkflowModel workflow) {
subWorkflowTask.setSubworkflowChanged(false);
executionDAOFacade.updateTask(subWorkflowTask);

LOGGER.info(
"{} reset subworkflowChanged flag for {}",
workflow.toShortString(),
subWorkflowTask.getTaskId());

// find all terminal and unsuccessful JOIN tasks and set them to IN_PROGRESS
if (workflow.getWorkflowDefinition().containsType(TaskType.TASK_TYPE_JOIN)
|| workflow.getWorkflowDefinition()
Expand Down Expand Up @@ -1428,7 +1432,7 @@ List<String> cancelNonTerminalTasks(WorkflowModel workflow) {
if (erroredTasks.isEmpty()) {
try {
workflowStatusListener.onWorkflowFinalizedIfEnabled(workflow);
queueDAO.remove(Utils.DECIDER_QUEUE, workflow.getWorkflowId());
queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId());
} catch (Exception e) {
LOGGER.error(
"Error removing workflow: {} from decider queue",
Expand Down Expand Up @@ -1483,7 +1487,7 @@ public void pauseWorkflow(String workflowId) {
// remove from the sweep queue
// any exceptions can be ignored, as this is not critical to the pause operation
try {
queueDAO.remove(Utils.DECIDER_QUEUE, workflowId);
queueDAO.remove(DECIDER_QUEUE, workflowId);
} catch (Exception e) {
LOGGER.info(
"[pauseWorkflow] Error removing workflow: {} from decider queue",
Expand All @@ -1510,7 +1514,7 @@ public void resumeWorkflow(String workflowId) {
workflow.setLastRetriedTime(System.currentTimeMillis());
// Add to decider queue
queueDAO.push(
Utils.DECIDER_QUEUE,
DECIDER_QUEUE,
workflow.getWorkflowId(),
workflow.getPriority(),
properties.getWorkflowOffsetTimeout().getSeconds());
Expand Down Expand Up @@ -1841,7 +1845,7 @@ private boolean rerunWF(
}

queueDAO.push(
Utils.DECIDER_QUEUE,
DECIDER_QUEUE,
workflow.getWorkflowId(),
workflow.getPriority(),
properties.getWorkflowOffsetTimeout().getSeconds());
Expand Down Expand Up @@ -1888,7 +1892,7 @@ private boolean rerunWF(
}
// Add to decider queue
queueDAO.push(
Utils.DECIDER_QUEUE,
DECIDER_QUEUE,
workflow.getWorkflowId(),
workflow.getPriority(),
properties.getWorkflowOffsetTimeout().getSeconds());
Expand Down Expand Up @@ -1990,20 +1994,16 @@ private void executeSubworkflowTaskAndSyncData(
WorkflowSystemTask subWorkflowSystemTask =
systemTaskRegistry.get(TaskType.TASK_TYPE_SUB_WORKFLOW);
subWorkflowSystemTask.execute(subWorkflow, subWorkflowTask, this);
// Keep Subworkflow task's data consistent with Subworkflow's.
if (subWorkflowTask.getStatus().isTerminal()
&& subWorkflowTask.getExternalOutputPayloadStoragePath() != null
&& !subWorkflowTask.getOutputData().isEmpty()) {
Map<String, Object> parentWorkflowTaskOutputData = subWorkflowTask.getOutputData();
subWorkflowTask.getOutputData().putAll(parentWorkflowTaskOutputData);
}
}

/** Pushes parent workflow id into the decider queue with a priority. */
private void pushParentWorkflow(String parentWorkflowId) {
if (!queueDAO.containsMessage(Utils.DECIDER_QUEUE, parentWorkflowId)) {
queueDAO.push(Utils.DECIDER_QUEUE, parentWorkflowId, PARENT_WF_PRIORITY, 0);
LOGGER.info("Pushed parent workflow {} to {}", parentWorkflowId, Utils.DECIDER_QUEUE);
if (queueDAO.containsMessage(DECIDER_QUEUE, parentWorkflowId)) {
queueDAO.postpone(DECIDER_QUEUE, parentWorkflowId, PARENT_WF_PRIORITY, 0);
} else {
queueDAO.push(DECIDER_QUEUE, parentWorkflowId, PARENT_WF_PRIORITY, 0);
}

LOGGER.info("Pushed parent workflow {} to {}", parentWorkflowId, DECIDER_QUEUE);
}
}

0 comments on commit 16c6342

Please sign in to comment.