From 07b7e9ff57f9e0084be247d03ad1f5130258278a Mon Sep 17 00:00:00 2001 From: LiangQuan Date: Thu, 19 Sep 2024 10:43:19 +0800 Subject: [PATCH] add unmount method for dsw --- pai/dsw.py | 95 ++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 82 insertions(+), 13 deletions(-) diff --git a/pai/dsw.py b/pai/dsw.py index 53e797d..eb6d7bb 100644 --- a/pai/dsw.py +++ b/pai/dsw.py @@ -74,7 +74,7 @@ def mount( option_type: Optional[OptionType] = None, ) -> str: """ - Dynamic mount a data source to the DSW Instance. + Dynamic mount a dataset to the DSW Instance. Args: source (str): The source to be mounted, can be a dataset id or an OSS uri. @@ -109,26 +109,50 @@ def mount( ) -def list_dataset_configs() -> List[Dict[str, Any]]: +def unmount(mount_point: str) -> None: + """ + Unmount a dynamic mount dataset from the DSW Instance. + + Dynamic mount dataset is a special dataset in DSW, which is used to mount data sources + dynamically when the instance is running. When unmount it, the dataset will be removed + from the instance. + + + Args: + mount_point (str): The mount point to be unmounted. + + Returns: + None + """ + instance = _default_instance() + instance.unmount(mount_point) + + +def list_dataset_configs(dynamic_only: bool = False) -> List[Dict[str, Any]]: """ List all the datasets available in the DSW Instance. + Args: + dynamic_only (bool): Whether to list only the dynamic mount datasets. + Returns: list: A list of dataset details. """ instance = _default_instance() + datasets = instance._get_instance_info().datasets + if dynamic_only: + datasets = [ds for ds in datasets if ds.dynamic] + return [ds.to_map() for ds in datasets] - return [d.to_map() for d in instance._get_instance_info().datasets] - -def default_dynamic_mount_point(): - """Get the default dynamic mount point of the DSW Instance. +def default_dynamic_mount_path(): + """Get the default dynamic mount path of the DSW Instance. Returns: - str: The default dynamic mount point of the DSW Instance. + str: The default dynamic mount path of the DSW Instance. """ instance = _default_instance() - return instance.default_dynamic_mount_point() + return instance.default_dynamic_mount_path() def get_dynamic_mount_config() -> Dict[str, Any]: @@ -164,11 +188,11 @@ def get_dynamic_mount_config(self): """ return self._instance_info.dynamic_mount.to_map() - def default_dynamic_mount_point(self) -> Optional[str]: - """Get the default dynamic mount point of the DSW Instance. + def default_dynamic_mount_path(self) -> Optional[str]: + """Get the default dynamic mount path of the DSW Instance. Returns: - str: The default dynamic mount point of the DSW Instance. + str: The default dynamic mount path of the DSW Instance. """ if ( not self._instance_info.dynamic_mount.enable @@ -183,7 +207,7 @@ def mount( mount_point: str = None, options: Union[str] = None, option_type: Union[OptionType] = None, - ): + ) -> str: """ Dynamic mount a data source to the DSW Instance. @@ -195,6 +219,9 @@ def mount( specified with option_type. option_type(str): Preset data source mount options, can not be specified with options. + + Returns: + str: The mount point of the data source. """ if options and option_type: raise ValueError( @@ -214,7 +241,7 @@ def mount( ) sess = get_default_session() - default_root_path = self.default_dynamic_mount_point() + default_root_path = self.default_dynamic_mount_path() if is_oss_uri(source): obj = OssUriObj(source) @@ -262,3 +289,45 @@ def mount( instance_id=self.instance_id, request=request ) return mount_point + + def unmount(self, mount_point: str) -> None: + """ + Unmount a data source from the DSW Instance. + + Args: + mount_point (str): The mount point to be unmounted. + + Returns: + None + """ + sess = get_default_session() + + resp: GetInstanceResponse = sess._acs_dsw_client.get_instance(self.instance_id) + datasets = [ + UpdateInstanceRequestDatasets().from_map(ds.to_map()) + for ds in resp.body.datasets + ] + + unmount_ds = [ds for ds in datasets if ds.mount_path == mount_point] + + if not unmount_ds: + raise ValueError(f"Not found dataset to unmount: {mount_point}") + if len(unmount_ds) > 1: + raise RuntimeError(f"Found multiple datasets to unmount: {mount_point}") + + dataset = unmount_ds[0] + if not dataset.dynamic: + raise ValueError(f"Dataset is not a dynamic mount point: {mount_point}") + + request = UpdateInstanceRequest( + datasets=[ + UpdateInstanceRequestDatasets().from_map(ds.to_map()) + for ds in resp.body.datasets + if ds.mount_path != mount_point + ] + ) + if not request.datasets: + request.disassociate_datasets = True + sess._acs_dsw_client.update_instance( + instance_id=self.instance_id, request=request + )