From 2eefa5a9e6b482a1b388238f6f61fcbac429231c Mon Sep 17 00:00:00 2001 From: miaozhipeng Date: Mon, 24 Apr 2023 17:24:52 +0800 Subject: [PATCH 01/15] fix spark config in role does not take effect Signed-off-by: miaozhipeng --- python/fate_flow/controller/job_controller.py | 2 +- python/fate_flow/scheduler/dsl_parser.py | 5 ++--- python/fate_flow/utils/runtime_conf_parse_util.py | 5 +++-- python/fate_flow/worker/task_executor.py | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/python/fate_flow/controller/job_controller.py b/python/fate_flow/controller/job_controller.py index c43610760..630f7f182 100644 --- a/python/fate_flow/controller/job_controller.py +++ b/python/fate_flow/controller/job_controller.py @@ -57,7 +57,7 @@ def create_job(cls, job_id, role, party_id, job_info): runtime_conf=runtime_conf, train_runtime_conf=train_runtime_conf ) - job_parameters = dsl_parser.get_job_parameters(runtime_conf) + job_parameters = dsl_parser.get_job_parameters(runtime_conf, int(runtime_conf.get("dsl_version", "1"))) schedule_logger(job_id).info('job parameters:{}'.format(job_parameters)) dest_user = job_parameters.get(role, {}).get(party_id, {}).get('user', '') user = {} diff --git a/python/fate_flow/scheduler/dsl_parser.py b/python/fate_flow/scheduler/dsl_parser.py index 8385196c4..a4d404a5c 100644 --- a/python/fate_flow/scheduler/dsl_parser.py +++ b/python/fate_flow/scheduler/dsl_parser.py @@ -968,10 +968,9 @@ def __init__(self): self.version = 1 @staticmethod - def get_job_parameters(runtime_conf): + def get_job_parameters(runtime_conf, conf_version=1): job_parameters = RuntimeConfParserUtil.get_job_parameters(runtime_conf, - conf_version=1) - + conf_version) return job_parameters @staticmethod diff --git a/python/fate_flow/utils/runtime_conf_parse_util.py b/python/fate_flow/utils/runtime_conf_parse_util.py index cc0242b9b..320d9a30f 100644 --- a/python/fate_flow/utils/runtime_conf_parse_util.py +++ b/python/fate_flow/utils/runtime_conf_parse_util.py @@ -539,9 +539,10 @@ def get_input_parameters(cls, submit_dict, components=None): def get_job_parameters(submit_dict): ret = {} job_parameters = submit_dict.get("job_parameters", {}) + component_parameters = submit_dict.get("component_parameters", {}) common_job_parameters = job_parameters.get("common", {}) - role_job_parameters = job_parameters.get("role", {}) - for role in submit_dict["role"]: + role_job_parameters = component_parameters.get("role", {}) + for role in component_parameters["role"]: party_id_list = submit_dict["role"][role] if not role_job_parameters: ret[role] = {party_id: copy.deepcopy(common_job_parameters) for party_id in party_id_list} diff --git a/python/fate_flow/worker/task_executor.py b/python/fate_flow/worker/task_executor.py index c2e4f01f7..777e4a605 100644 --- a/python/fate_flow/worker/task_executor.py +++ b/python/fate_flow/worker/task_executor.py @@ -77,7 +77,7 @@ def _run_(self): train_runtime_conf=job_configuration.train_runtime_conf, pipeline_dsl=None) - job_parameters = dsl_parser.get_job_parameters(job_configuration.runtime_conf) + job_parameters = dsl_parser.get_job_parameters(job_configuration.runtime_conf, int(job_configuration.runtime_conf.get("dsl_version", "1"))) user_name = job_parameters.get(args.role, {}).get(args.party_id, {}).get("user", '') LOGGER.info(f"user name:{user_name}") task_parameters = RunParameters(**task_parameters_conf) From d657923dc2afaa67a7735a803e9b14efbf4ec8b2 Mon Sep 17 00:00:00 2001 From: zhihuiwan <15779896112@163.com> Date: Mon, 22 May 2023 18:03:26 +0800 Subject: [PATCH 02/15] init deepspeed task Signed-off-by: zhihuiwan <15779896112@163.com> --- conf/job_default_config.yaml | 4 +- python/fate_flow/controller/engine_adapt.py | 25 ++-- .../controller/engine_controller/deepspeed.py | 111 ++++++++++++++++++ .../fate_flow/controller/task_controller.py | 24 +++- python/fate_flow/db/db_models.py | 3 + python/fate_flow/db/job_default_config.py | 2 + python/fate_flow/detection/detector.py | 29 +++++ python/fate_flow/entity/_run_parameters.py | 24 ++++ python/fate_flow/entity/types.py | 5 + python/fate_flow/scheduler/dag_scheduler.py | 2 + python/fate_flow/worker/base_worker.py | 2 + python/fate_flow/worker/task_base_worker.py | 19 +-- python/fate_flow/worker/task_executor.py | 16 ++- 13 files changed, 237 insertions(+), 29 deletions(-) create mode 100644 python/fate_flow/controller/engine_controller/deepspeed.py diff --git a/conf/job_default_config.yaml b/conf/job_default_config.yaml index 94ce9fe05..ae2491d54 100644 --- a/conf/job_default_config.yaml +++ b/conf/job_default_config.yaml @@ -26,4 +26,6 @@ detect_connect_long_retry_count: 2 upload_block_max_bytes: 104857600 # bytes #component output -output_data_summary_count_limit: 100 \ No newline at end of file +output_data_summary_count_limit: 100 + +task_world_size: 2 \ No newline at end of file diff --git a/python/fate_flow/controller/engine_adapt.py b/python/fate_flow/controller/engine_adapt.py index 6787cefb2..b048f1247 100644 --- a/python/fate_flow/controller/engine_adapt.py +++ b/python/fate_flow/controller/engine_adapt.py @@ -14,20 +14,25 @@ # limitations under the License. # from fate_arch.computing import ComputingEngine +from fate_flow.controller.engine_controller.deepspeed import EggrollDeepspeedEngine from fate_flow.controller.engine_controller.eggroll import EggrollEngine from fate_flow.controller.engine_controller.linkis_spark import LinkisSparkEngine from fate_flow.controller.engine_controller.spark import SparkEngine -def build_engine(computing_engine): - if not computing_engine: - return None - if computing_engine in {ComputingEngine.EGGROLL, ComputingEngine.STANDALONE}: - engine_session = EggrollEngine() - elif computing_engine == ComputingEngine.SPARK: - engine_session = SparkEngine() - elif computing_engine == ComputingEngine.LINKIS_SPARK: - engine_session = LinkisSparkEngine() +def build_engine(computing_engine, is_deepspeed=False): + if not is_deepspeed: + if computing_engine in {ComputingEngine.EGGROLL, ComputingEngine.STANDALONE}: + engine_session = EggrollEngine() + elif computing_engine == ComputingEngine.SPARK: + engine_session = SparkEngine() + elif computing_engine == ComputingEngine.LINKIS_SPARK: + engine_session = LinkisSparkEngine() + else: + raise ValueError(f"{computing_engine} is not supported") else: - raise ValueError(f"{computing_engine} is not supported") + if computing_engine in {ComputingEngine.EGGROLL, ComputingEngine.STANDALONE}: + engine_session = EggrollDeepspeedEngine() + else: + raise ValueError(f"deepspeed {computing_engine} is not supported") return engine_session diff --git a/python/fate_flow/controller/engine_controller/deepspeed.py b/python/fate_flow/controller/engine_controller/deepspeed.py new file mode 100644 index 000000000..a92e92059 --- /dev/null +++ b/python/fate_flow/controller/engine_controller/deepspeed.py @@ -0,0 +1,111 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import sys +from abc import ABC + +from fate_flow.controller.engine_controller.engine import EngineABC +from fate_flow.db.db_models import Task +from fate_flow.db.job_default_config import JobDefaultConfig +from fate_flow.entity.run_status import BaseStatus, TaskStatus +from fate_flow.entity.types import WorkerName +from fate_flow.manager.worker_manager import WorkerManager +from fate_flow.utils.log_utils import detect_logger +from fate_flow.worker.task_executor import TaskExecutor + + +class StatusSet(BaseStatus): + NEW = "NEW" + NEW_TIMEOUT = "NEW_TIMEOUT" + ACTIVE = "ACTIVE" + CLOSED = "CLOSED" + KILLED = "KILLED" + ERROR = "ERROR" + FINISHED = "FINISHED" + + +class EndStatus(BaseStatus): + NEW_TIMEOUT = StatusSet.NEW_TIMEOUT + CLOSED = StatusSet.CLOSED + FAILED = StatusSet.KILLED + ERROR = StatusSet.ERROR + FINISHED = StatusSet.FINISHED + + +class EggrollDeepspeedEngine(EngineABC, ABC): + def run(self, task: Task, run_parameters, run_parameters_path, config_dir, log_dir, cwd_dir, **kwargs): + from eggroll.deepspeed.submit import client + worker_id, config_dir, log_dir = WorkerManager.get_process_dirs( + worker_name=WorkerName.TASK_EXECUTOR, + job_id=task.f_job_id, + role=task.f_role, + party_id=task.f_party_id, + task=task + ) + config = run_parameters.to_dict() + session_id, _, command_arguments = WorkerManager.generate_common_cmd(task, config_dir, config, + log_dir, worker_id) + command_arguments.extend(["--is_deepspeed", True]) + environment_variables = {} + files = {} + options = { + "eggroll.container.deepspeed.script.path": sys.modules[TaskExecutor.__module__].__file__ + } + task_conf = run_parameters.role_parameter("task_conf", role=task.f_role, party_id=task.f_party_id) + world_size = task_conf.get(task.f_component_name).get("world_size", JobDefaultConfig.task_world_size) + resource_options = {"timeout_seconds": 0, "resource_exhausted_strategy": "waiting"} + + client = client.DeepspeedJob() + result = client.submit( + world_size=world_size, + command_arguments=command_arguments, + environment_variables=environment_variables, + files=files, + resource_options=resource_options, + options=options) + return {"run_pid": "", "worker_id": worker_id, "cmd": [], "deepspeed_id": result.session_id} + + def kill(self, task): + if task.f_deepspeed_id: + from eggroll.deepspeed.submit import client + client = client.DeepspeedJob(task.f_deepspeed_id) + return client.kill() + + @staticmethod + def _query_status(task): + if task.f_deepspeed_id: + from eggroll.deepspeed.submit import client + client = client.DeepspeedJob(task.f_deepspeed_id) + return client.query_status().status + return StatusSet.NEW + + def query_task_status(self, task): + status = self._query_status(task) + if status in EndStatus.status_list(): + if status in [EndStatus.FINISHED]: + return TaskStatus.SUCCESS + else: + return TaskStatus.FAILED + + def is_alive(self, task: Task): + status = self._query_status(task) + detect_logger(task.f_job_id).info(f"task {task.f_task_id} {task.f_task_version} deepspeed status {status}") + if status in StatusSet.status_list(): + if status in EndStatus.status_list(): + return False + else: + return True + else: + raise RuntimeError(f"task run status: {status}") diff --git a/python/fate_flow/controller/task_controller.py b/python/fate_flow/controller/task_controller.py index 3533ccb4c..a91709025 100644 --- a/python/fate_flow/controller/task_controller.py +++ b/python/fate_flow/controller/task_controller.py @@ -29,7 +29,7 @@ from fate_flow.manager.resource_manager import ResourceManager from fate_flow.operation.job_tracker import Tracker from fate_flow.manager.worker_manager import WorkerManager -from fate_flow.entity.types import TaskCleanResourceType +from fate_flow.entity.types import TaskCleanResourceType, TaskLauncher class TaskController(object): @@ -47,7 +47,9 @@ def create_task(cls, role, party_id, run_on_this_party, task_info): task_info["task_id"] = job_utils.generate_task_id(job_id=task_info["job_id"], component_name=task_info["component_name"]) if task_info.get("task_version") is None: task_info["task_version"] = 0 - + run_parameters_dict = job_utils.get_job_parameters(task_info.get("job_id"), role, party_id) + run_parameters = RunParameters(**run_parameters_dict) + task_info.update({"is_deepspeed": cls.is_deepspeed(run_parameters, role, party_id, task_info["component_name"])}) task = JobSaver.create_task(task_info=task_info) @classmethod @@ -62,7 +64,6 @@ def start_task(cls, job_id, component_name, task_id, task_version, role, party_i :param party_id: :return: """ - job_dsl = job_utils.get_job_dsl(job_id, role, party_id) schedule_logger(job_id).info( f"try to start task {task_id} {task_version} on {role} {party_id} executor subprocess") task_executor_process_start_status = False @@ -89,7 +90,9 @@ def start_task(cls, job_id, component_name, task_id, task_version, role, party_i schedule_logger(job_id).info(f"use computing engine {run_parameters.computing_engine}") task_info["engine_conf"] = {"computing_engine": run_parameters.computing_engine} - backend_engine = build_engine(run_parameters.computing_engine) + backend_engine = build_engine( + run_parameters.computing_engine, + task.f_is_deepspeed) run_info = backend_engine.run(task=task, run_parameters=run_parameters, run_parameters_path=run_parameters_path, @@ -194,7 +197,10 @@ def kill_task(cls, task: Task): kill_status = False try: # kill task executor - backend_engine = build_engine(task.f_engine_conf.get("computing_engine")) + backend_engine = build_engine( + task.f_engine_conf.get("computing_engine"), + task.f_is_deepspeed + ) if backend_engine: backend_engine.kill(task) WorkerManager.kill_task_all_workers(task) @@ -231,3 +237,11 @@ def clean_task(cls, job_id, task_id, task_version, role, party_id, content_type: else: return False + @staticmethod + def is_deepspeed(run_parameters, role, party_id, component_name): + task_conf = run_parameters.role_parameter("task_conf", role=role, party_id=party_id) + if task_conf.get(component_name, {}).get("launcher") == TaskLauncher.DEEPSPEED.value and role != "arbiter": + return True + else: + return False + diff --git a/python/fate_flow/db/db_models.py b/python/fate_flow/db/db_models.py index 9c2c3f62b..d2ccbb4bb 100644 --- a/python/fate_flow/db/db_models.py +++ b/python/fate_flow/db/db_models.py @@ -249,6 +249,9 @@ class Task(DataBaseModel): f_kill_status = BooleanField(default=False) f_error_report = TextField(default="") + f_is_deepspeed = BooleanField(default=False) + f_deepspeed_id = CharField(max_length=200, null=True) + f_start_time = BigIntegerField(null=True) f_start_date = DateTimeField(null=True) f_end_time = BigIntegerField(null=True) diff --git a/python/fate_flow/db/job_default_config.py b/python/fate_flow/db/job_default_config.py index e1fe1c5d6..7e781f4b5 100644 --- a/python/fate_flow/db/job_default_config.py +++ b/python/fate_flow/db/job_default_config.py @@ -48,6 +48,8 @@ class JobDefaultConfig(ReloadConfigBase): # component output output_data_summary_count_limit = None + task_world_size = None + @classmethod def load(cls): conf = file_utils.load_yaml_conf(FATE_FLOW_JOB_DEFAULT_CONFIG_PATH) diff --git a/python/fate_flow/detection/detector.py b/python/fate_flow/detection/detector.py index ac34ae88a..f3cb8c8da 100644 --- a/python/fate_flow/detection/detector.py +++ b/python/fate_flow/detection/detector.py @@ -49,6 +49,7 @@ def run_do(self): self.detect_resource_record() self.detect_expired_session() self.detect_dependence_upload_record() + self.detect_deepspeed_task() @classmethod def detect_running_task(cls): @@ -94,6 +95,34 @@ def detect_running_task(cls): finally: detect_logger().info(f"finish detect {count} running task") + @classmethod + def detect_deepspeed_task(cls): + detect_logger().info('start to detect deepspeed running task..') + running_tasks = JobSaver.query_task(party_status=TaskStatus.RUNNING, is_deepspeed=True) + for task in running_tasks: + cls.detect_deepspeed_task_status(task) + detect_logger().info(f'finish detect deepspeed running task {running_tasks}') + + @staticmethod + def detect_deepspeed_task_status(task): + try: + deepspeed_engine = build_engine(task.f_engine_conf.get("computing_engine"), task.f_is_deepspeed) + # query or update + if not deepspeed_engine.is_alive(): + # update task status to end status + status = deepspeed_engine.query_task_status(task) + task_info = { + "job_id": task.f_job_id, + "task_id": task.f_task_id, + "task_version": task.f_task_version, + "role": task.f_role, + "party_id": task.f_party_id, + "party_status": status + } + TaskController.update_task(task_info) + except Exception as e: + detect_logger(task.f_job_id).exception(e) + @classmethod def detect_end_task(cls): detect_logger().info('start to detect end status task..') diff --git a/python/fate_flow/entity/_run_parameters.py b/python/fate_flow/entity/_run_parameters.py index 2d3ac74d1..a3bcfe7a9 100644 --- a/python/fate_flow/entity/_run_parameters.py +++ b/python/fate_flow/entity/_run_parameters.py @@ -13,7 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from copy import deepcopy + from ._base import BaseEntity +from ..utils.runtime_conf_parse_util import RuntimeConfParserUtil class RunParameters(BaseEntity): @@ -45,10 +48,31 @@ def __init__(self, **kwargs): self.assistant_role = None self.map_table_name = None self.map_namespace = None + self.task_conf = {} + self.roles = {} + self.role_parameters = {} for k, v in kwargs.items(): if hasattr(self, k): setattr(self, k, v) + def role_parameter(self, parameter_name, role, party_id): + if role == "local" or int(party_id) == 0: + return {} + if not self.roles: + # compatible with previous versions + return getattr(self, parameter_name) + index = [str(_p) for _p in self.roles.get(role)].index(str(party_id)) + _conf = deepcopy(getattr(self, parameter_name)) + _role = self.role_parameters.get(role, {}).get(str(index), {}).get(parameter_name, {}) + if isinstance(_conf, dict): + # dict + _conf = RuntimeConfParserUtil.merge_dict(_conf, _role) + else: + # int, str, etc. + _conf = _role if _role else _conf + return _conf + + def to_dict(self): d = {} for k, v in self.__dict__.items(): diff --git a/python/fate_flow/entity/types.py b/python/fate_flow/entity/types.py index 20fc6f5ab..7797062c5 100644 --- a/python/fate_flow/entity/types.py +++ b/python/fate_flow/entity/types.py @@ -148,3 +148,8 @@ class TaskCleanResourceType(CustomEnum): class ExternalStorage(CustomEnum): MYSQL = "MYSQL" + + +class TaskLauncher(CustomEnum): + DEFAULT = "default" + DEEPSPEED = "deepspeed" diff --git a/python/fate_flow/scheduler/dag_scheduler.py b/python/fate_flow/scheduler/dag_scheduler.py index 3bb796e84..2947d3713 100644 --- a/python/fate_flow/scheduler/dag_scheduler.py +++ b/python/fate_flow/scheduler/dag_scheduler.py @@ -56,6 +56,8 @@ def submit(cls, submit_job_conf: JobConfigurationBase, job_id: str = None): job_initiator = runtime_conf["initiator"] conf_adapter = JobRuntimeConfigAdapter(runtime_conf) common_job_parameters = conf_adapter.get_common_parameters() + common_job_parameters.roles = runtime_conf["role"] + common_job_parameters.role_parameters = runtime_conf.get("job_parameters", {}).get("role", {}) if common_job_parameters.job_type != "predict": # generate job model info diff --git a/python/fate_flow/worker/base_worker.py b/python/fate_flow/worker/base_worker.py index 8d3f9a7ae..488e60364 100644 --- a/python/fate_flow/worker/base_worker.py +++ b/python/fate_flow/worker/base_worker.py @@ -70,6 +70,8 @@ def __init__(self, **kwargs): # Dependence Upload self.dependence_type = kwargs.get("dependence_type") + self.is_deepspeed = kwargs.get("is_deepspeed") + @staticmethod def load_dict_attr(kwargs: dict, attr_name: str): return load_json_conf(kwargs[attr_name]) if kwargs.get(attr_name) else {} diff --git a/python/fate_flow/worker/task_base_worker.py b/python/fate_flow/worker/task_base_worker.py index 5a264aef3..b5042007b 100644 --- a/python/fate_flow/worker/task_base_worker.py +++ b/python/fate_flow/worker/task_base_worker.py @@ -110,12 +110,13 @@ def _handle_exception(self): self.report_task_info_to_driver() def report_task_info_to_driver(self): - LOGGER.info("report {} {} {} {} {} to driver:\n{}".format( - self.__class__.__name__, - self.report_info["task_id"], - self.report_info["task_version"], - self.report_info["role"], - self.report_info["party_id"], - self.report_info - )) - ControllerClient.report_task(self.report_info) + if not self.args.is_deepspeed: + LOGGER.info("report {} {} {} {} {} to driver:\n{}".format( + self.__class__.__name__, + self.report_info["task_id"], + self.report_info["task_version"], + self.report_info["role"], + self.report_info["party_id"], + self.report_info + )) + ControllerClient.report_task(self.report_info) diff --git a/python/fate_flow/worker/task_executor.py b/python/fate_flow/worker/task_executor.py index c2e4f01f7..e4a461545 100644 --- a/python/fate_flow/worker/task_executor.py +++ b/python/fate_flow/worker/task_executor.py @@ -151,9 +151,10 @@ def _run_(self): if set(roles) == {"local"}: LOGGER.info(f"only local roles, pass init federation") else: - sess.init_federation(federation_session_id=args.federation_session_id, - runtime_conf=component_parameters_on_party, - service_conf=job_parameters.engines_address.get(EngineType.FEDERATION, {})) + if self.is_master: + sess.init_federation(federation_session_id=args.federation_session_id, + runtime_conf=component_parameters_on_party, + service_conf=job_parameters.engines_address.get(EngineType.FEDERATION, {})) LOGGER.info(f'run {args.component_name} {args.task_id} {args.task_version} on {args.role} {args.party_id} task') LOGGER.info(f"component parameters on party:\n{json_dumps(component_parameters_on_party, indent=4)}") LOGGER.info(f"task input dsl {task_input_dsl}") @@ -227,7 +228,7 @@ def _run_(self): output_table_list.append({"namespace": persistent_table_namespace, "name": persistent_table_name}) self.log_output_data_table_tracker(args.job_id, input_table_list, output_table_list) - if cpn_output.model: + if cpn_output.model and self.is_master: getattr( tracker_client if predict_tracker_client is None else predict_tracker_client, 'save_component_output_model', @@ -283,6 +284,13 @@ def _run_(self): print(msg) return self.report_info + @property + def is_master(self): + # deepspeed rank 0 + if not os.getenv("GLOBAL_RANK"): + return True + return int(os.getenv("GLOBAL_RANK")) == 0 + @classmethod def log_output_data_table_tracker(cls, job_id, input_table_list, output_table_list): try: From d18bb77d02bb3af28753a6eca130b36e2dc0d1f9 Mon Sep 17 00:00:00 2001 From: zhihuiwan <15779896112@163.com> Date: Mon, 22 May 2023 18:03:34 +0800 Subject: [PATCH 03/15] init deepspeed task Signed-off-by: zhihuiwan <15779896112@163.com> --- python/fate_flow/manager/worker_manager.py | 39 +++++++++++----------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/python/fate_flow/manager/worker_manager.py b/python/fate_flow/manager/worker_manager.py index f22221607..ab06b0fa9 100644 --- a/python/fate_flow/manager/worker_manager.py +++ b/python/fate_flow/manager/worker_manager.py @@ -154,12 +154,6 @@ def start_task_worker(cls, worker_name, task: Task, task_parameters: RunParamete role=task.f_role, party_id=task.f_party_id, task=task) - - session_id = job_utils.generate_session_id(task.f_task_id, task.f_task_version, task.f_role, task.f_party_id) - federation_session_id = job_utils.generate_task_version_id(task.f_task_id, task.f_task_version) - - info_kwargs = {} - specific_cmd = [] if worker_name is WorkerName.TASK_EXECUTOR: from fate_flow.worker.task_executor import TaskExecutor module_file_path = sys.modules[TaskExecutor.__module__].__file__ @@ -171,15 +165,30 @@ def start_task_worker(cls, worker_name, task: Task, task_parameters: RunParamete config = task_parameters.to_dict() config["src_user"] = kwargs.get("src_user") - config_path, result_path = cls.get_config(config_dir=config_dir, config=config, log_dir=log_dir) env = cls.get_env(task.f_job_id, task.f_provider_info) if executable: process_cmd = executable else: process_cmd = [env.get("PYTHON_ENV") or sys.executable or "python3"] + common_cmd = [module_file_path] + common_cmd.extend(cls.generate_common_cmd(task, config_dir, config, log_dir, worker_id)[2]) + process_cmd.extend(common_cmd) + if extra_env: + env.update(extra_env) + schedule_logger(task.f_job_id).info( + f"task {task.f_task_id} {task.f_task_version} on {task.f_role} {task.f_party_id} {worker_name} worker subprocess is ready") + p = process_utils.run_subprocess(job_id=task.f_job_id, config_dir=config_dir, process_cmd=process_cmd, + added_env=env, log_dir=log_dir, cwd_dir=config_dir, process_name=worker_name.value, + process_id=worker_id) + cls.save_worker_info(task=task, worker_name=worker_name, worker_id=worker_id, run_ip=RuntimeConfig.JOB_SERVER_HOST, run_pid=p.pid, config=config, cmd=process_cmd) + return {"run_pid": p.pid, "worker_id": worker_id, "cmd": process_cmd} - common_cmd = [ - module_file_path, + @classmethod + def generate_common_cmd(cls, task, config_dir, config, log_dir, worker_id): + session_id = job_utils.generate_session_id(task.f_task_id, task.f_task_version, task.f_role, task.f_party_id) + federation_session_id = job_utils.generate_task_version_id(task.f_task_id, task.f_task_version) + config_path, result_path = cls.get_config(config_dir=config_dir, config=config, log_dir=log_dir) + cmd = [ "--job_id", task.f_job_id, "--component_name", task.f_component_name, "--task_id", task.f_task_id, @@ -197,17 +206,7 @@ def start_task_worker(cls, worker_name, task: Task, task_parameters: RunParamete "--session_id", session_id, "--federation_session_id", federation_session_id ] - process_cmd.extend(common_cmd) - process_cmd.extend(specific_cmd) - if extra_env: - env.update(extra_env) - schedule_logger(task.f_job_id).info( - f"task {task.f_task_id} {task.f_task_version} on {task.f_role} {task.f_party_id} {worker_name} worker subprocess is ready") - p = process_utils.run_subprocess(job_id=task.f_job_id, config_dir=config_dir, process_cmd=process_cmd, - added_env=env, log_dir=log_dir, cwd_dir=config_dir, process_name=worker_name.value, - process_id=worker_id) - cls.save_worker_info(task=task, worker_name=worker_name, worker_id=worker_id, run_ip=RuntimeConfig.JOB_SERVER_HOST, run_pid=p.pid, config=config, cmd=process_cmd, **info_kwargs) - return {"run_pid": p.pid, "worker_id": worker_id, "cmd": process_cmd} + return session_id, federation_session_id, cmd @classmethod def get_process_dirs(cls, worker_name: WorkerName, job_id=None, role=None, party_id=None, task: Task = None): From 92eef0eea190089a4f90b7e386c90c405b726157 Mon Sep 17 00:00:00 2001 From: zhihuiwan <15779896112@163.com> Date: Tue, 23 May 2023 17:49:34 +0800 Subject: [PATCH 04/15] fix deepspeed bug Signed-off-by: zhihuiwan <15779896112@163.com> --- python/fate_flow/apps/worker_app.py | 32 +++++++++++++ .../controller/engine_controller/deepspeed.py | 47 +++++++++++++++++-- python/fate_flow/detection/detector.py | 6 ++- python/fate_flow/settings.py | 2 + python/fate_flow/worker/base_worker.py | 21 ++++++++- python/fate_flow/worker/task_executor.py | 4 +- 6 files changed, 101 insertions(+), 11 deletions(-) create mode 100644 python/fate_flow/apps/worker_app.py diff --git a/python/fate_flow/apps/worker_app.py b/python/fate_flow/apps/worker_app.py new file mode 100644 index 000000000..adecd4565 --- /dev/null +++ b/python/fate_flow/apps/worker_app.py @@ -0,0 +1,32 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os.path + +from flask import request + +from fate_arch.common.file_utils import load_json_conf +from fate_flow.utils.api_utils import get_json_result + +page_name = "worker" + + +@manager.route('/config/load', methods=['POST']) +def load_config(): + conf_path = request.json.get('config_path') + data = {} + if os.path.exists(conf_path): + data = load_json_conf(conf_path) + return get_json_result(data=data) diff --git a/python/fate_flow/controller/engine_controller/deepspeed.py b/python/fate_flow/controller/engine_controller/deepspeed.py index a92e92059..caf02aff4 100644 --- a/python/fate_flow/controller/engine_controller/deepspeed.py +++ b/python/fate_flow/controller/engine_controller/deepspeed.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import os import sys from abc import ABC @@ -22,7 +23,8 @@ from fate_flow.entity.run_status import BaseStatus, TaskStatus from fate_flow.entity.types import WorkerName from fate_flow.manager.worker_manager import WorkerManager -from fate_flow.utils.log_utils import detect_logger +from fate_flow.utils import log_utils +from fate_flow.utils.log_utils import detect_logger, schedule_logger from fate_flow.worker.task_executor import TaskExecutor @@ -58,6 +60,7 @@ def run(self, task: Task, run_parameters, run_parameters_path, config_dir, log_d session_id, _, command_arguments = WorkerManager.generate_common_cmd(task, config_dir, config, log_dir, worker_id) command_arguments.extend(["--is_deepspeed", True]) + cmd = [str(_c) for _c in command_arguments] environment_variables = {} files = {} options = { @@ -65,17 +68,18 @@ def run(self, task: Task, run_parameters, run_parameters_path, config_dir, log_d } task_conf = run_parameters.role_parameter("task_conf", role=task.f_role, party_id=task.f_party_id) world_size = task_conf.get(task.f_component_name).get("world_size", JobDefaultConfig.task_world_size) - resource_options = {"timeout_seconds": 0, "resource_exhausted_strategy": "waiting"} - + resource_options = {"timeout_seconds": 3000, "resource_exhausted_strategy": "waiting"} + schedule_logger(task.f_job_id).info(f"start submit deepspeed task") + schedule_logger(task.f_job_id).info(f"cmd: {cmd}") client = client.DeepspeedJob() result = client.submit( world_size=world_size, - command_arguments=command_arguments, + command_arguments=cmd, environment_variables=environment_variables, files=files, resource_options=resource_options, options=options) - return {"run_pid": "", "worker_id": worker_id, "cmd": [], "deepspeed_id": result.session_id} + return {"worker_id": worker_id, "cmd": cmd, "deepspeed_id": result.session_id} def kill(self, task): if task.f_deepspeed_id: @@ -91,6 +95,17 @@ def _query_status(task): return client.query_status().status return StatusSet.NEW + @staticmethod + def _download_job(task): + if task.f_deepspeed_id: + from eggroll.deepspeed.submit import client + client = client.DeepspeedJob(task.f_deepspeed_id) + dir_name = os.path.join(log_utils.get_logger_base_dir(), task.f_job_id, task.f_role, task.f_party_id, task.f_component_name) + os.makedirs(dir_name, exist_ok=True) + path = lambda rank: f"{dir_name}/{rank}.zip" + client.download_job_to(rank_to_path=path) + return dir_name + def query_task_status(self, task): status = self._query_status(task) if status in EndStatus.status_list(): @@ -109,3 +124,25 @@ def is_alive(self, task: Task): return True else: raise RuntimeError(f"task run status: {status}") + + def download(self, task): + dir_name = self._download_job(task) + if dir_name: + for file in os.listdir(dir_name): + if file.endswith(".zip"): + rank_dir = os.path.join(dir_name, file.split(".zip")[0]) + os.makedirs(rank_dir, exist_ok=True) + self.unzip(os.path.join(dir_name, file), extra_dir=rank_dir) + os.remove(os.path.join(dir_name, file)) + + @staticmethod + def unzip(zip_path, extra_dir): + import zipfile + zfile = zipfile.ZipFile(zip_path, "r") + for name in zfile.namelist(): + dir_name = os.path.dirname(zip_path) + file_path = os.path.join(dir_name, extra_dir, name) + os.makedirs(os.path.dirname(file_path), exist_ok=True) + data = zfile.read(name) + with open(file_path, "w+b") as file: + file.write(data) diff --git a/python/fate_flow/detection/detector.py b/python/fate_flow/detection/detector.py index f3cb8c8da..897275105 100644 --- a/python/fate_flow/detection/detector.py +++ b/python/fate_flow/detection/detector.py @@ -108,9 +108,10 @@ def detect_deepspeed_task_status(task): try: deepspeed_engine = build_engine(task.f_engine_conf.get("computing_engine"), task.f_is_deepspeed) # query or update - if not deepspeed_engine.is_alive(): + if not deepspeed_engine.is_alive(task): # update task status to end status status = deepspeed_engine.query_task_status(task) + detect_logger(task.f_job_id).info(f"task status status: {status}") task_info = { "job_id": task.f_job_id, "task_id": task.f_task_id, @@ -119,7 +120,8 @@ def detect_deepspeed_task_status(task): "party_id": task.f_party_id, "party_status": status } - TaskController.update_task(task_info) + TaskController.update_task_status(task_info) + deepspeed_engine.download(task) except Exception as e: detect_logger(task.f_job_id).exception(e) diff --git a/python/fate_flow/settings.py b/python/fate_flow/settings.py index 11d0343e2..b2077859c 100644 --- a/python/fate_flow/settings.py +++ b/python/fate_flow/settings.py @@ -162,3 +162,5 @@ HOOK_SERVER_NAME = get_base_config("hook_server_name") ENABLE_MODEL_STORE = get_base_config('enable_model_store', False) + +REMOTE_LOAD_CONF = True diff --git a/python/fate_flow/worker/base_worker.py b/python/fate_flow/worker/base_worker.py index 488e60364..721b645fb 100644 --- a/python/fate_flow/worker/base_worker.py +++ b/python/fate_flow/worker/base_worker.py @@ -19,8 +19,11 @@ import traceback import logging +import requests + from fate_arch.common.base_utils import current_timestamp from fate_arch.common.file_utils import load_json_conf, dump_json_conf +from fate_flow.settings import REMOTE_LOAD_CONF from fate_flow.utils.log_utils import getLogger, LoggerFactory, exception_to_trace_string from fate_flow.db.component_registry import ComponentRegistry from fate_flow.db.config_manager import ConfigManager @@ -74,7 +77,20 @@ def __init__(self, **kwargs): @staticmethod def load_dict_attr(kwargs: dict, attr_name: str): - return load_json_conf(kwargs[attr_name]) if kwargs.get(attr_name) else {} + if kwargs.get(attr_name): + if REMOTE_LOAD_CONF and kwargs.get("is_deepspeed"): + url = f'http://{kwargs.get("job_server")}/v1/worker/config/load' + try: + _r = requests.post(url, json={"config_path": kwargs[attr_name]}).json() + config = _r.get("data") if _r.get("data") else {} + except Exception as e: + LOGGER.exception(e) + config = {} + else: + config = load_json_conf(kwargs[attr_name]) + else: + config = {} + return config class BaseWorker: @@ -122,7 +138,8 @@ def run(self, **kwargs): message = exception_to_trace_string(e) finally: if self.args and self.args.result: - dump_json_conf(result, self.args.result) + if not self.args.is_deepspeed: + dump_json_conf(result, self.args.result) end_time = current_timestamp() LOGGER.info(f"worker {self.__class__.__name__}, process role: {RuntimeConfig.PROCESS_ROLE}, pid: {self.run_pid}, elapsed: {end_time - start_time} ms") if RuntimeConfig.PROCESS_ROLE == ProcessRole.WORKER: diff --git a/python/fate_flow/worker/task_executor.py b/python/fate_flow/worker/task_executor.py index e4a461545..32022de12 100644 --- a/python/fate_flow/worker/task_executor.py +++ b/python/fate_flow/worker/task_executor.py @@ -287,9 +287,9 @@ def _run_(self): @property def is_master(self): # deepspeed rank 0 - if not os.getenv("GLOBAL_RANK"): + if not os.getenv("RANK"): return True - return int(os.getenv("GLOBAL_RANK")) == 0 + return int(os.getenv("RANK")) == 0 @classmethod def log_output_data_table_tracker(cls, job_id, input_table_list, output_table_list): From 7f22f50582c852a56ac74f05f8e54ef7db85bf1a Mon Sep 17 00:00:00 2001 From: zhihuiwan <15779896112@163.com> Date: Tue, 23 May 2023 20:25:03 +0800 Subject: [PATCH 05/15] update deepspeed engine Signed-off-by: zhihuiwan <15779896112@163.com> --- .../controller/engine_controller/deepspeed.py | 24 ++++-- python/fate_flow/deepspeed_client.py | 76 +++++++++++++++++++ python/fate_flow/settings.py | 1 + python/fate_flow/worker/base_worker.py | 3 + python/fate_flow/worker/task_executor.py | 6 +- 5 files changed, 101 insertions(+), 9 deletions(-) create mode 100644 python/fate_flow/deepspeed_client.py diff --git a/python/fate_flow/controller/engine_controller/deepspeed.py b/python/fate_flow/controller/engine_controller/deepspeed.py index caf02aff4..8b3391ace 100644 --- a/python/fate_flow/controller/engine_controller/deepspeed.py +++ b/python/fate_flow/controller/engine_controller/deepspeed.py @@ -23,6 +23,7 @@ from fate_flow.entity.run_status import BaseStatus, TaskStatus from fate_flow.entity.types import WorkerName from fate_flow.manager.worker_manager import WorkerManager +from fate_flow.settings import EXTRA_MODEL_DIR from fate_flow.utils import log_utils from fate_flow.utils.log_utils import detect_logger, schedule_logger from fate_flow.worker.task_executor import TaskExecutor @@ -60,6 +61,7 @@ def run(self, task: Task, run_parameters, run_parameters_path, config_dir, log_d session_id, _, command_arguments = WorkerManager.generate_common_cmd(task, config_dir, config, log_dir, worker_id) command_arguments.extend(["--is_deepspeed", True]) + command_arguments.extend(["--model_path", self.model_path(task)]) cmd = [str(_c) for _c in command_arguments] environment_variables = {} files = {} @@ -83,6 +85,7 @@ def run(self, task: Task, run_parameters, run_parameters_path, config_dir, log_d def kill(self, task): if task.f_deepspeed_id: + schedule_logger(task.f_job_id).info(f"start kill deepspeed task: {task.f_deepspeed_id}") from eggroll.deepspeed.submit import client client = client.DeepspeedJob(task.f_deepspeed_id) return client.kill() @@ -96,15 +99,18 @@ def _query_status(task): return StatusSet.NEW @staticmethod - def _download_job(task): + def _download_job(task, base_dir=None): if task.f_deepspeed_id: + if not base_dir: + base_dir = os.path.join(log_utils.get_logger_base_dir(), task.f_job_id, task.f_role, task.f_party_id, + task.f_component_name) from eggroll.deepspeed.submit import client client = client.DeepspeedJob(task.f_deepspeed_id) - dir_name = os.path.join(log_utils.get_logger_base_dir(), task.f_job_id, task.f_role, task.f_party_id, task.f_component_name) - os.makedirs(dir_name, exist_ok=True) - path = lambda rank: f"{dir_name}/{rank}.zip" + + os.makedirs(base_dir, exist_ok=True) + path = lambda rank: f"{base_dir}/{rank}.zip" client.download_job_to(rank_to_path=path) - return dir_name + return base_dir def query_task_status(self, task): status = self._query_status(task) @@ -125,8 +131,8 @@ def is_alive(self, task: Task): else: raise RuntimeError(f"task run status: {status}") - def download(self, task): - dir_name = self._download_job(task) + def download(self, task, base_dir=None): + dir_name = self._download_job(task, base_dir) if dir_name: for file in os.listdir(dir_name): if file.endswith(".zip"): @@ -146,3 +152,7 @@ def unzip(zip_path, extra_dir): data = zfile.read(name) with open(file_path, "w+b") as file: file.write(data) + + @staticmethod + def model_path(task): + return os.path.join(EXTRA_MODEL_DIR, task.f_job_id, task.f_component_name) diff --git a/python/fate_flow/deepspeed_client.py b/python/fate_flow/deepspeed_client.py new file mode 100644 index 000000000..187691fcf --- /dev/null +++ b/python/fate_flow/deepspeed_client.py @@ -0,0 +1,76 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import argparse + +from fate_flow.controller.engine_adapt import build_engine +from fate_flow.operation.job_saver import JobSaver + + +FUNC = ["query_status", "download_log"] + + +def call_fun(func, args): + job_id = args.job_id + role = args.role + party_id = args.party_id + component_name = args.component_name + output_path = args.output_path + engine, task = load_engine(job_id, role, party_id, component_name) + if func == "query_status": + query_status(engine, task) + elif func == "download_log": + download_log(engine, task, output_path) + + +def load_engine(job_id, role, party_id, component_name): + tasks = JobSaver.query_task(job_id=job_id, role=role, party_id=party_id, component_name=component_name) + if tasks: + task = tasks[0] + if task.f_is_deepspeed: + deepspeed_engine = build_engine(task.f_engine_conf.get("computing_engine"), task.f_is_deepspeed) + return deepspeed_engine, task + else: + raise Exception(f"Not is a deepspeed task: job_id[{job_id}], role[{role}], party_id[{party_id}], component_name[{component_name}]") + else: + raise Exception(f"no found task: job_id[{job_id}], role[{role}], party_id[{party_id}], component_name[{component_name}]") + + +def query_status(engine, task): + status = engine._query_status(task) + print(status) + + +def download_log(engine, task, output_path): + engine.download(task, base_dir=output_path) + print(output_path) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('-f', '--function', type=str, + choices=FUNC, + required=True, + help="function to call") + parser.add_argument('-j', '--job_id', required=True, type=str, help="job id") + parser.add_argument('-r', '--role', required=True, type=str, help="role") + parser.add_argument('-p', '--party_id', required=True, type=str, help="party id") + parser.add_argument('-cpn', '--component_name', required=True, type=str, help="component name") + parser.add_argument('-o', '--output_path', required=False, type=str, help="output_path") + args = parser.parse_args() + config_data = {} + config_data.update(dict((k, v) for k, v in vars(args).items() if v is not None)) + + call_fun(args.function, args) diff --git a/python/fate_flow/settings.py b/python/fate_flow/settings.py index b2077859c..658d9d00d 100644 --- a/python/fate_flow/settings.py +++ b/python/fate_flow/settings.py @@ -164,3 +164,4 @@ ENABLE_MODEL_STORE = get_base_config('enable_model_store', False) REMOTE_LOAD_CONF = True +EXTRA_MODEL_DIR = get_fate_flow_directory('model') diff --git a/python/fate_flow/worker/base_worker.py b/python/fate_flow/worker/base_worker.py index 721b645fb..55a6f511f 100644 --- a/python/fate_flow/worker/base_worker.py +++ b/python/fate_flow/worker/base_worker.py @@ -74,6 +74,7 @@ def __init__(self, **kwargs): self.dependence_type = kwargs.get("dependence_type") self.is_deepspeed = kwargs.get("is_deepspeed") + self.model_path = kwargs.get("model_path") @staticmethod def load_dict_attr(kwargs: dict, attr_name: str): @@ -107,6 +108,8 @@ def run(self, **kwargs): self.run_pid = os.getpid() try: self.args = self.get_args(**kwargs) + if self.args.model_path: + os.environ["MODEL_PATH"] = self.args.model_path RuntimeConfig.init_env() RuntimeConfig.set_process_role(ProcessRole(os.getenv("PROCESS_ROLE"))) if RuntimeConfig.PROCESS_ROLE == ProcessRole.WORKER: diff --git a/python/fate_flow/worker/task_executor.py b/python/fate_flow/worker/task_executor.py index 32022de12..2b4a9f9af 100644 --- a/python/fate_flow/worker/task_executor.py +++ b/python/fate_flow/worker/task_executor.py @@ -268,8 +268,10 @@ def _run_(self): LOGGER.info("start destroy sessions") sess.destroy_all_sessions() LOGGER.info("destroy all sessions success") - except Exception as e: - LOGGER.exception(e) + except Exception as _e: + LOGGER.exception(_e) + if self.args.is_deepspeed: + raise RuntimeError(e) finally: try: self.report_info["end_time"] = current_timestamp() From f94c14b66d6a4f4e2a59b2af312aee591f57650b Mon Sep 17 00:00:00 2001 From: zhihuiwan <15779896112@163.com> Date: Thu, 25 May 2023 10:23:28 +0800 Subject: [PATCH 06/15] deepspeed output log and model Signed-off-by: zhihuiwan <15779896112@163.com> --- .../controller/engine_controller/deepspeed.py | 47 ++++++++++++++----- .../fate_flow/controller/task_controller.py | 33 +++++++++---- python/fate_flow/deepspeed_client.py | 12 ++++- python/fate_flow/detection/detector.py | 2 +- python/fate_flow/worker/base_worker.py | 2 + 5 files changed, 72 insertions(+), 24 deletions(-) diff --git a/python/fate_flow/controller/engine_controller/deepspeed.py b/python/fate_flow/controller/engine_controller/deepspeed.py index 8b3391ace..86239e0ce 100644 --- a/python/fate_flow/controller/engine_controller/deepspeed.py +++ b/python/fate_flow/controller/engine_controller/deepspeed.py @@ -71,7 +71,7 @@ def run(self, task: Task, run_parameters, run_parameters_path, config_dir, log_d task_conf = run_parameters.role_parameter("task_conf", role=task.f_role, party_id=task.f_party_id) world_size = task_conf.get(task.f_component_name).get("world_size", JobDefaultConfig.task_world_size) resource_options = {"timeout_seconds": 3000, "resource_exhausted_strategy": "waiting"} - schedule_logger(task.f_job_id).info(f"start submit deepspeed task") + schedule_logger(task.f_job_id).info(f"start submit deepspeed task, world size: {world_size}") schedule_logger(task.f_job_id).info(f"cmd: {cmd}") client = client.DeepspeedJob() result = client.submit( @@ -99,17 +99,15 @@ def _query_status(task): return StatusSet.NEW @staticmethod - def _download_job(task, base_dir=None): + def _download_job(task, base_dir, content_type=None): + from eggroll.deepspeed.submit import client + if not content_type: + content_type = client.ContentType.ALL if task.f_deepspeed_id: - if not base_dir: - base_dir = os.path.join(log_utils.get_logger_base_dir(), task.f_job_id, task.f_role, task.f_party_id, - task.f_component_name) - from eggroll.deepspeed.submit import client client = client.DeepspeedJob(task.f_deepspeed_id) - os.makedirs(base_dir, exist_ok=True) path = lambda rank: f"{base_dir}/{rank}.zip" - client.download_job_to(rank_to_path=path) + client.download_job_to(rank_to_path=path, content_type=content_type) return base_dir def query_task_status(self, task): @@ -131,8 +129,11 @@ def is_alive(self, task: Task): else: raise RuntimeError(f"task run status: {status}") - def download(self, task, base_dir=None): - dir_name = self._download_job(task, base_dir) + def download(self, task, base_dir, content_type=None): + from eggroll.deepspeed.submit.client import ContentType + if not content_type: + content_type = ContentType.ALL + dir_name = self._download_job(task, base_dir, content_type) if dir_name: for file in os.listdir(dir_name): if file.endswith(".zip"): @@ -141,6 +142,18 @@ def download(self, task, base_dir=None): self.unzip(os.path.join(dir_name, file), extra_dir=rank_dir) os.remove(os.path.join(dir_name, file)) + def download_log(self, task, path=None): + from eggroll.deepspeed.submit.client import ContentType + if not path: + path = self.log_path(task) + self.download(task, base_dir=path, content_type=ContentType.LOGS) + + def download_model(self, task, path=None): + from eggroll.deepspeed.submit.client import ContentType + if not path: + path = self.model_path(task, download=True) + self.download(task, base_dir=path, content_type=ContentType.MODELS) + @staticmethod def unzip(zip_path, extra_dir): import zipfile @@ -154,5 +167,15 @@ def unzip(zip_path, extra_dir): file.write(data) @staticmethod - def model_path(task): - return os.path.join(EXTRA_MODEL_DIR, task.f_job_id, task.f_component_name) + def model_path(task, download=False): + _p = os.path.join(EXTRA_MODEL_DIR, task.f_job_id, task.f_component_name) + if not download: + # only rank 0 output model + _p = os.path.join(_p, "0") + return _p + + @staticmethod + def log_path(task): + return os.path.join( + log_utils.get_logger_base_dir(), task.f_job_id, task.f_role, task.f_party_id, task.f_component_name + ) diff --git a/python/fate_flow/controller/task_controller.py b/python/fate_flow/controller/task_controller.py index a91709025..3c65bf13c 100644 --- a/python/fate_flow/controller/task_controller.py +++ b/python/fate_flow/controller/task_controller.py @@ -138,6 +138,10 @@ def update_task(cls, task_info): @classmethod def update_task_status(cls, task_info): update_status = JobSaver.update_task_status(task_info=task_info) + task = JobSaver.query_task(task_id=task_info["task_id"], + task_version=task_info["task_version"], + role=task_info["role"], + party_id=task_info["party_id"])[0] if update_status and EndStatus.contains(task_info.get("status")): ResourceManager.return_task_resource(task_info=task_info) cls.clean_task(job_id=task_info["job_id"], @@ -147,19 +151,22 @@ def update_task_status(cls, task_info): party_id=task_info["party_id"], content_type=TaskCleanResourceType.TABLE, is_asynchronous=True) - cls.report_task_to_initiator(task_info=task_info) + cls.report_task_to_initiator(task_info=task_info, task=task) + cls.callback_task_output(task, task_info.get("status")) return update_status @classmethod - def report_task_to_initiator(cls, task_info): - tasks = JobSaver.query_task(task_id=task_info["task_id"], - task_version=task_info["task_version"], - role=task_info["role"], - party_id=task_info["party_id"]) + def report_task_to_initiator(cls, task_info, task=None): + if not task: + tasks = JobSaver.query_task(task_id=task_info["task_id"], + task_version=task_info["task_version"], + role=task_info["role"], + party_id=task_info["party_id"]) + task = tasks[0] if task_info.get("error_report"): - tasks[0].f_error_report = task_info.get("error_report") - if tasks[0].f_federated_status_collect_type == FederatedCommunicationType.PUSH: - FederatedScheduler.report_task_to_initiator(task=tasks[0]) + task.f_error_report = task_info.get("error_report") + if task.f_federated_status_collect_type == FederatedCommunicationType.PUSH: + FederatedScheduler.report_task_to_initiator(task=task) @classmethod def collect_task(cls, job_id, component_name, task_id, task_version, role, party_id): @@ -245,3 +252,11 @@ def is_deepspeed(run_parameters, role, party_id, component_name): else: return False + @staticmethod + def callback_task_output(task, status): + if EndStatus.contains(status): + if task.f_is_deepspeed: + deepspeed_engine = build_engine(task.f_engine_conf.get("computing_engine"), task.f_is_deepspeed) + deepspeed_engine.download_log(task) + if status == TaskStatus.SUCCESS: + deepspeed_engine.download_model(task) diff --git a/python/fate_flow/deepspeed_client.py b/python/fate_flow/deepspeed_client.py index 187691fcf..d5fe24caa 100644 --- a/python/fate_flow/deepspeed_client.py +++ b/python/fate_flow/deepspeed_client.py @@ -19,7 +19,7 @@ from fate_flow.operation.job_saver import JobSaver -FUNC = ["query_status", "download_log"] +FUNC = ["query_status", "download_log", "download_model"] def call_fun(func, args): @@ -34,6 +34,9 @@ def call_fun(func, args): elif func == "download_log": download_log(engine, task, output_path) + elif func == "download_model": + download_model(engine, task, output_path) + def load_engine(job_id, role, party_id, component_name): tasks = JobSaver.query_task(job_id=job_id, role=role, party_id=party_id, component_name=component_name) @@ -54,7 +57,12 @@ def query_status(engine, task): def download_log(engine, task, output_path): - engine.download(task, base_dir=output_path) + engine.download_log(task, path=output_path) + print(output_path) + + +def download_model(engine, task, output_path): + engine.download_model(task, path=output_path) print(output_path) diff --git a/python/fate_flow/detection/detector.py b/python/fate_flow/detection/detector.py index 897275105..a65ce3702 100644 --- a/python/fate_flow/detection/detector.py +++ b/python/fate_flow/detection/detector.py @@ -121,7 +121,7 @@ def detect_deepspeed_task_status(task): "party_status": status } TaskController.update_task_status(task_info) - deepspeed_engine.download(task) + deepspeed_engine.download_log(task) except Exception as e: detect_logger(task.f_job_id).exception(e) diff --git a/python/fate_flow/worker/base_worker.py b/python/fate_flow/worker/base_worker.py index 55a6f511f..f1550a150 100644 --- a/python/fate_flow/worker/base_worker.py +++ b/python/fate_flow/worker/base_worker.py @@ -147,6 +147,8 @@ def run(self, **kwargs): LOGGER.info(f"worker {self.__class__.__name__}, process role: {RuntimeConfig.PROCESS_ROLE}, pid: {self.run_pid}, elapsed: {end_time - start_time} ms") if RuntimeConfig.PROCESS_ROLE == ProcessRole.WORKER: sys.exit(code) + if self.args and self.args.is_deepspeed: + sys.exit(code) else: return code, message, result From cdb78baf7ed4262a1cf3af8e02cdb87bcf503339 Mon Sep 17 00:00:00 2001 From: zhihuiwan <15779896112@163.com> Date: Fri, 26 May 2023 14:14:47 +0800 Subject: [PATCH 07/15] deepspeed output model Signed-off-by: zhihuiwan <15779896112@163.com> --- .../fate_flow/controller/task_controller.py | 23 ++++++++++- python/fate_flow/deepspeed_client.py | 2 +- python/fate_flow/worker/base_worker.py | 1 + python/fate_flow/worker/download_model.py | 39 +++++++++++++++++++ 4 files changed, 62 insertions(+), 3 deletions(-) create mode 100644 python/fate_flow/worker/download_model.py diff --git a/python/fate_flow/controller/task_controller.py b/python/fate_flow/controller/task_controller.py index 3c65bf13c..475dd5da1 100644 --- a/python/fate_flow/controller/task_controller.py +++ b/python/fate_flow/controller/task_controller.py @@ -14,6 +14,8 @@ # limitations under the License. # import os +import sys + from fate_arch.common import FederatedCommunicationType from fate_flow.utils.job_utils import asynchronous_function from fate_flow.utils.log_utils import schedule_logger @@ -21,7 +23,7 @@ from fate_flow.db.db_models import Task from fate_flow.scheduler.federated_scheduler import FederatedScheduler from fate_flow.entity.run_status import TaskStatus, EndStatus -from fate_flow.utils import job_utils +from fate_flow.utils import job_utils, process_utils from fate_flow.operation.job_saver import JobSaver from fate_arch.common.base_utils import json_dumps, current_timestamp from fate_arch.common import base_utils @@ -30,6 +32,7 @@ from fate_flow.operation.job_tracker import Tracker from fate_flow.manager.worker_manager import WorkerManager from fate_flow.entity.types import TaskCleanResourceType, TaskLauncher +from fate_flow.worker.download_model import DownloadModel class TaskController(object): @@ -259,4 +262,20 @@ def callback_task_output(task, status): deepspeed_engine = build_engine(task.f_engine_conf.get("computing_engine"), task.f_is_deepspeed) deepspeed_engine.download_log(task) if status == TaskStatus.SUCCESS: - deepspeed_engine.download_model(task) + # run subprocess to download model + conf_dir = job_utils.get_job_directory(job_id=task.f_job_id) + os.makedirs(conf_dir, exist_ok=True) + process_cmd = [ + sys.executable or 'python3', + sys.modules[DownloadModel.__module__].__file__, + '--job_id', task.f_job_id, + '--role', task.f_role, + '--party_id', task.f_party_id, + '--task_id', task.f_task_id, + '--task_version', task.f_task_version, + '--computing_engine', task.f_engine_conf.get("computing_engine") + ] + process_name = "model_download" + log_dir = job_utils.get_job_log_directory(job_id=task.f_job_id) + process_utils.run_subprocess(job_id=task.f_job_id, config_dir=conf_dir, process_cmd=process_cmd, + log_dir=log_dir, process_name=process_name) diff --git a/python/fate_flow/deepspeed_client.py b/python/fate_flow/deepspeed_client.py index d5fe24caa..9c7f7daa0 100644 --- a/python/fate_flow/deepspeed_client.py +++ b/python/fate_flow/deepspeed_client.py @@ -39,7 +39,7 @@ def call_fun(func, args): def load_engine(job_id, role, party_id, component_name): - tasks = JobSaver.query_task(job_id=job_id, role=role, party_id=party_id, component_name=component_name) + tasks = JobSaver.query_task(job_id=job_id, role=role, party_id=party_id, component_name=component_name, run_on_this_party=True) if tasks: task = tasks[0] if task.f_is_deepspeed: diff --git a/python/fate_flow/worker/base_worker.py b/python/fate_flow/worker/base_worker.py index f1550a150..aa7aee279 100644 --- a/python/fate_flow/worker/base_worker.py +++ b/python/fate_flow/worker/base_worker.py @@ -75,6 +75,7 @@ def __init__(self, **kwargs): self.is_deepspeed = kwargs.get("is_deepspeed") self.model_path = kwargs.get("model_path") + self.computing_engine = kwargs.get("computing_engine") @staticmethod def load_dict_attr(kwargs: dict, attr_name: str): diff --git a/python/fate_flow/worker/download_model.py b/python/fate_flow/worker/download_model.py new file mode 100644 index 000000000..9c0eb5bd6 --- /dev/null +++ b/python/fate_flow/worker/download_model.py @@ -0,0 +1,39 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from fate_flow.controller.engine_adapt import build_engine +from fate_flow.operation.job_saver import JobSaver +from fate_flow.utils.log_utils import schedule_logger +from fate_flow.worker.base_worker import BaseWorker + + +class DownloadModel(BaseWorker): + def _run(self): + deepspeed_engine = build_engine(self.args.computing_engine, True) + tasks = JobSaver.query_task( + task_id=self.args.task_id, + task_version=self.args.task_version, + job_id=self.args.job_id, + role=self.args.role, + party_id=self.args.party_id + ) + task = tasks[0] + schedule_logger(self.args.job_id).info("start download model") + deepspeed_engine.download_model(task) + schedule_logger(self.args.job_id).info("download model success") + + +if __name__ == '__main__': + DownloadModel().run() From 0b6b49152e9fe27460f1f6da098a709578b42001 Mon Sep 17 00:00:00 2001 From: zhihuiwan <15779896112@163.com> Date: Fri, 26 May 2023 21:16:36 +0800 Subject: [PATCH 08/15] submit deepspeed task Signed-off-by: zhihuiwan <15779896112@163.com> --- conf/job_default_config.yaml | 3 +- .../controller/engine_controller/deepspeed.py | 64 ++++++++++++++----- python/fate_flow/db/job_default_config.py | 1 + python/fate_flow/utils/deepspeed_utils.py | 45 +++++++++++++ 4 files changed, 97 insertions(+), 16 deletions(-) create mode 100644 python/fate_flow/utils/deepspeed_utils.py diff --git a/conf/job_default_config.yaml b/conf/job_default_config.yaml index ae2491d54..ebfbaef7d 100644 --- a/conf/job_default_config.yaml +++ b/conf/job_default_config.yaml @@ -28,4 +28,5 @@ upload_block_max_bytes: 104857600 # bytes #component output output_data_summary_count_limit: 100 -task_world_size: 2 \ No newline at end of file +task_world_size: 2 +resource_waiting_timeout: 21600000 # ms \ No newline at end of file diff --git a/python/fate_flow/controller/engine_controller/deepspeed.py b/python/fate_flow/controller/engine_controller/deepspeed.py index 86239e0ce..31c004441 100644 --- a/python/fate_flow/controller/engine_controller/deepspeed.py +++ b/python/fate_flow/controller/engine_controller/deepspeed.py @@ -13,10 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import datetime import os import sys from abc import ABC +from fate_arch.common.base_utils import json_dumps from fate_flow.controller.engine_controller.engine import EngineABC from fate_flow.db.db_models import Task from fate_flow.db.job_default_config import JobDefaultConfig @@ -24,7 +26,8 @@ from fate_flow.entity.types import WorkerName from fate_flow.manager.worker_manager import WorkerManager from fate_flow.settings import EXTRA_MODEL_DIR -from fate_flow.utils import log_utils +from fate_flow.utils import log_utils, job_utils, process_utils +from fate_flow.utils.deepspeed_utils import Submit from fate_flow.utils.log_utils import detect_logger, schedule_logger from fate_flow.worker.task_executor import TaskExecutor @@ -48,8 +51,11 @@ class EndStatus(BaseStatus): class EggrollDeepspeedEngine(EngineABC, ABC): + @staticmethod + def generate_session_id(): + return f"deepspeed_session_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S-%f')}" + def run(self, task: Task, run_parameters, run_parameters_path, config_dir, log_dir, cwd_dir, **kwargs): - from eggroll.deepspeed.submit import client worker_id, config_dir, log_dir = WorkerManager.get_process_dirs( worker_name=WorkerName.TASK_EXECUTOR, job_id=task.f_job_id, @@ -70,18 +76,45 @@ def run(self, task: Task, run_parameters, run_parameters_path, config_dir, log_d } task_conf = run_parameters.role_parameter("task_conf", role=task.f_role, party_id=task.f_party_id) world_size = task_conf.get(task.f_component_name).get("world_size", JobDefaultConfig.task_world_size) - resource_options = {"timeout_seconds": 3000, "resource_exhausted_strategy": "waiting"} - schedule_logger(task.f_job_id).info(f"start submit deepspeed task, world size: {world_size}") - schedule_logger(task.f_job_id).info(f"cmd: {cmd}") - client = client.DeepspeedJob() - result = client.submit( - world_size=world_size, - command_arguments=cmd, - environment_variables=environment_variables, - files=files, - resource_options=resource_options, - options=options) - return {"worker_id": worker_id, "cmd": cmd, "deepspeed_id": result.session_id} + timeout = task_conf.get(task.f_component_name).get("timeout", JobDefaultConfig.resource_waiting_timeout) + resource_options = {"timeout_seconds": timeout, "resource_exhausted_strategy": "waiting"} + submit_conf = { + "world_size": world_size, + "command_arguments": cmd, + "environment_variables": environment_variables, + "files": files, + "resource_options": resource_options, + "options": options + } + config_dir = job_utils.get_task_directory( + task.f_job_id, task.f_role, task.f_party_id, + task.f_component_name, task.f_task_id, str(task.f_task_version) + ) + os.makedirs(config_dir, exist_ok=True) + config_path = os.path.join(config_dir, 'deepspeed_submit.json') + with open(config_path, 'w') as fw: + fw.write(json_dumps(submit_conf)) + session_id = self.generate_session_id() + self.submit(task, config_path, session_id) + return {"worker_id": worker_id, "cmd": cmd, "deepspeed_id": session_id} + + @staticmethod + def submit(task, config_path, session_id): + conf_dir = job_utils.get_job_directory(job_id=task.f_job_id) + os.makedirs(conf_dir, exist_ok=True) + process_cmd = [ + sys.executable or 'python3', + sys.modules[Submit.__module__].__file__, + '--job_id', task.f_job_id, + '--config', config_path, + '--session_id', session_id + ] + process_name = "deepspeed_submit" + log_dir = job_utils.get_job_log_directory(job_id=task.f_job_id) + + p = process_utils.run_subprocess(job_id=task.f_job_id, config_dir=conf_dir, process_cmd=process_cmd, + log_dir=log_dir, process_name=process_name) + schedule_logger(task.f_job_id).info(f"run subprocess {p.pid}") def kill(self, task): if task.f_deepspeed_id: @@ -95,7 +128,8 @@ def _query_status(task): if task.f_deepspeed_id: from eggroll.deepspeed.submit import client client = client.DeepspeedJob(task.f_deepspeed_id) - return client.query_status().status + _s = client.query_status().status + return _s if _s else StatusSet.NEW return StatusSet.NEW @staticmethod diff --git a/python/fate_flow/db/job_default_config.py b/python/fate_flow/db/job_default_config.py index 7e781f4b5..665a59aba 100644 --- a/python/fate_flow/db/job_default_config.py +++ b/python/fate_flow/db/job_default_config.py @@ -49,6 +49,7 @@ class JobDefaultConfig(ReloadConfigBase): output_data_summary_count_limit = None task_world_size = None + resource_waiting_timeout = None @classmethod def load(cls): diff --git a/python/fate_flow/utils/deepspeed_utils.py b/python/fate_flow/utils/deepspeed_utils.py new file mode 100644 index 000000000..01b2ba899 --- /dev/null +++ b/python/fate_flow/utils/deepspeed_utils.py @@ -0,0 +1,45 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os +import sys + +from fate_flow.utils.log_utils import schedule_logger +from fate_flow.worker.base_worker import BaseWorker + + +class Submit(BaseWorker): + def _run(self): + try: + from eggroll.deepspeed.submit import client + client = client.DeepspeedJob(session_id=self.args.session_id) + config = self.args.config + schedule_logger(self.args.job_id).info(f"start submit deepspeed task {self.args.session_id}") + schedule_logger(self.args.job_id).info(f"submit config {config}") + client.submit( + world_size=config.get("world_size"), + command_arguments=config.get("command_arguments"), + environment_variables=config.get("environment_variables"), + files=config.get("files"), + resource_options=config.get("resource_options"), + options=config.get("options") + ) + schedule_logger(self.args.job_id).info(f"submit deepspeed task success") + except Exception as e: + schedule_logger(self.args.job_id).exception(e) + + +if __name__ == "__main__": + Submit().run() From 66052413898c3402e62e88839b3d627e41f25ea9 Mon Sep 17 00:00:00 2001 From: zhihuiwan <15779896112@163.com> Date: Sat, 27 May 2023 16:48:55 +0800 Subject: [PATCH 09/15] fix parameters merge bug Signed-off-by: zhihuiwan <15779896112@163.com> --- python/fate_flow/utils/runtime_conf_parse_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/fate_flow/utils/runtime_conf_parse_util.py b/python/fate_flow/utils/runtime_conf_parse_util.py index 320d9a30f..fb4bf2b1b 100644 --- a/python/fate_flow/utils/runtime_conf_parse_util.py +++ b/python/fate_flow/utils/runtime_conf_parse_util.py @@ -542,7 +542,7 @@ def get_job_parameters(submit_dict): component_parameters = submit_dict.get("component_parameters", {}) common_job_parameters = job_parameters.get("common", {}) role_job_parameters = component_parameters.get("role", {}) - for role in component_parameters["role"]: + for role in submit_dict["role"]: party_id_list = submit_dict["role"][role] if not role_job_parameters: ret[role] = {party_id: copy.deepcopy(common_job_parameters) for party_id in party_id_list} From f1343e4126235386491c4a380510955f1ed89eaf Mon Sep 17 00:00:00 2001 From: zhihuiwan <15779896112@163.com> Date: Tue, 30 May 2023 11:03:17 +0800 Subject: [PATCH 10/15] deepspeed worker Signed-off-by: zhihuiwan <15779896112@163.com> --- .../controller/engine_controller/deepspeed.py | 18 ++++++---- .../fate_flow/controller/task_controller.py | 36 +++++++++---------- python/fate_flow/manager/resource_manager.py | 1 + python/fate_flow/worker/base_worker.py | 13 +++++-- 4 files changed, 41 insertions(+), 27 deletions(-) diff --git a/python/fate_flow/controller/engine_controller/deepspeed.py b/python/fate_flow/controller/engine_controller/deepspeed.py index 31c004441..7cec1ef5e 100644 --- a/python/fate_flow/controller/engine_controller/deepspeed.py +++ b/python/fate_flow/controller/engine_controller/deepspeed.py @@ -22,6 +22,7 @@ from fate_flow.controller.engine_controller.engine import EngineABC from fate_flow.db.db_models import Task from fate_flow.db.job_default_config import JobDefaultConfig +from fate_flow.db.runtime_config import RuntimeConfig from fate_flow.entity.run_status import BaseStatus, TaskStatus from fate_flow.entity.types import WorkerName from fate_flow.manager.worker_manager import WorkerManager @@ -95,8 +96,10 @@ def run(self, task: Task, run_parameters, run_parameters_path, config_dir, log_d with open(config_path, 'w') as fw: fw.write(json_dumps(submit_conf)) session_id = self.generate_session_id() - self.submit(task, config_path, session_id) - return {"worker_id": worker_id, "cmd": cmd, "deepspeed_id": session_id} + process_cmd, pid = self.submit(task, config_path, session_id) + WorkerManager.save_worker_info(task=task, worker_name=WorkerName.TASK_EXECUTOR, worker_id=worker_id, + run_ip=RuntimeConfig.JOB_SERVER_HOST, run_pid=pid, cmd=process_cmd) + return {"worker_id": worker_id, "cmd": cmd, "deepspeed_id": session_id, "run_pid": pid} @staticmethod def submit(task, config_path, session_id): @@ -115,6 +118,7 @@ def submit(task, config_path, session_id): p = process_utils.run_subprocess(job_id=task.f_job_id, config_dir=conf_dir, process_cmd=process_cmd, log_dir=log_dir, process_name=process_name) schedule_logger(task.f_job_id).info(f"run subprocess {p.pid}") + return process_cmd, p.pid def kill(self, task): if task.f_deepspeed_id: @@ -133,7 +137,7 @@ def _query_status(task): return StatusSet.NEW @staticmethod - def _download_job(task, base_dir, content_type=None): + def _download_job(task, base_dir, content_type=None, ranks: list = None): from eggroll.deepspeed.submit import client if not content_type: content_type = client.ContentType.ALL @@ -141,7 +145,7 @@ def _download_job(task, base_dir, content_type=None): client = client.DeepspeedJob(task.f_deepspeed_id) os.makedirs(base_dir, exist_ok=True) path = lambda rank: f"{base_dir}/{rank}.zip" - client.download_job_to(rank_to_path=path, content_type=content_type) + client.download_job_to(rank_to_path=path, content_type=content_type, ranks=ranks) return base_dir def query_task_status(self, task): @@ -163,11 +167,11 @@ def is_alive(self, task: Task): else: raise RuntimeError(f"task run status: {status}") - def download(self, task, base_dir, content_type=None): + def download(self, task, base_dir, content_type=None, ranks=None): from eggroll.deepspeed.submit.client import ContentType if not content_type: content_type = ContentType.ALL - dir_name = self._download_job(task, base_dir, content_type) + dir_name = self._download_job(task, base_dir, content_type, ranks) if dir_name: for file in os.listdir(dir_name): if file.endswith(".zip"): @@ -186,7 +190,7 @@ def download_model(self, task, path=None): from eggroll.deepspeed.submit.client import ContentType if not path: path = self.model_path(task, download=True) - self.download(task, base_dir=path, content_type=ContentType.MODELS) + self.download(task, base_dir=path, content_type=ContentType.MODELS, ranks=[0]) @staticmethod def unzip(zip_path, extra_dir): diff --git a/python/fate_flow/controller/task_controller.py b/python/fate_flow/controller/task_controller.py index 475dd5da1..124335a10 100644 --- a/python/fate_flow/controller/task_controller.py +++ b/python/fate_flow/controller/task_controller.py @@ -261,21 +261,21 @@ def callback_task_output(task, status): if task.f_is_deepspeed: deepspeed_engine = build_engine(task.f_engine_conf.get("computing_engine"), task.f_is_deepspeed) deepspeed_engine.download_log(task) - if status == TaskStatus.SUCCESS: - # run subprocess to download model - conf_dir = job_utils.get_job_directory(job_id=task.f_job_id) - os.makedirs(conf_dir, exist_ok=True) - process_cmd = [ - sys.executable or 'python3', - sys.modules[DownloadModel.__module__].__file__, - '--job_id', task.f_job_id, - '--role', task.f_role, - '--party_id', task.f_party_id, - '--task_id', task.f_task_id, - '--task_version', task.f_task_version, - '--computing_engine', task.f_engine_conf.get("computing_engine") - ] - process_name = "model_download" - log_dir = job_utils.get_job_log_directory(job_id=task.f_job_id) - process_utils.run_subprocess(job_id=task.f_job_id, config_dir=conf_dir, process_cmd=process_cmd, - log_dir=log_dir, process_name=process_name) + + # run subprocess to download model + conf_dir = job_utils.get_job_directory(job_id=task.f_job_id) + os.makedirs(conf_dir, exist_ok=True) + process_cmd = [ + sys.executable or 'python3', + sys.modules[DownloadModel.__module__].__file__, + '--job_id', task.f_job_id, + '--role', task.f_role, + '--party_id', task.f_party_id, + '--task_id', task.f_task_id, + '--task_version', task.f_task_version, + '--computing_engine', task.f_engine_conf.get("computing_engine") + ] + process_name = "model_download" + log_dir = job_utils.get_job_log_directory(job_id=task.f_job_id) + process_utils.run_subprocess(job_id=task.f_job_id, config_dir=conf_dir, process_cmd=process_cmd, + log_dir=log_dir, process_name=process_name) diff --git a/python/fate_flow/manager/resource_manager.py b/python/fate_flow/manager/resource_manager.py index 8ec261ed2..f6a5e65fe 100644 --- a/python/fate_flow/manager/resource_manager.py +++ b/python/fate_flow/manager/resource_manager.py @@ -288,6 +288,7 @@ def return_task_resource(cls, task_info): return ResourceManager.resource_for_task(task_info=task_info, operation_type=ResourceOperation.RETURN) @classmethod + @DB.connection_context() def resource_for_task(cls, task_info, operation_type): cores_per_task, memory_per_task = cls.calculate_task_resource(task_info=task_info) schedule_logger(task_info["job_id"]).info(f"cores_per_task:{cores_per_task}, memory_per_task:{memory_per_task}") diff --git a/python/fate_flow/worker/base_worker.py b/python/fate_flow/worker/base_worker.py index aa7aee279..9949f3729 100644 --- a/python/fate_flow/worker/base_worker.py +++ b/python/fate_flow/worker/base_worker.py @@ -112,11 +112,20 @@ def run(self, **kwargs): if self.args.model_path: os.environ["MODEL_PATH"] = self.args.model_path RuntimeConfig.init_env() - RuntimeConfig.set_process_role(ProcessRole(os.getenv("PROCESS_ROLE"))) + role = ProcessRole(os.getenv("PROCESS_ROLE")) + append_to_parent_log = True + if self.args.is_deepspeed: + role = ProcessRole(ProcessRole.WORKER.value) + append_to_parent_log = False + RuntimeConfig.set_process_role(role) if RuntimeConfig.PROCESS_ROLE == ProcessRole.WORKER: LoggerFactory.LEVEL = logging.getLevelName(os.getenv("FATE_LOG_LEVEL", "INFO")) + if os.getenv("EGGROLL_CONTAINER_LOGS_DIR"): + # eggroll deepspeed + self.args.parent_log_dir = os.path.dirname(os.getenv("EGGROLL_CONTAINER_LOGS_DIR")) + self.args.log_dir = os.getenv("EGGROLL_CONTAINER_LOGS_DIR") LoggerFactory.set_directory(directory=self.args.log_dir, parent_log_dir=self.args.parent_log_dir, - append_to_parent_log=True, force=True) + append_to_parent_log=append_to_parent_log, force=True) LOGGER.info(f"enter {self.__class__.__name__} worker in subprocess, pid: {self.run_pid}") else: LOGGER.info(f"enter {self.__class__.__name__} worker in driver process, pid: {self.run_pid}") From 50c672f9f08105543cbbc6e898721b7ee9fe97cb Mon Sep 17 00:00:00 2001 From: zhihuiwan <15779896112@163.com> Date: Wed, 31 May 2023 11:13:32 +0800 Subject: [PATCH 11/15] fix bug: hdfs load failed without classpath Signed-off-by: zhihuiwan <15779896112@163.com> --- conf/job_default_config.yaml | 2 ++ python/fate_flow/db/job_default_config.py | 1 + python/fate_flow/utils/process_utils.py | 6 +++++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/conf/job_default_config.yaml b/conf/job_default_config.yaml index ebfbaef7d..20650d647 100644 --- a/conf/job_default_config.yaml +++ b/conf/job_default_config.yaml @@ -22,6 +22,8 @@ federated_status_collect_type: PUSH detect_connect_max_retry_count: 3 detect_connect_long_retry_count: 2 +task_process_classpath: true + # upload upload_block_max_bytes: 104857600 # bytes diff --git a/python/fate_flow/db/job_default_config.py b/python/fate_flow/db/job_default_config.py index 665a59aba..6a98a9b5a 100644 --- a/python/fate_flow/db/job_default_config.py +++ b/python/fate_flow/db/job_default_config.py @@ -50,6 +50,7 @@ class JobDefaultConfig(ReloadConfigBase): task_world_size = None resource_waiting_timeout = None + task_process_classpath = None @classmethod def load(cls): diff --git a/python/fate_flow/utils/process_utils.py b/python/fate_flow/utils/process_utils.py index c05b8ed25..6d6d0b546 100644 --- a/python/fate_flow/utils/process_utils.py +++ b/python/fate_flow/utils/process_utils.py @@ -17,6 +17,8 @@ import os import subprocess import psutil + +from fate_flow.db.job_default_config import JobDefaultConfig from fate_flow.utils.log_utils import schedule_logger from fate_flow.db.db_models import Task from fate_flow.entity.types import KillProcessRetCode, ProcessRole @@ -52,7 +54,9 @@ def run_subprocess(job_id, config_dir, process_cmd, added_env: dict = None, log_ if name.endswith("PATH") and subprocess_env.get(name) is not None: value += ':' + subprocess_env[name] subprocess_env[name] = value - subprocess_env.pop("CLASSPATH", None) + + if not JobDefaultConfig.task_process_classpath: + subprocess_env.pop("CLASSPATH", None) p = subprocess.Popen(process_cmd, stdout=std, From af2a13ba92354e008a35402555e7b4918ce45046 Mon Sep 17 00:00:00 2001 From: zhihuiwan <15779896112@163.com> Date: Wed, 31 May 2023 14:44:06 +0800 Subject: [PATCH 12/15] update job config Signed-off-by: zhihuiwan <15779896112@163.com> --- conf/job_default_config.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/conf/job_default_config.yaml b/conf/job_default_config.yaml index 20650d647..c557893db 100644 --- a/conf/job_default_config.yaml +++ b/conf/job_default_config.yaml @@ -30,5 +30,6 @@ upload_block_max_bytes: 104857600 # bytes #component output output_data_summary_count_limit: 100 +# gpu task_world_size: 2 -resource_waiting_timeout: 21600000 # ms \ No newline at end of file +resource_waiting_timeout: 21600 # s \ No newline at end of file From b46b16730bf7d3326ffc043e82d5c0dca05efd39 Mon Sep 17 00:00:00 2001 From: zhihuiwan <15779896112@163.com> Date: Wed, 31 May 2023 16:16:08 +0800 Subject: [PATCH 13/15] fix submit deepspeed job failed Signed-off-by: zhihuiwan <15779896112@163.com> --- .../controller/engine_controller/deepspeed.py | 6 ++++++ python/fate_flow/utils/deepspeed_utils.py | 18 +++++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/python/fate_flow/controller/engine_controller/deepspeed.py b/python/fate_flow/controller/engine_controller/deepspeed.py index 7cec1ef5e..cb747dd41 100644 --- a/python/fate_flow/controller/engine_controller/deepspeed.py +++ b/python/fate_flow/controller/engine_controller/deepspeed.py @@ -109,7 +109,13 @@ def submit(task, config_path, session_id): sys.executable or 'python3', sys.modules[Submit.__module__].__file__, '--job_id', task.f_job_id, + '--role', task.f_role, + '--party_id', task.f_party_id, + '--task_id', task.f_task_id, + '--task_version', task.f_task_version, + '--component_name', task.f_component_name, '--config', config_path, + '--job_server', f"{RuntimeConfig.JOB_SERVER_HOST}:{RuntimeConfig.HTTP_PORT}", '--session_id', session_id ] process_name = "deepspeed_submit" diff --git a/python/fate_flow/utils/deepspeed_utils.py b/python/fate_flow/utils/deepspeed_utils.py index 01b2ba899..093ad04fd 100644 --- a/python/fate_flow/utils/deepspeed_utils.py +++ b/python/fate_flow/utils/deepspeed_utils.py @@ -13,9 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os -import sys - +from fate_flow.db.runtime_config import RuntimeConfig +from fate_flow.entity.run_status import TaskStatus +from fate_flow.scheduling_apps.client import ControllerClient from fate_flow.utils.log_utils import schedule_logger from fate_flow.worker.base_worker import BaseWorker @@ -38,6 +38,18 @@ def _run(self): ) schedule_logger(self.args.job_id).info(f"submit deepspeed task success") except Exception as e: + task_info = { + "job_id": self.args.job_id, + "role": self.args.role, + "party_id": self.args.party_id, + "task_id": self.args.task_id, + "task_version": self.args.task_version, + "component_name": self.args.component_name, + "party_status": TaskStatus.FAILED, + } + + RuntimeConfig.init_config(JOB_SERVER_HOST=self.args.job_server.split(':')[0], HTTP_PORT=self.args.job_server.split(':')[1]) + ControllerClient.report_task(task_info) schedule_logger(self.args.job_id).exception(e) From 086c8c48073e4f82c93cfb21b9c87aaa0de12666 Mon Sep 17 00:00:00 2001 From: zhihuiwan <15779896112@163.com> Date: Wed, 31 May 2023 22:10:42 +0800 Subject: [PATCH 14/15] update release Signed-off-by: zhihuiwan <15779896112@163.com> --- RELEASE.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/RELEASE.md b/RELEASE.md index d860f21cf..03247c6da 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,3 +1,11 @@ +# Release 1.11.1 +## Major Features and Improvements +* Support distributed training with multiple gpus for FATE-LLM by Eggroll + +## Bug Fixes +* fix hadoop cannot be connected in some scenarios +* fix spark config in role does not take effect + # Release 1.11.0 ## Major Features and Improvements * Add data table preview query interface From 3dd18960db325d589bf1c9d992b9e56355b3158c Mon Sep 17 00:00:00 2001 From: zhihuiwan <15779896112@163.com> Date: Wed, 31 May 2023 22:11:54 +0800 Subject: [PATCH 15/15] update release Signed-off-by: zhihuiwan <15779896112@163.com> --- RELEASE.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/RELEASE.md b/RELEASE.md index 03247c6da..af49e1f17 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -3,8 +3,8 @@ * Support distributed training with multiple gpus for FATE-LLM by Eggroll ## Bug Fixes -* fix hadoop cannot be connected in some scenarios -* fix spark config in role does not take effect +* Fix hadoop connection failures in some scenarios +* Fix spark config in role does not take effect # Release 1.11.0 ## Major Features and Improvements