diff --git a/elyra/pipeline/pipeline.py b/elyra/pipeline/pipeline.py index e583d993e..e428b6b5c 100644 --- a/elyra/pipeline/pipeline.py +++ b/elyra/pipeline/pipeline.py @@ -336,7 +336,7 @@ def gpu_vendor(self) -> Optional[str]: @property def parallel_count(self) -> Optional[str]: - return self._component_props.get("parallel_count") + return self._component_props.get("parallel_count", 1) def __eq__(self, other: GenericOperation) -> bool: if isinstance(self, other.__class__): diff --git a/elyra/templates/kubeflow/v1/python_dsl_template.jinja2 b/elyra/templates/kubeflow/v1/python_dsl_template.jinja2 index 1904d5f57..9ebce539c 100644 --- a/elyra/templates/kubeflow/v1/python_dsl_template.jinja2 +++ b/elyra/templates/kubeflow/v1/python_dsl_template.jinja2 @@ -34,10 +34,12 @@ def generated_pipeline( {% set task_name = "task_" + workflow_task.escaped_task_id %} # Task for node '{{ workflow_task.name }}' {% set parallel_indent = 0 %} -{% if workflow_task.task_modifiers.parallel_count > 1 %} +{% if 'parallel_count' in workflow_task.task_modifiers and workflow_task.task_modifiers.parallel_count is not none %} +{% if workflow_task.task_modifiers.parallel_count > 1 %} {% set parallel_indent = 4 %} parallel_count = {{workflow_task.task_modifiers.parallel_count}} with kfp.dsl.ParallelFor(list(range(parallel_count))) as rank: +{% endif %} {% endif %} {% filter indent(width=parallel_indent) %} @@ -81,9 +83,11 @@ def generated_pipeline( {% for env_var_name, env_var_value in workflow_task.task_modifiers.env_variables.items() %} {{ task_name }}.add_env_variable(V1EnvVar(name="{{ env_var_name }}", value="{{ env_var_value | string_delimiter_safe }}")) {% endfor %} -{% if workflow_task.task_modifiers.parallel_count > 1 %} +{% if 'parallel_count' in workflow_task.task_modifiers and workflow_task.task_modifiers.parallel_count is not none %} +{% if workflow_task.task_modifiers.parallel_count > 1 %} {{ task_name }}.add_env_variable(V1EnvVar(name="NRANKS", value=str(parallel_count))) {{ task_name }}.add_env_variable(V1EnvVar(name="RANK", value=str(rank))) +{% endif %} {% endif %} {% if workflow_engine == "argo" %} {{ task_name }}.add_env_variable(V1EnvVar( diff --git a/elyra/tests/pipeline/kfp/test_processor_kfp.py b/elyra/tests/pipeline/kfp/test_processor_kfp.py index 0750a15a2..cdf0598b8 100644 --- a/elyra/tests/pipeline/kfp/test_processor_kfp.py +++ b/elyra/tests/pipeline/kfp/test_processor_kfp.py @@ -735,7 +735,7 @@ def test_generate_pipeline_dsl_compile_pipeline_dsl_one_generic_node_pipeline_te # Verify component definition information (see generic_component_definition_template.jinja2) # - property 'name' - assert node_template["name"] == "run-a-file" + assert node_template["name"] == sanitize_label_value(op.name) # - property 'implementation.container.command' assert node_template["container"]["command"] == ["sh", "-c"] # - property 'implementation.container.args' @@ -1416,11 +1416,9 @@ def test_generate_pipeline_dsl_compile_pipeline_dsl_generic_components_data_exch assert len(compiled_spec["spec"]["templates"]) >= 3 template_specs = {} for node_template in compiled_spec["spec"]["templates"]: - if node_template["name"] == compiled_spec["spec"]["entrypoint"] or not node_template["name"].startswith( - "run-a-file" - ): + if node_template["name"] == compiled_spec["spec"]["entrypoint"]: continue - template_specs[node_template["name"]] = node_template + template_specs[sanitize_label_value(node_template["name"])] = node_template # Iterate through sorted operations and verify that their inputs # and outputs are properly represented in their respective template @@ -1430,10 +1428,8 @@ def test_generate_pipeline_dsl_compile_pipeline_dsl_generic_components_data_exch if not op.is_generic: # ignore custom nodes continue - if template_index == 1: - template_name = "run-a-file" - else: - template_name = f"run-a-file-{template_index}" + template_name = sanitize_label_value(op.name) + template_name = template_name.replace("_", "-") # kubernetes does this replace template_index = template_index + 1 # compare outputs if len(op.outputs) > 0: