diff --git a/README.md b/README.md index dd7e92d..57cfbb6 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ PAI Python SDK是阿里云 [机器学习平台 PAI(Platform for Artificial Intel ## 🔧 安装 -使用以下命令安装PAI Python SDK(支持Python版本 \>= 3.6,建议使用Python版本 \>= 3.8): +使用以下命令安装PAI Python SDK(支持Python版本 \>= 3.8): ```shell python -m pip install alipai diff --git a/README_EN.md b/README_EN.md index c5df0e8..851b9f0 100644 --- a/README_EN.md +++ b/README_EN.md @@ -7,7 +7,7 @@ The PAI Python SDK is provided by Alibaba Cloud\'s [Platform for Artificial Inte ## Installation 🔧 -Install the PAI Python SDK using the following command, which supports Python versions \>= 3.6 (it is recommended to use Python \>= 3.8): +Install the PAI Python SDK using the following command, which supports Python versions \>= 3.8 : ```shell python -m pip install alipai diff --git a/docs/source/quick-tour/installation.rst b/docs/source/quick-tour/installation.rst index b48464e..181353f 100644 --- a/docs/source/quick-tour/installation.rst +++ b/docs/source/quick-tour/installation.rst @@ -5,7 +5,7 @@ 安装 ------ -请通过以下命令安装PAI Python SDK(请使用Python>=3.6)。 +请通过以下命令安装PAI Python SDK(请使用Python>=3.8)。 .. parsed-literal:: diff --git a/noxfile.py b/noxfile.py index 686429b..f74aca9 100644 --- a/noxfile.py +++ b/noxfile.py @@ -25,7 +25,7 @@ "PYTHONWARNINGS": "ignore", } -UNIT_TEST_PYTHON_VERSIONS = ["3.6", "3.7", "3.8"] +UNIT_TEST_PYTHON_VERSIONS = ["3.8", "3.9", "3.10"] INTEGRATION_TEST_PYTHON_VERSIONS = ["3.8"] TEST_VENV_BACKEND = os.environ.get("PAI_TEST_VENV_BACKEND", "conda") @@ -54,9 +54,9 @@ def integration(session: Session): pos_args = session.posargs + ["-n", str(os.cpu_count() * 2)] else: pos_args = session.posargs - session.run( "pytest", + "-vv", "--cov-config=.coveragerc", "--cov-append", "--cov-report=html", @@ -77,8 +77,10 @@ def integration(session: Session): def unit(session: Session): """Run unit test.""" install_test_dependencies(session=session) + # run test cases session.run( "pytest", + "-vv", "--cov-config=.coveragerc", "--cov-append", "--cov-report=html", @@ -159,6 +161,7 @@ def notebook(session: Session): session.run( "pytest", + "-vv", "--timeout", "3000", "--nbmake", diff --git a/pai/api/entity_base.py b/pai/api/entity_base.py deleted file mode 100644 index 044ae9a..0000000 --- a/pai/api/entity_base.py +++ /dev/null @@ -1,70 +0,0 @@ -# Copyright 2023 Alibaba, Inc. or its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Any, Dict, Optional, Type - -from ..schema.base import BaseAPIResourceSchema -from ..session import Session, get_default_session - - -class EntityBaseMixin(object): - - _schema_cls: Type[BaseAPIResourceSchema] - resource_type: str - - def __init__(self, session: Optional[Session] = None, **kwargs) -> None: - super(EntityBaseMixin, self).__init__() - self._session = session or get_default_session() - - @property - def session(self) -> Session: - return self._session - - @classmethod - def from_api_object(cls, obj_dict: Dict[str, Any], session: Session = None): - """Construct an entity representing the API resource from response. - - Args: - session: Session for the instance. - obj_dict: Response in json - - Returns: - An entity representing the resource. - """ - session = session or get_default_session() - return cls._schema_cls(session=session).load(obj_dict) - - def to_api_object(self) -> Dict[str, Any]: - """Convert the current instance to a dictionary representing an API object. - - Returns: - dict: a dictionary representing the API object. - """ - return self._schema_cls().dump(self) - - def patch_from_api_object(self, api_obj: Dict[str, Any]): - if not api_obj: - raise ValueError("REST API object should not be empty.") - - return self._schema_cls(instance=self).load(api_obj) - - def __repr__(self): - return "{}:{}".format(type(self).__name__, self.id) - - def __str__(self) -> str: - return self.__repr__() - - @property - def resource_api(self): - return self._session.get_api_by_resource(self.resource_type) diff --git a/pai/api/training_job.py b/pai/api/training_job.py index 25a2b5d..f8648fa 100644 --- a/pai/api/training_job.py +++ b/pai/api/training_job.py @@ -141,6 +141,7 @@ def create( else: raise ValueError("Please provide instance_type or instance_spec.") + hyperparameters = hyperparameters or dict() hyper_parameters = [ CreateTrainingJobRequestHyperParameters( name=name, diff --git a/pai/common/configs.py b/pai/common/configs.py deleted file mode 100644 index b7b952c..0000000 --- a/pai/common/configs.py +++ /dev/null @@ -1,55 +0,0 @@ -# Copyright 2024 Alibaba, Inc. or its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import List, Optional - - -class UserVpcConfig(object): - """UserVpcConfig is used to give training job access to resources in your VPC.""" - - def __init__( - self, - vpc_id: str, - security_group_id: str, - switch_id: Optional[str] = None, - extended_cidrs: List[str] = None, - ): - """Initialize UserVpcConfig. - - Args: - vpc_id (str): Specifies the ID of the VPC that training job instance - connects to. - security_group_id (str): The ID of the security group that training job - instances belong to. - switch_id (str, optional): The ID of the vSwitch to which the instance - belongs. Defaults to None. - extended_cidrs (List[str], optional): The CIDR blocks configured for the - ENI of the training job instance. If it is not specified, the CIDR block - will be configured as the same as the VPC network segmentation, which - means that the training job instance can access all resources in the - VPC. Defaults to None. - """ - - self.vpc_id = vpc_id - self.security_group_id = security_group_id - self.switch_id = switch_id - self.extended_cidrs = extended_cidrs - - def to_dict(self): - return { - "VpcId": self.vpc_id, - "SecurityGroupId": self.security_group_id, - "SwitchId": self.switch_id, - "ExtendedCIDRs": self.extended_cidrs, - } diff --git a/pai/common/utils.py b/pai/common/utils.py index 4ec74b2..a326c51 100644 --- a/pai/common/utils.py +++ b/pai/common/utils.py @@ -23,6 +23,7 @@ import sys import time import warnings +from datetime import datetime from functools import lru_cache from typing import Callable, Dict, List, Optional, Union @@ -127,12 +128,14 @@ def http_user_agent(user_agent: Optional[Union[Dict, str]] = None) -> str: def is_notebook() -> bool: """Return True if current environment is notebook.""" try: + from IPython import get_ipython + shell = get_ipython().__class__.__name__ for parent_cls in shell.__mro__: if parent_cls.__name__ == "ZMQInteractiveShell": return True return False - except NameError: + except (NameError, ImportError): return False @@ -322,3 +325,39 @@ def print_table(headers: List[str], rows: List[List[str]]): def is_package_available(package_name: str) -> bool: """Check if the package is available in the current environment.""" return True if importlib.util.find_spec(package_name) is not None else False + + +def timestamp(sep: str = "-", utc: bool = False) -> str: + """Return a timestamp with millisecond precision. + + Args: + sep: The separator between date and time. + utc: Whether to use UTC time. + + Returns: + str: A timestamp with millisecond precision. + + """ + if utc: + res = datetime.utcnow().strftime("%Y%m%d-%H%M%S-%f")[:-3] + else: + res = datetime.now().strftime("%Y%m%d-%H%M%S-%f")[:-3] + if sep != "-": + res = res.replace("-", sep) + return res + + +def name_from_base(base_name: str, sep: str = "-") -> str: + """Return a name with base_name and timestamp. + + Args: + base_name: The base name of the returned name. + sep: The separator between base_name and timestamp. + + Returns: + str: A name with base_name and timestamp. + + """ + return "{base_name}{sep}{timestamp}".format( + base_name=base_name, sep=sep, timestamp=timestamp(sep=sep, utc=False) + ) diff --git a/pai/estimator.py b/pai/estimator.py index 1b78e98..a3ee149 100644 --- a/pai/estimator.py +++ b/pai/estimator.py @@ -12,55 +12,38 @@ # See the License for the specific language governing permissions and # limitations under the License. -import distutils.dir_util -import json -import os -import posixpath -import re -import shlex -import shutil -import tempfile -import textwrap -import time import webbrowser from abc import ABCMeta, abstractmethod -from concurrent.futures import ThreadPoolExecutor from datetime import datetime from typing import Any, Dict, List, Optional, Union -from Tea.exceptions import TeaException - -from .api.base import PaginatedResult -from .api.entity_base import EntityBaseMixin -from .common import ProviderAlibabaPAI, git_utils -from .common.configs import UserVpcConfig -from .common.consts import INSTANCE_TYPE_LOCAL_GPU, FileSystemInputScheme, JobType -from .common.docker_utils import ContainerRun, run_container +from .common import git_utils +from .common.consts import FileSystemInputScheme, JobType from .common.logging import get_logger -from .common.oss_utils import OssUriObj, download, is_oss_uri, upload -from .common.utils import ( - is_filesystem_uri, - is_local_run_instance_type, - is_odps_table_uri, - make_list_resource_iterator, - random_str, - retry, - to_plain_text, +from .common.utils import is_local_run_instance_type, make_list_resource_iterator +from .job import ( + AlgorithmSpec, + Channel, + HyperParameterDefinition, + LocalTrainingJob, + TrainingJob, + UriOutput, + _TrainingJobSubmitter, +) +from .job._training_job import ( + DEFAULT_CHECKPOINT_CHANNEL_NAME, + DEFAULT_OUTPUT_MODEL_CHANNEL_NAME, + DEFAULT_TENSORBOARD_CHANNEL_NAME, + ExperimentConfig, + UserVpcConfig, ) -from .exception import UnexpectedStatusException -from .experiment import Experiment, ExperimentConfig from .model import InferenceSpec, Model, ResourceConfig from .predictor import Predictor -from .schema.training_job_schema import TrainingJobSchema from .serializers import SerializerBase from .session import Session, get_default_session logger = get_logger(__name__) -DEFAULT_OUTPUT_MODEL_CHANNEL_NAME = "model" -DEFAULT_CHECKPOINT_CHANNEL_NAME = "checkpoints" -DEFAULT_TENSORBOARD_CHANNEL_NAME = "tensorboard" - class HyperParameterType(object): """Hyperparameter type.""" @@ -184,7 +167,7 @@ def to_input_uri(self): ) -class EstimatorBase(metaclass=ABCMeta): +class EstimatorBase(_TrainingJobSubmitter, metaclass=ABCMeta): """EstimatorBase is the base class for other Estimator classes, such as Estimator. The EstimatorBase class contains common attributes and methods for all estimators, @@ -295,21 +278,22 @@ def __init__( """ self.hyperparameters = hyperparameters or dict() - self.environments = environments - self.requirements = requirements - self.instance_type = instance_type - self.instance_spec = instance_spec - self.resource_id = resource_id - self.instance_count = instance_count if instance_count else 1 - self.max_run_time = max_run_time - self.base_job_name = base_job_name - self.output_path = output_path - self.user_vpc_config = user_vpc_config - self.experiment_config = experiment_config self.checkpoints_path = checkpoints_path self.session = session or get_default_session() - self.labels = labels - self._latest_training_job = None + super().__init__( + base_job_name=base_job_name, + output_path=output_path, + experiment_config=experiment_config, + instance_type=instance_type, + instance_count=instance_count, + resource_id=resource_id, + instance_spec=instance_spec, + user_vpc_config=user_vpc_config, + max_run_time=max_run_time, + environments=environments, + requirements=requirements, + labels=labels, + ) def set_hyperparameters(self, **kwargs): """Set hyperparameters for the training job. @@ -319,11 +303,6 @@ def set_hyperparameters(self, **kwargs): """ self.hyperparameters.update(**kwargs) - @property - def latest_training_job(self): - """Return the latest submitted training job.""" - return self._latest_training_job - def _gen_job_display_name(self, job_name=None): """Generate job display name.""" if job_name: @@ -331,150 +310,12 @@ def _gen_job_display_name(self, job_name=None): ts = datetime.now().strftime("%Y%m%d_%H%M%S") return "{}_{}".format(self.base_job_name or "training_job", ts) - def _get_input_uri(self, item: str): - """Get input uri for training_job from given input.""" - if not isinstance(item, (str, FileSystemInputBase)): - raise ValueError(f"Input data of type {type(item)} is not supported.") - - if isinstance(item, FileSystemInputBase): - input_uri = item.to_input_uri() - elif is_oss_uri(item) or is_filesystem_uri(item) or is_odps_table_uri(item): - input_uri = item - elif os.path.exists(item): - store_path = self.session.get_storage_path_by_category("train_data") - input_uri = upload(item, store_path) - else: - raise ValueError( - "Invalid input data, supported inputs are OSS, NAS, MaxCompute " - "table or local path." - ) - - return input_uri - - def _build_input_data_configs( - self, - inputs: Dict[str, Any] = None, - input_channel_defs: Optional[List[Dict[str, str]]] = None, - ) -> List[Dict[str, str]]: - """Build the input data config for the training job.""" - res = [] - - if input_channel_defs: - remains = set(inputs.keys()) - for channel in input_channel_defs: - channel_name = channel["Name"] - channel_required = channel["Required"] - if channel_name in inputs: - input_uri = self._get_input_uri(inputs[channel_name]) - res.append({"Name": channel_name, "InputUri": input_uri}) - remains.remove(channel_name) - elif channel_required: - raise ValueError( - f"Input channel {channel_name} is required but not provided." - " Please check the input channels definition." - ) - if remains: - raise ValueError( - f"Following input channels={list(remains)} are not defined in input" - " channels definition. Please check the input channels definition." - ) - else: - for name, item in inputs.items(): - input_uri = self._get_input_uri(item) - res.append({"Name": name, "InputUri": input_uri}) - - return res - - def _generate_job_base_output_path(self, job_name: str) -> str: - """Generate the base output path for the training job.""" - bucket = self.session.oss_bucket - bucket_name = bucket.bucket_name - # replace non-alphanumeric character in training job name. - name = to_plain_text(job_name) - - if self.output_path: - return os.path.join(self.output_path, f"{name}_{random_str(6)}") - else: - job_output_path = self.session.get_storage_path_by_category( - "training_job", f"{name}_{random_str(6)}" - ) - return f"oss://{bucket_name}/{job_output_path}" - - @classmethod - def _get_default_output_channel_defs(cls): - channel_defs = [ - { - "Name": DEFAULT_OUTPUT_MODEL_CHANNEL_NAME, - }, - { - "Name": DEFAULT_CHECKPOINT_CHANNEL_NAME, - }, - { - "Name": DEFAULT_TENSORBOARD_CHANNEL_NAME, - "Properties": { - "ossAppendable": "true", - }, - }, - ] - return channel_defs - - def _build_output_data_configs( - self, job_name: str, output_channel_defs: List[Dict[str, str]] - ) -> List[Dict[str, str]]: - """Build the output data config for the training job.""" - job_base_output_path = self._generate_job_base_output_path(job_name) - - # OSS URI for output channel will be mounted to directory - # "/ml/output/{ChannelName}/" and the output OSS URI should be a "directory" - def as_oss_dir_uri(uri: str): - return uri if uri.endswith("/") else uri + "/" - - res = [] - for ch in output_channel_defs: - # if checkpoint path is provided, use it as the checkpoint channel output. - if ch["Name"] == DEFAULT_CHECKPOINT_CHANNEL_NAME and self.checkpoints_path: - oss_uri = self.checkpoints_path - elif ( - ch["Name"] == DEFAULT_TENSORBOARD_CHANNEL_NAME - and self.experiment_config - ): - continue - elif not self.output_path and self.experiment_config: - continue - else: - oss_uri = as_oss_dir_uri( - posixpath.join(job_base_output_path, ch["Name"]) - ) - res.append( - { - "Name": ch["Name"], - "OutputUri": oss_uri, - } - ) - - return res - @abstractmethod def fit( self, inputs: Dict[str, Any] = None, wait: bool = True, show_logs: bool = True ): """Submit a training job with the given input data.""" - def wait(self, show_logs: bool = True): - """Block until the latest training job is completed. - - Args: - show_logs(bool): Specifies whether to fetch and print the logs produced by - the training job. - - Raises: - RuntimeError: If no training job is submitted. - - """ - if not self._latest_training_job: - raise RuntimeError("Could not find a submitted training job.") - self._latest_training_job.wait(show_logs=show_logs) - def model_data(self) -> str: """Model data output path. @@ -482,18 +323,18 @@ def model_data(self) -> str: str: A string in OSS URI format refers to the output model of the submitted job. """ - if not self._latest_training_job: + if not self.latest_job: raise RuntimeError( "No TrainingJob for the estimator, output model data not found." ) - if not self._latest_training_job.is_succeeded(): + if not self.latest_job.is_succeeded(): logger.warning( "The TrainingJob is currently not in a succeeded status, which means" " that the model data output may not be accessible." ) - return self._latest_training_job.output_path( + return self.latest_job.output_path( channel_name=DEFAULT_OUTPUT_MODEL_CHANNEL_NAME ) @@ -504,14 +345,12 @@ def checkpoints_data(self) -> str: str: A string in OSS URI format refers to the checkpoints of submitted training job. """ - if not self._latest_training_job: + if not self.latest_job: raise RuntimeError( "No TrainingJob for the Estimator, output checkpoints data path " "not found." ) - return self._latest_training_job.output_path( - channel_name=DEFAULT_CHECKPOINT_CHANNEL_NAME - ) + return self.latest_job.output_path(channel_name=DEFAULT_CHECKPOINT_CHANNEL_NAME) def tensorboard_data(self) -> str: """Output TensorBoard logs path. @@ -520,12 +359,12 @@ def tensorboard_data(self) -> str: str: A string in OSS URI format refers to the tensorboard log of submitted training job. """ - if not self._latest_training_job: + if not self.latest_job: raise RuntimeError( "No TrainingJob for the Estimator, output TensorBoard logs data path" " not found." ) - return self._latest_training_job.output_path( + return self.latest_job.output_path( channel_name=DEFAULT_TENSORBOARD_CHANNEL_NAME, ) @@ -540,15 +379,15 @@ def tensorboard(self, wait=True): """ from pai.tensorboard import TensorBoard - if not self.latest_training_job: + if not self.latest_job: raise RuntimeError("Could not find a submitted training job.") source_type = "TrainingJob" - if isinstance(self.latest_training_job, _LocalTrainingJob): + if isinstance(self.latest_job, LocalTrainingJob): raise RuntimeError("Local training job does not support tensorboard.") res = self.session.tensorboard_api.list( source_type=source_type, - source_id=self.latest_training_job.training_job_id, + source_id=self.latest_job.training_job_id, ) if res.items: @@ -564,8 +403,8 @@ def tensorboard(self, wait=True): tb = TensorBoard.create( uri=self.tensorboard_data(), wait=wait, - display_name=self._latest_training_job.training_job_name, - source_id=self.latest_training_job.training_job_id, + display_name=self.latest_job.training_job_name, + source_id=self.latest_job.training_job_id, source_type=source_type, session=self.session, ) @@ -871,8 +710,6 @@ def __init__( session=session, ) - self.__uploaded_source_files = None - def training_image_uri(self) -> str: """Return the Docker image to use for training. @@ -893,73 +730,35 @@ def _prepare_for_training(self): ) self.source_dir = updated_args["source_dir"] - def _upload_source_files(self, job_name: str) -> Optional[str]: - """Upload local source files to OSS.""" - if not self.source_dir: - return - - if is_oss_uri(self.source_dir): - return self.source_dir - elif not os.path.exists(self.source_dir): - raise ValueError(f"Source directory {self.source_dir} does not exist.") - # compress the source files to a Tar Gz file and upload to OSS bucket. - upload_data_path = self.session.get_storage_path_by_category( - "training_src", to_plain_text(job_name) - ) - self.__uploaded_source_files = upload( - source_path=self.source_dir, - oss_path=upload_data_path, - bucket=self.session.oss_bucket, - is_tar=True, - ) - return self.__uploaded_source_files - - def _build_code_input(self, job_name: str) -> Optional[Dict[str, Any]]: - """Build a dict to represent AlgorithmSpecCodeDir used in the TrainingJob.""" - upload_source_files = self._upload_source_files(job_name) - if not upload_source_files: - return - oss_uri_obj = OssUriObj( - uri=self.session.patch_oss_endpoint(upload_source_files) - ) - - code_dir = { - "LocationType": "oss", - "LocationValue": { - "Bucket": oss_uri_obj.bucket_name, - "Key": oss_uri_obj.object_key, - "Endpoint": oss_uri_obj.endpoint, - }, - } - return code_dir - def _build_algorithm_spec( - self, - code_input, - ) -> Dict[str, Any]: + self, code_input, inputs: Dict[str, Any] + ) -> AlgorithmSpec: """Build a temporary AlgorithmSpec used for submitting the TrainingJob.""" - command = ( - self.command - if isinstance(self.command, list) - else [ - "/bin/sh", - "-c", - self.command, - ] + algorithm_spec = AlgorithmSpec( + command=( + self.command + if isinstance(self.command, list) + else ["sh", "-c", self.command] + ), + image=self.training_image_uri(), + job_type=self.job_type, + metric_definitions=self.metric_definitions, + code_dir=code_input, + output_channels=self._default_training_output_channels(), + input_channels=[ + Channel(name=channel_name, required=False) + for channel_name in inputs.keys() + ], ) - algo_spec = { - "Command": command, - "Image": self.training_image_uri(), - "JobType": self.job_type, - "MetricDefinitions": self.metric_definitions, - "CodeDir": code_input, - "OutputChannels": self._get_default_output_channel_defs(), - } - return algo_spec + return algorithm_spec def fit( - self, inputs: Dict[str, Any] = None, wait: bool = True, show_logs: bool = True - ): + self, + inputs: Dict[str, Any] = None, + wait: bool = True, + show_logs: bool = True, + job_name: Optional[str] = None, + ) -> Union[TrainingJob, LocalTrainingJob]: """Submit a training job with the given input data. Args: @@ -972,78 +771,100 @@ def fit( either succeeded, failed, or stopped. (Default True). show_logs (bool): Specifies whether to show the logs produced by the training job (Default True). + job_name (str, optional): The name of the training job. + + Returns: + :class:`pai.job.TrainingJob` or :class:`pai.job.LocalTrainingJob`: A + submitted training job. + Raises: UnExpectedStatusException: If the training job fails. """ inputs = inputs or dict() self._prepare_for_training() - job_name = self._gen_job_display_name() + job_name = self.job_name(job_name=job_name) if is_local_run_instance_type(self.instance_type): - training_job = self._local_run( - job_name=job_name, inputs=inputs, instance_type=self.instance_type + return self._local_run( + job_name=job_name, + inputs=inputs, + instance_type=self.instance_type, + wait=wait, ) - else: - training_job = self._fit(inputs=inputs, job_name=job_name) - self._latest_training_job = training_job - - if wait: - self.wait(show_logs=show_logs) - - def _fit(self, job_name, inputs: Dict[str, Any] = None): - input_configs = self._build_input_data_configs(inputs) - output_configs = self._build_output_data_configs( - job_name, output_channel_defs=self._get_default_output_channel_defs() + return self._fit( + inputs=inputs, job_name=job_name, wait=wait, show_logs=show_logs ) + + def _fit( + self, + job_name, + inputs: Dict[str, Any], + wait: bool = True, + show_logs: bool = True, + ) -> TrainingJob: # prepare input code. - code_input = self._build_code_input(job_name) + code_input = self._build_code_input(job_name, source_dir=self.source_dir) algo_spec = self._build_algorithm_spec( code_input=code_input, + inputs=inputs, + ) + inputs = self.build_inputs( + inputs=inputs, + input_channels=algo_spec.input_channels, ) - training_job_id = self.session.training_job_api.create( - instance_count=self.instance_count, + if self.checkpoints_path: + outputs = {DEFAULT_CHECKPOINT_CHANNEL_NAME: self.checkpoints_path} + else: + outputs = None + + outputs = self.build_outputs( + job_name=job_name, + output_channels=algo_spec.output_channels, + outputs=outputs, + ) + + return self._submit( + job_name=job_name, + algorithm_spec=algo_spec, instance_spec=self.instance_spec, instance_type=self.instance_type, + instance_count=self.instance_count, resource_id=self.resource_id, - job_name=job_name, hyperparameters=self.hyperparameters, environments=self.environments, requirements=self.requirements, - max_running_in_seconds=self.max_run_time, - input_channels=input_configs, - output_channels=output_configs, - algorithm_spec=algo_spec, - user_vpc_config=self.user_vpc_config.to_dict() - if self.user_vpc_config - else None, - experiment_config=self.experiment_config.to_dict() - if self.experiment_config - else None, + max_run_time=self.max_run_time, + inputs=inputs, + outputs=outputs, + user_vpc_config=self.user_vpc_config if self.user_vpc_config else None, + experiment_config=( + self.experiment_config if self.experiment_config else None + ), labels=self.labels, + wait=wait, + show_logs=show_logs, ) - training_job = _TrainingJob.get(training_job_id) - print( - f"View the job detail by accessing the console URI: {training_job.console_uri}" - ) - return training_job def _local_run( self, job_name, instance_type: str, inputs: Dict[str, Any] = None, - ) -> "_LocalTrainingJob": + wait: bool = True, + ) -> "LocalTrainingJob": if self.instance_count > 1: raise RuntimeError("Local training job only supports single instance.") - training_job = _LocalTrainingJob( + training_job = LocalTrainingJob( estimator=self, inputs=inputs, job_name=job_name, instance_type=instance_type, ) training_job.run() + if wait: + training_job.wait() return training_job @@ -1086,7 +907,7 @@ def __init__( algorithm_name: Optional[str] = None, algorithm_version: Optional[str] = None, algorithm_provider: Optional[str] = None, - algorithm_spec: Optional[Dict[str, Any]] = None, + algorithm_spec: Optional[AlgorithmSpec] = None, hyperparameters: Optional[Dict[str, Any]] = None, environments: Optional[Dict[str, str]] = None, requirements: Optional[List[str]] = None, @@ -1113,7 +934,7 @@ def __init__( a PAI official algorithm. If not provided, the default provider is user's PAI account. If algorithm name is not provided, this argument will be ignored. - algorithm_spec (Dict[str, Any], optional): A temporary algorithm spec. + algorithm_spec (AlgorithmSpec, optional): A temporary algorithm spec. Required if algorithm_name is not provided. hyperparameters (dict, optional): A dictionary that represents the hyperparameters used in the training job. Default hyperparameters will @@ -1169,7 +990,9 @@ def __init__( algorithm_version=algorithm_version, algorithm_provider=algorithm_provider, ) - self._algo_spec = _algo_version["AlgorithmSpec"] + self._algo_spec = AlgorithmSpec.model_validate( + _algo_version["AlgorithmSpec"] + ) self.algorithm_name = _algo_version["AlgorithmName"] self.algorithm_version = _algo_version["AlgorithmVersion"] self.algorithm_provider = _algo_version["AlgorithmProvider"] @@ -1205,37 +1028,32 @@ def set_hyperparameters(self, **kwargs): super(AlgorithmEstimator, self).set_hyperparameters(**kwargs) @property - def hyperparameter_definitions(self) -> List[Dict[str, Any]]: + def hyperparameter_definitions(self) -> List[HyperParameterDefinition]: """Get the hyperparameter definitions from the algorithm spec.""" - res = self._algo_spec.get("HyperParameters", []) + res = self._algo_spec.hyperparameter_definitions return res @property - def input_channel_definitions(self) -> List[Dict[str, Any]]: + def input_channel_definitions(self) -> List[Channel]: """Get the input channel definitions from the algorithm spec.""" - res = self._algo_spec.get("InputChannels", []) + res = self._algo_spec.input_channels return res @property - def output_channel_definitions(self) -> List[Dict[str, Any]]: + def output_channel_definitions(self) -> List[Channel]: """Get the output channel definitions from the algorithm spec.""" - res = self._algo_spec.get("OutputChannels", []) + res = self._algo_spec.output_channels return res @property def supported_instance_types(self) -> List[str]: """Get the supported instance types from the algorithm spec.""" - res = ( - self._algo_spec["SupportedInstanceTypes"] - if "SupportedInstanceTypes" in self._algo_spec - else [] - ) - return res + return self._algo_spec.supported_instance_types def _check_args( self, algorithm_name: str, - algorithm_spec: Dict[str, Any], + algorithm_spec: Optional[AlgorithmSpec], ): """Check the algorithm_name and algorithm_spec. @@ -1245,7 +1063,7 @@ def _check_args( Args: algorithm_name (str): The name of the algorithm. - algorithm_spec (dict): The algorithm spec. + algorithm_spec (AlgorithmSpec): The algorithm spec. """ if not algorithm_name and not algorithm_spec: raise ValueError( @@ -1382,8 +1200,12 @@ def _get_default_training_instance_type(self) -> str: return machine_spec["InstanceType"] def fit( - self, inputs: Dict[str, Any] = None, wait: bool = True, show_logs: bool = True - ): + self, + inputs: Dict[str, Any] = None, + wait: bool = True, + show_logs: bool = True, + job_name: Optional[str] = None, + ) -> TrainingJob: """Submit a training job with the given input data. Args: @@ -1396,56 +1218,50 @@ def fit( either succeeded, failed, or stopped. (Default True). show_logs (bool): Specifies whether to show the logs produced by the training job (Default True). + job_name (str, optional): The name of the training job. + + Returns: + :class:`pai.training_job.TrainingJob`: The submitted training job. + Raises: UnExpectedStatusException: If the training job fails. """ - inputs = inputs or dict() - job_name = self._gen_job_display_name() - training_job = self._fit(inputs=inputs, job_name=job_name) - self._latest_training_job = training_job - - if wait: - self.wait(show_logs=show_logs) - - def _fit(self, job_name, inputs: Dict[str, Any] = None): - input_configs = self._build_input_data_configs( - inputs, input_channel_defs=self.input_channel_definitions + job_name = self.job_name(job_name=job_name) + input_configs = self.build_inputs( + inputs, + input_channels=self._algo_spec.input_channels, ) - output_configs = self._build_output_data_configs( - job_name, output_channel_defs=self.output_channel_definitions + output_configs = self.build_outputs( + job_name, + output_channels=self._algo_spec.output_channels, ) - - training_job_id = self.session.training_job_api.create( + return self._submit( instance_count=self.instance_count, instance_type=self.instance_type, instance_spec=self.instance_spec, resource_id=self.resource_id, job_name=job_name, hyperparameters=self.hyperparameters, - max_running_in_seconds=self.max_run_time, - input_channels=input_configs, - output_channels=output_configs, + max_run_time=self.max_run_time, + inputs=input_configs, + outputs=output_configs, environments=self.environments, requirements=self.requirements, algorithm_name=self.algorithm_name, algorithm_version=self.algorithm_version, algorithm_provider=self.algorithm_provider, algorithm_spec=self.algorithm_spec, - user_vpc_config=self.user_vpc_config.to_dict() - if self.user_vpc_config - else None, - experiment_config=self.experiment_config.to_dict() - if self.experiment_config - else None, + user_vpc_config=( + self.user_vpc_config.to_dict() if self.user_vpc_config else None + ), + experiment_config=( + self.experiment_config.to_dict() if self.experiment_config else None + ), labels=self.labels, + wait=wait, + show_logs=show_logs, ) - training_job = _TrainingJob.get(training_job_id) - print( - f"View the job detail by accessing the console URI:" - f" {training_job.console_uri}" - ) - return training_job def get_outputs_data(self) -> Dict[str, str]: """Show all outputs data paths. @@ -1453,543 +1269,27 @@ def get_outputs_data(self) -> Dict[str, str]: Returns: dict[str, str]: A dictionary of all outputs data paths. """ - if not self._latest_training_job: + if not self.latest_job: raise RuntimeError( - "No TrainingJob for the estimator, output checkpoints data not found." - ) - - return { - ch["Name"]: ch["OutputUri"] - for ch in self._latest_training_job.output_channels - } - - -_TRAINING_LAUNCH_SCRIPT_TEMPLATE = textwrap.dedent( - """\ -#!/bin/sh - -env - -# change to working directory -if [ -n "$PAI_WORKING_DIR" ]; then - echo "Change to Working Directory", $PAI_WORKING_DIR - mkdir -p $PAI_WORKING_DIR && cd $PAI_WORKING_DIR -fi - -# install requirements -if [ -e "requirements.txt" ]; then - echo "Installing dependencies from requirements.txt" - python -m pip install -r requirements.txt -fi - -echo "User program launching" -echo "-----------------------------------------------------------------" - -sh {0} -""" -) - - -class _TrainingEnv(object): - ENV_PAI_HPS = "PAI_HPS" - ENV_PAI_HPS_PREFIX = "PAI_HPS_" - ENV_PAI_USER_ARGS = "PAI_USER_ARGS" - ENV_PAI_INPUT_PREFIX = "PAI_INPUT_" - ENV_PAI_OUTPUT_PREFIX = "PAI_OUTPUT_" - ENV_PAI_WORKING_DIR = "PAI_WORKING_DIR" - - -class _TrainingJobConfig(object): - WORKING_DIR = "/ml/usercode/" - INPUT_CONFIG_DIR = "/ml/input/config/" - INPUT_DATA_DIR = "/ml/input/data/" - OUTPUT_DIR = "/ml/output/" - - -_ENV_NOT_ALLOWED_CHARS = re.compile(r"[^a-zA-Z0-9_]") - - -class _LocalTrainingJob(object): - """A class that represents a local training job running with docker container.""" - - def __init__( - self, - estimator: Estimator, - inputs: Dict[str, Any], - instance_type: str = None, - temp_dir: str = None, - job_name: str = None, - ): - self.estimator = estimator - self.inputs = inputs - self.tmp_dir = temp_dir or tempfile.mkdtemp() - self.job_name = job_name - self.instance_type = instance_type - logger.info("Local TrainingJob temporary directory: {}".format(self.tmp_dir)) - self._container_run: ContainerRun = None - - def __str__(self): - return self.__repr__() - - def __repr__(self): - if self._container_run: - container = self._container_run.container - container_name, container_id, status = ( - container.name, - container.id, - container.status, + "Could not find a submitted training job. Please submit a training job" + " before calling this method." ) - else: - container_name, container_id, status = None, None, None - return f"LocalTrainingJob(container_name={container_name}, container_id={container_id}, status={status})" - - @property - def session(self) -> Session: - return self.estimator.session - - def prepare_env(self) -> Dict[str, str]: - """Prepare environment variables for the training job.""" - - # Hyperparameters environment variables - def _normalize_name(name: str) -> str: - # replace all non-alphanumeric characters with underscore - return _ENV_NOT_ALLOWED_CHARS.sub("_", name).upper() - - env = {} - user_args = [] - for name, value in self.estimator.hyperparameters.items(): - env[_TrainingEnv.ENV_PAI_HPS_PREFIX + _normalize_name(name)] = str(value) - user_args.extend(["--" + name, shlex.quote(str(value))]) - env[_TrainingEnv.ENV_PAI_USER_ARGS] = " ".join( - [shlex.quote(v) for v in user_args] - ) - env[_TrainingEnv.ENV_PAI_HPS] = json.dumps( - {name: str(value) for name, value in self.estimator.hyperparameters.items()} - ) - # Environments for input channel - for name, value in self.inputs.items(): - if (is_oss_uri(value) and value.endswith("/")) or os.path.isdir(value): - env[ - _TrainingEnv.ENV_PAI_INPUT_PREFIX + _normalize_name(name) - ] = posixpath.join(_TrainingJobConfig.INPUT_DATA_DIR, name) - else: - file_name = os.path.basename(value) - env[ - _TrainingEnv.ENV_PAI_INPUT_PREFIX + _normalize_name(name) - ] = posixpath.join(_TrainingJobConfig.INPUT_DATA_DIR, name, file_name) - - # Environments for output channel. - # By default, TrainingJob invoked by Estimator will have two output channels: - # 'model' and 'checkpoints' - output_channel = ["model", "checkpoints"] - for name in output_channel: - env[ - _TrainingEnv.ENV_PAI_OUTPUT_PREFIX + _normalize_name(name) - ] = posixpath.join(_TrainingJobConfig.OUTPUT_DIR, name) - - env[_TrainingEnv.ENV_PAI_WORKING_DIR] = _TrainingJobConfig.WORKING_DIR - return env - - def run(self): - """Run estimator job in local with docker.""" - output_model_path = self.output_path() - os.makedirs(output_model_path, exist_ok=True) - volumes = {} - - tmp_dir = tempfile.mkdtemp() - # 1. Prepare source code to directory /ml/usercode - user_code_dir = os.path.join(self.tmp_dir, "user_code") - if is_oss_uri(self.estimator.source_dir): - raise RuntimeError("OSS source code is not supported in local training.") - shutil.copytree(self.estimator.source_dir, user_code_dir) - volumes[user_code_dir] = { - "bind": _TrainingJobConfig.WORKING_DIR, - "mode": "rw", - } - - # 2. Prepare input data for training job. - input_data = self.prepare_input_data() - for host_path, container_path in input_data.items(): - volumes[host_path] = { - "bind": container_path, - "mode": "rw", - } - - # 3. Prepare input config files, such as hyperparameters.json, - # training-job.json, etc. - input_config_path = os.path.join(tmp_dir, "config") - os.makedirs(input_config_path, exist_ok=True) - self.prepare_input_config(input_config_path=input_config_path) - volumes[input_config_path] = { - "bind": _TrainingJobConfig.INPUT_CONFIG_DIR, - "mode": "rw", - } - - execution_dir = os.path.join(tmp_dir, "config", "execution") - os.makedirs(execution_dir, exist_ok=True) - command_path = os.path.join(execution_dir, "command.sh") - with open(command_path, "w") as f: - f.write(self.estimator.command) - launch_script_path = os.path.join(input_config_path, "launch.sh") - with open(launch_script_path, "w") as f: - f.write( - _TRAINING_LAUNCH_SCRIPT_TEMPLATE.format( - posixpath.join( - _TrainingJobConfig.INPUT_CONFIG_DIR, "execution/command.sh" - ) - ) - ) - - # 4. Config output model channel - volumes[output_model_path] = { - "bind": posixpath.join(_TrainingJobConfig.OUTPUT_DIR, "model"), - "mode": "rw", - } - - gpu_count = ( - -1 if self.instance_type.strip() == INSTANCE_TYPE_LOCAL_GPU else None - ) - self._container_run = run_container( - environment_variables=self.prepare_env(), - image_uri=self.estimator.image_uri, - entry_point=[ - "/bin/sh", - posixpath.join(_TrainingJobConfig.INPUT_CONFIG_DIR, "launch.sh"), - ], - volumes=volumes, - working_dir=_TrainingJobConfig.WORKING_DIR, - gpu_count=gpu_count, - ) - - def prepare_input_config(self, input_config_path): - """Prepare input config for TrainingJob, such as hyperparameters.json, - trainingjob.json.""" - with open(os.path.join(input_config_path, "hyperparameters.json"), "w") as f: - hps = self.estimator.hyperparameters or dict() - f.write(json.dumps({k: str(v) for k, v in hps.items()})) - - def prepare_input_data(self) -> Dict[str, str]: - """Prepare input data config.""" - input_data_configs = {} - - for name, input_data in self.inputs.items(): - local_channel_path = os.path.join(self.tmp_dir, f"input/data/{name}") - os.makedirs(local_channel_path, exist_ok=True) - input_data_configs[local_channel_path] = posixpath.join( - _TrainingJobConfig.INPUT_DATA_DIR, name - ) - if is_oss_uri(input_data): - oss_uri_obj = OssUriObj(input_data) - oss_bucket = self.session.get_oss_bucket(oss_uri_obj.bucket_name) - os.makedirs(local_channel_path, exist_ok=True) - download( - oss_uri_obj.object_key, - local_path=local_channel_path, - bucket=oss_bucket, - ) - input_data_configs[local_channel_path] = posixpath.join( - _TrainingJobConfig.INPUT_DATA_DIR, name - ) - else: - # If the input data is local files, copy the input data to a - # temporary directory. - if not os.path.exists(input_data): - raise ValueError( - "Input data not exists: name={} input_data={}".format( - name, input_data - ) - ) - elif os.path.isdir(input_data): - distutils.dir_util.copy_tree(input_data, local_channel_path) - else: - shutil.copy( - input_data, - os.path.join(local_channel_path, os.path.basename(input_data)), - ) - - return input_data_configs - - def wait(self, show_logs: bool = True): - self._container_run.watch(show_logs=show_logs) - - def output_path(self, channel_name="model"): - return os.path.join(self.tmp_dir, "output", f"{channel_name}/") - - def is_succeeded(self): - """Return True if the training job is succeeded, otherwise return False.""" - return self._container_run.is_succeeded() - - -class TrainingJobStatus(object): - CreateFailed = "CreateFailed" - InitializeFailed = "InitializeFailed" - Succeed = "Succeed" - Failed = "Failed" - Terminated = "Terminated" - Creating = "Creating" - Created = "Created" - Initializing = "Initializing" - Submitted = "Submitted" - Running = "Running" - - @classmethod - def completed_status(cls): - return [ - cls.InitializeFailed, - cls.Succeed, - cls.Failed, - cls.Terminated, + uri_outputs = [ + output + for output in self.latest_job.outputs + if isinstance(output, UriOutput) ] - - @classmethod - def failed_status(cls): - return [ - cls.InitializeFailed, - cls.Failed, - cls.CreateFailed, + extra_outputs = [ + output + for output in self.latest_job.outputs + if not isinstance(output, UriOutput) ] - -class TrainingJobChannel(object): - def __init__(self, dataset_id=None, input_uri=None, name=None): - self.dataset_id = dataset_id - self.input_uri = input_uri - self.name = name - - -class _TrainingJob(EntityBaseMixin): - _schema_cls = TrainingJobSchema - - def __init__( - self, - algorithm_name=None, - algorithm_version="1.0.0", - algorithm_provider=ProviderAlibabaPAI, - hyperparameters: Dict[str, Any] = None, - training_job_name: str = None, - instance_type: str = None, - instance_count: int = None, - output_channels: List[Dict[str, str]] = None, - input_channels: List[Dict[str, str]] = None, - labels: Dict[str, str] = None, - max_running_time_in_seconds: int = None, - experiment_config: Dict[str, str] = None, - description: str = None, - session: Session = None, - **kwargs, - ): - super(_TrainingJob, self).__init__(session=session, **kwargs) - session = session or get_default_session() - self.algorithm_name = algorithm_name - self.algorithm_version = algorithm_version - self.algorithm_provider = algorithm_provider - self.training_job_name = training_job_name - self.description = description - self.labels = labels - self.hyperparameters = hyperparameters - self.input_channels = input_channels - self.output_channels = output_channels - self.instance_type = instance_type - self.instance_count = instance_count - self.max_running_time_in_seconds = max_running_time_in_seconds - self.experiment_config = experiment_config - - # Load only fields - self.create_time = kwargs.pop("create_time", None) - self.modified_time = kwargs.pop("modified_time", None) - self.reason_code = kwargs.pop("reason_code", None) - self.reason_message = kwargs.pop("reason_message", None) - self.status = kwargs.pop("status", None) - self.status_transitions = kwargs.pop("status_transitions", None) - self.training_job_id = kwargs.pop("training_job_id", None) - self.training_job_url = kwargs.pop("training_job_url", None) - - def __repr__(self): - return "TrainingJob(id={})".format(self.training_job_id) - - def __str__(self): - return self.__repr__() - - @property - def id(self): - return self.training_job_id - - @classmethod - def get(cls, training_job_id, session: Session = None) -> "_TrainingJob": - session = session or get_default_session() - res = session.training_job_api.get(training_job_id=training_job_id) - return cls.from_api_object(res, session=session) - - @classmethod - def list( - cls, - status=None, - session: Session = None, - page_size=50, - page_number=1, - ): - session = session or get_default_session() - res = session.training_job_api.list( - status=status, page_size=page_size, page_number=page_number - ) - return [cls.from_api_object(item, session=session) for item in res.items] - - def output_path(self, channel_name="model"): - for output_channel in self.output_channels: - if output_channel["Name"] == channel_name: - return output_channel["OutputUri"] - raise RuntimeError( - f"Output channel is not specified: channel_name={channel_name}" - ) - - @property - def console_uri(self): - if not self.training_job_id: - raise ValueError("The TrainingJob is not submitted") - - return self.training_job_url - - def wait(self, interval=2, show_logs: bool = True): - self.session.training_job_api.refresh_entity(self.training_job_id, self) - - if show_logs: - job_log_printer = _TrainingJobLogPrinter( - training_job_id=self.training_job_id, page_size=20, session=self.session - ) - job_log_printer.start() - else: - job_log_printer = None - try: - while not self.is_completed(): - time.sleep(interval) - finally: - if job_log_printer: - job_log_printer.stop(wait=True) - - self._on_job_completed() - - def _on_job_completed(self): - # Print an empty line to separate the training job logs and the following logs - print() - if self.status == TrainingJobStatus.Succeed: - print( - f"Training job ({self.training_job_id}) succeeded, you can check the" - f" logs/metrics/output in the console:\n{self.console_uri}" - ) - elif self.status == TrainingJobStatus.Terminated: - print( - f"Training job is ended with status {self.status}: " - f"reason_code={self.reason_code}, reason_message={self.reason_message}." - f"Check the training job in the console:\n{self.console_uri}" - ) - elif self.status in TrainingJobStatus.failed_status(): - print( - f"Training job ({self.training_job_id}) failed, please check the logs" - f" in the console: \n{self.console_uri}" - ) - - message = f"TrainingJob failed: name={self.training_job_name}, " - f"training_job_id={self.training_job_id}, " - f"reason_code={self.reason_code}, status={self.status}, " - f"reason_message={self.reason_message}" - - raise UnexpectedStatusException(message=message, status=self.status) - - def _reload(self): - """Reload the training job from the PAI Service,""" - self.session.training_job_api.refresh_entity(self.training_job_id, self) - - def is_succeeded(self): - """Return True if the training job is succeeded""" - self._reload() - return self.status == TrainingJobStatus.Succeed - - @retry(wait_secs=10) - def is_completed(self): - """Return True if the training job is completed, including failed status""" - if self.status in TrainingJobStatus.completed_status(): - return True - self._reload() - - return self.status in TrainingJobStatus.completed_status() - - -class _TrainingJobLogPrinter(object): - """A class used to print logs for a training job""" - - executor = ThreadPoolExecutor(5) - - def __init__( - self, training_job_id: str, page_size=10, session: Optional[Session] = None - ): - self.training_job_id = training_job_id - self.session = session - self.page_size = page_size - self._future = None - self._stop = False - - def _list_logs_api(self, page_number: int = 1): - try: - res = self.session.training_job_api.list_logs( - self.training_job_id, - page_number=page_number, - page_size=self.page_size, + if extra_outputs: + logger.warning( + "Extra outputs are provided in the training job, but only URI outputs" + " are supported. The extra outputs will be ignored: %s", + extra_outputs, ) - return res - except TeaException as e: - # hack: Backend service may raise an exception when the training job - # instance is not found. - if e.code == "TRAINING_JOB_INSTANCE_NOT_FOUND": - return PaginatedResult(items=[], total_count=0) - else: - raise e - - def _list_logs(self): - page_number, page_offset = 1, 0 - # print training job logs. - while not self._stop: - res = self._list_logs_api(page_number=page_number) - # 1. move to next page - if len(res.items) == self.page_size: - # print new logs starting from page_offset - self._print_logs(logs=res.items[page_offset:]) - page_number += 1 - page_offset = 0 - # 2. stay at the current page. - else: - if len(res.items) > page_offset: - # print new logs starting from page_offset - self._print_logs(logs=res.items[page_offset:]) - page_offset = len(res.items) - time.sleep(1) - - # When _stop is True, wait and print remaining logs. - time.sleep(10) - while True: - res = self._list_logs_api(page_number=page_number) - # There maybe more logs in the next page - if len(res.items) == self.page_size: - self._print_logs(logs=res.items[page_offset:]) - page_number += 1 - page_offset = 0 - # No more logs in the next page. - else: - if len(res.items) > page_offset: - self._print_logs(logs=res.items[page_offset:]) - break - - def _print_logs(self, logs: List[str]): - for log in logs: - print(log) - - def start(self): - if self._future: - raise ValueError("The training job log printer is already started") - self._stop = False - self._future = self.executor.submit(self._list_logs) - - def stop(self, wait: bool = True): - self._stop = True - if self._future: - self._future.result() + return {ch.name: ch.output_uri for ch in uri_outputs} diff --git a/pai/experiment.py b/pai/experiment.py index 1087342..7f1f1c0 100644 --- a/pai/experiment.py +++ b/pai/experiment.py @@ -21,28 +21,6 @@ logger = get_logger(__name__) -_default_session = None - - -class ExperimentConfig(object): - """ExperimentConfig is used to configure the experiment to which the job belongs.""" - - def __init__( - self, - experiment_id: str, - ): - """Initialize ExperimentConfig. - Args: - experiment_id (str): Specifies the ID of the experiment that training job instance - belongs to. - """ - self.experiment_id = experiment_id - - def to_dict(self): - return { - "ExperimentId": self.experiment_id, - } - class Experiment(object): """An experiment is a collection of runs. It can be used to compare the @@ -146,6 +124,26 @@ def get(cls, experiment_id: str) -> "Experiment": session=session, ) + @classmethod + def get_by_name( + cls, name: str, session: Optional[Session] = None + ) -> Optional["Experiment"]: + """Get experiment by name. + + Args: + name (str): The name of the experiment. + session (Session): The session to be used. + + Returns: + Experiment: The experiment with the specified name. + + """ + exp = next( + (exp for exp in cls.list(name=name, session=session) if exp.name == name), + None, + ) + return exp + def update( self, name: str, diff --git a/pai/job/__init__.py b/pai/job/__init__.py new file mode 100644 index 0000000..85ae444 --- /dev/null +++ b/pai/job/__init__.py @@ -0,0 +1,48 @@ +# Copyright 2024 Alibaba, Inc. or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._local_training_job import LocalTrainingJob +from ._training_job import ( + AlgorithmSpec, + Channel, + CodeDir, + ExperimentConfig, + HyperParameterDefinition, + InstanceSpec, + ModelTrainingSpec, + OssLocation, + TrainingJob, + TrainingJobStatus, + UriInput, + UriOutput, + UserVpcConfig, + _TrainingJobSubmitter, +) + +__all__ = [ + "TrainingJob", + "ModelTrainingSpec", + "TrainingJobStatus", + "Channel", + "HyperParameterDefinition", + "OssLocation", + "AlgorithmSpec", + "CodeDir", + "LocalTrainingJob", + "UriOutput", + "UserVpcConfig", + "ExperimentConfig", + "InstanceSpec", + "UriInput", +] diff --git a/pai/job/_local_training_job.py b/pai/job/_local_training_job.py new file mode 100644 index 0000000..5f9c1d7 --- /dev/null +++ b/pai/job/_local_training_job.py @@ -0,0 +1,293 @@ +# Copyright 2024 Alibaba, Inc. or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import distutils.dir_util +import json +import os +import posixpath +import re +import shlex +import shutil +import tempfile +import textwrap +import typing +from typing import Any, Dict + +from pai.session import Session + +from ..common.consts import INSTANCE_TYPE_LOCAL_GPU +from ..common.docker_utils import ContainerRun, run_container +from ..common.logging import get_logger +from ..common.oss_utils import OssUriObj, download, is_oss_uri + +if typing.TYPE_CHECKING: + from ..estimator import Estimator + + +logger = get_logger(__name__) + + +class _TrainingEnv(object): + ENV_PAI_HPS = "PAI_HPS" + ENV_PAI_HPS_PREFIX = "PAI_HPS_" + ENV_PAI_USER_ARGS = "PAI_USER_ARGS" + ENV_PAI_INPUT_PREFIX = "PAI_INPUT_" + ENV_PAI_OUTPUT_PREFIX = "PAI_OUTPUT_" + ENV_PAI_WORKING_DIR = "PAI_WORKING_DIR" + + +class _TrainingJobConfig(object): + WORKING_DIR = "/ml/usercode/" + INPUT_CONFIG_DIR = "/ml/input/config/" + INPUT_DATA_DIR = "/ml/input/data/" + OUTPUT_DIR = "/ml/output/" + + +_ENV_NOT_ALLOWED_CHARS = re.compile(r"[^a-zA-Z0-9_]") +_TRAINING_LAUNCH_SCRIPT_TEMPLATE = textwrap.dedent( + """\ +#!/bin/sh + +env + +# change to working directory +if [ -n "$PAI_WORKING_DIR" ]; then + echo "Change to Working Directory", $PAI_WORKING_DIR + mkdir -p $PAI_WORKING_DIR && cd $PAI_WORKING_DIR +fi + +# install requirements +if [ -e "requirements.txt" ]; then + echo "Installing dependencies from requirements.txt" + python -m pip install -r requirements.txt +fi + +echo "User program launching" +echo "-----------------------------------------------------------------" + +sh {0} +""" +) + + +class LocalTrainingJob(object): + """A class that represents a local training job running with docker container.""" + + def __init__( + self, + estimator: "Estimator", + inputs: Dict[str, Any], + instance_type: str = None, + temp_dir: str = None, + job_name: str = None, + ): + self.estimator = estimator + self.inputs = inputs + self.tmp_dir = temp_dir or tempfile.mkdtemp() + self.job_name = job_name + self.instance_type = instance_type + logger.info("Local TrainingJob temporary directory: {}".format(self.tmp_dir)) + self._container_run: ContainerRun = None + + def __str__(self): + return self.__repr__() + + def __repr__(self): + if self._container_run: + container = self._container_run.container + container_name, container_id, status = ( + container.name, + container.id, + container.status, + ) + else: + container_name, container_id, status = None, None, None + return f"LocalTrainingJob(container_name={container_name}, container_id={container_id}, status={status})" + + @property + def session(self) -> Session: + return self.estimator.session + + def prepare_env(self) -> Dict[str, str]: + """Prepare environment variables for the training job.""" + + # Hyperparameters environment variables + def _normalize_name(name: str) -> str: + # replace all non-alphanumeric characters with underscore + return _ENV_NOT_ALLOWED_CHARS.sub("_", name).upper() + + env = {} + user_args = [] + for name, value in self.estimator.hyperparameters.items(): + env[_TrainingEnv.ENV_PAI_HPS_PREFIX + _normalize_name(name)] = str(value) + user_args.extend(["--" + name, shlex.quote(str(value))]) + env[_TrainingEnv.ENV_PAI_USER_ARGS] = " ".join( + [shlex.quote(v) for v in user_args] + ) + env[_TrainingEnv.ENV_PAI_HPS] = json.dumps( + {name: str(value) for name, value in self.estimator.hyperparameters.items()} + ) + + # Environments for input channel + for name, value in self.inputs.items(): + if (is_oss_uri(value) and value.endswith("/")) or os.path.isdir(value): + env[ + _TrainingEnv.ENV_PAI_INPUT_PREFIX + _normalize_name(name) + ] = posixpath.join(_TrainingJobConfig.INPUT_DATA_DIR, name) + else: + file_name = os.path.basename(value) + env[ + _TrainingEnv.ENV_PAI_INPUT_PREFIX + _normalize_name(name) + ] = posixpath.join(_TrainingJobConfig.INPUT_DATA_DIR, name, file_name) + + # Environments for output channel. + # By default, TrainingJob invoked by Estimator will have two output channels: + # 'model' and 'checkpoints' + output_channel = ["model", "checkpoints"] + for name in output_channel: + env[ + _TrainingEnv.ENV_PAI_OUTPUT_PREFIX + _normalize_name(name) + ] = posixpath.join(_TrainingJobConfig.OUTPUT_DIR, name) + + env[_TrainingEnv.ENV_PAI_WORKING_DIR] = _TrainingJobConfig.WORKING_DIR + return env + + def run(self): + """Run estimator job in local with docker.""" + output_model_path = self.output_path() + os.makedirs(output_model_path, exist_ok=True) + volumes = {} + + tmp_dir = tempfile.mkdtemp() + # 1. Prepare source code to directory /ml/usercode + user_code_dir = os.path.join(self.tmp_dir, "user_code") + if is_oss_uri(self.estimator.source_dir): + raise RuntimeError("OSS source code is not supported in local training.") + shutil.copytree(self.estimator.source_dir, user_code_dir) + volumes[user_code_dir] = { + "bind": _TrainingJobConfig.WORKING_DIR, + "mode": "rw", + } + + # 2. Prepare input data for training job. + input_data = self.prepare_input_data() + for host_path, container_path in input_data.items(): + volumes[host_path] = { + "bind": container_path, + "mode": "rw", + } + + # 3. Prepare input config files, such as hyperparameters.json, + # training-job.json, etc. + input_config_path = os.path.join(tmp_dir, "config") + os.makedirs(input_config_path, exist_ok=True) + self.prepare_input_config(input_config_path=input_config_path) + volumes[input_config_path] = { + "bind": _TrainingJobConfig.INPUT_CONFIG_DIR, + "mode": "rw", + } + + execution_dir = os.path.join(tmp_dir, "config", "execution") + os.makedirs(execution_dir, exist_ok=True) + command_path = os.path.join(execution_dir, "command.sh") + with open(command_path, "w") as f: + f.write(self.estimator.command) + launch_script_path = os.path.join(input_config_path, "launch.sh") + with open(launch_script_path, "w") as f: + f.write( + _TRAINING_LAUNCH_SCRIPT_TEMPLATE.format( + posixpath.join( + _TrainingJobConfig.INPUT_CONFIG_DIR, "execution/command.sh" + ) + ) + ) + + # 4. Config output model channel + volumes[output_model_path] = { + "bind": posixpath.join(_TrainingJobConfig.OUTPUT_DIR, "model"), + "mode": "rw", + } + + gpu_count = ( + -1 if self.instance_type.strip() == INSTANCE_TYPE_LOCAL_GPU else None + ) + self._container_run = run_container( + environment_variables=self.prepare_env(), + image_uri=self.estimator.image_uri, + entry_point=[ + "/bin/sh", + posixpath.join(_TrainingJobConfig.INPUT_CONFIG_DIR, "launch.sh"), + ], + volumes=volumes, + working_dir=_TrainingJobConfig.WORKING_DIR, + gpu_count=gpu_count, + ) + + def prepare_input_config(self, input_config_path): + """Prepare input config for TrainingJob, such as hyperparameters.json, + trainingjob.json.""" + with open(os.path.join(input_config_path, "hyperparameters.json"), "w") as f: + hps = self.estimator.hyperparameters or dict() + f.write(json.dumps({k: str(v) for k, v in hps.items()})) + + def prepare_input_data(self) -> Dict[str, str]: + """Prepare input data config.""" + input_data_configs = {} + + for name, input_data in self.inputs.items(): + local_channel_path = os.path.join(self.tmp_dir, f"input/data/{name}") + os.makedirs(local_channel_path, exist_ok=True) + input_data_configs[local_channel_path] = posixpath.join( + _TrainingJobConfig.INPUT_DATA_DIR, name + ) + if is_oss_uri(input_data): + oss_uri_obj = OssUriObj(input_data) + oss_bucket = self.session.get_oss_bucket(oss_uri_obj.bucket_name) + os.makedirs(local_channel_path, exist_ok=True) + download( + oss_uri_obj.object_key, + local_path=local_channel_path, + bucket=oss_bucket, + ) + input_data_configs[local_channel_path] = posixpath.join( + _TrainingJobConfig.INPUT_DATA_DIR, name + ) + else: + # If the input data is local files, copy the input data to a + # temporary directory. + if not os.path.exists(input_data): + raise ValueError( + "Input data not exists: name={} input_data={}".format( + name, input_data + ) + ) + elif os.path.isdir(input_data): + distutils.dir_util.copy_tree(input_data, local_channel_path) + else: + shutil.copy( + input_data, + os.path.join(local_channel_path, os.path.basename(input_data)), + ) + + return input_data_configs + + def wait(self, show_logs: bool = True): + self._container_run.watch(show_logs=show_logs) + + def output_path(self, channel_name="model"): + return os.path.join(self.tmp_dir, "output", f"{channel_name}/") + + def is_succeeded(self): + """Return True if the training job is succeeded, otherwise return False.""" + return self._container_run.is_succeeded() diff --git a/pai/job/_training_job.py b/pai/job/_training_job.py new file mode 100644 index 0000000..ea2508e --- /dev/null +++ b/pai/job/_training_job.py @@ -0,0 +1,872 @@ +# Copyright 2024 Alibaba, Inc. or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import posixpath +import time +import typing +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Dict, List, Optional, Union + +from pydantic import BaseModel, ConfigDict, Field +from pydantic.alias_generators import to_pascal +from Tea.exceptions import TeaException + +from ..api.base import PaginatedResult +from ..common.consts import StoragePathCategory +from ..common.logging import get_logger +from ..common.oss_utils import OssUriObj, is_oss_uri, upload +from ..common.utils import ( + is_dataset_id, + is_filesystem_uri, + is_odps_table_uri, + name_from_base, + print_table, + random_str, + retry, + to_plain_text, +) +from ..exception import UnexpectedStatusException +from ..session import Session, get_default_session + +if typing.TYPE_CHECKING: + from ..estimator import FileSystemInputBase + +logger = get_logger(__name__) + + +def as_oss_dir_uri(uri: str): + return uri if uri.endswith("/") else uri + "/" + + +DEFAULT_OUTPUT_MODEL_CHANNEL_NAME = "model" +DEFAULT_CHECKPOINT_CHANNEL_NAME = "checkpoints" +DEFAULT_TENSORBOARD_CHANNEL_NAME = "tensorboard" + + +class BaseAPIModel(BaseModel): + + model_config = ConfigDict( + alias_generator=to_pascal, + populate_by_name=True, + ) + + def model_dump(self, **kwargs) -> Dict[str, Any]: + kwargs.update({"by_alias": True, "exclude_none": True}) + return super().model_dump(**kwargs) + + def to_dict(self): + return self.model_dump() + + +class TrainingJobStatus(object): + CreateFailed = "CreateFailed" + InitializeFailed = "InitializeFailed" + Succeed = "Succeed" + Failed = "Failed" + Terminated = "Terminated" + Creating = "Creating" + Created = "Created" + Initializing = "Initializing" + Submitted = "Submitted" + Running = "Running" + + @classmethod + def completed_status(cls): + return [ + cls.InitializeFailed, + cls.Succeed, + cls.Failed, + cls.Terminated, + ] + + @classmethod + def failed_status(cls): + return [ + cls.InitializeFailed, + cls.Failed, + cls.CreateFailed, + ] + + +class UserVpcConfig(BaseAPIModel): + """UserVpcConfig represents the VPC configuration for the training job instance.""" + + vpc_id: str = Field( + ..., + description="Specifies the ID of the VPC that training job instance connects to.", + ) + security_group_id: str = Field( + ..., + description="The ID of the security group that training job instances belong to.", + ) + switch_id: Optional[str] = Field( + None, + description="The ID of the vSwitch to which the instance belongs. Defaults to None.", + ) + extended_cidrs: Optional[List[str]] = Field( + None, + description="The CIDR blocks configured for the ENI of the training job instance. " + "If it is not specified, the CIDR block will be configured as the same as the VPC " + "network segmentation, which means that the training job instance can access all " + "resources in the VPC. Defaults to None.", + ) + + +class ExperimentConfig(BaseAPIModel): + """ExperimentConfig is used to configure the experiment to which the job belongs.""" + + experiment_id: str = Field( + ..., + description="Specifies the ID of the experiment that training job instance belongs to.", + ) + + +class OssLocation(BaseAPIModel): + """OSS location.""" + + bucket: str = Field(..., description="OSS bucket name.") + key: str = Field(..., description="Object key in the OSS bucket.") + endpoint: Optional[str] = Field(None, description="OSS service endpoint URL.") + + +class CodeDir(BaseAPIModel): + """Source code location""" + + location_value: Union[OssLocation, Dict[str, Any]] = Field( + ..., description="Location of the code directory." + ) + location_type: str = Field( + ..., description="Type of the code directory location, e.g., OSS." + ) + + +# HyperParameter +class HyperParameter(BaseAPIModel): + """A hyperparameter for a training job.""" + + value: str = Field(..., description="Value of the hyperparameter.") + name: str = Field(..., description="Name of the hyperparameter.") + + +class InstanceSpec(BaseAPIModel): + """Instance resource configuration""" + + memory: str = Field(..., description="Memory allocation for the instance.") + cpu: str = Field(..., alias="CPU", description="CPU allocation for the instance.") + gpu: str = Field(..., alias="GPU", description="GPU allocation for the instance.") + shared_memory: Optional[str] = Field( + None, description="Shared memory allocation, if applicable." + ) + + +class ComputeResource(BaseAPIModel): + """Compute Resource Configuration.""" + + ecs_count: Optional[int] = Field(None, description="Number of ECS instances.") + ecs_spec: Optional[str] = Field(None, description="Specification of ECS instances.") + instance_count: Optional[int] = Field(None, description="Number of instances.") + instance_spec: Optional[InstanceSpec] = Field( + None, description="Specification for instances." + ) + + +# URI Input and Output +class UriInput(BaseAPIModel): + """URI Input for a training job.""" + + name: str = Field(..., description="Name of the input.") + input_uri: str = Field(..., description="URI of the input data.") + + +class UriOutput(BaseAPIModel): + """URI Output for a training job.""" + + name: str = Field(..., description="Name of the output.") + output_uri: str = Field(..., description="URI of the output data.") + + +class DatasetConfig(BaseAPIModel): + """Dataset Configuration""" + + dataset_id: str = Field(..., description="Unique ID of the dataset.") + name: Optional[str] = Field(None, description="Name of the dataset.") + dataset_name: Optional[str] = Field( + None, description="Alternative name of the dataset." + ) + + +class Channel(BaseAPIModel): + """Channel Configuration.""" + + name: str = Field(..., description="Name of the channel.") + description: Optional[str] = Field(None, description="Description of the channel.") + required: Optional[bool] = Field( + None, description="Indicates if the channel is required." + ) + supported_channel_types: Optional[List[str]] = Field( + None, description="Supported types for this channel." + ) + properties: Optional[Dict[str, Any]] = Field( + None, description="Additional properties of the channel." + ) + + +# HyperParameter Definition +class HyperParameterDefinition(BaseAPIModel): + """HyerParameter Definition.""" + + name: str = Field(..., description="Name of the hyperparameter.") + type: Optional[str] = Field(None, description="Type of the hyperparameter.") + default_value: Optional[str] = Field( + None, description="Default value of the hyperparameter." + ) + description: Optional[str] = Field( + None, description="Description of the hyperparameter." + ) + required: bool = Field( + False, description="Indicates if the hyperparameter is required." + ) + + +class SchedulerConfig(BaseAPIModel): + max_running_time_in_seconds: Optional[int] = None + + +class MetricDefinition(BaseAPIModel): + description: Optional[str] = Field(None, description="Description of the metric.") + name: str = Field(..., description="Name of the metric.") + regex: str = Field( + ..., description="Regular expression used for capturing the metric." + ) + + +class AlgorithmSpec(BaseAPIModel): + """Algorithm Specification.""" + + command: List[str] = Field(..., description="Command to run the training job.") + image: str = Field(..., description="Docker image for the training job.") + supported_channel_types: List[str] = Field(default_factory=list) + output_channels: List[Channel] = Field( + default_factory=list, description="Output channels." + ) + input_channels: List[Channel] = Field( + default_factory=list, description="Input channels." + ) + supports_distributed_training: Optional[bool] = Field( + True, description="Whether the algorithm supports distributed training." + ) + supported_instance_types: Optional[List[str]] = Field( + None, description="Supported instance types." + ) + metric_definitions: Optional[List[MetricDefinition]] = Field( + None, description="Metric definitions." + ) + hyperparameter_definitions: List[HyperParameterDefinition] = Field( + default_factory=list, + alias="HyperParameter", + description="Hyperparameter definitions.", + ) + job_type: str = Field(default="PyTorchJob") + code_dir: Optional[CodeDir] = Field(None, description="Source code location.") + + +class ModelTrainingSpec(BaseAPIModel): + compute_resource: Optional[ComputeResource] = None + hyperparameters: List[HyperParameter] = Field( + default_factory=list, alias="HyperParameters" + ) + inputs: List[Union[UriInput, DatasetConfig]] = Field( + default_factory=list, alias="InputChannels" + ) + scheduler: Optional[SchedulerConfig] = None + supported_instance_types: Optional[List[str]] = None + algorithm_spec: Optional[AlgorithmSpec] = None + algorithm_version: Optional[str] = None + algorithm_provider: Optional[str] = None + algorithm_name: Optional[str] = None + environments: Optional[Dict[str, str]] = None + requirements: Optional[List[str]] = None + + +ModelEvaluationSpec = ModelTrainingSpec + + +class TrainingJob(BaseAPIModel): + """TrainingJob represents a training job in the PAI service.""" + + algorithm_id: Optional[str] = None + algorithm_name: Optional[str] = None + algorithm_provider: Optional[str] = None + algorithm_version: Optional[str] = None + algorithm_spec: Optional[AlgorithmSpec] = None + compute_resource: Optional[ComputeResource] = None + scheduler: Optional[SchedulerConfig] = None + experiment_config: Optional[Dict[str, Any]] = None + inputs: List[Union[UriInput, DatasetConfig]] = Field( + default=list, alias="InputChannels" + ) + outputs: List[Union[UriOutput, DatasetConfig]] = Field( + default=list, alias="OutputChannels" + ) + hyperparameters: List[HyperParameter] = Field( + default_factory=list, alias="HyperParameters" + ) + labels: Optional[List[Dict[str, str]]] = Field(default_factory=list) + training_job_description: Optional[str] = None + training_job_id: Optional[str] = None + training_job_name: Optional[str] = None + workspace_id: Optional[str] = None + training_job_url: Optional[str] = None + status: Optional[str] = None + reason_code: Optional[str] = None + reason_message: Optional[str] = None + + def __hash__(self): + return hash(self.training_job_id) + + def __eq__(self, other: "TrainingJob"): + return ( + isinstance(other, TrainingJob) + and self.training_job_id == other.training_job_id + ) + + @property + def id(self): + return self.training_job_id + + @classmethod + def get(cls, training_job_id, session: Session = None) -> "TrainingJob": + session = session or get_default_session() + res = session.training_job_api.get(training_job_id=training_job_id) + return cls.model_validate(res) + + @classmethod + def list( + cls, + status: Optional[str] = None, + session: Optional[Session] = None, + page_size: int = 50, + page_number: int = 1, + ): + session = session or get_default_session() + res = session.training_job_api.list( + status=status, page_size=page_size, page_number=page_number + ) + return [cls.model_validate(item) for item in res.items] + + def output_path(self, channel_name="model"): + for output_channel in self.outputs: + if output_channel.name == channel_name: + return output_channel.output_uri + raise RuntimeError( + f"Output channel is not specified: channel_name={channel_name}" + ) + + @property + def console_uri(self): + if not self.training_job_id: + raise ValueError("The TrainingJob is not submitted") + + return self.training_job_url + + def wait(self, interval: int = 5, show_logs: bool = True): + session = get_default_session() + self._refresh_status() + + if show_logs: + job_log_printer = _TrainingJobLogPrinter( + training_job_id=self.training_job_id, page_size=20, session=session + ) + job_log_printer.start() + else: + job_log_printer = None + try: + while not self.is_completed(): + time.sleep(interval) + finally: + if job_log_printer: + job_log_printer.stop(wait=True) + + self._on_job_completed() + + def _on_job_completed(self): + # Print an empty line to separate the training job logs and the following logs + print() + if self.status == TrainingJobStatus.Succeed: + print( + f"Training job ({self.training_job_id}) succeeded, you can check the" + f" logs/metrics/output in the console:\n{self.console_uri}" + ) + elif self.status == TrainingJobStatus.Terminated: + print( + f"Training job is ended with status {self.status}: " + f"reason_code={self.reason_code}, reason_message={self.reason_message}." + f"Check the training job in the console:\n{self.console_uri}" + ) + elif self.status in TrainingJobStatus.failed_status(): + print( + f"Training job ({self.training_job_id}) failed, please check the logs" + f" in the console: \n{self.console_uri}" + ) + + message = f"TrainingJob failed: name={self.training_job_name}, " + f"training_job_id={self.training_job_id}, " + f"reason_code={self.reason_code}, status={self.status}, " + f"reason_message={self.reason_message}" + + raise UnexpectedStatusException(message=message, status=self.status) + + def _refresh_status(self): + """Reload the training job from the PAI Service,""" + session = get_default_session() + training_job = type(self).model_validate( + session.training_job_api.get(training_job_id=self.training_job_id) + ) + self.status = training_job.status + + def is_succeeded(self): + """Return True if the training job is succeeded""" + self._refresh_status() + return self.status == TrainingJobStatus.Succeed + + @retry(wait_secs=10) + def is_completed(self): + """Return True if the training job is completed, including failed status""" + if self.status in TrainingJobStatus.completed_status(): + return True + self._refresh_status() + + return self.status in TrainingJobStatus.completed_status() + + +class _TrainingJobLogPrinter(object): + """A class used to print logs for a training job""" + + executor = ThreadPoolExecutor(5) + + def __init__( + self, training_job_id: str, page_size=10, session: Optional[Session] = None + ): + self.training_job_id = training_job_id + self.session = session + self.page_size = page_size + self._future = None + self._stop = False + + def _list_logs_api(self, page_number: int = 1): + try: + res = self.session.training_job_api.list_logs( + self.training_job_id, + page_number=page_number, + page_size=self.page_size, + ) + return res + except TeaException as e: + # hack: Backend service may raise an exception when the training job + # instance is not found. + if e.code == "TRAINING_JOB_INSTANCE_NOT_FOUND": + return PaginatedResult(items=[], total_count=0) + else: + raise e + + def _list_logs(self): + page_number, page_offset = 1, 0 + # print training job logs. + while not self._stop: + res = self._list_logs_api(page_number=page_number) + # 1. move to next page + if len(res.items) == self.page_size: + # print new logs starting from page_offset + self._print_logs(logs=res.items[page_offset:]) + page_number += 1 + page_offset = 0 + # 2. stay at the current page. + else: + if len(res.items) > page_offset: + # print new logs starting from page_offset + self._print_logs(logs=res.items[page_offset:]) + page_offset = len(res.items) + time.sleep(1) + + # When _stop is True, wait and print remaining logs. + time.sleep(10) + while True: + res = self._list_logs_api(page_number=page_number) + # There maybe more logs in the next page + if len(res.items) == self.page_size: + self._print_logs(logs=res.items[page_offset:]) + page_number += 1 + page_offset = 0 + # No more logs in the next page. + else: + if len(res.items) > page_offset: + self._print_logs(logs=res.items[page_offset:]) + break + + def _print_logs(self, logs: List[str]): + for log in logs: + print(log) + + def start(self): + if self._future: + raise ValueError("The training job log printer is already started") + self._stop = False + self._future = self.executor.submit(self._list_logs) + + def stop(self, wait: bool = True): + self._stop = True + if self._future: + self._future.result() + + +class _TrainingJobSubmitter(object): + """A class used to submit a training job to the PAI service.""" + + def __init__( + self, + base_job_name: Optional[str] = None, + output_path: Optional[str] = None, + experiment_config: Optional[ExperimentConfig] = None, + user_vpc_config: Optional[UserVpcConfig] = None, + max_run_time: Optional[int] = None, + instance_type: Optional[str] = None, + instance_spec: Optional[Dict] = None, + instance_count: Optional[int] = None, + resource_id: Optional[Dict] = None, + environments: Optional[Dict] = None, + requirements: Optional[List[str]] = None, + labels: Optional[Dict[str, str]] = None, + ): + self.session = get_default_session() + self._training_jobs = [] + self.base_job_name = base_job_name or type(self).__name__.lower() + self.output_path = output_path + self.user_vpc_config = user_vpc_config + self.experiment_config = experiment_config + self.max_run_time = max_run_time + self.instance_type = instance_type + self.instance_spec = instance_spec + self.instance_count = instance_count or 1 + self.resource_id = resource_id + self.environments = environments + self.requirements = requirements + self.labels = labels + + def wait(self, interval: int = 5, show_logs: bool = True, all_jobs: bool = False): + """Block until the jobs is completed. + + Args: + interval(int): Interval to reload job status + show_logs(bool): Specifies whether to fetch and print the logs produced by + the job. + all_jobs(bool): Wait latest job or wait all jobs in processor, show_logs disabled while + wait all jobs. + + Raises: + RuntimeError: If no job is submitted. + + """ + if all_jobs: + if not self._training_jobs: + raise RuntimeError("Could not find any submitted job.") + remains = set(self._training_jobs) + while remains: + for job in self._training_jobs: + if job in remains and job.is_completed(): + remains.remove(job) + + time.sleep(interval) + self._generate_jobs_report() + else: + latest_job = self.latest_job + if not latest_job: + raise RuntimeError("Could not find a submitted job.") + latest_job.wait(interval=interval, show_logs=show_logs) + return latest_job + + def _generate_jobs_report(self): + """Generate current jobs report and output to stdout""" + print(f"Jobs status report, total jobs count: {len(self._training_jobs)}") + rows = [] + headers = ["JobName", "JobID", "Status"] + for job in self._training_jobs: + rows.append([job.training_job_name, job.id, job.status]) + print_table(headers, rows) + + def job_name(self, job_name: Optional[str] = None): + if job_name: + return job_name + sep = "-" + base_name = self.base_job_name + return name_from_base(base_name, sep) + + def build_inputs( + self, + inputs: Dict[str, Any], + input_channels: List[Channel], + default_inputs: List[Union[DatasetConfig, UriInput]] = None, + ) -> List[Dict[str, str]]: + res = [] + inputs = inputs or dict() + input_channels = input_channels or [] + default_inputs = default_inputs or [] + + input_keys = set(list(inputs.keys()) + [item.name for item in default_inputs]) + + requires = {ch.name for ch in input_channels if ch.required} - input_keys + if requires: + raise ValueError( + "Required input channels are not provided: {}".format( + ",".join(requires) + ) + ) + for name, item in inputs.items(): + input_config = self._get_input_config(name, item) + res.append(input_config.model_dump()) + + for item in default_inputs: + res.append(item.model_dump()) + + return res + + @staticmethod + def _default_training_output_channels() -> List[Channel]: + channels = [ + Channel( + name=DEFAULT_OUTPUT_MODEL_CHANNEL_NAME, + description="Training output models", + required=True, + ), + Channel( + name=DEFAULT_CHECKPOINT_CHANNEL_NAME, + description="Training checkpoints channel", + required=False, + ), + Channel( + name=DEFAULT_TENSORBOARD_CHANNEL_NAME, + properties={"ossAppendable": "true"}, + description="TensorBoard logs channel", + required=False, + ), + ] + + return channels + + def _training_job_base_output(self, job_name): + job_name = to_plain_text(job_name) + if self.output_path: + if not is_oss_uri(self.output_path): + raise ValueError("Output path should be an OSS path.") + return os.path.join(self.output_path, f"{job_name}_{random_str(6)}") + + session = get_default_session() + bucket_name = session.oss_bucket.bucket_name + storage_path = session.get_storage_path_by_category( + StoragePathCategory.TrainingJob, + f"{to_plain_text(job_name)}_{random_str(6)}", + ) + base_output_path = f"oss://{bucket_name}/{storage_path}" + return base_output_path + + def build_outputs( + self, + job_name: str, + output_channels: List[Channel], + outputs: Optional[Dict[str, Any]] = None, + ) -> List[Dict[str, str]]: + base_output_path = self._training_job_base_output(job_name) + res = [] + outputs = outputs or dict() + + for ch in output_channels: + if ch.name in outputs: + output = self._get_output_config(name=ch.name, item=outputs[ch.name]) + else: + output_uri = as_oss_dir_uri(posixpath.join(base_output_path, ch.name)) + output = UriOutput(name=ch.name, output_uri=output_uri) + res.append(output) + + extra_outputs = set(outputs.keys()) - {ch.name for ch in output_channels} + + for name in extra_outputs: + output = self._get_output_config( + name=name, + item=outputs[name], + ) + res.append(output) + + return [item.model_dump() for item in res] + + def _submit( + self, + job_name: str, + algorithm_spec: Optional[AlgorithmSpec] = None, + algorithm_name: Optional[str] = None, + algorithm_version: Optional[str] = None, + algorithm_provider: Optional[str] = None, + instance_count: int = 1, + instance_type: Optional[str] = None, + instance_spec: Optional[InstanceSpec] = None, + resource_id: Optional[str] = None, + inputs: Optional[List[Dict[str, Any]]] = None, + outputs: Optional[List[Dict[str, Any]]] = None, + hyperparameters: Optional[Dict[str, str]] = None, + max_run_time: Optional[int] = None, + environments: Optional[Dict[str, str]] = None, + user_vpc_config: Optional[Dict[str, str]] = None, + requirements: Optional[List[str]] = None, + experiment_config: Optional[Dict[str, Any]] = None, + labels: Optional[Dict[str, str]] = None, + wait: bool = True, + show_logs: bool = False, + ): + session = get_default_session() + training_job_id = session.training_job_api.create( + instance_count=instance_count, + instance_spec=instance_spec.model_dump() if instance_spec else None, + algorithm_name=algorithm_name, + algorithm_provider=algorithm_provider, + experiment_config=( + experiment_config.model_dump() + if experiment_config and isinstance(experiment_config, ExperimentConfig) + else experiment_config + ), + algorithm_version=algorithm_version, + instance_type=instance_type, + resource_id=resource_id, + job_name=job_name, + hyperparameters=hyperparameters, + max_running_in_seconds=max_run_time, + input_channels=inputs, + output_channels=outputs, + algorithm_spec=algorithm_spec.model_dump() if algorithm_spec else None, + requirements=requirements, + user_vpc_config=user_vpc_config, + labels=labels, + environments=environments, + ) + training_job = TrainingJob.get(training_job_id) + self._training_jobs.append(training_job) + print( + f"View the job detail by accessing the console URI: {training_job.console_uri}" + ) + if wait: + training_job.wait(show_logs=show_logs) + + @classmethod + def _get_input_config( + cls, name: str, item: Union[str, "FileSystemInputBase", DatasetConfig] + ): + """Get input uri for training_job from given input.""" + from pai.estimator import FileSystemInputBase + + if not isinstance(item, (str, FileSystemInputBase, DatasetConfig)): + raise ValueError(f"Input data of type {type(item)} is not supported.") + + if isinstance(item, FileSystemInputBase): + input_ = UriInput( + name=name, + input_uri=item.to_input_uri(), + ) + elif isinstance(item, DatasetConfig): + input_ = DatasetConfig( + name=name, + dataset_id=item.dataset_id, + ) + elif is_oss_uri(item) or is_filesystem_uri(item) or is_odps_table_uri(item): + input_ = UriInput( + name=name, + input_uri=item, + ) + elif os.path.exists(item): + store_path = Session.get_storage_path_by_category( + StoragePathCategory.InputData + ) + input_ = UriInput(name=name, input_uri=upload(item, store_path)) + elif is_dataset_id(item): + input_ = DatasetConfig( + dataset_id=item, + name=name, + ) + else: + raise ValueError( + "Invalid input data, supported inputs are OSS, NAS, MaxCompute " + "table or local path." + ) + return input_ + + @classmethod + def _get_output_config( + cls, name: str, item: str + ) -> Union[UriOutput, DatasetConfig]: + from pai.estimator import FileSystemInputBase + + if not isinstance(item, (str, FileSystemInputBase, DatasetConfig)): + raise ValueError(f"Output data of type {type(item)} is not supported.") + + if isinstance(item, FileSystemInputBase): + output = UriOutput( + name=name, + output_uri=item.to_input_uri(), + ) + elif isinstance(item, DatasetConfig): + output = DatasetConfig(name=name, dataset_id=item.dataset_id) + elif is_oss_uri(item) or is_filesystem_uri(item) or is_odps_table_uri(item): + output = UriOutput( + name=name, + output_uri=as_oss_dir_uri(item), + ) + else: + raise ValueError( + "Invalid output data, supported outputs are OSS, NAS, MaxCompute " + ) + + return output + + @property + def latest_job(self) -> "TrainingJob": + return self._training_jobs[-1] if self._training_jobs else None + + def _build_code_input( + self, job_name: str, source_dir: Optional[str], code_dest: Optional[str] = None + ) -> Optional[CodeDir]: + """Upload source files to OSS and return the code input for training job.""" + if not source_dir: + return + if is_oss_uri(source_dir): + code_uri = source_dir + elif not os.path.exists(source_dir): + raise ValueError(f"Source directory {source_dir} does not exist.") + else: + code_dest = code_dest or self.session.get_storage_path_by_category( + StoragePathCategory.TrainingSrc, to_plain_text(job_name) + ) + code_uri = upload( + source_path=source_dir, + oss_path=code_dest, + bucket=self.session.oss_bucket, + is_tar=True, + ) + oss_uri_obj = OssUriObj(uri=self.session.patch_oss_endpoint(code_uri)) + code_dir = CodeDir( + location_type="oss", + location_value=OssLocation( + bucket=oss_uri_obj.bucket_name, + key=oss_uri_obj.object_key, + endpoint=oss_uri_obj.endpoint, + ), + ) + + return code_dir diff --git a/pai/model.py b/pai/model.py index c5065b9..52c9a7a 100644 --- a/pai/model.py +++ b/pai/model.py @@ -30,8 +30,7 @@ from oss2 import ObjectIterator from .common import ProviderAlibabaPAI, git_utils -from .common.configs import UserVpcConfig -from .common.consts import INSTANCE_TYPE_LOCAL_GPU, ModelFormat +from .common.consts import INSTANCE_TYPE_LOCAL_GPU, ModelFormat, StoragePathCategory from .common.docker_utils import ContainerRun, run_container from .common.logging import get_logger from .common.oss_utils import OssUriObj, download, is_oss_uri, upload @@ -43,6 +42,7 @@ ) from .exception import DuplicatedMountException from .image import ImageInfo +from .job import InstanceSpec, ModelTrainingSpec, UriInput, UserVpcConfig from .predictor import AsyncPredictor, LocalPredictor, Predictor, ServiceType from .serializers import SerializerBase from .session import Session, get_default_session @@ -276,7 +276,9 @@ def _upload_source_dir(cls, source_dir, session): f"Input source code path should be a directory: {source_dir}." ) - target_dir = session.get_storage_path_by_category(category="inference_src") + target_dir = session.get_storage_path_by_category( + category=StoragePathCategory.InferenceSrc + ) # upload local script data to the OSS bucket. uploaded_source_code = upload( source_dir, @@ -358,7 +360,9 @@ def mount( elif os.path.exists(source): # if source is a local path, upload it to OSS bucket and use OSS URI # as storage source. - oss_path = session.get_storage_path_by_category("model_data") + oss_path = session.get_storage_path_by_category( + StoragePathCategory.ModelData + ) oss_uri = upload( source_path=source, oss_path=oss_path, bucket=session.oss_bucket ) @@ -589,12 +593,14 @@ def container_serving_spec( "image": image_uri, "port": port, "script": command, - "env": [ - {"name": key, "value": str(value)} - for key, value in environment_variables.items() - ] - if environment_variables - else [], + "env": ( + [ + {"name": key, "value": str(value)} + for key, value in environment_variables.items() + ] + if environment_variables + else [] + ), } if health_check: @@ -757,7 +763,9 @@ def _upload_model_data(self): elif not os.path.exists(self.model_data): raise RuntimeError(f"Model data path does not exist: {self.model_data}") - dest_oss_path = self.session.get_storage_path_by_category(category="model_data") + dest_oss_path = self.session.get_storage_path_by_category( + category=StoragePathCategory.ModelData + ) upload_model_data = upload( source_path=self.model_data, oss_path=dest_oss_path, @@ -1229,9 +1237,9 @@ def register( framework_type=framework_type, training_spec=training_spec, evaluation_spec=evaluation_spec, - inference_spec=self.inference_spec.to_dict() - if self.inference_spec - else None, + inference_spec=( + self.inference_spec.to_dict() if self.inference_spec else None + ), approval_status=approval_status, metrics=metrics, options=options, @@ -1832,6 +1840,36 @@ def _build_service_config( return inference_spec.to_dict() + def get_training_spec(self, training_method: Optional[str]) -> ModelTrainingSpec: + if type(self)._is_multiple_spec(self.training_spec): + supported_training_methods = list(self.training_spec.keys()) + if training_method and training_method not in supported_training_methods: + raise ValueError( + "The model does not support the given training method:" + f" {training_method}. Supported training methods are:" + f" {supported_training_methods}." + ) + elif training_method: + ts = self.training_spec.get(training_method) + else: + training_method = supported_training_methods[0] + logger.warning( + "The training method is not specified, using the default training" + " method: %s. Supported training methods are: %s.", + training_method, + supported_training_methods, + ) + ts = self.training_spec.get(training_method) + else: + # Does not support training methods. # Use default training spec. + if training_method: + raise ValueError( + "The model does not support choosing training method. Do not" + " specify the training method." + ) + ts = self.training_spec + return ModelTrainingSpec.model_validate(ts) + def get_estimator( self, training_method: Optional[str] = None, @@ -1848,7 +1886,7 @@ def get_estimator( Generate an AlgorithmEstimator object from RegisteredModel's training_spec. Args: - training_method (str, optional): Used to selected the training algorithm + training_method (str, optional): Used to select the training algorithm that supported by the model. If not specified, the default training algorithm will be retrieved from the model version. instance_type (str, optional): The machine instance type used to run the @@ -1881,59 +1919,14 @@ def get_estimator( raise ValueError( "The provided registered model does not contain training spec." ) - ts = self.training_spec - if "AlgorithmSpec" not in ts and "AlgorithmName" not in ts: - # Support choosing training methods. - supported_training_methods = list(ts.keys()) - if training_method and training_method not in supported_training_methods: - raise ValueError( - "The model does not support the given training method:" - f" {training_method}. Supported training methods are:" - f" {supported_training_methods}." - ) - elif training_method: - ts = ts.get(training_method) - else: - training_method = supported_training_methods[0] - logger.warning( - "The training method is not specified, using the default training" - " method: %s. Supported training methods are: %s.", - training_method, - supported_training_methods, - ) - ts = ts.get(training_method) - else: - # Does not support training methods. - # Use default training spec. - if training_method: - raise ValueError( - "The model does not support choosing training method. Do not" - " specify the training method." - ) - - if "AlgorithmSpec" not in ts and "AlgorithmName" not in ts: - raise ValueError( - "The provided registered model's training spec does not contain any" - " algorithms." - ) - if "AlgorithmSpec" in ts: - algorithm_spec = ts.get("AlgorithmSpec") - algorithm_name, algorithm_provider, algorithm_version = (None, None, None) - else: - algorithm_name, algorithm_provider, algorithm_version = ( - ts.get("AlgorithmName"), - ts.get("AlgorithmProvider"), - ts.get("AlgorithmVersion"), - ) - algorithm_spec = None - + ts = self.get_training_spec(training_method=training_method) hyperparameters = hyperparameters or {} # TODO: validate the given hyperparameters via algorithm definition - for hp in ts.get("HyperParameters", []): - if hp["Name"] not in hyperparameters: + for hp in ts.hyperparameters: + if hp.name not in hyperparameters: hyperparameters.update( { - hp["Name"]: hp["Value"], + hp.name: hp.value, } ) @@ -1941,18 +1934,42 @@ def get_estimator( base_job_name = f"{self.model_name}_training" if self.model_name else None if not max_run_time: - max_run_time = ts.get("Scheduler", {}).get("MaxRunningTimeInSeconds") + max_run_time = ( + ts.scheduler.max_running_time_in_seconds if ts.scheduler else None + ) - train_compute_resource = ts.get("ComputeResource") + resource_id = kwargs.get("resource_id") instance_spec = kwargs.get("instance_spec") - if train_compute_resource: - instance_type = instance_type or train_compute_resource.get("EcsSpec") + compute_resource = ts.compute_resource + if resource_id: + if instance_type: + logger.warning( + "The instance type is ignored when resource_id is provided." + ) + instance_spec = instance_type or compute_resource.instance_spec + if not instance_spec: + raise ValueError( + "Instance spec is required when resource_id is provided." + ) + instance_spec = InstanceSpec.model_validate(instance_spec) + instance_count = ( + instance_count + or compute_resource.instance_count + or compute_resource.ecs_count + or 1 + ) + else: + if instance_spec: + logger.warning( + "The instance spec is ignored when resource_id is not provided." + ) + instance_type = instance_type or compute_resource.ecs_spec instance_count = ( instance_count - or train_compute_resource.get("EcsCount") - or train_compute_resource.get("InstanceCount") + or compute_resource.ecs_count + or compute_resource.instance_count + or 1 ) - instance_spec = instance_spec or train_compute_resource.get("InstanceSpec") labels = kwargs.pop("labels", dict()) if self.model_provider == ProviderAlibabaPAI: @@ -1969,10 +1986,10 @@ def get_estimator( labels = default_labels return AlgorithmEstimator( - algorithm_name=algorithm_name, - algorithm_version=algorithm_version, - algorithm_provider=algorithm_provider, - algorithm_spec=algorithm_spec, + algorithm_name=ts.algorithm_name, + algorithm_version=ts.algorithm_version, + algorithm_provider=ts.algorithm_provider, + algorithm_spec=ts.algorithm_spec, hyperparameters=hyperparameters, base_job_name=base_job_name, max_run_time=max_run_time, @@ -1984,35 +2001,26 @@ def get_estimator( **kwargs, ) - def get_estimator_inputs(self) -> Dict[str, str]: + def get_estimator_inputs(self, training_method=None) -> Dict[str, Any]: """Get the AlgorithmEstimator's default input channels Get the AlgorithmEstimator's default input channels from RegisteredModel's training_spec. Returns: - dict[str, str]: A dict of input channels. + Dict[str, str]: A dict of input channels. """ - if not self.training_spec: - raise ValueError( - "The provided registered model does not contain training spec." - ) - ts = self.training_spec - if "AlgorithmSpec" not in ts and "AlgorithmName" not in ts: - raise ValueError( - "The provided registered model's training spec does not contain any" - " algorithms." - ) + default_inputs = ( + self.get_training_spec(training_method=training_method).inputs or [] + ) - input_channels = {} - if "InputChannels" in ts: - for i in ts["InputChannels"]: - input_channels.update( - { - i["Name"]: i["InputUri"], - } - ) - return input_channels + ret = {} + for item in default_inputs: + if isinstance(item, UriInput): + ret[item.name] = item.input_uri + else: + ret[item.name] = item + return ret def get_eval_processor( self, @@ -2066,47 +2074,44 @@ def get_eval_processor( raise ValueError( "The provided registered model does not contain evaluation spec." ) - - if "AlgorithmSpec" not in eval_spec: + eval_spec = ModelTrainingSpec.model_validate(eval_spec) + if not eval_spec.algorithm_spec: raise ValueError( - "The provided registered model's evaluation spec does not contain any" - " workload." + "Invalid evaluation spec, the evaluation spec does not contain any" + " configuration for the evaluation job." ) - workload = eval_spec.get("AlgorithmSpec") + # workload = eval_spec.get("AlgorithmSpec") if not base_job_name: base_job_name = f"{self.model_name}_eval" if self.model_name else None parameters = parameters or dict() - for item in eval_spec.get("HyperParameters"): - name = item["Name"] - value = item["Value"] - if name not in parameters: - parameters[name] = value + for item in eval_spec.hyperparameters: + if item.name not in parameters: + parameters[item.name] = item.value if not max_run_time: - max_run_time = eval_spec.get("Scheduler", {}).get("MaxRunningTimeInSeconds") + max_run_time = eval_spec.scheduler.max_running_time_in_seconds - compute_resource = eval_spec.get("ComputeResource") + compute_resource = eval_spec.compute_resource if compute_resource and (not instance_type or not instance_count): # If instance_type or instance_count is not provided, use the default - instance_type = instance_type or compute_resource.get("EcsSpec") - instance_count = instance_count or compute_resource.get("EcsCount") + instance_type = instance_type or compute_resource.ecs_spec + instance_count = instance_count or compute_resource.ecs_count source_dir = None - code_dir = workload.get("CodeDir") - if code_dir and code_dir.get("LocationType") == "oss": - location = code_dir.get("LocationValue") - oss_path = OssUriObj.from_bucket_key_endpoint( - bucket_name=location.get("Bucket"), - object_key=location.get("Key"), - endpoint=location.get("Endpoint"), - ) - source_dir = oss_path.uri + code_dir = eval_spec.algorithm_spec.code_dir + if code_dir and code_dir.location_type == "oss": + oss_uri_obj = OssUriObj.from_bucket_key_endpoint( + bucket_name=code_dir.location_value.bucket, + object_key=code_dir.location_value.key, + endpoint=code_dir.location_value.endpoint, + ) + source_dir = oss_uri_obj.uri processor = Processor( - image_uri=workload.get("Image"), - command=" ".join(workload.get("Command")), + image_uri=eval_spec.algorithm_spec.image, + command=eval_spec.algorithm_spec.command, source_dir=source_dir, parameters=parameters, max_run_time=max_run_time, @@ -2117,8 +2122,8 @@ def get_eval_processor( user_vpc_config=user_vpc_config, session=self.session, ) - processor.set_input_channel_definitions(workload["InputChannels"]) - processor.set_output_channel_definitions(workload["OutputChannels"]) + processor.set_input_channels(eval_spec.algorithm_spec.input_channels) + processor.set_output_channels(eval_spec.algorithm_spec.output_channels) return processor @@ -2135,17 +2140,17 @@ def get_evaluation_inputs(self) -> Dict[str, Any]: raise ValueError( "The provided registered model does not contain evaluation spec." ) + eval_spec = ModelTrainingSpec.model_validate(self.evaluation_spec) + inputs = eval_spec.inputs or [] + res = {} - input_channels = {} - if "InputChannels" in self.evaluation_spec: - for i in self.evaluation_spec["InputChannels"]: - input_channels.update( - { - i["Name"]: i.get("InputUri") or i.get("DatasetId"), - } - ) + for item in inputs: + res[item.name] = item.input_uri if isinstance(item, UriInput) else item + return res - return input_channels + @classmethod + def _is_multiple_spec(cls, spec: Dict[str, Any]) -> bool: + return not ("AlgorithmSpec" in spec or "AlgorithmName" in spec) def _get_evaluation_spec(self): """Get the evaluation_spec of the registered model.""" diff --git a/pai/processor.py b/pai/processor.py index a0e2bf5..e73b085 100644 --- a/pai/processor.py +++ b/pai/processor.py @@ -12,130 +12,28 @@ # See the License for the specific language governing permissions and # limitations under the License. import os -import posixpath -import time from datetime import datetime from typing import Any, Dict, List, Optional, Union -from .common.configs import UserVpcConfig from .common.consts import JobType, StoragePathCategory from .common.logging import get_logger -from .common.oss_utils import OssUriObj, is_oss_uri, upload -from .common.utils import ( - experimental, - is_dataset_id, - is_filesystem_uri, - is_odps_table_uri, - random_str, - to_plain_text, +from .common.utils import experimental, random_str, to_plain_text +from .job import ( + AlgorithmSpec, + Channel, + CodeDir, + ExperimentConfig, + TrainingJob, + UserVpcConfig, + _TrainingJobSubmitter, ) -from .estimator import FileSystemInputBase -from .estimator import _TrainingJob as _Job -from .experiment import ExperimentConfig from .session import Session, get_default_session logger = get_logger(__name__) -def build_code_input( - source_dir: str, upload_data_path: str -) -> Optional[Dict[str, Any]]: - """Upload local code and build CodeDir config for job.""" - if not source_dir: - return - - from pai.session import get_default_session - - sess = get_default_session() - - if is_oss_uri(source_dir): - code_oss_uri = source_dir - elif os.path.exists(source_dir): - code_oss_uri = upload( - source_path=source_dir, - oss_path=upload_data_path, - bucket=sess.oss_bucket, - is_tar=True, - ) - else: - raise ValueError(f"Source directory {source_dir} does not exist.") - - code_oss_obj = OssUriObj(uri=sess.patch_oss_endpoint(code_oss_uri)) - res = { - "LocationType": "oss", - "LocationValue": { - "Bucket": code_oss_obj.bucket_name, - "Key": code_oss_obj.object_key, - "Endpoint": code_oss_obj.endpoint, - }, - } - - return res - - -def get_input_channel_config( - item: Optional[Union[str, FileSystemInputBase]] -) -> Dict[str, str]: - """Get channel config from given job input.""" - - if not isinstance(item, (str, FileSystemInputBase)): - raise ValueError(f"Input data of type {type(item)} is not supported.") - - if isinstance(item, FileSystemInputBase): - config = {"InputUri": item.to_input_uri()} - elif is_oss_uri(item) or is_filesystem_uri(item) or is_odps_table_uri(item): - config = {"InputUri": item} - elif is_dataset_id(item): - config = {"DatasetId": item} - elif os.path.exists(item): - store_path = Session.get_storage_path_by_category(StoragePathCategory.InputData) - config = {"InputUri": upload(item, store_path)} - else: - raise ValueError( - "Invalid input data, supported inputs are OSS, NAS, MaxCompute " - "table or local path." - ) - - return config - - -def get_output_channel_config( - item: Optional[Union[str, FileSystemInputBase]] -) -> Dict[str, str]: - """Get channel config from given job output.""" - - if not isinstance(item, (str, FileSystemInputBase)): - raise ValueError(f"Output data of type {type(item)} is not supported.") - - # OSS URI for output channel will be mounted to directory - # "/ml/output/{ChannelName}/" and the output OSS URI should be a "directory" - def as_oss_dir_uri(uri: str): - folder_uri = uri if uri.endswith("/") else uri + "/" - if folder_uri != uri: - logger.warning( - f"This output URI {uri} is not in the format of a folder path, " - f"system will automatically use {folder_uri} instead." - ) - return folder_uri - - if isinstance(item, FileSystemInputBase): - config = {"OutputUri": item.to_input_uri()} - elif is_oss_uri(item): - config = {"OutputUri": as_oss_dir_uri(item)} - elif is_filesystem_uri(item) or is_odps_table_uri(item): - config = {"OutputUri": item} - elif is_dataset_id(item): - config = {"DatasetId": item} - else: - raise ValueError( - "Invalid output data, supported inputs are OSS, NAS, MaxCompute table." - ) - - return config - - @experimental -class Processor(object): +class Processor(_TrainingJobSubmitter): def __init__( self, image_uri: str, @@ -267,33 +165,30 @@ def __init__( self.source_dir = source_dir self.job_type = job_type or JobType.PyTorchJob self.parameters = parameters or dict() - self.environments = environments - self.requirements = requirements - self.max_run_time = max_run_time - - self.base_job_name = base_job_name - self.output_path = output_path - - self.instance_type = instance_type - self.instance_count = instance_count or 1 - self.labels = labels - self.user_vpc_config = user_vpc_config - self.experiment_config = experiment_config self.session = session or get_default_session() - self._latest_job = None - self._jobs = [] - - self._input_channel_definitions = None - self._output_channel_definitions = None + self._input_channels = None + self._output_channels = None + super().__init__( + base_job_name=base_job_name, + output_path=output_path, + experiment_config=experiment_config, + instance_type=instance_type, + instance_count=instance_count or 1, + user_vpc_config=user_vpc_config, + max_run_time=max_run_time, + environments=environments, + requirements=requirements, + labels=labels, + ) def run( self, inputs: Dict[str, Any] = None, outputs: Dict[str, Any] = None, wait: bool = True, - show_logs=True, - ): + show_logs: bool = True, + ) -> TrainingJob: """Submit a job with the given input and output channels. Args: @@ -311,6 +206,10 @@ def run( either succeeded, failed, or stopped. (Default True). show_logs (bool): Specifies whether to show the logs produced by the job (Default True). + + Returns: + :class:`pai.job.TrainingJob`: A submitted training job. + Raises: UnExpectedStatusException: If the job fails. @@ -319,140 +218,71 @@ def run( outputs = outputs or dict() job_name = self._gen_job_display_name() - job = self._fit(inputs=inputs, outputs=outputs, job_name=job_name) - self._latest_job = job - self._jobs.append(job) - - if wait: - self.wait(show_logs=show_logs) - - def _gen_job_display_name(self, job_name=None): - """Generate job display name.""" - if job_name: - return job_name - ts = datetime.now().strftime("%Y%m%d_%H%M%S") - return "{}_{}".format(self.base_job_name or "processing_job", ts) - - def _build_algorithm_spec(self, code_input) -> Dict[str, Any]: - """Build a temporary AlgorithmSpec used for submitting the Job.""" - command = ( - self.command - if isinstance(self.command, list) - else [ - "/bin/sh", - "-c", - self.command, - ] - ) - - algo_spec = { - "Command": command, - "Image": self.image_uri, - "JobType": self.job_type, - "CodeDir": code_input, - "InputChannels": self._input_channel_definitions or None, - "OutputChannels": self._output_channel_definitions or None, - } - - return algo_spec - - def _fit( - self, job_name, inputs: Dict[str, Any] = None, outputs: Dict[str, Any] = None - ): - output_path = self._get_job_base_output_path(job_name) - upload_path = Session.get_storage_path_by_category( + code_dest = Session.get_storage_path_by_category( StoragePathCategory.ProcessingSrc, to_plain_text(job_name) ) - - input_configs = self._build_input_data_configs(inputs) - output_configs = self._build_output_data_configs(output_path, outputs) - + code_dir = self._build_code_input(job_name, self.source_dir, code_dest) algo_spec = self._build_algorithm_spec( - code_input=build_code_input(self.source_dir, upload_path), + code_input=code_dir, + ) + inputs = self.build_inputs(inputs, input_channels=algo_spec.input_channels) + outputs = self.build_outputs( + job_name=job_name, + output_channels=algo_spec.output_channels, + outputs=outputs, ) - job_id = self.session.training_job_api.create( + return self._submit( instance_count=self.instance_count, instance_type=self.instance_type, job_name=job_name, hyperparameters=self.parameters, environments=self.environments, requirements=self.requirements, - max_running_in_seconds=self.max_run_time, - input_channels=input_configs, - output_channels=output_configs, + max_run_time=self.max_run_time, + inputs=inputs, + outputs=outputs, algorithm_spec=algo_spec, - user_vpc_config=self.user_vpc_config.to_dict() - if self.user_vpc_config - else None, - experiment_config=self.experiment_config.to_dict() - if self.experiment_config - else None, + user_vpc_config=( + self.user_vpc_config.model_dump() if self.user_vpc_config else None + ), + experiment_config=( + self.experiment_config.model_dump() if self.experiment_config else None + ), labels=self.labels, + wait=wait, + show_logs=show_logs, ) - job = _Job.get(job_id) - print(f"View the job {job_id} by accessing the console URI: {job.console_uri}") - return job - def wait(self, interval: int = 2, show_logs: bool = True, all_jobs: bool = False): - """Block until the jobs is completed. - - Args: - interval(int): Interval to reload job status - show_logs(bool): Specifies whether to fetch and print the logs produced by - the job. - all_jobs(bool): Wait latest job or wait all jobs in processor, show_logs disabled while - wait all jobs. + def _gen_job_display_name(self, job_name=None): + """Generate job display name.""" + if job_name: + return job_name + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + return "{}_{}".format(self.base_job_name or "processing_job", ts) - Raises: - RuntimeError: If no job is submitted. + def _build_algorithm_spec(self, code_input: CodeDir) -> AlgorithmSpec: + """Build a temporary AlgorithmSpec used for submitting the Job.""" - """ - if all_jobs: - if not self._jobs: - raise RuntimeError("Could not find any submitted job.") - - remains = set(self._jobs) - while remains: - for job in self._jobs: - if job in remains and job.is_completed(): - remains.remove(job) - - time.sleep(interval) - - self._generate_jobs_report() - else: - if not self._latest_job: - raise RuntimeError("Could not find a submitted job.") - - self._latest_job.wait(interval=interval, show_logs=show_logs) - - def _generate_jobs_report(self): - """Generate current jobs report and output to stdout""" - print(f"Jobs status report, total jobs count: {len(self._jobs)}") - - rows = [] - headers = ["JobName", "JobID", "Status"] - for job in self._jobs: - rows.append([job.training_job_name, job.id, job.status]) - - column_widths = [ - max(len(str(value)) for value in column) for column in zip(headers, *rows) - ] - header_row = " | ".join( - f"{header:<{column_widths[i]}}" for i, header in enumerate(headers) + algorithm_spec = AlgorithmSpec( + command=( + self.command + if isinstance(self.command, list) + else [ + "sh", + "-c", + self.command, + ] + ), + image=self.image_uri, + job_type=self.job_type, + code_dir=code_input, + input_channels=self._input_channels or [], + output_channels=self._output_channels or [], ) + return algorithm_spec - print(header_row) - print("-" * len(header_row)) - for row in rows: - print( - " | ".join( - f"{str(value):<{column_widths[i]}}" for i, value in enumerate(row) - ) - ) - - def _get_job_base_output_path(self, job_name: str) -> str: + def _training_job_base_output(self, job_name: str) -> str: """Generate the base output path for the job.""" bucket_name = self.session.oss_bucket.bucket_name @@ -467,105 +297,22 @@ def _get_job_base_output_path(self, job_name: str) -> str: ) return f"oss://{bucket_name}/{job_output_path}" - def _build_input_data_configs( - self, - inputs: Dict[str, Any] = None, - ) -> List[Dict[str, str]]: - """Build the input data config for jobs.""" - - res = [] - remain_inputs = {} - - if self._input_channel_definitions: - remains = set(inputs.keys()) - for channel in self._input_channel_definitions: - channel_name = channel["Name"] - channel_required = channel["Required"] - channel_config = {"Name": channel_name} - - if channel_name in inputs: - updated_value = get_input_channel_config(inputs[channel_name]) - channel_config.update(updated_value) - res.append(channel_config) - remains.remove(channel_name) - elif channel_required: - raise ValueError( - f"Input channel {channel_name} is required but not provided." - " Please check the input channels definition." - ) - - # follow the rest of user input channels in Processor. - remain_inputs = {channel: inputs[channel] for channel in remains} - - if remains: - logger.warning( - f"Following input channels={list(remains)} are not defined in input" - " channels definition. Please check the input channels definition." - ) - - for name, item in remain_inputs.items(): - channel_config = {"Name": name} - updated_value = get_input_channel_config(item) - channel_config.update(updated_value) - res.append(channel_config) - - return res - - def _build_output_data_configs( - self, - output_path: str, - outputs: Dict[str, Any] = None, - ) -> List[Dict[str, str]]: - """Build the output data config for jobs.""" - - res = [] - - if self._output_channel_definitions: - # we create all channel config no matter whether channel is required or not for backward compatibility. - for channel in self._output_channel_definitions: - channel_name = channel["Name"] - channel_config = {"Name": channel_name} - - if channel_name in outputs: - updated_value = get_output_channel_config(outputs[channel_name]) - channel_config.update(updated_value) - else: - output_uri = posixpath.join(output_path, channel["Name"]) - updated_value = get_output_channel_config(output_uri) - channel_config.update(updated_value) - - res.append(channel_config) - else: - for name, item in outputs.items(): - channel_config = {"Name": name} - updated_value = get_output_channel_config(item) - channel_config.update(updated_value) - - res.append(channel_config) - - return res - - @property - def latest_job(self): - """Return the latest submitted processing job.""" - return self._latest_job - def get_outputs_data(self) -> Dict[str, str]: """Show all outputs data paths. Returns: dict[str, str]: A dictionary of all outputs data paths. """ - if not self._latest_job: + if not self.latest_job: raise RuntimeError("Current no Job for the processor.") return { ch["Name"]: ch["OutputUri"] or ch["DatasetId"] - for ch in self._latest_job.output_channels + for ch in self.latest_job.output_channels } - def set_input_channel_definitions(self, definitions: List[Dict[str, Any]]): - self._input_channel_definitions = definitions + def set_input_channels(self, channels: List[Channel]): + self._input_channels = channels - def set_output_channel_definitions(self, definitions: List[Dict[str, Any]]): - self._output_channel_definitions = definitions + def set_output_channels(self, channels: List[Channel]): + self._output_channels = channels diff --git a/pai/schema/__init__.py b/pai/schema/__init__.py deleted file mode 100644 index b42e873..0000000 --- a/pai/schema/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2023 Alibaba, Inc. or its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/pai/schema/base.py b/pai/schema/base.py deleted file mode 100644 index eb81c37..0000000 --- a/pai/schema/base.py +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright 2023 Alibaba, Inc. or its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from marshmallow import EXCLUDE, Schema, fields, post_dump, pre_load - -from ..common.utils import camel_to_snake, snake_to_camel - - -class EntitySchema(Schema): - def __init__(self, instance=None, **kwargs): - super(EntitySchema, self).__init__(**kwargs) - self.instance = instance - - -class BaseAPIResourceSchema(Schema): - """Base schema using in API object serialization and deserialization.""" - - class Meta(object): - unknown = EXCLUDE - - _DefaultFieldsNameMapping = { - "GmtCreateTime": "create_time", - "GmtModifiedTime": "modified_time", - } - - # Mapping API object field name to Python Object/Schema field name.. - FieldNameMapping = {} - - def __init__(self, instance=None, session=None, **kwargs): - super(BaseAPIResourceSchema, self).__init__(**kwargs) - self.instance = instance - self.session = session - - @pre_load - def _filed_name_load_preprocess(self, data, **kwargs): - """Input API object preprocess. - - Transform the input data key to entity filed name. - """ - result = dict() - for name, value in data.items(): - if name in self.FieldNameMapping: - result[self.FieldNameMapping[name]] = value - else: - result[camel_to_snake(name)] = value - return result - - @post_dump - def _filed_name_dump_postprocess(self, data, **kwargs): - """Transform output field name to camel case.""" - filed_name_mapping = self._DefaultFieldsNameMapping.copy() - filed_name_mapping.update(self.FieldNameMapping) - - field_mapping_rev = {value: key for key, value in filed_name_mapping.items()} - result = dict() - for key, value in data.items(): - if value is None: - continue - if key in field_mapping_rev: - result[field_mapping_rev[key]] = value - else: - result[snake_to_camel(key)] = value - return result - - def make_or_reload(self, instance_cls, data): - """Make an instance or reload the instance.""" - if self.instance: - self.instance.__init__(**data) - return self.instance - else: - return instance_cls(session=self.session, **data) - - -class ListOfKVField(fields.Field): - """Mapping a List of key, value to a Dict.""" - - def _serialize(self, value, attr, obj, **kwargs): - res = [] - if not value: - return res - for k, v in value.items(): - res.append( - { - "Key": k, - "Value": v, - } - ) - return res - - def _deserialize(self, value, attr, data, **kwargs): - res = dict() - if not value: - return res - for item in value: - res[item["Key"]] = item["Value"] - return res diff --git a/pai/schema/training_job_schema.py b/pai/schema/training_job_schema.py deleted file mode 100644 index 27b99f9..0000000 --- a/pai/schema/training_job_schema.py +++ /dev/null @@ -1,113 +0,0 @@ -# Copyright 2023 Alibaba, Inc. or its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from marshmallow import fields, post_load - -from .base import BaseAPIResourceSchema, ListOfKVField - - -class HyperparameterField(fields.Field): - """Convert between hyperparameters in Dict to hyperparameters in API Object.""" - - def _serialize(self, value, attr, obj, **kwargs): - res = [] - if not value: - return res - for k, v in value.items(): - res.append( - { - "Name": k, - "Value": v, - } - ) - return res - - def _deserialize(self, value, attr, data, **kwargs): - res = dict() - if not value: - return res - for item in value: - res[item["Name"]] = item["Value"] - return res - - -class TrainingJobMetricSchema(BaseAPIResourceSchema): - name = fields.Str() - timestamp = fields.Str() - value = fields.Float() - - -class TrainingJobSchedulerSchema(BaseAPIResourceSchema): - max_running_time_in_seconds = fields.Int() - - -class TrainingJobChannelSchema(BaseAPIResourceSchema): - dataset_id = fields.Str() - input_uri = fields.Str() - name = fields.Str() - - -class TrainingJobStatusTransitionSchema(BaseAPIResourceSchema): - end_time = fields.Str() - reason_code = fields.Str() - reason_message = fields.Str() - start_time = fields.DateTime() - status = fields.Str() - - -class TrainingJobSchema(BaseAPIResourceSchema): - FieldNameMapping = { - "GmtCreateTime": "create_time", - "GmtModifiedTime": "modified_time", - "TrainingJobDescription": "description", - } - - algorithm_name = fields.Str() - algorithm_provider = fields.Str() - algorithm_version = fields.Str() - - hyperparameters = HyperparameterField() - input_channels = fields.List(fields.Dict) - output_channels = fields.List(fields.Dict) - labels = ListOfKVField() - description = fields.Str() - training_job_name = fields.Str() - scheduler = fields.Dict() - compute_resource = fields.Dict() - workspace_id = fields.Str() - experiment_config = fields.Dict() - - # load only fields - latest_metrics = fields.List(fields.Dict) - algorithm_id = fields.Str(load_only=True) - create_time = fields.DateTime(load_only=True) - modified_time = fields.DateTime(load_only=True) - reason_code = fields.Str() - reason_message = fields.Str() - status = fields.Str() - status_transitions = fields.List(fields.Dict) - training_job_id = fields.Str() - training_job_url = fields.Str() - - @post_load - def _make(self, data, **kwargs): - from pai.estimator import _TrainingJob - - data["instance_count"] = data.get("compute_resource", {}).get("EcsCount") - data["instance_type"] = data.get("compute_resource", {}).get("EcsType") - data["max_running_time_in_seconds"] = data.get("scheduler", {}).get( - "MaxRunningTimeInSeconds" - ) - - return self.make_or_reload(_TrainingJob, data) diff --git a/pai/version.py b/pai/version.py index 023d357..7b413b3 100644 --- a/pai/version.py +++ b/pai/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -VERSION = "0.4.7.post0" +VERSION = "0.4.8.dev0" diff --git a/requirements/requirements.txt b/requirements/requirements.txt index f58f634..f5bc171 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -1,13 +1,12 @@ aliyun-python-sdk-core>=2.13.25 alibabacloud_sts20150401 importlib-metadata; python_version < "3.8" -numpy>=1.16.0 +numpy>=1.16.0, <2 oss2>=2.8.0 pyodps>=0.11.0 pyyaml>=5.3.1 six>=1.15.0 -marshmallow -marshmallow-oneofschema>=3.0.1 +pydantic>=2.0.1 eas_prediction>=0.20 alibabacloud_tea_util>=0.3.6, <1.0.0, !=0.3.9 alibabacloud_tea_openapi>=0.3.3, <1.0.0 diff --git a/requirements/test-requirements.txt b/requirements/test-requirements.txt index 6af780b..5693026 100644 --- a/requirements/test-requirements.txt +++ b/requirements/test-requirements.txt @@ -6,3 +6,4 @@ mock>=2.0.0 scikit-learn pandas docker>=4.4.0 +openai diff --git a/setup.py b/setup.py index 43d405d..1dad53c 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ def read_requirements(): setup( name="alipai", - python_requires=">=3.6", + python_requires=">=3.8", version=version, setup_requires=["setuptools_scm"], description="Alibaba Cloud PAI Python SDK", diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index f33208b..d0f4de7 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -16,9 +16,7 @@ import logging import os -import time import unittest -from functools import wraps import oss2 from odps import ODPS @@ -136,27 +134,6 @@ def setUpClass(cls): if cls.default_session.is_inner: PublicMaxComputeTableDataSet.set_dataset_project("pai_inner_project") cls.odps_client = cls._init_maxc_client() - cls.patch_model_deploy() - - @classmethod - def patch_model_deploy(cls): - """Hack for model deploy wait for service ready.""" - from pai.model import ModelBase - - def deco(f): - @wraps(f) - def _(*args, **kwargs): - wait = kwargs.get("wait") - res = f(*args, **kwargs) - # wait is True which means deploy method should wait until the - # prediction service is 'really' ready. - if wait: - time.sleep(15) - return res - - return _ - - ModelBase.deploy = deco(ModelBase.deploy) @classmethod def tearDownClass(cls): diff --git a/tests/integration/test_estimator.py b/tests/integration/test_estimator.py index ec0ec98..52e266b 100644 --- a/tests/integration/test_estimator.py +++ b/tests/integration/test_estimator.py @@ -13,6 +13,7 @@ # limitations under the License. import os +import posixpath import re from unittest import skipUnless @@ -21,8 +22,9 @@ from pai.common.oss_utils import upload from pai.common.utils import random_str from pai.estimator import AlgorithmEstimator, Estimator -from pai.experiment import Experiment, ExperimentConfig +from pai.experiment import Experiment from pai.image import retrieve +from pai.job._training_job import ExperimentConfig from pai.session import get_default_session from tests.integration import BaseIntegTestCase from tests.integration.utils import t_context @@ -96,6 +98,35 @@ def test_torch_run(self): self.assertIsNotNone(tb.app_uri) tb.delete() + def test_checkpoints(self): + sess = get_default_session() + torch_image_uri = retrieve("pytorch", framework_version="1.12").image_uri + filename = "output.txt" + command = ( + f"echo helloworld > /ml/output/checkpoints/{filename} && echo 'helloworld'" + ) + checkpoint_path = f"oss://{sess.oss_bucket.bucket_name}/sdk-test/test-checkpoints/{random_str(6)}/" + + est = Estimator( + image_uri=torch_image_uri, + command=command, + instance_type="ecs.c6.large", + base_job_name="torch_run_", + checkpoints_path=checkpoint_path, + ) + + est.fit( + inputs={ + "training": self.breast_cancer_train_data_uri, + "test": self.breast_cancer_test_data_uri, + }, + wait=True, + ) + self.assertEqual(checkpoint_path, est.checkpoints_data()) + self.assertTrue( + self.is_oss_object_exists(posixpath.join(checkpoint_path, filename)) + ) + def test_max_compute_input(self): image_uri = retrieve("xgboost", framework_version="latest").image_uri est = Estimator( diff --git a/tests/integration/test_experiment.py b/tests/integration/test_experiment.py index 537f24a..d204513 100644 --- a/tests/integration/test_experiment.py +++ b/tests/integration/test_experiment.py @@ -11,8 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import time +from pai.common.utils import random_str from pai.experiment import Experiment from pai.tensorboard import TensorBoardStatus from tests.integration import BaseIntegTestCase @@ -29,7 +29,7 @@ def setUp(self): def test_create(self): # Init test data - exp_name = "test_experiment_" + str(int(time.time())) + exp_name = "test_experiment_" + random_str(10) # Test create self.experiment = Experiment.create( artifact_uri=self.artifact_uri, @@ -40,7 +40,7 @@ def test_create(self): self.assertEqual(self.experiment.tensorboard_data(), expected_tb_path) def test_update(self): - exp_name = "test_experiment_" + str(int(time.time())) + exp_name = "test_experiment_" + random_str(10) self.experiment = Experiment.create( artifact_uri=self.artifact_uri, name=exp_name, @@ -51,7 +51,7 @@ def test_update(self): self.assertEqual(self.experiment.name, exp_name) def test_list(self): - exp_name = "test_experiment_" + str(int(time.time())) + exp_name = "test_experiment_" + random_str(10) self.experiment = Experiment.create( artifact_uri=self.artifact_uri, name=exp_name, @@ -63,7 +63,7 @@ def test_list(self): self.assertEqual(experiment_names[0], exp_name) def test_get(self): - exp_name = "test_experiment_" + str(int(time.time())) + exp_name = "test_experiment_" + random_str(10) self.experiment = Experiment.create( artifact_uri=self.artifact_uri, name=exp_name, @@ -75,7 +75,7 @@ def test_get(self): self.assertEqual(self.experiment.tensorboard_data(), exp1.tensorboard_data()) def test_tensorboard(self): - exp_name = "test_experiment_" + str(int(time.time())) + exp_name = "test_experiment_" + random_str(10) self.experiment = Experiment.create( artifact_uri=self.artifact_uri, name=exp_name, @@ -84,8 +84,8 @@ def test_tensorboard(self): tb = self.experiment.tensorboard() self.assertIsNotNone(tb.app_uri) self.assertEqual(tb.status, TensorBoardStatus.Running) - tb.stop() + tb.delete() def tearDown(self): - if self.experiment: + if hasattr(self, "experiment") and self.experiment: self.experiment.delete() diff --git a/tests/integration/test_model/__init__.py b/tests/integration/test_model/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/test_model.py b/tests/integration/test_model/test_model.py similarity index 98% rename from tests/integration/test_model.py rename to tests/integration/test_model/test_model.py index e2dac19..8393408 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model/test_model.py @@ -239,8 +239,8 @@ def tearDownClass(cls): def test_tmp_algo_rm_train(self): """Test training registered model with temporary algorithm""" m = RegisteredModel( - model_name="easynlp_pai_bert_tiny_zh", - model_version="0.1.0", + model_name="qwen1.5-0.5b-chat", + # model_version="0.1.0", model_provider="pai", ) @@ -284,6 +284,8 @@ def test_builtin_algo_rm_train(self): inputs = m.get_estimator_inputs() est.hyperparameters["max_epochs"] = 5 est.hyperparameters["warmup_epochs"] = 2 + est.hyperparameters["image_scale"] = "640,640" + est.hyperparameters["train_batch_size"] = 8 est.fit(inputs=inputs) outputs_data = est.get_outputs_data() diff --git a/tests/integration/test_processor.py b/tests/integration/test_processor.py index 98e5ab5..22057a9 100644 --- a/tests/integration/test_processor.py +++ b/tests/integration/test_processor.py @@ -15,8 +15,9 @@ import os from pai.common.utils import random_str -from pai.experiment import Experiment, ExperimentConfig +from pai.experiment import Experiment from pai.image import retrieve +from pai.job import ExperimentConfig from pai.processor import Processor from pai.session import get_default_session from tests.integration import BaseIntegTestCase diff --git a/tests/integration/tests_pipeline/test_pipeline_build.py b/tests/integration/tests_pipeline/test_pipeline_build.py index 3079f6c..f371cc1 100644 --- a/tests/integration/tests_pipeline/test_pipeline_build.py +++ b/tests/integration/tests_pipeline/test_pipeline_build.py @@ -135,7 +135,7 @@ def test_conflict_step_names(self): }, ) - with self.assertRaisesRegexp(ValueError, "name conflict") as _: + with self.assertRaisesRegex(ValueError, "name conflict") as _: _ = Pipeline(steps=[split_step_1, split_step_2]) def test_auto_step_name(self): @@ -207,7 +207,7 @@ def test_pipeline_cycle_detect(self): ) data_source_step.after(type_transform_step) - with self.assertRaisesRegexp(ValueError, "Cycle dependency detected") as _: + with self.assertRaisesRegex(ValueError, "Cycle dependency detected") as _: _ = Pipeline( steps=[type_transform_step, data_source_step], ) diff --git a/tests/integration/utils.py b/tests/integration/utils.py index dbca156..bf6a3c0 100644 --- a/tests/integration/utils.py +++ b/tests/integration/utils.py @@ -17,6 +17,7 @@ import io import os import shutil +import subprocess import uuid from collections import namedtuple @@ -85,7 +86,14 @@ def __init__(self, pai_service_config, oss_config, maxc_config): @property def has_docker(self): - return shutil.which("docker") is not None + # Check if docker daemon is running + return ( + shutil.which("docker") is not None + and subprocess.run( + ["docker", "stats"], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ).returncode + == 0 + ) @property def has_gpu(self): @@ -170,9 +178,11 @@ def make_resource_name(case_name, resource_type=None, sep="-", time_suffix=True) "sdktest", resource_type, case_name, - datetime.datetime.now().isoformat(timespec="seconds") - if time_suffix - else random_str(10), + ( + datetime.datetime.now().isoformat(timespec="seconds") + if time_suffix + else random_str(10) + ), ], ) ) diff --git a/tests/test_data/read_mc_table/run.py b/tests/test_data/read_mc_table/run.py index faf0a59..915916d 100644 --- a/tests/test_data/read_mc_table/run.py +++ b/tests/test_data/read_mc_table/run.py @@ -1,5 +1,6 @@ import json import os +from itertools import islice from odps import ODPS from odps.accounts import StsAccount @@ -40,7 +41,7 @@ def read_table(): o = ODPS(account=account, project=project_name, endpoint=endpoint) # 读取输入表数据 - for record in o.read_table(table_name): + for record in islice(o.read_table(table_name), 20): print(record) diff --git a/tests/unit/test_pipeline/test_pipeline.py b/tests/unit/test_pipeline/test_pipeline.py index a09fd6a..c16df4d 100644 --- a/tests/unit/test_pipeline/test_pipeline.py +++ b/tests/unit/test_pipeline/test_pipeline.py @@ -50,7 +50,7 @@ def test_io_name_conflict(self): }, ) - with self.assertRaisesRegexp(ValueError, ".*conflict.*") as _: + with self.assertRaisesRegex(ValueError, ".*conflict.*") as _: step1 = op.as_step(name="step1") step2 = op.as_step(name="step2") _ = Pipeline(