From 65f994afb9163544700f87178a348676a026ace0 Mon Sep 17 00:00:00 2001 From: Carlos Date: Tue, 24 Oct 2023 12:33:06 +0200 Subject: [PATCH 1/2] Add test unit for zero concurrency --- orquesta/conducting.py | 3 + .../test_workflow_conductor_with_items.py | 70 +++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/orquesta/conducting.py b/orquesta/conducting.py index 34fb3567..109525c6 100644 --- a/orquesta/conducting.py +++ b/orquesta/conducting.py @@ -629,6 +629,9 @@ def _evaluate_task_actions(self, task): active_items = list(filter(lambda x: x[1]["status"] in statuses.ACTIVE_STATUSES, all_items)) if task["concurrency"] is not None: + # Concurrency below 1 prevents scheduling of tasks. + if task["concurrency"] <= 0: + task["concurrency"] = 1 availability = task["concurrency"] - len(active_items) candidates = list(zip(*notrun_items[:availability])) task["actions"] = list(candidates[0]) if candidates and availability > 0 else [] diff --git a/orquesta/tests/unit/conducting/test_workflow_conductor_with_items.py b/orquesta/tests/unit/conducting/test_workflow_conductor_with_items.py index 4c2d2d9f..f1f389b7 100644 --- a/orquesta/tests/unit/conducting/test_workflow_conductor_with_items.py +++ b/orquesta/tests/unit/conducting/test_workflow_conductor_with_items.py @@ -391,6 +391,76 @@ def test_basic_items_list_with_concurrency(self): # Assert the workflow succeeded. self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED) + def test_basic_items_list_with_zero_concurrency(self): + wf_def = """ + version: 1.0 + + vars: + - concurrency: 0 + - xs: + - fee + - fi + - fo + - fum + + tasks: + task1: + with: + items: <% ctx(xs) %> + concurrency: <% ctx(concurrency) %> + action: core.echo message=<% item() %> + next: + - publish: + - items: <% result() %> + + output: + - items: <% ctx(items) %> + """ + + concurrency = 0 + + spec = native_specs.WorkflowSpec(wf_def) + self.assertDictEqual(spec.inspect(), {}) + + conductor = conducting.WorkflowConductor(spec) + conductor.request_workflow_status(statuses.RUNNING) + + # Mock the action execution for each item and assert expected task statuses. + task_route = 0 + task_name = "task1" + task_ctx = {"xs": ["fee", "fi", "fo", "fum"], "concurrency": 0} + + task_action_specs = [ + {"action": "core.echo", "input": {"message": "fee"}, "item_id": 0}, + {"action": "core.echo", "input": {"message": "fi"}, "item_id": 1}, + {"action": "core.echo", "input": {"message": "fo"}, "item_id": 2}, + {"action": "core.echo", "input": {"message": "fum"}, "item_id": 3}, + ] + + mock_ac_ex_statuses = [statuses.SUCCEEDED] * 4 + expected_task_statuses = [statuses.RUNNING] * 3 + [statuses.SUCCEEDED] + expected_workflow_statuses = [statuses.RUNNING] * 3 + [statuses.SUCCEEDED] + + self.assert_task_items( + conductor, + task_name, + task_route, + task_ctx, + task_ctx["xs"], + task_action_specs, + mock_ac_ex_statuses, + expected_task_statuses, + expected_workflow_statuses, + concurrency=concurrency, + ) + + # Assert the task is removed from staging. + self.assertIsNone(conductor.workflow_state.get_staged_task(task_name, task_route)) + + # Assert the workflow succeeded. + self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED) + + def test_multiple_items_list(self): wf_def = """ version: 1.0 From 5d7bda1abdad06748f716f9957e6e7608d6d2278 Mon Sep 17 00:00:00 2001 From: Carlos Date: Tue, 24 Oct 2023 12:39:45 +0200 Subject: [PATCH 2/2] Set expected concurrency count to 1. --- orquesta/conducting.py | 2 +- .../unit/conducting/test_workflow_conductor_with_items.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/orquesta/conducting.py b/orquesta/conducting.py index 49d711dc..3a096974 100644 --- a/orquesta/conducting.py +++ b/orquesta/conducting.py @@ -630,7 +630,7 @@ def _evaluate_task_actions(self, task): if task["concurrency"] is not None: # Concurrency below 1 prevents scheduling of tasks. if task["concurrency"] <= 0: - task["concurrency"] = 1 + task["concurrency"] = 1 availability = task["concurrency"] - len(active_items) candidates = list(zip(*notrun_items[:availability])) task["actions"] = list(candidates[0]) if candidates and availability > 0 else [] diff --git a/orquesta/tests/unit/conducting/test_workflow_conductor_with_items.py b/orquesta/tests/unit/conducting/test_workflow_conductor_with_items.py index f1f389b7..65b8883e 100644 --- a/orquesta/tests/unit/conducting/test_workflow_conductor_with_items.py +++ b/orquesta/tests/unit/conducting/test_workflow_conductor_with_items.py @@ -417,7 +417,9 @@ def test_basic_items_list_with_zero_concurrency(self): - items: <% ctx(items) %> """ - concurrency = 0 + # Set the concurrency to 1 since concurrency 0 is expected to be + # overridden in the Orquesta concurrency scheduling code. + concurrency = 1 spec = native_specs.WorkflowSpec(wf_def) self.assertDictEqual(spec.inspect(), {}) @@ -460,7 +462,6 @@ def test_basic_items_list_with_zero_concurrency(self): # Assert the workflow succeeded. self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED) - def test_multiple_items_list(self): wf_def = """ version: 1.0