diff --git a/RELEASE.md b/RELEASE.md index d860f21cf..af49e1f17 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,3 +1,11 @@ +# Release 1.11.1 +## Major Features and Improvements +* Support distributed training with multiple gpus for FATE-LLM by Eggroll + +## Bug Fixes +* Fix hadoop connection failures in some scenarios +* Fix spark config in role does not take effect + # Release 1.11.0 ## Major Features and Improvements * Add data table preview query interface diff --git a/conf/job_default_config.yaml b/conf/job_default_config.yaml index 94ce9fe05..c557893db 100644 --- a/conf/job_default_config.yaml +++ b/conf/job_default_config.yaml @@ -22,8 +22,14 @@ federated_status_collect_type: PUSH detect_connect_max_retry_count: 3 detect_connect_long_retry_count: 2 +task_process_classpath: true + # upload upload_block_max_bytes: 104857600 # bytes #component output -output_data_summary_count_limit: 100 \ No newline at end of file +output_data_summary_count_limit: 100 + +# gpu +task_world_size: 2 +resource_waiting_timeout: 21600 # s \ No newline at end of file diff --git a/python/fate_flow/apps/worker_app.py b/python/fate_flow/apps/worker_app.py new file mode 100644 index 000000000..adecd4565 --- /dev/null +++ b/python/fate_flow/apps/worker_app.py @@ -0,0 +1,32 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os.path + +from flask import request + +from fate_arch.common.file_utils import load_json_conf +from fate_flow.utils.api_utils import get_json_result + +page_name = "worker" + + +@manager.route('/config/load', methods=['POST']) +def load_config(): + conf_path = request.json.get('config_path') + data = {} + if os.path.exists(conf_path): + data = load_json_conf(conf_path) + return get_json_result(data=data) diff --git a/python/fate_flow/controller/engine_adapt.py b/python/fate_flow/controller/engine_adapt.py index 6787cefb2..b048f1247 100644 --- a/python/fate_flow/controller/engine_adapt.py +++ b/python/fate_flow/controller/engine_adapt.py @@ -14,20 +14,25 @@ # limitations under the License. # from fate_arch.computing import ComputingEngine +from fate_flow.controller.engine_controller.deepspeed import EggrollDeepspeedEngine from fate_flow.controller.engine_controller.eggroll import EggrollEngine from fate_flow.controller.engine_controller.linkis_spark import LinkisSparkEngine from fate_flow.controller.engine_controller.spark import SparkEngine -def build_engine(computing_engine): - if not computing_engine: - return None - if computing_engine in {ComputingEngine.EGGROLL, ComputingEngine.STANDALONE}: - engine_session = EggrollEngine() - elif computing_engine == ComputingEngine.SPARK: - engine_session = SparkEngine() - elif computing_engine == ComputingEngine.LINKIS_SPARK: - engine_session = LinkisSparkEngine() +def build_engine(computing_engine, is_deepspeed=False): + if not is_deepspeed: + if computing_engine in {ComputingEngine.EGGROLL, ComputingEngine.STANDALONE}: + engine_session = EggrollEngine() + elif computing_engine == ComputingEngine.SPARK: + engine_session = SparkEngine() + elif computing_engine == ComputingEngine.LINKIS_SPARK: + engine_session = LinkisSparkEngine() + else: + raise ValueError(f"{computing_engine} is not supported") else: - raise ValueError(f"{computing_engine} is not supported") + if computing_engine in {ComputingEngine.EGGROLL, ComputingEngine.STANDALONE}: + engine_session = EggrollDeepspeedEngine() + else: + raise ValueError(f"deepspeed {computing_engine} is not supported") return engine_session diff --git a/python/fate_flow/controller/engine_controller/deepspeed.py b/python/fate_flow/controller/engine_controller/deepspeed.py new file mode 100644 index 000000000..cb747dd41 --- /dev/null +++ b/python/fate_flow/controller/engine_controller/deepspeed.py @@ -0,0 +1,225 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import datetime +import os +import sys +from abc import ABC + +from fate_arch.common.base_utils import json_dumps +from fate_flow.controller.engine_controller.engine import EngineABC +from fate_flow.db.db_models import Task +from fate_flow.db.job_default_config import JobDefaultConfig +from fate_flow.db.runtime_config import RuntimeConfig +from fate_flow.entity.run_status import BaseStatus, TaskStatus +from fate_flow.entity.types import WorkerName +from fate_flow.manager.worker_manager import WorkerManager +from fate_flow.settings import EXTRA_MODEL_DIR +from fate_flow.utils import log_utils, job_utils, process_utils +from fate_flow.utils.deepspeed_utils import Submit +from fate_flow.utils.log_utils import detect_logger, schedule_logger +from fate_flow.worker.task_executor import TaskExecutor + + +class StatusSet(BaseStatus): + NEW = "NEW" + NEW_TIMEOUT = "NEW_TIMEOUT" + ACTIVE = "ACTIVE" + CLOSED = "CLOSED" + KILLED = "KILLED" + ERROR = "ERROR" + FINISHED = "FINISHED" + + +class EndStatus(BaseStatus): + NEW_TIMEOUT = StatusSet.NEW_TIMEOUT + CLOSED = StatusSet.CLOSED + FAILED = StatusSet.KILLED + ERROR = StatusSet.ERROR + FINISHED = StatusSet.FINISHED + + +class EggrollDeepspeedEngine(EngineABC, ABC): + @staticmethod + def generate_session_id(): + return f"deepspeed_session_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S-%f')}" + + def run(self, task: Task, run_parameters, run_parameters_path, config_dir, log_dir, cwd_dir, **kwargs): + worker_id, config_dir, log_dir = WorkerManager.get_process_dirs( + worker_name=WorkerName.TASK_EXECUTOR, + job_id=task.f_job_id, + role=task.f_role, + party_id=task.f_party_id, + task=task + ) + config = run_parameters.to_dict() + session_id, _, command_arguments = WorkerManager.generate_common_cmd(task, config_dir, config, + log_dir, worker_id) + command_arguments.extend(["--is_deepspeed", True]) + command_arguments.extend(["--model_path", self.model_path(task)]) + cmd = [str(_c) for _c in command_arguments] + environment_variables = {} + files = {} + options = { + "eggroll.container.deepspeed.script.path": sys.modules[TaskExecutor.__module__].__file__ + } + task_conf = run_parameters.role_parameter("task_conf", role=task.f_role, party_id=task.f_party_id) + world_size = task_conf.get(task.f_component_name).get("world_size", JobDefaultConfig.task_world_size) + timeout = task_conf.get(task.f_component_name).get("timeout", JobDefaultConfig.resource_waiting_timeout) + resource_options = {"timeout_seconds": timeout, "resource_exhausted_strategy": "waiting"} + submit_conf = { + "world_size": world_size, + "command_arguments": cmd, + "environment_variables": environment_variables, + "files": files, + "resource_options": resource_options, + "options": options + } + config_dir = job_utils.get_task_directory( + task.f_job_id, task.f_role, task.f_party_id, + task.f_component_name, task.f_task_id, str(task.f_task_version) + ) + os.makedirs(config_dir, exist_ok=True) + config_path = os.path.join(config_dir, 'deepspeed_submit.json') + with open(config_path, 'w') as fw: + fw.write(json_dumps(submit_conf)) + session_id = self.generate_session_id() + process_cmd, pid = self.submit(task, config_path, session_id) + WorkerManager.save_worker_info(task=task, worker_name=WorkerName.TASK_EXECUTOR, worker_id=worker_id, + run_ip=RuntimeConfig.JOB_SERVER_HOST, run_pid=pid, cmd=process_cmd) + return {"worker_id": worker_id, "cmd": cmd, "deepspeed_id": session_id, "run_pid": pid} + + @staticmethod + def submit(task, config_path, session_id): + conf_dir = job_utils.get_job_directory(job_id=task.f_job_id) + os.makedirs(conf_dir, exist_ok=True) + process_cmd = [ + sys.executable or 'python3', + sys.modules[Submit.__module__].__file__, + '--job_id', task.f_job_id, + '--role', task.f_role, + '--party_id', task.f_party_id, + '--task_id', task.f_task_id, + '--task_version', task.f_task_version, + '--component_name', task.f_component_name, + '--config', config_path, + '--job_server', f"{RuntimeConfig.JOB_SERVER_HOST}:{RuntimeConfig.HTTP_PORT}", + '--session_id', session_id + ] + process_name = "deepspeed_submit" + log_dir = job_utils.get_job_log_directory(job_id=task.f_job_id) + + p = process_utils.run_subprocess(job_id=task.f_job_id, config_dir=conf_dir, process_cmd=process_cmd, + log_dir=log_dir, process_name=process_name) + schedule_logger(task.f_job_id).info(f"run subprocess {p.pid}") + return process_cmd, p.pid + + def kill(self, task): + if task.f_deepspeed_id: + schedule_logger(task.f_job_id).info(f"start kill deepspeed task: {task.f_deepspeed_id}") + from eggroll.deepspeed.submit import client + client = client.DeepspeedJob(task.f_deepspeed_id) + return client.kill() + + @staticmethod + def _query_status(task): + if task.f_deepspeed_id: + from eggroll.deepspeed.submit import client + client = client.DeepspeedJob(task.f_deepspeed_id) + _s = client.query_status().status + return _s if _s else StatusSet.NEW + return StatusSet.NEW + + @staticmethod + def _download_job(task, base_dir, content_type=None, ranks: list = None): + from eggroll.deepspeed.submit import client + if not content_type: + content_type = client.ContentType.ALL + if task.f_deepspeed_id: + client = client.DeepspeedJob(task.f_deepspeed_id) + os.makedirs(base_dir, exist_ok=True) + path = lambda rank: f"{base_dir}/{rank}.zip" + client.download_job_to(rank_to_path=path, content_type=content_type, ranks=ranks) + return base_dir + + def query_task_status(self, task): + status = self._query_status(task) + if status in EndStatus.status_list(): + if status in [EndStatus.FINISHED]: + return TaskStatus.SUCCESS + else: + return TaskStatus.FAILED + + def is_alive(self, task: Task): + status = self._query_status(task) + detect_logger(task.f_job_id).info(f"task {task.f_task_id} {task.f_task_version} deepspeed status {status}") + if status in StatusSet.status_list(): + if status in EndStatus.status_list(): + return False + else: + return True + else: + raise RuntimeError(f"task run status: {status}") + + def download(self, task, base_dir, content_type=None, ranks=None): + from eggroll.deepspeed.submit.client import ContentType + if not content_type: + content_type = ContentType.ALL + dir_name = self._download_job(task, base_dir, content_type, ranks) + if dir_name: + for file in os.listdir(dir_name): + if file.endswith(".zip"): + rank_dir = os.path.join(dir_name, file.split(".zip")[0]) + os.makedirs(rank_dir, exist_ok=True) + self.unzip(os.path.join(dir_name, file), extra_dir=rank_dir) + os.remove(os.path.join(dir_name, file)) + + def download_log(self, task, path=None): + from eggroll.deepspeed.submit.client import ContentType + if not path: + path = self.log_path(task) + self.download(task, base_dir=path, content_type=ContentType.LOGS) + + def download_model(self, task, path=None): + from eggroll.deepspeed.submit.client import ContentType + if not path: + path = self.model_path(task, download=True) + self.download(task, base_dir=path, content_type=ContentType.MODELS, ranks=[0]) + + @staticmethod + def unzip(zip_path, extra_dir): + import zipfile + zfile = zipfile.ZipFile(zip_path, "r") + for name in zfile.namelist(): + dir_name = os.path.dirname(zip_path) + file_path = os.path.join(dir_name, extra_dir, name) + os.makedirs(os.path.dirname(file_path), exist_ok=True) + data = zfile.read(name) + with open(file_path, "w+b") as file: + file.write(data) + + @staticmethod + def model_path(task, download=False): + _p = os.path.join(EXTRA_MODEL_DIR, task.f_job_id, task.f_component_name) + if not download: + # only rank 0 output model + _p = os.path.join(_p, "0") + return _p + + @staticmethod + def log_path(task): + return os.path.join( + log_utils.get_logger_base_dir(), task.f_job_id, task.f_role, task.f_party_id, task.f_component_name + ) diff --git a/python/fate_flow/controller/job_controller.py b/python/fate_flow/controller/job_controller.py index c43610760..630f7f182 100644 --- a/python/fate_flow/controller/job_controller.py +++ b/python/fate_flow/controller/job_controller.py @@ -57,7 +57,7 @@ def create_job(cls, job_id, role, party_id, job_info): runtime_conf=runtime_conf, train_runtime_conf=train_runtime_conf ) - job_parameters = dsl_parser.get_job_parameters(runtime_conf) + job_parameters = dsl_parser.get_job_parameters(runtime_conf, int(runtime_conf.get("dsl_version", "1"))) schedule_logger(job_id).info('job parameters:{}'.format(job_parameters)) dest_user = job_parameters.get(role, {}).get(party_id, {}).get('user', '') user = {} diff --git a/python/fate_flow/controller/task_controller.py b/python/fate_flow/controller/task_controller.py index 3533ccb4c..124335a10 100644 --- a/python/fate_flow/controller/task_controller.py +++ b/python/fate_flow/controller/task_controller.py @@ -14,6 +14,8 @@ # limitations under the License. # import os +import sys + from fate_arch.common import FederatedCommunicationType from fate_flow.utils.job_utils import asynchronous_function from fate_flow.utils.log_utils import schedule_logger @@ -21,7 +23,7 @@ from fate_flow.db.db_models import Task from fate_flow.scheduler.federated_scheduler import FederatedScheduler from fate_flow.entity.run_status import TaskStatus, EndStatus -from fate_flow.utils import job_utils +from fate_flow.utils import job_utils, process_utils from fate_flow.operation.job_saver import JobSaver from fate_arch.common.base_utils import json_dumps, current_timestamp from fate_arch.common import base_utils @@ -29,7 +31,8 @@ from fate_flow.manager.resource_manager import ResourceManager from fate_flow.operation.job_tracker import Tracker from fate_flow.manager.worker_manager import WorkerManager -from fate_flow.entity.types import TaskCleanResourceType +from fate_flow.entity.types import TaskCleanResourceType, TaskLauncher +from fate_flow.worker.download_model import DownloadModel class TaskController(object): @@ -47,7 +50,9 @@ def create_task(cls, role, party_id, run_on_this_party, task_info): task_info["task_id"] = job_utils.generate_task_id(job_id=task_info["job_id"], component_name=task_info["component_name"]) if task_info.get("task_version") is None: task_info["task_version"] = 0 - + run_parameters_dict = job_utils.get_job_parameters(task_info.get("job_id"), role, party_id) + run_parameters = RunParameters(**run_parameters_dict) + task_info.update({"is_deepspeed": cls.is_deepspeed(run_parameters, role, party_id, task_info["component_name"])}) task = JobSaver.create_task(task_info=task_info) @classmethod @@ -62,7 +67,6 @@ def start_task(cls, job_id, component_name, task_id, task_version, role, party_i :param party_id: :return: """ - job_dsl = job_utils.get_job_dsl(job_id, role, party_id) schedule_logger(job_id).info( f"try to start task {task_id} {task_version} on {role} {party_id} executor subprocess") task_executor_process_start_status = False @@ -89,7 +93,9 @@ def start_task(cls, job_id, component_name, task_id, task_version, role, party_i schedule_logger(job_id).info(f"use computing engine {run_parameters.computing_engine}") task_info["engine_conf"] = {"computing_engine": run_parameters.computing_engine} - backend_engine = build_engine(run_parameters.computing_engine) + backend_engine = build_engine( + run_parameters.computing_engine, + task.f_is_deepspeed) run_info = backend_engine.run(task=task, run_parameters=run_parameters, run_parameters_path=run_parameters_path, @@ -135,6 +141,10 @@ def update_task(cls, task_info): @classmethod def update_task_status(cls, task_info): update_status = JobSaver.update_task_status(task_info=task_info) + task = JobSaver.query_task(task_id=task_info["task_id"], + task_version=task_info["task_version"], + role=task_info["role"], + party_id=task_info["party_id"])[0] if update_status and EndStatus.contains(task_info.get("status")): ResourceManager.return_task_resource(task_info=task_info) cls.clean_task(job_id=task_info["job_id"], @@ -144,19 +154,22 @@ def update_task_status(cls, task_info): party_id=task_info["party_id"], content_type=TaskCleanResourceType.TABLE, is_asynchronous=True) - cls.report_task_to_initiator(task_info=task_info) + cls.report_task_to_initiator(task_info=task_info, task=task) + cls.callback_task_output(task, task_info.get("status")) return update_status @classmethod - def report_task_to_initiator(cls, task_info): - tasks = JobSaver.query_task(task_id=task_info["task_id"], - task_version=task_info["task_version"], - role=task_info["role"], - party_id=task_info["party_id"]) + def report_task_to_initiator(cls, task_info, task=None): + if not task: + tasks = JobSaver.query_task(task_id=task_info["task_id"], + task_version=task_info["task_version"], + role=task_info["role"], + party_id=task_info["party_id"]) + task = tasks[0] if task_info.get("error_report"): - tasks[0].f_error_report = task_info.get("error_report") - if tasks[0].f_federated_status_collect_type == FederatedCommunicationType.PUSH: - FederatedScheduler.report_task_to_initiator(task=tasks[0]) + task.f_error_report = task_info.get("error_report") + if task.f_federated_status_collect_type == FederatedCommunicationType.PUSH: + FederatedScheduler.report_task_to_initiator(task=task) @classmethod def collect_task(cls, job_id, component_name, task_id, task_version, role, party_id): @@ -194,7 +207,10 @@ def kill_task(cls, task: Task): kill_status = False try: # kill task executor - backend_engine = build_engine(task.f_engine_conf.get("computing_engine")) + backend_engine = build_engine( + task.f_engine_conf.get("computing_engine"), + task.f_is_deepspeed + ) if backend_engine: backend_engine.kill(task) WorkerManager.kill_task_all_workers(task) @@ -231,3 +247,35 @@ def clean_task(cls, job_id, task_id, task_version, role, party_id, content_type: else: return False + @staticmethod + def is_deepspeed(run_parameters, role, party_id, component_name): + task_conf = run_parameters.role_parameter("task_conf", role=role, party_id=party_id) + if task_conf.get(component_name, {}).get("launcher") == TaskLauncher.DEEPSPEED.value and role != "arbiter": + return True + else: + return False + + @staticmethod + def callback_task_output(task, status): + if EndStatus.contains(status): + if task.f_is_deepspeed: + deepspeed_engine = build_engine(task.f_engine_conf.get("computing_engine"), task.f_is_deepspeed) + deepspeed_engine.download_log(task) + + # run subprocess to download model + conf_dir = job_utils.get_job_directory(job_id=task.f_job_id) + os.makedirs(conf_dir, exist_ok=True) + process_cmd = [ + sys.executable or 'python3', + sys.modules[DownloadModel.__module__].__file__, + '--job_id', task.f_job_id, + '--role', task.f_role, + '--party_id', task.f_party_id, + '--task_id', task.f_task_id, + '--task_version', task.f_task_version, + '--computing_engine', task.f_engine_conf.get("computing_engine") + ] + process_name = "model_download" + log_dir = job_utils.get_job_log_directory(job_id=task.f_job_id) + process_utils.run_subprocess(job_id=task.f_job_id, config_dir=conf_dir, process_cmd=process_cmd, + log_dir=log_dir, process_name=process_name) diff --git a/python/fate_flow/db/db_models.py b/python/fate_flow/db/db_models.py index 9c2c3f62b..d2ccbb4bb 100644 --- a/python/fate_flow/db/db_models.py +++ b/python/fate_flow/db/db_models.py @@ -249,6 +249,9 @@ class Task(DataBaseModel): f_kill_status = BooleanField(default=False) f_error_report = TextField(default="") + f_is_deepspeed = BooleanField(default=False) + f_deepspeed_id = CharField(max_length=200, null=True) + f_start_time = BigIntegerField(null=True) f_start_date = DateTimeField(null=True) f_end_time = BigIntegerField(null=True) diff --git a/python/fate_flow/db/job_default_config.py b/python/fate_flow/db/job_default_config.py index e1fe1c5d6..6a98a9b5a 100644 --- a/python/fate_flow/db/job_default_config.py +++ b/python/fate_flow/db/job_default_config.py @@ -48,6 +48,10 @@ class JobDefaultConfig(ReloadConfigBase): # component output output_data_summary_count_limit = None + task_world_size = None + resource_waiting_timeout = None + task_process_classpath = None + @classmethod def load(cls): conf = file_utils.load_yaml_conf(FATE_FLOW_JOB_DEFAULT_CONFIG_PATH) diff --git a/python/fate_flow/deepspeed_client.py b/python/fate_flow/deepspeed_client.py new file mode 100644 index 000000000..9c7f7daa0 --- /dev/null +++ b/python/fate_flow/deepspeed_client.py @@ -0,0 +1,84 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import argparse + +from fate_flow.controller.engine_adapt import build_engine +from fate_flow.operation.job_saver import JobSaver + + +FUNC = ["query_status", "download_log", "download_model"] + + +def call_fun(func, args): + job_id = args.job_id + role = args.role + party_id = args.party_id + component_name = args.component_name + output_path = args.output_path + engine, task = load_engine(job_id, role, party_id, component_name) + if func == "query_status": + query_status(engine, task) + elif func == "download_log": + download_log(engine, task, output_path) + + elif func == "download_model": + download_model(engine, task, output_path) + + +def load_engine(job_id, role, party_id, component_name): + tasks = JobSaver.query_task(job_id=job_id, role=role, party_id=party_id, component_name=component_name, run_on_this_party=True) + if tasks: + task = tasks[0] + if task.f_is_deepspeed: + deepspeed_engine = build_engine(task.f_engine_conf.get("computing_engine"), task.f_is_deepspeed) + return deepspeed_engine, task + else: + raise Exception(f"Not is a deepspeed task: job_id[{job_id}], role[{role}], party_id[{party_id}], component_name[{component_name}]") + else: + raise Exception(f"no found task: job_id[{job_id}], role[{role}], party_id[{party_id}], component_name[{component_name}]") + + +def query_status(engine, task): + status = engine._query_status(task) + print(status) + + +def download_log(engine, task, output_path): + engine.download_log(task, path=output_path) + print(output_path) + + +def download_model(engine, task, output_path): + engine.download_model(task, path=output_path) + print(output_path) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('-f', '--function', type=str, + choices=FUNC, + required=True, + help="function to call") + parser.add_argument('-j', '--job_id', required=True, type=str, help="job id") + parser.add_argument('-r', '--role', required=True, type=str, help="role") + parser.add_argument('-p', '--party_id', required=True, type=str, help="party id") + parser.add_argument('-cpn', '--component_name', required=True, type=str, help="component name") + parser.add_argument('-o', '--output_path', required=False, type=str, help="output_path") + args = parser.parse_args() + config_data = {} + config_data.update(dict((k, v) for k, v in vars(args).items() if v is not None)) + + call_fun(args.function, args) diff --git a/python/fate_flow/detection/detector.py b/python/fate_flow/detection/detector.py index ac34ae88a..a65ce3702 100644 --- a/python/fate_flow/detection/detector.py +++ b/python/fate_flow/detection/detector.py @@ -49,6 +49,7 @@ def run_do(self): self.detect_resource_record() self.detect_expired_session() self.detect_dependence_upload_record() + self.detect_deepspeed_task() @classmethod def detect_running_task(cls): @@ -94,6 +95,36 @@ def detect_running_task(cls): finally: detect_logger().info(f"finish detect {count} running task") + @classmethod + def detect_deepspeed_task(cls): + detect_logger().info('start to detect deepspeed running task..') + running_tasks = JobSaver.query_task(party_status=TaskStatus.RUNNING, is_deepspeed=True) + for task in running_tasks: + cls.detect_deepspeed_task_status(task) + detect_logger().info(f'finish detect deepspeed running task {running_tasks}') + + @staticmethod + def detect_deepspeed_task_status(task): + try: + deepspeed_engine = build_engine(task.f_engine_conf.get("computing_engine"), task.f_is_deepspeed) + # query or update + if not deepspeed_engine.is_alive(task): + # update task status to end status + status = deepspeed_engine.query_task_status(task) + detect_logger(task.f_job_id).info(f"task status status: {status}") + task_info = { + "job_id": task.f_job_id, + "task_id": task.f_task_id, + "task_version": task.f_task_version, + "role": task.f_role, + "party_id": task.f_party_id, + "party_status": status + } + TaskController.update_task_status(task_info) + deepspeed_engine.download_log(task) + except Exception as e: + detect_logger(task.f_job_id).exception(e) + @classmethod def detect_end_task(cls): detect_logger().info('start to detect end status task..') diff --git a/python/fate_flow/entity/_run_parameters.py b/python/fate_flow/entity/_run_parameters.py index 2d3ac74d1..a3bcfe7a9 100644 --- a/python/fate_flow/entity/_run_parameters.py +++ b/python/fate_flow/entity/_run_parameters.py @@ -13,7 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from copy import deepcopy + from ._base import BaseEntity +from ..utils.runtime_conf_parse_util import RuntimeConfParserUtil class RunParameters(BaseEntity): @@ -45,10 +48,31 @@ def __init__(self, **kwargs): self.assistant_role = None self.map_table_name = None self.map_namespace = None + self.task_conf = {} + self.roles = {} + self.role_parameters = {} for k, v in kwargs.items(): if hasattr(self, k): setattr(self, k, v) + def role_parameter(self, parameter_name, role, party_id): + if role == "local" or int(party_id) == 0: + return {} + if not self.roles: + # compatible with previous versions + return getattr(self, parameter_name) + index = [str(_p) for _p in self.roles.get(role)].index(str(party_id)) + _conf = deepcopy(getattr(self, parameter_name)) + _role = self.role_parameters.get(role, {}).get(str(index), {}).get(parameter_name, {}) + if isinstance(_conf, dict): + # dict + _conf = RuntimeConfParserUtil.merge_dict(_conf, _role) + else: + # int, str, etc. + _conf = _role if _role else _conf + return _conf + + def to_dict(self): d = {} for k, v in self.__dict__.items(): diff --git a/python/fate_flow/entity/types.py b/python/fate_flow/entity/types.py index 20fc6f5ab..7797062c5 100644 --- a/python/fate_flow/entity/types.py +++ b/python/fate_flow/entity/types.py @@ -148,3 +148,8 @@ class TaskCleanResourceType(CustomEnum): class ExternalStorage(CustomEnum): MYSQL = "MYSQL" + + +class TaskLauncher(CustomEnum): + DEFAULT = "default" + DEEPSPEED = "deepspeed" diff --git a/python/fate_flow/manager/resource_manager.py b/python/fate_flow/manager/resource_manager.py index 8ec261ed2..f6a5e65fe 100644 --- a/python/fate_flow/manager/resource_manager.py +++ b/python/fate_flow/manager/resource_manager.py @@ -288,6 +288,7 @@ def return_task_resource(cls, task_info): return ResourceManager.resource_for_task(task_info=task_info, operation_type=ResourceOperation.RETURN) @classmethod + @DB.connection_context() def resource_for_task(cls, task_info, operation_type): cores_per_task, memory_per_task = cls.calculate_task_resource(task_info=task_info) schedule_logger(task_info["job_id"]).info(f"cores_per_task:{cores_per_task}, memory_per_task:{memory_per_task}") diff --git a/python/fate_flow/manager/worker_manager.py b/python/fate_flow/manager/worker_manager.py index f22221607..ab06b0fa9 100644 --- a/python/fate_flow/manager/worker_manager.py +++ b/python/fate_flow/manager/worker_manager.py @@ -154,12 +154,6 @@ def start_task_worker(cls, worker_name, task: Task, task_parameters: RunParamete role=task.f_role, party_id=task.f_party_id, task=task) - - session_id = job_utils.generate_session_id(task.f_task_id, task.f_task_version, task.f_role, task.f_party_id) - federation_session_id = job_utils.generate_task_version_id(task.f_task_id, task.f_task_version) - - info_kwargs = {} - specific_cmd = [] if worker_name is WorkerName.TASK_EXECUTOR: from fate_flow.worker.task_executor import TaskExecutor module_file_path = sys.modules[TaskExecutor.__module__].__file__ @@ -171,15 +165,30 @@ def start_task_worker(cls, worker_name, task: Task, task_parameters: RunParamete config = task_parameters.to_dict() config["src_user"] = kwargs.get("src_user") - config_path, result_path = cls.get_config(config_dir=config_dir, config=config, log_dir=log_dir) env = cls.get_env(task.f_job_id, task.f_provider_info) if executable: process_cmd = executable else: process_cmd = [env.get("PYTHON_ENV") or sys.executable or "python3"] + common_cmd = [module_file_path] + common_cmd.extend(cls.generate_common_cmd(task, config_dir, config, log_dir, worker_id)[2]) + process_cmd.extend(common_cmd) + if extra_env: + env.update(extra_env) + schedule_logger(task.f_job_id).info( + f"task {task.f_task_id} {task.f_task_version} on {task.f_role} {task.f_party_id} {worker_name} worker subprocess is ready") + p = process_utils.run_subprocess(job_id=task.f_job_id, config_dir=config_dir, process_cmd=process_cmd, + added_env=env, log_dir=log_dir, cwd_dir=config_dir, process_name=worker_name.value, + process_id=worker_id) + cls.save_worker_info(task=task, worker_name=worker_name, worker_id=worker_id, run_ip=RuntimeConfig.JOB_SERVER_HOST, run_pid=p.pid, config=config, cmd=process_cmd) + return {"run_pid": p.pid, "worker_id": worker_id, "cmd": process_cmd} - common_cmd = [ - module_file_path, + @classmethod + def generate_common_cmd(cls, task, config_dir, config, log_dir, worker_id): + session_id = job_utils.generate_session_id(task.f_task_id, task.f_task_version, task.f_role, task.f_party_id) + federation_session_id = job_utils.generate_task_version_id(task.f_task_id, task.f_task_version) + config_path, result_path = cls.get_config(config_dir=config_dir, config=config, log_dir=log_dir) + cmd = [ "--job_id", task.f_job_id, "--component_name", task.f_component_name, "--task_id", task.f_task_id, @@ -197,17 +206,7 @@ def start_task_worker(cls, worker_name, task: Task, task_parameters: RunParamete "--session_id", session_id, "--federation_session_id", federation_session_id ] - process_cmd.extend(common_cmd) - process_cmd.extend(specific_cmd) - if extra_env: - env.update(extra_env) - schedule_logger(task.f_job_id).info( - f"task {task.f_task_id} {task.f_task_version} on {task.f_role} {task.f_party_id} {worker_name} worker subprocess is ready") - p = process_utils.run_subprocess(job_id=task.f_job_id, config_dir=config_dir, process_cmd=process_cmd, - added_env=env, log_dir=log_dir, cwd_dir=config_dir, process_name=worker_name.value, - process_id=worker_id) - cls.save_worker_info(task=task, worker_name=worker_name, worker_id=worker_id, run_ip=RuntimeConfig.JOB_SERVER_HOST, run_pid=p.pid, config=config, cmd=process_cmd, **info_kwargs) - return {"run_pid": p.pid, "worker_id": worker_id, "cmd": process_cmd} + return session_id, federation_session_id, cmd @classmethod def get_process_dirs(cls, worker_name: WorkerName, job_id=None, role=None, party_id=None, task: Task = None): diff --git a/python/fate_flow/scheduler/dag_scheduler.py b/python/fate_flow/scheduler/dag_scheduler.py index 3bb796e84..2947d3713 100644 --- a/python/fate_flow/scheduler/dag_scheduler.py +++ b/python/fate_flow/scheduler/dag_scheduler.py @@ -56,6 +56,8 @@ def submit(cls, submit_job_conf: JobConfigurationBase, job_id: str = None): job_initiator = runtime_conf["initiator"] conf_adapter = JobRuntimeConfigAdapter(runtime_conf) common_job_parameters = conf_adapter.get_common_parameters() + common_job_parameters.roles = runtime_conf["role"] + common_job_parameters.role_parameters = runtime_conf.get("job_parameters", {}).get("role", {}) if common_job_parameters.job_type != "predict": # generate job model info diff --git a/python/fate_flow/scheduler/dsl_parser.py b/python/fate_flow/scheduler/dsl_parser.py index 8385196c4..a4d404a5c 100644 --- a/python/fate_flow/scheduler/dsl_parser.py +++ b/python/fate_flow/scheduler/dsl_parser.py @@ -968,10 +968,9 @@ def __init__(self): self.version = 1 @staticmethod - def get_job_parameters(runtime_conf): + def get_job_parameters(runtime_conf, conf_version=1): job_parameters = RuntimeConfParserUtil.get_job_parameters(runtime_conf, - conf_version=1) - + conf_version) return job_parameters @staticmethod diff --git a/python/fate_flow/settings.py b/python/fate_flow/settings.py index 11d0343e2..658d9d00d 100644 --- a/python/fate_flow/settings.py +++ b/python/fate_flow/settings.py @@ -162,3 +162,6 @@ HOOK_SERVER_NAME = get_base_config("hook_server_name") ENABLE_MODEL_STORE = get_base_config('enable_model_store', False) + +REMOTE_LOAD_CONF = True +EXTRA_MODEL_DIR = get_fate_flow_directory('model') diff --git a/python/fate_flow/utils/deepspeed_utils.py b/python/fate_flow/utils/deepspeed_utils.py new file mode 100644 index 000000000..093ad04fd --- /dev/null +++ b/python/fate_flow/utils/deepspeed_utils.py @@ -0,0 +1,57 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from fate_flow.db.runtime_config import RuntimeConfig +from fate_flow.entity.run_status import TaskStatus +from fate_flow.scheduling_apps.client import ControllerClient +from fate_flow.utils.log_utils import schedule_logger +from fate_flow.worker.base_worker import BaseWorker + + +class Submit(BaseWorker): + def _run(self): + try: + from eggroll.deepspeed.submit import client + client = client.DeepspeedJob(session_id=self.args.session_id) + config = self.args.config + schedule_logger(self.args.job_id).info(f"start submit deepspeed task {self.args.session_id}") + schedule_logger(self.args.job_id).info(f"submit config {config}") + client.submit( + world_size=config.get("world_size"), + command_arguments=config.get("command_arguments"), + environment_variables=config.get("environment_variables"), + files=config.get("files"), + resource_options=config.get("resource_options"), + options=config.get("options") + ) + schedule_logger(self.args.job_id).info(f"submit deepspeed task success") + except Exception as e: + task_info = { + "job_id": self.args.job_id, + "role": self.args.role, + "party_id": self.args.party_id, + "task_id": self.args.task_id, + "task_version": self.args.task_version, + "component_name": self.args.component_name, + "party_status": TaskStatus.FAILED, + } + + RuntimeConfig.init_config(JOB_SERVER_HOST=self.args.job_server.split(':')[0], HTTP_PORT=self.args.job_server.split(':')[1]) + ControllerClient.report_task(task_info) + schedule_logger(self.args.job_id).exception(e) + + +if __name__ == "__main__": + Submit().run() diff --git a/python/fate_flow/utils/process_utils.py b/python/fate_flow/utils/process_utils.py index c05b8ed25..6d6d0b546 100644 --- a/python/fate_flow/utils/process_utils.py +++ b/python/fate_flow/utils/process_utils.py @@ -17,6 +17,8 @@ import os import subprocess import psutil + +from fate_flow.db.job_default_config import JobDefaultConfig from fate_flow.utils.log_utils import schedule_logger from fate_flow.db.db_models import Task from fate_flow.entity.types import KillProcessRetCode, ProcessRole @@ -52,7 +54,9 @@ def run_subprocess(job_id, config_dir, process_cmd, added_env: dict = None, log_ if name.endswith("PATH") and subprocess_env.get(name) is not None: value += ':' + subprocess_env[name] subprocess_env[name] = value - subprocess_env.pop("CLASSPATH", None) + + if not JobDefaultConfig.task_process_classpath: + subprocess_env.pop("CLASSPATH", None) p = subprocess.Popen(process_cmd, stdout=std, diff --git a/python/fate_flow/utils/runtime_conf_parse_util.py b/python/fate_flow/utils/runtime_conf_parse_util.py index cc0242b9b..fb4bf2b1b 100644 --- a/python/fate_flow/utils/runtime_conf_parse_util.py +++ b/python/fate_flow/utils/runtime_conf_parse_util.py @@ -539,8 +539,9 @@ def get_input_parameters(cls, submit_dict, components=None): def get_job_parameters(submit_dict): ret = {} job_parameters = submit_dict.get("job_parameters", {}) + component_parameters = submit_dict.get("component_parameters", {}) common_job_parameters = job_parameters.get("common", {}) - role_job_parameters = job_parameters.get("role", {}) + role_job_parameters = component_parameters.get("role", {}) for role in submit_dict["role"]: party_id_list = submit_dict["role"][role] if not role_job_parameters: diff --git a/python/fate_flow/worker/base_worker.py b/python/fate_flow/worker/base_worker.py index 8d3f9a7ae..9949f3729 100644 --- a/python/fate_flow/worker/base_worker.py +++ b/python/fate_flow/worker/base_worker.py @@ -19,8 +19,11 @@ import traceback import logging +import requests + from fate_arch.common.base_utils import current_timestamp from fate_arch.common.file_utils import load_json_conf, dump_json_conf +from fate_flow.settings import REMOTE_LOAD_CONF from fate_flow.utils.log_utils import getLogger, LoggerFactory, exception_to_trace_string from fate_flow.db.component_registry import ComponentRegistry from fate_flow.db.config_manager import ConfigManager @@ -70,9 +73,26 @@ def __init__(self, **kwargs): # Dependence Upload self.dependence_type = kwargs.get("dependence_type") + self.is_deepspeed = kwargs.get("is_deepspeed") + self.model_path = kwargs.get("model_path") + self.computing_engine = kwargs.get("computing_engine") + @staticmethod def load_dict_attr(kwargs: dict, attr_name: str): - return load_json_conf(kwargs[attr_name]) if kwargs.get(attr_name) else {} + if kwargs.get(attr_name): + if REMOTE_LOAD_CONF and kwargs.get("is_deepspeed"): + url = f'http://{kwargs.get("job_server")}/v1/worker/config/load' + try: + _r = requests.post(url, json={"config_path": kwargs[attr_name]}).json() + config = _r.get("data") if _r.get("data") else {} + except Exception as e: + LOGGER.exception(e) + config = {} + else: + config = load_json_conf(kwargs[attr_name]) + else: + config = {} + return config class BaseWorker: @@ -89,12 +109,23 @@ def run(self, **kwargs): self.run_pid = os.getpid() try: self.args = self.get_args(**kwargs) + if self.args.model_path: + os.environ["MODEL_PATH"] = self.args.model_path RuntimeConfig.init_env() - RuntimeConfig.set_process_role(ProcessRole(os.getenv("PROCESS_ROLE"))) + role = ProcessRole(os.getenv("PROCESS_ROLE")) + append_to_parent_log = True + if self.args.is_deepspeed: + role = ProcessRole(ProcessRole.WORKER.value) + append_to_parent_log = False + RuntimeConfig.set_process_role(role) if RuntimeConfig.PROCESS_ROLE == ProcessRole.WORKER: LoggerFactory.LEVEL = logging.getLevelName(os.getenv("FATE_LOG_LEVEL", "INFO")) + if os.getenv("EGGROLL_CONTAINER_LOGS_DIR"): + # eggroll deepspeed + self.args.parent_log_dir = os.path.dirname(os.getenv("EGGROLL_CONTAINER_LOGS_DIR")) + self.args.log_dir = os.getenv("EGGROLL_CONTAINER_LOGS_DIR") LoggerFactory.set_directory(directory=self.args.log_dir, parent_log_dir=self.args.parent_log_dir, - append_to_parent_log=True, force=True) + append_to_parent_log=append_to_parent_log, force=True) LOGGER.info(f"enter {self.__class__.__name__} worker in subprocess, pid: {self.run_pid}") else: LOGGER.info(f"enter {self.__class__.__name__} worker in driver process, pid: {self.run_pid}") @@ -120,11 +151,14 @@ def run(self, **kwargs): message = exception_to_trace_string(e) finally: if self.args and self.args.result: - dump_json_conf(result, self.args.result) + if not self.args.is_deepspeed: + dump_json_conf(result, self.args.result) end_time = current_timestamp() LOGGER.info(f"worker {self.__class__.__name__}, process role: {RuntimeConfig.PROCESS_ROLE}, pid: {self.run_pid}, elapsed: {end_time - start_time} ms") if RuntimeConfig.PROCESS_ROLE == ProcessRole.WORKER: sys.exit(code) + if self.args and self.args.is_deepspeed: + sys.exit(code) else: return code, message, result diff --git a/python/fate_flow/worker/download_model.py b/python/fate_flow/worker/download_model.py new file mode 100644 index 000000000..9c0eb5bd6 --- /dev/null +++ b/python/fate_flow/worker/download_model.py @@ -0,0 +1,39 @@ +# +# Copyright 2019 The FATE Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from fate_flow.controller.engine_adapt import build_engine +from fate_flow.operation.job_saver import JobSaver +from fate_flow.utils.log_utils import schedule_logger +from fate_flow.worker.base_worker import BaseWorker + + +class DownloadModel(BaseWorker): + def _run(self): + deepspeed_engine = build_engine(self.args.computing_engine, True) + tasks = JobSaver.query_task( + task_id=self.args.task_id, + task_version=self.args.task_version, + job_id=self.args.job_id, + role=self.args.role, + party_id=self.args.party_id + ) + task = tasks[0] + schedule_logger(self.args.job_id).info("start download model") + deepspeed_engine.download_model(task) + schedule_logger(self.args.job_id).info("download model success") + + +if __name__ == '__main__': + DownloadModel().run() diff --git a/python/fate_flow/worker/task_base_worker.py b/python/fate_flow/worker/task_base_worker.py index 5a264aef3..b5042007b 100644 --- a/python/fate_flow/worker/task_base_worker.py +++ b/python/fate_flow/worker/task_base_worker.py @@ -110,12 +110,13 @@ def _handle_exception(self): self.report_task_info_to_driver() def report_task_info_to_driver(self): - LOGGER.info("report {} {} {} {} {} to driver:\n{}".format( - self.__class__.__name__, - self.report_info["task_id"], - self.report_info["task_version"], - self.report_info["role"], - self.report_info["party_id"], - self.report_info - )) - ControllerClient.report_task(self.report_info) + if not self.args.is_deepspeed: + LOGGER.info("report {} {} {} {} {} to driver:\n{}".format( + self.__class__.__name__, + self.report_info["task_id"], + self.report_info["task_version"], + self.report_info["role"], + self.report_info["party_id"], + self.report_info + )) + ControllerClient.report_task(self.report_info) diff --git a/python/fate_flow/worker/task_executor.py b/python/fate_flow/worker/task_executor.py index c2e4f01f7..13e126f57 100644 --- a/python/fate_flow/worker/task_executor.py +++ b/python/fate_flow/worker/task_executor.py @@ -77,7 +77,7 @@ def _run_(self): train_runtime_conf=job_configuration.train_runtime_conf, pipeline_dsl=None) - job_parameters = dsl_parser.get_job_parameters(job_configuration.runtime_conf) + job_parameters = dsl_parser.get_job_parameters(job_configuration.runtime_conf, int(job_configuration.runtime_conf.get("dsl_version", "1"))) user_name = job_parameters.get(args.role, {}).get(args.party_id, {}).get("user", '') LOGGER.info(f"user name:{user_name}") task_parameters = RunParameters(**task_parameters_conf) @@ -151,9 +151,10 @@ def _run_(self): if set(roles) == {"local"}: LOGGER.info(f"only local roles, pass init federation") else: - sess.init_federation(federation_session_id=args.federation_session_id, - runtime_conf=component_parameters_on_party, - service_conf=job_parameters.engines_address.get(EngineType.FEDERATION, {})) + if self.is_master: + sess.init_federation(federation_session_id=args.federation_session_id, + runtime_conf=component_parameters_on_party, + service_conf=job_parameters.engines_address.get(EngineType.FEDERATION, {})) LOGGER.info(f'run {args.component_name} {args.task_id} {args.task_version} on {args.role} {args.party_id} task') LOGGER.info(f"component parameters on party:\n{json_dumps(component_parameters_on_party, indent=4)}") LOGGER.info(f"task input dsl {task_input_dsl}") @@ -227,7 +228,7 @@ def _run_(self): output_table_list.append({"namespace": persistent_table_namespace, "name": persistent_table_name}) self.log_output_data_table_tracker(args.job_id, input_table_list, output_table_list) - if cpn_output.model: + if cpn_output.model and self.is_master: getattr( tracker_client if predict_tracker_client is None else predict_tracker_client, 'save_component_output_model', @@ -267,8 +268,10 @@ def _run_(self): LOGGER.info("start destroy sessions") sess.destroy_all_sessions() LOGGER.info("destroy all sessions success") - except Exception as e: - LOGGER.exception(e) + except Exception as _e: + LOGGER.exception(_e) + if self.args.is_deepspeed: + raise RuntimeError(e) finally: try: self.report_info["end_time"] = current_timestamp() @@ -283,6 +286,13 @@ def _run_(self): print(msg) return self.report_info + @property + def is_master(self): + # deepspeed rank 0 + if not os.getenv("RANK"): + return True + return int(os.getenv("RANK")) == 0 + @classmethod def log_output_data_table_tracker(cls, job_id, input_table_list, output_table_list): try: