diff --git a/orquesta/specs/native/v1/models.py b/orquesta/specs/native/v1/models.py index 1712dfac..9cf10546 100644 --- a/orquesta/specs/native/v1/models.py +++ b/orquesta/specs/native/v1/models.py @@ -382,6 +382,7 @@ def detect_undefined_tasks(self, parent=None): # Identify the undefined task in task transitions. result = [] traversed = [] + queued_task = [] q = queue.Queue() for task in self.get_start_tasks(): @@ -412,8 +413,12 @@ def detect_undefined_tasks(self, parent=None): continue if self.has_task(next_task_name): - if next_task_name not in RESERVED_TASK_NAMES + traversed: + if ( + next_task_name not in RESERVED_TASK_NAMES + traversed + and next_task_name not in queued_task + ): q.put(next_task_name) + queued_task.append(next_task_name) else: entry = { "message": 'The task "%s" is not defined.' % next_task_name, @@ -523,6 +528,7 @@ def inspect_context(self, parent=None): ctxs = {} errors = [] traversed = [] + task_ctx_map = {} parent_ctx = parent.get("ctx", []) if parent else [] rolling_ctx = list(set(parent_ctx)) q = queue.Queue() @@ -592,7 +598,11 @@ def inspect_context(self, parent=None): next_task_spec = self.get_task(next_task_name) if not next_task_spec.has_join(): - q.put((next_task_name, branch_ctx)) + if next_task_name not in task_ctx_map or task_ctx_map[next_task_name] != set( + branch_ctx + ): + q.put((next_task_name, branch_ctx)) + task_ctx_map[next_task_name] = set(branch_ctx) else: next_task_ctx = ctxs.get(next_task_name, []) ctxs[next_task_name] = list(set(next_task_ctx + branch_ctx)) diff --git a/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py b/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py index e4df536a..1c770783 100644 --- a/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py +++ b/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py @@ -100,7 +100,7 @@ def test_runtime_function_of_splits_count(self): t2 = datetime.datetime.utcnow() delta = t2 - t1 - self.assertLess(delta.seconds, 3) + self.assertLess(delta.seconds, 2) class WorkflowConductorWithItemsStressTest(test_base.WorkflowConductorWithItemsTest):