Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: spot instance/job settings supports. #33

Merged
merged 8 commits into from
Jul 11, 2024
18 changes: 18 additions & 0 deletions pai/api/training_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
CreateTrainingJobRequest,
CreateTrainingJobRequestComputeResource,
CreateTrainingJobRequestComputeResourceInstanceSpec,
CreateTrainingJobRequestComputeResourceSpotSpec,
CreateTrainingJobRequestExperimentConfig,
CreateTrainingJobRequestHyperParameters,
CreateTrainingJobRequestInputChannels,
CreateTrainingJobRequestLabels,
CreateTrainingJobRequestOutputChannels,
CreateTrainingJobRequestScheduler,
CreateTrainingJobRequestSettings,
CreateTrainingJobRequestUserVpc,
CreateTrainingJobResponseBody,
GetTrainingJobRequest,
Expand Down Expand Up @@ -86,8 +88,10 @@ def create(
instance_type,
instance_count,
job_name,
spot_spec: Optional[Dict[str, Any]] = None,
instance_spec: Optional[Dict[str, str]] = None,
resource_id: Optional[str] = None,
resource_type: Optional[str] = None,
hyperparameters: Optional[Dict[str, Any]] = None,
input_channels: Optional[List[Dict[str, Any]]] = None,
output_channels: Optional[List[Dict[str, Any]]] = None,
Expand All @@ -102,6 +106,7 @@ def create(
algorithm_spec: Optional[Dict[str, Any]] = None,
user_vpc_config: Optional[Dict[str, Any]] = None,
experiment_config: Optional[Dict[str, Any]] = None,
settings: Optional[Dict[str, Any]] = None,
) -> str:
"""Create a TrainingJob."""
if algorithm_spec and (
Expand All @@ -126,9 +131,16 @@ def create(
for ch in output_channels
]
if instance_type:
spot_spec = (
CreateTrainingJobRequestComputeResourceSpotSpec().from_map(spot_spec)
if spot_spec
else None
)
compute_resource = CreateTrainingJobRequestComputeResource(
ecs_count=instance_count,
ecs_spec=instance_type,
use_spot_instance=bool(spot_spec),
spot_spec=spot_spec,
)
elif instance_spec:
compute_resource = CreateTrainingJobRequestComputeResource(
Expand Down Expand Up @@ -169,6 +181,7 @@ def create(
compute_resource=compute_resource,
hyper_parameters=hyper_parameters,
input_channels=input_channels,
resource_type=resource_type,
environments=environments,
python_requirements=requirements,
labels=labels,
Expand All @@ -181,6 +194,11 @@ def create(
experiment_config=CreateTrainingJobRequestExperimentConfig().from_map(
experiment_config
),
settings=(
CreateTrainingJobRequestSettings().from_map(settings)
if settings
else None
),
)

resp: CreateTrainingJobResponseBody = self._do_request(
Expand Down
16 changes: 16 additions & 0 deletions pai/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
DEFAULT_OUTPUT_MODEL_CHANNEL_NAME,
DEFAULT_TENSORBOARD_CHANNEL_NAME,
ExperimentConfig,
ResourceType,
SpotSpec,
UserVpcConfig,
)
from .model import InferenceSpec, Model, ResourceConfig
Expand Down Expand Up @@ -187,11 +189,14 @@ def __init__(
environments: Optional[Dict[str, str]] = None,
requirements: Optional[List[str]] = None,
instance_type: Optional[str] = None,
spot_spec: Optional[SpotSpec] = None,
instance_spec: Optional[Dict] = None,
resource_id: Optional[Dict] = None,
resource_type: Optional[Union[str, ResourceType]] = None,
instance_count: Optional[int] = None,
user_vpc_config: Optional[UserVpcConfig] = None,
experiment_config: Optional[ExperimentConfig] = None,
settings: Optional[Dict[str, Any]] = None,
labels: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
):
Expand Down Expand Up @@ -252,12 +257,18 @@ def __init__(
'package' or 'package==version'. This is similar to the contents of a requirements.txt file used
in Python projects. If requirements.txt is provided in user code directory, requirements
will override the conflict dependencies directly.
resource_type (str, optional): The resource type used to run the training job.
By default, general computing resource is used. If the resource_type is
'Lingjun', Lingjun computing resource is used.
instance_type (str, optional): The machine instance type used to run the
training job. To view the supported machine instance types, please refer
to the document:
https://help.aliyun.com/document_detail/171758.htm#section-55y-4tq-84y.
If the instance_type is "local", the training job is executed locally
using docker.
spot_spec (:class:`pai.job.SpotSpec`, optional): The specification of the spot
instance used to run the training job. If provided, the training job will
use the spot instance to run the training job.
instance_count (int): The number of machines used to run the training job.
user_vpc_config (:class:`pai.estimator.UserVpcConfig`, optional): The VPC
configuration used to enable the training job instance to connect to the
Expand All @@ -270,6 +281,8 @@ def __init__(
training job and the experiment. If provided, the training job will belong
to the specified experiment, in which case the training job will use
artifact_uri of experiment as default output path. Default to None.
settings (dict, optional): A dictionary that represents the additional settings
for job, such as AIMaster configurations.
labels (Dict[str, str], optional): A dictionary that maps label names to
their values. This optional field allows you to provide a set of labels
that will be applied to the training job.
Expand All @@ -287,11 +300,14 @@ def __init__(
instance_type=instance_type,
instance_count=instance_count,
resource_id=resource_id,
resource_type=resource_type,
spot_spec=spot_spec,
instance_spec=instance_spec,
user_vpc_config=user_vpc_config,
max_run_time=max_run_time,
environments=environments,
requirements=requirements,
settings=settings,
labels=labels,
)

Expand Down
6 changes: 6 additions & 0 deletions pai/job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
InstanceSpec,
ModelRecipeSpec,
OssLocation,
ResourceType,
SpotSpec,
SpotStrategy,
TrainingJob,
TrainingJobStatus,
UriInput,
Expand All @@ -45,4 +48,7 @@
"ExperimentConfig",
"InstanceSpec",
"UriInput",
"SpotSpec",
"ResourceType",
"SpotStrategy",
]
56 changes: 55 additions & 1 deletion pai/job/_training_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import time
import typing
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from typing import Any, Dict, List, Optional, Union

from pydantic import BaseModel, ConfigDict, Field
Expand Down Expand Up @@ -55,6 +56,19 @@ def as_oss_dir_uri(uri: str):
DEFAULT_TENSORBOARD_CHANNEL_NAME = "tensorboard"


class SpotStrategy(str, Enum):
SpotWithPriceLimit = "SpotWithPriceLimit"
SpotAsPriceGo = "SpotAsPriceGo"

def __repr__(self):
return self.value


class ResourceType(str, Enum):
Lingjun = "Lingjun"
General = "General"


class BaseAPIModel(BaseModel):

model_config = ConfigDict(
Expand Down Expand Up @@ -275,11 +289,14 @@ class AlgorithmSpec(BaseAPIModel):
)
hyperparameter_definitions: List[HyperParameterDefinition] = Field(
default_factory=list,
alias="HyperParameter",
alias="HyperParameters",
description="Hyperparameter definitions.",
)
job_type: str = Field(default="PyTorchJob")
code_dir: Optional[CodeDir] = Field(None, description="Source code location.")
customization: Optional[Dict[str, Any]] = Field(
None, description="Whether the algorithm supports customize code."
)


class ModelRecipeSpec(BaseAPIModel):
Expand All @@ -300,6 +317,19 @@ class ModelRecipeSpec(BaseAPIModel):
requirements: Optional[List[str]] = None


class SpotSpec(BaseAPIModel):
spot_strategy: SpotStrategy = Field(
...,
description="Spot instance strategy, support 'SpotWithPriceLimit', 'SpotAsPriceGo'",
)
spot_discount_limit: Optional[float] = Field(
None,
description="Spot instance discount limit, maximum 2 decimal places, "
"required when spot_strategy is 'SpotWithPriceLimit'."
"For example, 0.5 means 50% off the original price.",
)


class TrainingJob(BaseAPIModel):
"""TrainingJob represents a training job in the PAI service."""

Expand Down Expand Up @@ -542,23 +572,29 @@ def __init__(
instance_spec: Optional[Dict] = None,
instance_count: Optional[int] = None,
resource_id: Optional[Dict] = None,
resource_type: Optional[Union[str, ResourceType]] = None,
spot_spec: Optional[SpotSpec] = None,
environments: Optional[Dict] = None,
requirements: Optional[List[str]] = None,
labels: Optional[Dict[str, str]] = None,
settings: Optional[Dict[str, Any]] = None,
):
self.session = get_default_session()
self._training_jobs = []
self.base_job_name = base_job_name or type(self).__name__.lower()
self.output_path = output_path
self.user_vpc_config = user_vpc_config
self.spot_spec = spot_spec
self.experiment_config = experiment_config
self.max_run_time = max_run_time
self.instance_type = instance_type
self.instance_spec = instance_spec
self.instance_count = instance_count or 1
self.resource_id = resource_id
self.resource_type = ResourceType(resource_type) if resource_type else None
self.environments = environments
self.requirements = requirements
self.settings = settings
self.labels = labels

def wait(self, interval: int = 5, show_logs: bool = True, all_jobs: bool = False):
Expand Down Expand Up @@ -704,6 +740,7 @@ def build_outputs(

return [item.model_dump() for item in res]

# TODO: get arguments, such as VPCConfig, instance_type etc, from self instance.
def _submit(
self,
job_name: str,
Expand All @@ -728,6 +765,20 @@ def _submit(
show_logs: bool = False,
):
session = get_default_session()

if not self.resource_type or self.resource_type == ResourceType.General:
resource_type = None
else:
resource_type = self.resource_type.value

if self.spot_spec:
spot_spec = {
"SpotStrategy": self.spot_spec.spot_strategy.value,
}
if self.spot_spec.spot_discount_limit:
spot_spec["SpotDiscountLimit"] = self.spot_spec.spot_discount_limit
else:
spot_spec = None
training_job_id = session.training_job_api.create(
instance_count=instance_count,
instance_spec=instance_spec.model_dump() if instance_spec else None,
Expand All @@ -738,9 +789,11 @@ def _submit(
if experiment_config and isinstance(experiment_config, ExperimentConfig)
else experiment_config
),
spot_spec=spot_spec,
algorithm_version=algorithm_version,
instance_type=instance_type,
resource_id=resource_id,
resource_type=resource_type,
job_name=job_name,
hyperparameters=hyperparameters,
max_running_in_seconds=max_run_time,
Expand All @@ -751,6 +804,7 @@ def _submit(
user_vpc_config=user_vpc_config,
labels=labels,
environments=environments,
settings=self.settings,
)
training_job = TrainingJob.get(training_job_id)
self._training_jobs.append(training_job)
Expand Down
Loading