diff --git a/orquesta/conducting.py b/orquesta/conducting.py index a0c07d1d..3a096974 100644 --- a/orquesta/conducting.py +++ b/orquesta/conducting.py @@ -628,6 +628,9 @@ def _evaluate_task_actions(self, task): active_items = list(filter(lambda x: x[1]["status"] in statuses.ACTIVE_STATUSES, all_items)) if task["concurrency"] is not None: + # Concurrency below 1 prevents scheduling of tasks. + if task["concurrency"] <= 0: + task["concurrency"] = 1 availability = task["concurrency"] - len(active_items) candidates = list(zip(*notrun_items[:availability])) task["actions"] = list(candidates[0]) if candidates and availability > 0 else [] diff --git a/orquesta/tests/unit/conducting/test_workflow_conductor_with_items.py b/orquesta/tests/unit/conducting/test_workflow_conductor_with_items.py index 4c2d2d9f..65b8883e 100644 --- a/orquesta/tests/unit/conducting/test_workflow_conductor_with_items.py +++ b/orquesta/tests/unit/conducting/test_workflow_conductor_with_items.py @@ -391,6 +391,77 @@ def test_basic_items_list_with_concurrency(self): # Assert the workflow succeeded. self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED) + def test_basic_items_list_with_zero_concurrency(self): + wf_def = """ + version: 1.0 + + vars: + - concurrency: 0 + - xs: + - fee + - fi + - fo + - fum + + tasks: + task1: + with: + items: <% ctx(xs) %> + concurrency: <% ctx(concurrency) %> + action: core.echo message=<% item() %> + next: + - publish: + - items: <% result() %> + + output: + - items: <% ctx(items) %> + """ + + # Set the concurrency to 1 since concurrency 0 is expected to be + # overridden in the Orquesta concurrency scheduling code. + concurrency = 1 + + spec = native_specs.WorkflowSpec(wf_def) + self.assertDictEqual(spec.inspect(), {}) + + conductor = conducting.WorkflowConductor(spec) + conductor.request_workflow_status(statuses.RUNNING) + + # Mock the action execution for each item and assert expected task statuses. + task_route = 0 + task_name = "task1" + task_ctx = {"xs": ["fee", "fi", "fo", "fum"], "concurrency": 0} + + task_action_specs = [ + {"action": "core.echo", "input": {"message": "fee"}, "item_id": 0}, + {"action": "core.echo", "input": {"message": "fi"}, "item_id": 1}, + {"action": "core.echo", "input": {"message": "fo"}, "item_id": 2}, + {"action": "core.echo", "input": {"message": "fum"}, "item_id": 3}, + ] + + mock_ac_ex_statuses = [statuses.SUCCEEDED] * 4 + expected_task_statuses = [statuses.RUNNING] * 3 + [statuses.SUCCEEDED] + expected_workflow_statuses = [statuses.RUNNING] * 3 + [statuses.SUCCEEDED] + + self.assert_task_items( + conductor, + task_name, + task_route, + task_ctx, + task_ctx["xs"], + task_action_specs, + mock_ac_ex_statuses, + expected_task_statuses, + expected_workflow_statuses, + concurrency=concurrency, + ) + + # Assert the task is removed from staging. + self.assertIsNone(conductor.workflow_state.get_staged_task(task_name, task_route)) + + # Assert the workflow succeeded. + self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED) + def test_multiple_items_list(self): wf_def = """ version: 1.0