Skip to content

Commit

Permalink
add unmount method for dsw
Browse files Browse the repository at this point in the history
  • Loading branch information
pitt-liang committed Oct 18, 2024
1 parent 50dc31f commit 07b7e9f
Showing 1 changed file with 82 additions and 13 deletions.
95 changes: 82 additions & 13 deletions pai/dsw.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def mount(
option_type: Optional[OptionType] = None,
) -> str:
"""
Dynamic mount a data source to the DSW Instance.
Dynamic mount a dataset to the DSW Instance.
Args:
source (str): The source to be mounted, can be a dataset id or an OSS uri.
Expand Down Expand Up @@ -109,26 +109,50 @@ def mount(
)


def list_dataset_configs() -> List[Dict[str, Any]]:
def unmount(mount_point: str) -> None:
"""
Unmount a dynamic mount dataset from the DSW Instance.
Dynamic mount dataset is a special dataset in DSW, which is used to mount data sources
dynamically when the instance is running. When unmount it, the dataset will be removed
from the instance.
Args:
mount_point (str): The mount point to be unmounted.
Returns:
None
"""
instance = _default_instance()
instance.unmount(mount_point)


def list_dataset_configs(dynamic_only: bool = False) -> List[Dict[str, Any]]:
"""
List all the datasets available in the DSW Instance.
Args:
dynamic_only (bool): Whether to list only the dynamic mount datasets.
Returns:
list: A list of dataset details.
"""
instance = _default_instance()
datasets = instance._get_instance_info().datasets
if dynamic_only:
datasets = [ds for ds in datasets if ds.dynamic]
return [ds.to_map() for ds in datasets]

return [d.to_map() for d in instance._get_instance_info().datasets]


def default_dynamic_mount_point():
"""Get the default dynamic mount point of the DSW Instance.
def default_dynamic_mount_path():
"""Get the default dynamic mount path of the DSW Instance.
Returns:
str: The default dynamic mount point of the DSW Instance.
str: The default dynamic mount path of the DSW Instance.
"""
instance = _default_instance()
return instance.default_dynamic_mount_point()
return instance.default_dynamic_mount_path()


def get_dynamic_mount_config() -> Dict[str, Any]:
Expand Down Expand Up @@ -164,11 +188,11 @@ def get_dynamic_mount_config(self):
"""
return self._instance_info.dynamic_mount.to_map()

def default_dynamic_mount_point(self) -> Optional[str]:
"""Get the default dynamic mount point of the DSW Instance.
def default_dynamic_mount_path(self) -> Optional[str]:
"""Get the default dynamic mount path of the DSW Instance.
Returns:
str: The default dynamic mount point of the DSW Instance.
str: The default dynamic mount path of the DSW Instance.
"""
if (
not self._instance_info.dynamic_mount.enable
Expand All @@ -183,7 +207,7 @@ def mount(
mount_point: str = None,
options: Union[str] = None,
option_type: Union[OptionType] = None,
):
) -> str:
"""
Dynamic mount a data source to the DSW Instance.
Expand All @@ -195,6 +219,9 @@ def mount(
specified with option_type.
option_type(str): Preset data source mount options, can not be specified with
options.
Returns:
str: The mount point of the data source.
"""
if options and option_type:
raise ValueError(
Expand All @@ -214,7 +241,7 @@ def mount(
)

sess = get_default_session()
default_root_path = self.default_dynamic_mount_point()
default_root_path = self.default_dynamic_mount_path()

if is_oss_uri(source):
obj = OssUriObj(source)
Expand Down Expand Up @@ -262,3 +289,45 @@ def mount(
instance_id=self.instance_id, request=request
)
return mount_point

def unmount(self, mount_point: str) -> None:
"""
Unmount a data source from the DSW Instance.
Args:
mount_point (str): The mount point to be unmounted.
Returns:
None
"""
sess = get_default_session()

resp: GetInstanceResponse = sess._acs_dsw_client.get_instance(self.instance_id)
datasets = [
UpdateInstanceRequestDatasets().from_map(ds.to_map())
for ds in resp.body.datasets
]

unmount_ds = [ds for ds in datasets if ds.mount_path == mount_point]

if not unmount_ds:
raise ValueError(f"Not found dataset to unmount: {mount_point}")
if len(unmount_ds) > 1:
raise RuntimeError(f"Found multiple datasets to unmount: {mount_point}")

dataset = unmount_ds[0]
if not dataset.dynamic:
raise ValueError(f"Dataset is not a dynamic mount point: {mount_point}")

request = UpdateInstanceRequest(
datasets=[
UpdateInstanceRequestDatasets().from_map(ds.to_map())
for ds in resp.body.datasets
if ds.mount_path != mount_point
]
)
if not request.datasets:
request.disassociate_datasets = True
sess._acs_dsw_client.update_instance(
instance_id=self.instance_id, request=request
)

0 comments on commit 07b7e9f

Please sign in to comment.