diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 9e46688772..48089faeba 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -44,7 +44,6 @@ import com.netflix.conductor.core.utils.IDGenerator; import com.netflix.conductor.core.utils.ParametersUtils; import com.netflix.conductor.core.utils.QueueUtils; -import com.netflix.conductor.core.utils.Utils; import com.netflix.conductor.dao.MetadataDAO; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; @@ -56,6 +55,7 @@ import com.google.common.base.Preconditions; import static com.netflix.conductor.core.exception.ApplicationException.Code.*; +import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE; import static com.netflix.conductor.model.TaskModel.Status.*; /** Workflow services provider interface */ @@ -707,7 +707,7 @@ private void retry(WorkflowModel workflow) { workflow.setReasonForIncompletion(null); // Add to decider queue queueDAO.push( - Utils.DECIDER_QUEUE, + DECIDER_QUEUE, workflow.getWorkflowId(), workflow.getPriority(), properties.getWorkflowOffsetTimeout().getSeconds()); @@ -834,8 +834,7 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) { LOGGER.debug("Completing workflow execution for {}", workflow.getWorkflowId()); if (workflow.getStatus().equals(WorkflowModel.Status.COMPLETED)) { - queueDAO.remove( - Utils.DECIDER_QUEUE, workflow.getWorkflowId()); // remove from the sweep queue + queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId()); // remove from the sweep queue executionDAOFacade.removeFromPendingWorkflow( workflow.getWorkflowName(), workflow.getWorkflowId()); LOGGER.debug("Workflow: {} has already been completed.", workflow.getWorkflowId()); @@ -1356,6 +1355,11 @@ private void adjustStateIfSubWorkflowChanged(WorkflowModel workflow) { subWorkflowTask.setSubworkflowChanged(false); executionDAOFacade.updateTask(subWorkflowTask); + LOGGER.info( + "{} reset subworkflowChanged flag for {}", + workflow.toShortString(), + subWorkflowTask.getTaskId()); + // find all terminal and unsuccessful JOIN tasks and set them to IN_PROGRESS if (workflow.getWorkflowDefinition().containsType(TaskType.TASK_TYPE_JOIN) || workflow.getWorkflowDefinition() @@ -1428,7 +1432,7 @@ List cancelNonTerminalTasks(WorkflowModel workflow) { if (erroredTasks.isEmpty()) { try { workflowStatusListener.onWorkflowFinalizedIfEnabled(workflow); - queueDAO.remove(Utils.DECIDER_QUEUE, workflow.getWorkflowId()); + queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId()); } catch (Exception e) { LOGGER.error( "Error removing workflow: {} from decider queue", @@ -1483,7 +1487,7 @@ public void pauseWorkflow(String workflowId) { // remove from the sweep queue // any exceptions can be ignored, as this is not critical to the pause operation try { - queueDAO.remove(Utils.DECIDER_QUEUE, workflowId); + queueDAO.remove(DECIDER_QUEUE, workflowId); } catch (Exception e) { LOGGER.info( "[pauseWorkflow] Error removing workflow: {} from decider queue", @@ -1510,7 +1514,7 @@ public void resumeWorkflow(String workflowId) { workflow.setLastRetriedTime(System.currentTimeMillis()); // Add to decider queue queueDAO.push( - Utils.DECIDER_QUEUE, + DECIDER_QUEUE, workflow.getWorkflowId(), workflow.getPriority(), properties.getWorkflowOffsetTimeout().getSeconds()); @@ -1841,7 +1845,7 @@ private boolean rerunWF( } queueDAO.push( - Utils.DECIDER_QUEUE, + DECIDER_QUEUE, workflow.getWorkflowId(), workflow.getPriority(), properties.getWorkflowOffsetTimeout().getSeconds()); @@ -1888,7 +1892,7 @@ private boolean rerunWF( } // Add to decider queue queueDAO.push( - Utils.DECIDER_QUEUE, + DECIDER_QUEUE, workflow.getWorkflowId(), workflow.getPriority(), properties.getWorkflowOffsetTimeout().getSeconds()); @@ -1990,20 +1994,16 @@ private void executeSubworkflowTaskAndSyncData( WorkflowSystemTask subWorkflowSystemTask = systemTaskRegistry.get(TaskType.TASK_TYPE_SUB_WORKFLOW); subWorkflowSystemTask.execute(subWorkflow, subWorkflowTask, this); - // Keep Subworkflow task's data consistent with Subworkflow's. - if (subWorkflowTask.getStatus().isTerminal() - && subWorkflowTask.getExternalOutputPayloadStoragePath() != null - && !subWorkflowTask.getOutputData().isEmpty()) { - Map parentWorkflowTaskOutputData = subWorkflowTask.getOutputData(); - subWorkflowTask.getOutputData().putAll(parentWorkflowTaskOutputData); - } } /** Pushes parent workflow id into the decider queue with a priority. */ private void pushParentWorkflow(String parentWorkflowId) { - if (!queueDAO.containsMessage(Utils.DECIDER_QUEUE, parentWorkflowId)) { - queueDAO.push(Utils.DECIDER_QUEUE, parentWorkflowId, PARENT_WF_PRIORITY, 0); - LOGGER.info("Pushed parent workflow {} to {}", parentWorkflowId, Utils.DECIDER_QUEUE); + if (queueDAO.containsMessage(DECIDER_QUEUE, parentWorkflowId)) { + queueDAO.postpone(DECIDER_QUEUE, parentWorkflowId, PARENT_WF_PRIORITY, 0); + } else { + queueDAO.push(DECIDER_QUEUE, parentWorkflowId, PARENT_WF_PRIORITY, 0); } + + LOGGER.info("Pushed parent workflow {} to {}", parentWorkflowId, DECIDER_QUEUE); } }