diff --git a/orquesta/specs/native/v1/models.py b/orquesta/specs/native/v1/models.py index 1712dfac..9cf10546 100644 --- a/orquesta/specs/native/v1/models.py +++ b/orquesta/specs/native/v1/models.py @@ -382,6 +382,7 @@ def detect_undefined_tasks(self, parent=None): # Identify the undefined task in task transitions. result = [] traversed = [] + queued_task = [] q = queue.Queue() for task in self.get_start_tasks(): @@ -412,8 +413,12 @@ def detect_undefined_tasks(self, parent=None): continue if self.has_task(next_task_name): - if next_task_name not in RESERVED_TASK_NAMES + traversed: + if ( + next_task_name not in RESERVED_TASK_NAMES + traversed + and next_task_name not in queued_task + ): q.put(next_task_name) + queued_task.append(next_task_name) else: entry = { "message": 'The task "%s" is not defined.' % next_task_name, @@ -523,6 +528,7 @@ def inspect_context(self, parent=None): ctxs = {} errors = [] traversed = [] + task_ctx_map = {} parent_ctx = parent.get("ctx", []) if parent else [] rolling_ctx = list(set(parent_ctx)) q = queue.Queue() @@ -592,7 +598,11 @@ def inspect_context(self, parent=None): next_task_spec = self.get_task(next_task_name) if not next_task_spec.has_join(): - q.put((next_task_name, branch_ctx)) + if next_task_name not in task_ctx_map or task_ctx_map[next_task_name] != set( + branch_ctx + ): + q.put((next_task_name, branch_ctx)) + task_ctx_map[next_task_name] = set(branch_ctx) else: next_task_ctx = ctxs.get(next_task_name, []) ctxs[next_task_name] = list(set(next_task_ctx + branch_ctx)) diff --git a/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py b/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py index e4df536a..2bbb95ba 100644 --- a/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py +++ b/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py @@ -76,7 +76,7 @@ def test_serialization_function_of_data_size(self): conductor.deserialize(conductor.serialize()) def test_runtime_function_of_splits_count(self): - num_tasks = 25 + num_tasks = 75 wf_def = {"input": ["data"], "tasks": {}} @@ -100,7 +100,33 @@ def test_runtime_function_of_splits_count(self): t2 = datetime.datetime.utcnow() delta = t2 - t1 - self.assertLess(delta.seconds, 3) + self.assertLess(delta.seconds, 2) + + def test_inspect_performance(self): + num_tasks = 75 + + wf_def = {"input": ["data"], "tasks": {}} + + for i in range(1, num_tasks): + task_name = "t" + str(i) + next_task_name = "t" + str(i + 1) + transition = [ + {"when": "<% succeeded() %>", "do": next_task_name}, + {"when": "<% failed() %>", "do": next_task_name}, + ] + wf_def["tasks"][task_name] = {"action": "core.noop", "next": transition} + + wf_def["tasks"]["t%d" % num_tasks] = {"action": "core.noop"} + wf_def["tasks"]["t%d" % (num_tasks + 1)] = {"action": "core.noop"} + + spec = native_specs.WorkflowSpec(wf_def) + + t1 = datetime.datetime.utcnow() + self.assertDictEqual(spec.inspect(), {}) + t2 = datetime.datetime.utcnow() + + delta = t2 - t1 + self.assertLess(delta.seconds, 2) class WorkflowConductorWithItemsStressTest(test_base.WorkflowConductorWithItemsTest):