diff --git a/clients/client-python/gravitino/audit/fileset_data_operation.py b/clients/client-python/gravitino/audit/fileset_data_operation.py index 5f7a5794b7..0428d7111a 100644 --- a/clients/client-python/gravitino/audit/fileset_data_operation.py +++ b/clients/client-python/gravitino/audit/fileset_data_operation.py @@ -28,6 +28,14 @@ class FilesetDataOperation(Enum): """Opens a file. """ + OPEN_AND_WRITE = "OPEN_AND_WRITE" + """Opens a file and writes to it. + """ + + OPEN_AND_APPEND = "OPEN_AND_APPEND" + """Opens a file and appends to it. + """ + APPEND = "APPEND" """Appends some content into a file. """ diff --git a/clients/client-python/gravitino/filesystem/gvfs.py b/clients/client-python/gravitino/filesystem/gvfs.py index 4870ac505f..8d98d0a041 100644 --- a/clients/client-python/gravitino/filesystem/gvfs.py +++ b/clients/client-python/gravitino/filesystem/gvfs.py @@ -21,16 +21,19 @@ import re import fsspec -from cachetools import TTLCache +from cachetools import TTLCache, LRUCache from fsspec import AbstractFileSystem from fsspec.implementations.local import LocalFileSystem from fsspec.implementations.arrow import ArrowFSWrapper from fsspec.utils import infer_storage_options from pyarrow.fs import HadoopFileSystem from readerwriterlock import rwlock -from gravitino.api.catalog import Catalog -from gravitino.api.fileset import Fileset +from gravitino.audit.caller_context import CallerContext, CallerContextHolder +from gravitino.audit.fileset_audit_constants import FilesetAuditConstants +from gravitino.audit.fileset_data_operation import FilesetDataOperation +from gravitino.audit.internal_client_type import InternalClientType from gravitino.auth.simple_auth_provider import SimpleAuthProvider +from gravitino.catalog.fileset_catalog import FilesetCatalog from gravitino.client.gravitino_client import GravitinoClient from gravitino.exceptions.base import GravitinoRuntimeException from gravitino.filesystem.gvfs_config import GVFSConfig @@ -44,39 +47,20 @@ class StorageType(Enum): LOCAL = "file" -class FilesetContext: - """A context object that holds the information about the fileset and the file system which used in +class FilesetContextPair: + """A context object that holds the information about the actual file location and the file system which used in the GravitinoVirtualFileSystem's operations. """ - def __init__( - self, - name_identifier: NameIdentifier, - fileset: Fileset, - fs: AbstractFileSystem, - storage_type: StorageType, - actual_path: str, - ): - self._name_identifier = name_identifier - self._fileset = fileset - self._fs = fs - self._storage_type = storage_type - self._actual_path = actual_path - - def get_name_identifier(self): - return self._name_identifier + def __init__(self, actual_file_location: str, filesystem: AbstractFileSystem): + self._actual_file_location = actual_file_location + self._filesystem = filesystem - def get_fileset(self): - return self._fileset + def actual_file_location(self): + return self._actual_file_location - def get_fs(self): - return self._fs - - def get_actual_path(self): - return self._actual_path - - def get_storage_type(self): - return self._storage_type + def filesystem(self): + return self._filesystem class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem): @@ -136,6 +120,8 @@ def __init__( ) self._cache = TTLCache(maxsize=cache_size, ttl=cache_expired_time) self._cache_lock = rwlock.RWLockFair() + self._catalog_cache = LRUCache(maxsize=100) + self._catalog_cache_lock = rwlock.RWLockFair() super().__init__(**kwargs) @@ -160,28 +146,42 @@ def ls(self, path, detail=True, **kwargs): :param kwargs: Extra args :return If details is true, returns a list of file info dicts, else returns a list of file paths """ - context: FilesetContext = self._get_fileset_context(path) + context_pair: FilesetContextPair = self._get_fileset_context( + path, FilesetDataOperation.LIST_STATUS + ) + actual_path = context_pair.actual_file_location() + storage_type = self._recognize_storage_type(actual_path) + pre_process_path: str = self._pre_process_path(path) + identifier: NameIdentifier = self._extract_identifier(pre_process_path) + sub_path: str = self._get_sub_path_from_virtual_path( + identifier, pre_process_path + ) + storage_location: str = actual_path[: len(actual_path) - len(sub_path)] + # return entries with details if detail: - entries = [ - self._convert_actual_info(entry, context) - for entry in context.get_fs().ls( - self._strip_storage_protocol( - context.get_storage_type(), context.get_actual_path() - ), - detail=True, + entries = context_pair.filesystem().ls( + self._strip_storage_protocol(storage_type, actual_path), + detail=True, + ) + virtual_entries = [ + self._convert_actual_info( + entry, storage_location, self._get_virtual_location(identifier) ) + for entry in entries ] - return entries - entries = [ - self._convert_actual_path(entry_path, context) - for entry_path in context.get_fs().ls( - self._strip_storage_protocol( - context.get_storage_type(), context.get_actual_path() - ), - detail=False, + return virtual_entries + # only returns paths + entry_paths = context_pair.filesystem().ls( + self._strip_storage_protocol(storage_type, actual_path), + detail=False, + ) + virtual_entry_paths = [ + self._convert_actual_path( + entry_path, storage_location, self._get_virtual_location(identifier) ) + for entry_path in entry_paths ] - return entries + return virtual_entry_paths def info(self, path, **kwargs): """Get file info. @@ -189,13 +189,23 @@ def info(self, path, **kwargs): :param kwargs: Extra args :return A file info dict """ - context: FilesetContext = self._get_fileset_context(path) - actual_info: Dict = context.get_fs().info( - self._strip_storage_protocol( - context.get_storage_type(), context.get_actual_path() - ) + context_pair: FilesetContextPair = self._get_fileset_context( + path, FilesetDataOperation.GET_FILE_STATUS + ) + actual_path = context_pair.actual_file_location() + storage_type = self._recognize_storage_type(actual_path) + pre_process_path: str = self._pre_process_path(path) + identifier: NameIdentifier = self._extract_identifier(pre_process_path) + sub_path: str = self._get_sub_path_from_virtual_path( + identifier, pre_process_path + ) + storage_location: str = actual_path[: len(actual_path) - len(sub_path)] + actual_info: Dict = context_pair.filesystem().info( + self._strip_storage_protocol(storage_type, actual_path) + ) + return self._convert_actual_info( + actual_info, storage_location, self._get_virtual_location(identifier) ) - return self._convert_actual_info(actual_info, context) def exists(self, path, **kwargs): """Check if a file or a directory exists. @@ -203,11 +213,13 @@ def exists(self, path, **kwargs): :param kwargs: Extra args :return If a file or directory exists, it returns True, otherwise False """ - context: FilesetContext = self._get_fileset_context(path) - return context.get_fs().exists( - self._strip_storage_protocol( - context.get_storage_type(), context.get_actual_path() - ) + context_pair: FilesetContextPair = self._get_fileset_context( + path, FilesetDataOperation.EXISTS + ) + actual_path = context_pair.actual_file_location() + storage_type = self._recognize_storage_type(actual_path) + return context_pair.filesystem().exists( + self._strip_storage_protocol(storage_type, actual_path) ) def cp_file(self, path1, path2, **kwargs): @@ -225,24 +237,20 @@ def cp_file(self, path1, path2, **kwargs): f"Destination file path identifier: `{dst_identifier}` should be same with src file path " f"identifier: `{src_identifier}`." ) - src_context: FilesetContext = self._get_fileset_context(src_path) - if self._check_mount_single_file( - src_context.get_fileset(), - src_context.get_fs(), - src_context.get_storage_type(), - ): - raise GravitinoRuntimeException( - f"Cannot cp file of the fileset: {src_identifier} which only mounts to a single file." - ) - dst_context: FilesetContext = self._get_fileset_context(dst_path) + src_context_pair: FilesetContextPair = self._get_fileset_context( + src_path, FilesetDataOperation.COPY_FILE + ) + src_actual_path = src_context_pair.actual_file_location() + + dst_context_pair: FilesetContextPair = self._get_fileset_context( + dst_path, FilesetDataOperation.COPY_FILE + ) + dst_actual_path = dst_context_pair.actual_file_location() - src_context.get_fs().cp_file( - self._strip_storage_protocol( - src_context.get_storage_type(), src_context.get_actual_path() - ), - self._strip_storage_protocol( - dst_context.get_storage_type(), dst_context.get_actual_path() - ), + storage_type = self._recognize_storage_type(src_actual_path) + src_context_pair.filesystem().cp_file( + self._strip_storage_protocol(storage_type, src_actual_path), + self._strip_storage_protocol(storage_type, dst_actual_path), ) def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): @@ -264,39 +272,31 @@ def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): f"Destination file path identifier: `{dst_identifier}`" f" should be same with src file path identifier: `{src_identifier}`." ) - src_context: FilesetContext = self._get_fileset_context(src_path) - if self._check_mount_single_file( - src_context.get_fileset(), - src_context.get_fs(), - src_context.get_storage_type(), - ): - raise GravitinoRuntimeException( - f"Cannot cp file of the fileset: {src_identifier} which only mounts to a single file." - ) - dst_context: FilesetContext = self._get_fileset_context(dst_path) - if src_context.get_storage_type() == StorageType.HDFS: - src_context.get_fs().mv( - self._strip_storage_protocol( - src_context.get_storage_type(), src_context.get_actual_path() - ), - self._strip_storage_protocol( - dst_context.get_storage_type(), dst_context.get_actual_path() - ), + src_context_pair: FilesetContextPair = self._get_fileset_context( + src_path, FilesetDataOperation.RENAME + ) + src_actual_path = src_context_pair.actual_file_location() + storage_type = self._recognize_storage_type(src_actual_path) + dst_context_pair: FilesetContextPair = self._get_fileset_context( + dst_path, FilesetDataOperation.RENAME + ) + dst_actual_path = dst_context_pair.actual_file_location() + + if storage_type == StorageType.HDFS: + src_context_pair.filesystem().mv( + self._strip_storage_protocol(storage_type, src_actual_path), + self._strip_storage_protocol(storage_type, dst_actual_path), ) - elif src_context.get_storage_type() == StorageType.LOCAL: - src_context.get_fs().mv( - self._strip_storage_protocol( - src_context.get_storage_type(), src_context.get_actual_path() - ), - self._strip_storage_protocol( - dst_context.get_storage_type(), dst_context.get_actual_path() - ), + elif storage_type == StorageType.LOCAL: + src_context_pair.filesystem().mv( + self._strip_storage_protocol(storage_type, src_actual_path), + self._strip_storage_protocol(storage_type, dst_actual_path), recursive, maxdepth, ) else: raise GravitinoRuntimeException( - f"Storage type:{src_context.get_storage_type()} doesn't support now." + f"Storage type:{storage_type} doesn't support now." ) def _rm(self, path): @@ -311,11 +311,13 @@ def rm(self, path, recursive=False, maxdepth=None): When removing a directory, this parameter should be True. :param maxdepth: The maximum depth to remove the directory recursively. """ - context: FilesetContext = self._get_fileset_context(path) - context.get_fs().rm( - self._strip_storage_protocol( - context.get_storage_type(), context.get_actual_path() - ), + context_pair: FilesetContextPair = self._get_fileset_context( + path, FilesetDataOperation.DELETE + ) + actual_path = context_pair.actual_file_location() + storage_type = self._recognize_storage_type(actual_path) + context_pair.filesystem().rm( + self._strip_storage_protocol(storage_type, actual_path), recursive, maxdepth, ) @@ -324,11 +326,13 @@ def rm_file(self, path): """Remove a file. :param path: Virtual fileset path """ - context: FilesetContext = self._get_fileset_context(path) - context.get_fs().rm_file( - self._strip_storage_protocol( - context.get_storage_type(), context.get_actual_path() - ) + context_pair: FilesetContextPair = self._get_fileset_context( + path, FilesetDataOperation.DELETE + ) + actual_path = context_pair.actual_file_location() + storage_type = self._recognize_storage_type(actual_path) + context_pair.filesystem().rm_file( + self._strip_storage_protocol(storage_type, actual_path) ) def rmdir(self, path): @@ -337,11 +341,13 @@ def rmdir(self, path): And it will throw an exception if delete a directory which is non-empty for LocalFileSystem. :param path: Virtual fileset path """ - context: FilesetContext = self._get_fileset_context(path) - context.get_fs().rmdir( - self._strip_storage_protocol( - context.get_storage_type(), context.get_actual_path() - ) + context_pair: FilesetContextPair = self._get_fileset_context( + path, FilesetDataOperation.DELETE + ) + actual_path = context_pair.actual_file_location() + storage_type = self._recognize_storage_type(actual_path) + context_pair.filesystem().rmdir( + self._strip_storage_protocol(storage_type, actual_path) ) def open( @@ -362,11 +368,19 @@ def open( :param kwargs: Extra args :return A file-like object from the filesystem """ - context: FilesetContext = self._get_fileset_context(path) - return context.get_fs().open( - self._strip_storage_protocol( - context.get_storage_type(), context.get_actual_path() - ), + if mode in ("w", "wb"): + data_operation = FilesetDataOperation.OPEN_AND_WRITE + elif mode in ("a", "ab"): + data_operation = FilesetDataOperation.OPEN_AND_APPEND + else: + data_operation = FilesetDataOperation.OPEN + context_pair: FilesetContextPair = self._get_fileset_context( + path, data_operation + ) + actual_path = context_pair.actual_file_location() + storage_type = self._recognize_storage_type(actual_path) + return context_pair.filesystem().open( + self._strip_storage_protocol(storage_type, actual_path), mode, block_size, cache_options, @@ -382,11 +396,13 @@ def mkdir(self, path, create_parents=True, **kwargs): :param create_parents: Create parent directories if missing when set to True :param kwargs: Extra args """ - context: FilesetContext = self._get_fileset_context(path) - context.get_fs().mkdir( - self._strip_storage_protocol( - context.get_storage_type(), context.get_actual_path() - ), + context_pair: FilesetContextPair = self._get_fileset_context( + path, FilesetDataOperation.MKDIRS + ) + actual_path = context_pair.actual_file_location() + storage_type = self._recognize_storage_type(actual_path) + context_pair.filesystem().mkdir( + self._strip_storage_protocol(storage_type, actual_path), create_parents, **kwargs, ) @@ -396,11 +412,13 @@ def makedirs(self, path, exist_ok=True): :param path: Virtual fileset path :param exist_ok: Continue if a directory already exists """ - context: FilesetContext = self._get_fileset_context(path) - context.get_fs().makedirs( - self._strip_storage_protocol( - context.get_storage_type(), context.get_actual_path() - ), + context_pair: FilesetContextPair = self._get_fileset_context( + path, FilesetDataOperation.MKDIRS + ) + actual_path = context_pair.actual_file_location() + storage_type = self._recognize_storage_type(actual_path) + context_pair.filesystem().makedirs( + self._strip_storage_protocol(storage_type, actual_path), exist_ok, ) @@ -410,15 +428,17 @@ def created(self, path): :param path: Virtual fileset path :return Created time(datetime.datetime) """ - context: FilesetContext = self._get_fileset_context(path) - if context.get_storage_type() == StorageType.LOCAL: - return context.get_fs().created( - self._strip_storage_protocol( - context.get_storage_type(), context.get_actual_path() - ) + context_pair: FilesetContextPair = self._get_fileset_context( + path, FilesetDataOperation.CREATED_TIME + ) + actual_path = context_pair.actual_file_location() + storage_type = self._recognize_storage_type(actual_path) + if storage_type == StorageType.LOCAL: + return context_pair.filesystem().created( + self._strip_storage_protocol(storage_type, actual_path) ) raise GravitinoRuntimeException( - f"Storage type:{context.get_storage_type()} doesn't support now." + f"Storage type:{storage_type} doesn't support now." ) def modified(self, path): @@ -426,11 +446,13 @@ def modified(self, path): :param path: Virtual fileset path :return Modified time(datetime.datetime) """ - context: FilesetContext = self._get_fileset_context(path) - return context.get_fs().modified( - self._strip_storage_protocol( - context.get_storage_type(), context.get_actual_path() - ) + context_pair: FilesetContextPair = self._get_fileset_context( + path, FilesetDataOperation.MODIFIED_TIME + ) + actual_path = context_pair.actual_file_location() + storage_type = self._recognize_storage_type(actual_path) + return context_pair.filesystem().modified( + self._strip_storage_protocol(storage_type, actual_path) ) def cat_file(self, path, start=None, end=None, **kwargs): @@ -441,11 +463,13 @@ def cat_file(self, path, start=None, end=None, **kwargs): :param kwargs: Extra args :return File content """ - context: FilesetContext = self._get_fileset_context(path) - return context.get_fs().cat_file( - self._strip_storage_protocol( - context.get_storage_type(), context.get_actual_path() - ), + context_pair: FilesetContextPair = self._get_fileset_context( + path, FilesetDataOperation.CAT_FILE + ) + actual_path = context_pair.actual_file_location() + storage_type = self._recognize_storage_type(actual_path) + return context_pair.filesystem().cat_file( + self._strip_storage_protocol(storage_type, actual_path), start, end, **kwargs, @@ -465,55 +489,67 @@ def get_file(self, rpath, lpath, callback=None, outfile=None, **kwargs): raise GravitinoRuntimeException( "Doesn't support copy a remote gvfs file to an another remote file." ) - context: FilesetContext = self._get_fileset_context(rpath) - context.get_fs().get_file( - self._strip_storage_protocol( - context.get_storage_type(), context.get_actual_path() - ), + context_pair: FilesetContextPair = self._get_fileset_context( + rpath, FilesetDataOperation.GET_FILE + ) + actual_path = context_pair.actual_file_location() + storage_type = self._recognize_storage_type(actual_path) + context_pair.filesystem().get_file( + self._strip_storage_protocol(storage_type, actual_path), lpath, **kwargs, ) - def _convert_actual_path(self, path, context: FilesetContext): + def _convert_actual_path( + self, + actual_path: str, + storage_location: str, + virtual_location: str, + ): """Convert an actual path to a virtual path. The virtual path is like `fileset/{catalog}/{schema}/{fileset}/xxx`. - :param path: Actual path - :param context: Fileset context + :param actual_path: Actual path + :param storage_location: Storage location + :param virtual_location: Virtual location :return A virtual path """ - if context.get_storage_type() == StorageType.HDFS: - actual_prefix = infer_storage_options( - context.get_fileset().storage_location() - )["path"] - elif context.get_storage_type() == StorageType.LOCAL: - actual_prefix = context.get_fileset().storage_location()[ - len(f"{StorageType.LOCAL.value}:") : - ] + if storage_location.startswith(f"{StorageType.HDFS.value}://"): + actual_prefix = infer_storage_options(storage_location)["path"] + elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"): + actual_prefix = storage_location[len(f"{StorageType.LOCAL.value}:") :] else: raise GravitinoRuntimeException( - f"Storage type:{context.get_storage_type()} doesn't support now." + f"Storage location:{storage_location} doesn't support now." ) - if not path.startswith(actual_prefix): + if not actual_path.startswith(actual_prefix): raise GravitinoRuntimeException( - f"Path {path} does not start with valid prefix {actual_prefix}." + f"Path {actual_path} does not start with valid prefix {actual_prefix}." ) - virtual_location = self._get_virtual_location(context.get_name_identifier()) + # if the storage location is end with "/", # we should truncate this to avoid replace issues. if actual_prefix.endswith(self.SLASH) and not virtual_location.endswith( self.SLASH ): - return f"{path.replace(actual_prefix[:-1], virtual_location)}" - return f"{path.replace(actual_prefix, virtual_location)}" + return f"{actual_path.replace(actual_prefix[:-1], virtual_location)}" + return f"{actual_path.replace(actual_prefix, virtual_location)}" - def _convert_actual_info(self, entry: Dict, context: FilesetContext): + def _convert_actual_info( + self, + entry: Dict, + storage_location: str, + virtual_location: str, + ): """Convert a file info from an actual entry to a virtual entry. :param entry: A dict of the actual file info - :param context: Fileset context + :param storage_location: Storage location + :param virtual_location: Virtual location :return A dict of the virtual file info """ - path = self._convert_actual_path(entry["name"], context) + path = self._convert_actual_path( + entry["name"], storage_location, virtual_location + ) return { "name": path, "size": entry["size"], @@ -521,78 +557,38 @@ def _convert_actual_info(self, entry: Dict, context: FilesetContext): "mtime": entry["mtime"], } - def _get_fileset_context(self, virtual_path: str): + def _get_fileset_context(self, virtual_path: str, operation: FilesetDataOperation): """Get a fileset context from the cache or the Gravitino server :param virtual_path: The virtual path - :return A fileset context + :param operation: The data operation + :return A fileset context pair """ virtual_path: str = self._pre_process_path(virtual_path) identifier: NameIdentifier = self._extract_identifier(virtual_path) - read_lock = self._cache_lock.gen_rlock() - try: - read_lock.acquire() - cache_value: Tuple[Fileset, AbstractFileSystem, StorageType] = ( - self._cache.get(identifier) - ) - if cache_value is not None: - actual_path = self._get_actual_path_by_ident( - identifier, - cache_value[0], - cache_value[1], - cache_value[2], - virtual_path, - ) - return FilesetContext( - identifier, - cache_value[0], - cache_value[1], - cache_value[2], - actual_path, - ) - finally: - read_lock.release() - - write_lock = self._cache_lock.gen_wlock() - try: - write_lock.acquire() - cache_value: Tuple[Fileset, AbstractFileSystem] = self._cache.get( - identifier - ) - if cache_value is not None: - actual_path = self._get_actual_path_by_ident( - identifier, - cache_value[0], - cache_value[1], - cache_value[2], - virtual_path, - ) - return FilesetContext( - identifier, - cache_value[0], - cache_value[1], - cache_value[2], - actual_path, - ) - fileset: Fileset = self._load_fileset_from_server(identifier) - storage_location = fileset.storage_location() - if storage_location.startswith(f"{StorageType.HDFS.value}://"): - fs = ArrowFSWrapper(HadoopFileSystem.from_uri(storage_location)) - storage_type = StorageType.HDFS - elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"): - fs = LocalFileSystem() - storage_type = StorageType.LOCAL - else: - raise GravitinoRuntimeException( - f"Storage under the fileset: `{identifier}` doesn't support now." - ) - actual_path = self._get_actual_path_by_ident( - identifier, fileset, fs, storage_type, virtual_path + catalog_ident: NameIdentifier = NameIdentifier.of( + self._metalake, identifier.namespace().level(1) + ) + fileset_catalog = self._get_fileset_catalog(catalog_ident) + if fileset_catalog is None: + raise GravitinoRuntimeException( + f"Loaded fileset catalog: {catalog_ident} is null." ) - self._cache[identifier] = (fileset, fs, storage_type) - context = FilesetContext(identifier, fileset, fs, storage_type, actual_path) - return context - finally: - write_lock.release() + sub_path: str = self._get_sub_path_from_virtual_path(identifier, virtual_path) + context = { + FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION: operation.name, + FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE: InternalClientType.PYTHON_GVFS.name, + } + caller_context: CallerContext = CallerContext(context) + CallerContextHolder.set(caller_context) + actual_file_location: ( + str + ) = fileset_catalog.as_fileset_catalog().get_file_location( + NameIdentifier.of(identifier.namespace().level(2), identifier.name()), + sub_path, + ) + return FilesetContextPair( + actual_file_location, self._get_filesystem(actual_file_location) + ) def _extract_identifier(self, path): """Extract the fileset identifier from the path. @@ -613,63 +609,6 @@ def _extract_identifier(self, path): f"path: `{path}` doesn't contains valid identifier." ) - def _load_fileset_from_server(self, identifier: NameIdentifier) -> Fileset: - """Load the fileset from the server. - If the fileset is not found on the server, an `NoSuchFilesetException` exception will be raised. - :param identifier: The fileset identifier - :return The fileset - """ - catalog: Catalog = self._client.load_catalog(identifier.namespace().level(1)) - - return catalog.as_fileset_catalog().load_fileset( - NameIdentifier.of(identifier.namespace().level(2), identifier.name()) - ) - - def _get_actual_path_by_ident( - self, - identifier: NameIdentifier, - fileset: Fileset, - fs: AbstractFileSystem, - storage_type: StorageType, - virtual_path: str, - ): - """Get the actual path by the virtual path and the fileset. - :param identifier: The fileset identifier - :param fileset: The fileset - :param fs: The file system corresponding to the fileset storage location - :param storage_type: The storage type of the fileset storage location - :param virtual_path: The virtual fileset path - :return The actual path. - """ - virtual_location = self._get_virtual_location(identifier) - storage_location = fileset.storage_location() - if self._check_mount_single_file(fileset, fs, storage_type): - if virtual_path != virtual_location: - raise GravitinoRuntimeException( - f"Path: {virtual_path} should be same with the virtual location: {virtual_location}" - " when the fileset only mounts a single file." - ) - return storage_location - # if the storage location ends with "/", - # we should handle the conversion specially - if storage_location.endswith(self.SLASH): - sub_path = virtual_path[len(virtual_location) :] - # For example, if the virtual path is `gvfs://fileset/catalog/schema/test_fileset/ttt`, - # and the storage location is `hdfs://cluster:8020/user/`, - # we should replace `gvfs://fileset/catalog/schema/test_fileset` - # with `hdfs://localhost:8020/user` which truncates the tailing slash. - # If the storage location is `hdfs://cluster:8020/user`, - # we can replace `gvfs://fileset/catalog/schema/test_fileset` - # with `hdfs://localhost:8020/user` directly. - if sub_path.startswith(self.SLASH): - new_storage_location = storage_location[:-1] - else: - new_storage_location = storage_location - - # Replace virtual_location with the adjusted storage_location - return virtual_path.replace(virtual_location, new_storage_location, 1) - return virtual_path.replace(virtual_location, storage_location, 1) - @staticmethod def _get_virtual_location(identifier: NameIdentifier): """Get the virtual location of the fileset. @@ -682,20 +621,6 @@ def _get_virtual_location(identifier: NameIdentifier): f"/{identifier.name()}" ) - def _check_mount_single_file( - self, fileset: Fileset, fs: AbstractFileSystem, storage_type: StorageType - ): - """Check if the fileset is mounted a single file. - :param fileset: The fileset - :param fs: The file system corresponding to the fileset storage location - :param storage_type: The storage type of the fileset storage location - :return True the fileset is mounted a single file. - """ - result: Dict = fs.info( - self._strip_storage_protocol(storage_type, fileset.storage_location()) - ) - return result["type"] == "file" - @staticmethod def _pre_process_path(virtual_path): """Pre-process the path. @@ -719,6 +644,28 @@ def _pre_process_path(virtual_path): ) return pre_processed_path + @staticmethod + def _recognize_storage_type(path: str): + """Recognize the storage type by the path. + :param path: The path + :return: The storage type + """ + if path.startswith(f"{StorageType.HDFS.value}://"): + return StorageType.HDFS + if path.startswith(f"{StorageType.LOCAL.value}:/"): + return StorageType.LOCAL + raise GravitinoRuntimeException( + f"Storage type doesn't support now. Path:{path}" + ) + + @staticmethod + def _get_sub_path_from_virtual_path(identifier: NameIdentifier, virtual_path: str): + return virtual_path[ + len( + f"fileset/{identifier.namespace().level(1)}/{identifier.namespace().level(2)}/{identifier.name()}" + ) : + ] + @staticmethod def _strip_storage_protocol(storage_type: StorageType, path: str): """Strip the storage protocol from the path. @@ -739,5 +686,65 @@ def _strip_storage_protocol(storage_type: StorageType, path: str): f"Storage type:{storage_type} doesn't support now." ) + def _get_fileset_catalog(self, catalog_ident: NameIdentifier): + read_lock = self._catalog_cache_lock.gen_rlock() + try: + read_lock.acquire() + cache_value: Tuple[NameIdentifier, FilesetCatalog] = ( + self._catalog_cache.get(catalog_ident) + ) + if cache_value is not None: + return cache_value + finally: + read_lock.release() + + write_lock = self._catalog_cache_lock.gen_wlock() + try: + write_lock.acquire() + cache_value: Tuple[NameIdentifier, FilesetCatalog] = ( + self._catalog_cache.get(catalog_ident) + ) + if cache_value is not None: + return cache_value + catalog = self._client.load_catalog(catalog_ident.name()) + self._catalog_cache[catalog_ident] = catalog + return catalog + finally: + write_lock.release() + + def _get_filesystem(self, actual_file_location: str): + storage_type = self._recognize_storage_type(actual_file_location) + read_lock = self._cache_lock.gen_rlock() + try: + read_lock.acquire() + cache_value: Tuple[StorageType, AbstractFileSystem] = self._cache.get( + storage_type + ) + if cache_value is not None: + return cache_value + finally: + read_lock.release() + + write_lock = self._cache_lock.gen_wlock() + try: + write_lock.acquire() + cache_value: Tuple[StorageType, AbstractFileSystem] = self._cache.get( + storage_type + ) + if cache_value is not None: + return cache_value + if storage_type == StorageType.HDFS: + fs = ArrowFSWrapper(HadoopFileSystem.from_uri(actual_file_location)) + elif storage_type == StorageType.LOCAL: + fs = LocalFileSystem() + else: + raise GravitinoRuntimeException( + f"Storage type: `{storage_type}` doesn't support now." + ) + self._cache[storage_type] = fs + return fs + finally: + write_lock.release() + fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem) diff --git a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py index 87b6f10231..9116005b84 100644 --- a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py +++ b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py @@ -364,6 +364,10 @@ def test_mv(self): self.assertTrue(fs.exists(mv_new_file)) self.assertTrue(self.hdfs.exists(mv_new_actual_file)) + # test rename without sub path, which should throw an exception + with self.assertRaises(GravitinoRuntimeException): + fs.mv(self.fileset_gvfs_location, self.fileset_gvfs_location + "/test_mv") + def test_rm(self): rm_dir = self.fileset_gvfs_location + "/test_rm" rm_actual_dir = self.fileset_storage_location + "/test_rm" diff --git a/clients/client-python/tests/unittests/test_gvfs_with_local.py b/clients/client-python/tests/unittests/test_gvfs_with_local.py index 3b28941dff..22bdccd8c5 100644 --- a/clients/client-python/tests/unittests/test_gvfs_with_local.py +++ b/clients/client-python/tests/unittests/test_gvfs_with_local.py @@ -14,16 +14,16 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import base64 +import os # pylint: disable=protected-access,too-many-lines,too-many-locals -import base64 -import os import random import string import time import unittest -from unittest import mock +from datetime import datetime from unittest.mock import patch import pandas @@ -31,16 +31,11 @@ import pyarrow.dataset as dt import pyarrow.parquet as pq from fsspec.implementations.local import LocalFileSystem -from llama_index.core import SimpleDirectoryReader -from gravitino import gvfs, Fileset -from gravitino import NameIdentifier +from gravitino import gvfs, NameIdentifier from gravitino.auth.auth_constants import AuthConstants -from gravitino.dto.audit_dto import AuditDTO -from gravitino.dto.fileset_dto import FilesetDTO -from gravitino.filesystem.gvfs import FilesetContext, StorageType from gravitino.exceptions.base import GravitinoRuntimeException - +from gravitino.filesystem.gvfs_config import GVFSConfig from tests.unittests import mock_base @@ -67,42 +62,30 @@ def tearDown(self) -> None: if local_fs.exists(self._local_base_dir_path): local_fs.rm(self._local_base_dir_path, recursive=True) - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_cache", f"{_fileset_dir}/test_cache" - ), - ) def test_cache(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_cache" fileset_virtual_location = "fileset/fileset_catalog/tmp/test_cache" - local_fs.mkdir(fileset_storage_location) - self.assertTrue(local_fs.exists(fileset_storage_location)) - options = {"cache_size": 1, "cache_expired_time": 2} - fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", - metalake_name="metalake_demo", - options=options, - ) - self.assertTrue(fs.exists(fileset_virtual_location)) - # wait 2 seconds - time.sleep(2) - self.assertIsNone( - fs.cache.get( - NameIdentifier.of( - "metalake_demo", "fileset_catalog", "tmp", "test_cache" - ) + actual_path = fileset_storage_location + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + local_fs = LocalFileSystem() + local_fs.mkdir(fileset_storage_location) + self.assertTrue(local_fs.exists(fileset_storage_location)) + options = {GVFSConfig.CACHE_SIZE: 1, GVFSConfig.CACHE_EXPIRED_TIME: 1} + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + options=options, + skip_instance_cache=True, ) - ) + self.assertTrue(fs.exists(fileset_virtual_location)) + # wait 2 seconds + time.sleep(2) + self.assertIsNone(fs._cache.get("file:/")) - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_simple_auth", f"{_fileset_dir}/test_simple_auth" - ), - ) - def test_simple_auth(self, mock_method1, mock_method2, mock_method3, mock_method4): + def test_simple_auth(self, *mock_methods): options = {"auth_type": "simple"} current_user = ( None if os.environ.get("user.name") is None else os.environ["user.name"] @@ -113,6 +96,7 @@ def test_simple_auth(self, mock_method1, mock_method2, mock_method3, mock_method server_uri="http://localhost:9090", metalake_name="metalake_demo", options=options, + skip_instance_cache=True, ) token = fs._client._rest_client.auth_data_provider.get_token_data() token_string = base64.b64decode( @@ -122,60 +106,59 @@ def test_simple_auth(self, mock_method1, mock_method2, mock_method3, mock_method if current_user is not None: os.environ["user.name"] = current_user - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset("test_ls", f"{_fileset_dir}/test_ls"), - ) def test_ls(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_ls" fileset_virtual_location = "fileset/fileset_catalog/tmp/test_ls" - local_fs.mkdir(fileset_storage_location) - sub_dir_path = f"{fileset_storage_location}/test_1" - local_fs.mkdir(sub_dir_path) - self.assertTrue(local_fs.exists(sub_dir_path)) - sub_file_path = f"{fileset_storage_location}/test_file_1.par" - local_fs.touch(sub_file_path) - self.assertTrue(local_fs.exists(sub_file_path)) - - fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" - ) - self.assertTrue(fs.exists(fileset_virtual_location)) - - # test detail = false - file_list_without_detail = fs.ls(fileset_virtual_location, detail=False) - file_list_without_detail.sort() - self.assertEqual(2, len(file_list_without_detail)) - self.assertEqual( - file_list_without_detail[0], f"{fileset_virtual_location}/test_1" - ) - self.assertEqual( - file_list_without_detail[1], f"{fileset_virtual_location}/test_file_1.par" - ) + actual_path = fileset_storage_location + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + local_fs = LocalFileSystem() + local_fs.mkdir(fileset_storage_location) + sub_dir_path = f"{fileset_storage_location}/test_1" + local_fs.mkdir(sub_dir_path) + self.assertTrue(local_fs.exists(sub_dir_path)) + sub_file_path = f"{fileset_storage_location}/test_file_1.par" + local_fs.touch(sub_file_path) + self.assertTrue(local_fs.exists(sub_file_path)) + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, + ) + self.assertTrue(fs.exists(fileset_virtual_location)) + + # test detail = false + file_list_without_detail = fs.ls(fileset_virtual_location, detail=False) + file_list_without_detail.sort() + self.assertEqual(2, len(file_list_without_detail)) + self.assertEqual( + file_list_without_detail[0], f"{fileset_virtual_location}/test_1" + ) + self.assertEqual( + file_list_without_detail[1], + f"{fileset_virtual_location}/test_file_1.par", + ) - # test detail = true - file_list_with_detail = fs.ls(fileset_virtual_location, detail=True) - file_list_with_detail.sort(key=lambda x: x["name"]) - self.assertEqual(2, len(file_list_with_detail)) - self.assertEqual( - file_list_with_detail[0]["name"], f"{fileset_virtual_location}/test_1" - ) - self.assertEqual( - file_list_with_detail[1]["name"], - f"{fileset_virtual_location}/test_file_1.par", - ) + # test detail = true + file_list_with_detail = fs.ls(fileset_virtual_location, detail=True) + file_list_with_detail.sort(key=lambda x: x["name"]) + self.assertEqual(2, len(file_list_with_detail)) + self.assertEqual( + file_list_with_detail[0]["name"], f"{fileset_virtual_location}/test_1" + ) + self.assertEqual( + file_list_with_detail[1]["name"], + f"{fileset_virtual_location}/test_file_1.par", + ) - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_info", f"{_fileset_dir}/test_info" - ), - ) def test_info(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_info" fileset_virtual_location = "fileset/fileset_catalog/tmp/test_info" + actual_path = fileset_storage_location + local_fs = LocalFileSystem() local_fs.mkdir(fileset_storage_location) sub_dir_path = f"{fileset_storage_location}/test_1" local_fs.mkdir(sub_dir_path) @@ -185,28 +168,39 @@ def test_info(self, *mock_methods): self.assertTrue(local_fs.exists(sub_file_path)) fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) - self.assertTrue(fs.exists(fileset_virtual_location)) + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(fileset_virtual_location)) dir_virtual_path = fileset_virtual_location + "/test_1" - dir_info = fs.info(dir_virtual_path) - self.assertEqual(dir_info["name"], dir_virtual_path) + actual_path = fileset_storage_location + "/test_1" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + dir_info = fs.info(dir_virtual_path) + self.assertEqual(dir_info["name"], dir_virtual_path) file_virtual_path = fileset_virtual_location + "/test_file_1.par" - file_info = fs.info(file_virtual_path) - self.assertEqual(file_info["name"], file_virtual_path) - - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_exist", f"{_fileset_dir}/test_exist" - ), - ) + actual_path = fileset_storage_location + "/test_file_1.par" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + file_info = fs.info(file_virtual_path) + self.assertEqual(file_info["name"], file_virtual_path) + def test_exist(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_exist" fileset_virtual_location = "fileset/fileset_catalog/tmp/test_exist" + actual_path = fileset_storage_location + local_fs = LocalFileSystem() local_fs.mkdir(fileset_storage_location) sub_dir_path = f"{fileset_storage_location}/test_1" local_fs.mkdir(sub_dir_path) @@ -216,28 +210,38 @@ def test_exist(self, *mock_methods): self.assertTrue(local_fs.exists(sub_file_path)) fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) - self.assertTrue(fs.exists(fileset_virtual_location)) + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(fileset_virtual_location)) dir_virtual_path = fileset_virtual_location + "/test_1" - self.assertTrue(fs.exists(dir_virtual_path)) + actual_path = fileset_storage_location + "/test_1" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(dir_virtual_path)) file_virtual_path = fileset_virtual_location + "/test_file_1.par" - self.assertTrue(fs.exists(file_virtual_path)) + actual_path = fileset_storage_location + "/test_file_1.par" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(file_virtual_path)) - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_cp_file", f"{_fileset_dir}/test_cp_file" - ), - ) def test_cp_file(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_cp_file" - local_fs.mkdir(fileset_storage_location) - fileset_virtual_location = "fileset/fileset_catalog/tmp/test_cp_file" + actual_path = fileset_storage_location + local_fs = LocalFileSystem() + local_fs.mkdir(fileset_storage_location) sub_file_path = f"{fileset_storage_location}/test_file_1.par" local_fs.touch(sub_file_path) self.assertTrue(local_fs.exists(sub_file_path)) @@ -246,19 +250,35 @@ def test_cp_file(self, *mock_methods): f.write(b"test_file_1") fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) - self.assertTrue(fs.exists(fileset_virtual_location)) + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(fileset_virtual_location)) file_virtual_path = fileset_virtual_location + "/test_file_1.par" - self.assertTrue(fs.exists(file_virtual_path)) - - cp_file_virtual_path = fileset_virtual_location + "/test_cp_file_1.par" - fs.cp_file(file_virtual_path, cp_file_virtual_path) - self.assertTrue(fs.exists(cp_file_virtual_path)) - with local_fs.open(sub_file_path, "rb") as f: - result = f.read() - self.assertEqual(b"test_file_1", result) + src_actual_path = fileset_storage_location + "/test_file_1.par" + dst_actual_path = fileset_storage_location + "/test_cp_file_1.par" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + side_effect=[ + src_actual_path, + src_actual_path, + dst_actual_path, + dst_actual_path, + ], + ): + self.assertTrue(fs.exists(file_virtual_path)) + cp_file_virtual_path = fileset_virtual_location + "/test_cp_file_1.par" + fs.cp_file(file_virtual_path, cp_file_virtual_path) + self.assertTrue(fs.exists(cp_file_virtual_path)) + with local_fs.open(sub_file_path, "rb") as f: + result = f.read() + self.assertEqual(b"test_file_1", result) # test invalid dst path cp_file_invalid_virtual_path = ( @@ -267,25 +287,12 @@ def test_cp_file(self, *mock_methods): with self.assertRaises(GravitinoRuntimeException): fs.cp_file(file_virtual_path, cp_file_invalid_virtual_path) - # test mount a single file - local_fs.rm(path=fileset_storage_location, recursive=True) - self.assertFalse(local_fs.exists(fileset_storage_location)) - local_fs.touch(fileset_storage_location) - self.assertTrue(local_fs.exists(fileset_storage_location)) - with self.assertRaises(GravitinoRuntimeException): - fs.cp_file(file_virtual_path, cp_file_virtual_path) - - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset("test_mv", f"{_fileset_dir}/test_mv"), - ) def test_mv(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_mv" - local_fs.mkdir(fileset_storage_location) - fileset_virtual_location = "fileset/fileset_catalog/tmp/test_mv" - + actual_path = fileset_storage_location + local_fs = LocalFileSystem() + local_fs.mkdir(fileset_storage_location) sub_file_path = f"{fileset_storage_location}/test_file_1.par" local_fs.touch(sub_file_path) self.assertTrue(local_fs.exists(sub_file_path)) @@ -295,27 +302,53 @@ def test_mv(self, *mock_methods): self.assertTrue(local_fs.exists(another_dir_path)) fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) - self.assertTrue(fs.exists(fileset_virtual_location)) + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(fileset_virtual_location)) file_virtual_path = fileset_virtual_location + "/test_file_1.par" - self.assertTrue(fs.exists(file_virtual_path)) + src_actual_path = fileset_storage_location + "/test_file_1.par" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=src_actual_path, + ): + self.assertTrue(fs.exists(file_virtual_path)) mv_file_virtual_path = fileset_virtual_location + "/test_cp_file_1.par" - fs.mv(file_virtual_path, mv_file_virtual_path) - self.assertTrue(fs.exists(mv_file_virtual_path)) + dst_actual_path = fileset_storage_location + "/test_cp_file_1.par" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + side_effect=[src_actual_path, dst_actual_path, dst_actual_path], + ): + fs.mv(file_virtual_path, mv_file_virtual_path) + self.assertTrue(fs.exists(mv_file_virtual_path)) mv_another_dir_virtual_path = ( fileset_virtual_location + "/another_dir/test_file_2.par" ) - fs.mv(mv_file_virtual_path, mv_another_dir_virtual_path) - self.assertTrue(fs.exists(mv_another_dir_virtual_path)) + dst_actual_path1 = fileset_storage_location + "/another_dir/test_file_2.par" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + side_effect=[dst_actual_path, dst_actual_path1, dst_actual_path1], + ): + fs.mv(mv_file_virtual_path, mv_another_dir_virtual_path) + self.assertTrue(fs.exists(mv_another_dir_virtual_path)) # test not exist dir not_exist_dst_dir_path = fileset_virtual_location + "/not_exist/test_file_2.par" - with self.assertRaises(FileNotFoundError): - fs.mv(path1=mv_another_dir_virtual_path, path2=not_exist_dst_dir_path) + dst_actual_path2 = fileset_storage_location + "/not_exist/test_file_2.par" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + side_effect=[dst_actual_path1, dst_actual_path2], + ): + with self.assertRaises(FileNotFoundError): + fs.mv(path1=mv_another_dir_virtual_path, path2=not_exist_dst_dir_path) # test invalid dst path mv_file_invalid_virtual_path = ( @@ -324,25 +357,12 @@ def test_mv(self, *mock_methods): with self.assertRaises(GravitinoRuntimeException): fs.mv(path1=file_virtual_path, path2=mv_file_invalid_virtual_path) - # test mount a single file - local_fs.rm(path=fileset_storage_location, recursive=True) - self.assertFalse(local_fs.exists(fileset_storage_location)) - local_fs.touch(fileset_storage_location) - self.assertTrue(local_fs.exists(fileset_storage_location)) - with self.assertRaises(GravitinoRuntimeException): - fs.mv(file_virtual_path, mv_file_virtual_path) - - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset("test_rm", f"{_fileset_dir}/test_rm"), - ) def test_rm(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_rm" - local_fs.mkdir(fileset_storage_location) - fileset_virtual_location = "fileset/fileset_catalog/tmp/test_rm" - + actual_path = fileset_storage_location + local_fs = LocalFileSystem() + local_fs.mkdir(fileset_storage_location) sub_file_path = f"{fileset_storage_location}/test_file_1.par" local_fs.touch(sub_file_path) self.assertTrue(local_fs.exists(sub_file_path)) @@ -352,38 +372,48 @@ def test_rm(self, *mock_methods): self.assertTrue(local_fs.exists(sub_dir_path)) fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) - self.assertTrue(fs.exists(fileset_virtual_location)) + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(fileset_virtual_location)) # test delete file file_virtual_path = fileset_virtual_location + "/test_file_1.par" - self.assertTrue(fs.exists(file_virtual_path)) - fs.rm(file_virtual_path) - self.assertFalse(fs.exists(file_virtual_path)) + actual_path1 = fileset_storage_location + "/test_file_1.par" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path1, + ): + self.assertTrue(fs.exists(file_virtual_path)) + fs.rm(file_virtual_path) + self.assertFalse(fs.exists(file_virtual_path)) # test delete dir with recursive = false dir_virtual_path = fileset_virtual_location + "/sub_dir" - self.assertTrue(fs.exists(dir_virtual_path)) - with self.assertRaises(ValueError): - fs.rm(dir_virtual_path, recursive=False) - - # test delete dir with recursive = true - fs.rm(dir_virtual_path, recursive=True) - self.assertFalse(fs.exists(dir_virtual_path)) - - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_rm_file", f"{_fileset_dir}/test_rm_file" - ), - ) + actual_path2 = fileset_storage_location + "/sub_dir" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path2, + ): + self.assertTrue(fs.exists(dir_virtual_path)) + with self.assertRaises(ValueError): + fs.rm(dir_virtual_path, recursive=False) + + # test delete dir with recursive = true + fs.rm(dir_virtual_path, recursive=True) + self.assertFalse(fs.exists(dir_virtual_path)) + def test_rm_file(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_rm_file" - local_fs.mkdir(fileset_storage_location) - fileset_virtual_location = "fileset/fileset_catalog/tmp/test_rm_file" + actual_path = fileset_storage_location + local_fs = LocalFileSystem() + local_fs.mkdir(fileset_storage_location) sub_file_path = f"{fileset_storage_location}/test_file_1.par" local_fs.touch(sub_file_path) @@ -394,35 +424,44 @@ def test_rm_file(self, *mock_methods): self.assertTrue(local_fs.exists(sub_dir_path)) fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) - self.assertTrue(fs.exists(fileset_virtual_location)) + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(fileset_virtual_location)) # test delete file file_virtual_path = fileset_virtual_location + "/test_file_1.par" - self.assertTrue(fs.exists(file_virtual_path)) - fs.rm_file(file_virtual_path) - self.assertFalse(fs.exists(file_virtual_path)) + actual_path1 = fileset_storage_location + "/test_file_1.par" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path1, + ): + self.assertTrue(fs.exists(file_virtual_path)) + fs.rm_file(file_virtual_path) + self.assertFalse(fs.exists(file_virtual_path)) # test delete dir dir_virtual_path = fileset_virtual_location + "/sub_dir" - self.assertTrue(fs.exists(dir_virtual_path)) - with self.assertRaises((IsADirectoryError, PermissionError)): - fs.rm_file(dir_virtual_path) - - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_rmdir", f"{_fileset_dir}/test_rmdir" - ), - ) + actual_path2 = fileset_storage_location + "/sub_dir" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path2, + ): + self.assertTrue(fs.exists(dir_virtual_path)) + with self.assertRaises((IsADirectoryError, PermissionError)): + fs.rm_file(dir_virtual_path) + def test_rmdir(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_rmdir" - local_fs.mkdir(fileset_storage_location) - fileset_virtual_location = "fileset/fileset_catalog/tmp/test_rmdir" - + actual_path = fileset_storage_location + local_fs = LocalFileSystem() + local_fs.mkdir(fileset_storage_location) sub_file_path = f"{fileset_storage_location}/test_file_1.par" local_fs.touch(sub_file_path) self.assertTrue(local_fs.exists(sub_file_path)) @@ -432,35 +471,44 @@ def test_rmdir(self, *mock_methods): self.assertTrue(local_fs.exists(sub_dir_path)) fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) - self.assertTrue(fs.exists(fileset_virtual_location)) + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(fileset_virtual_location)) # test delete file file_virtual_path = fileset_virtual_location + "/test_file_1.par" - self.assertTrue(fs.exists(file_virtual_path)) - with self.assertRaises(NotADirectoryError): - fs.rmdir(file_virtual_path) + actual_path1 = fileset_storage_location + "/test_file_1.par" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path1, + ): + self.assertTrue(fs.exists(file_virtual_path)) + with self.assertRaises(NotADirectoryError): + fs.rmdir(file_virtual_path) # test delete dir dir_virtual_path = fileset_virtual_location + "/sub_dir" - self.assertTrue(fs.exists(dir_virtual_path)) - fs.rmdir(dir_virtual_path) - self.assertFalse(fs.exists(dir_virtual_path)) - - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_open", f"{_fileset_dir}/test_open" - ), - ) + actual_path2 = fileset_storage_location + "/sub_dir" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path2, + ): + self.assertTrue(fs.exists(dir_virtual_path)) + fs.rmdir(dir_virtual_path) + self.assertFalse(fs.exists(dir_virtual_path)) + def test_open(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_open" - local_fs.mkdir(fileset_storage_location) - fileset_virtual_location = "fileset/fileset_catalog/tmp/test_open" - + actual_path = fileset_storage_location + local_fs = LocalFileSystem() + local_fs.mkdir(fileset_storage_location) sub_file_path = f"{fileset_storage_location}/test_file_1.par" local_fs.touch(sub_file_path) self.assertTrue(local_fs.exists(sub_file_path)) @@ -470,168 +518,198 @@ def test_open(self, *mock_methods): self.assertTrue(local_fs.exists(sub_dir_path)) fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) - self.assertTrue(fs.exists(fileset_virtual_location)) + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(fileset_virtual_location)) # test open and write file file_virtual_path = fileset_virtual_location + "/test_file_1.par" - self.assertTrue(fs.exists(file_virtual_path)) - with fs.open(file_virtual_path, mode="wb") as f: - f.write(b"test_open_write") - self.assertTrue(fs.info(file_virtual_path)["size"] > 0) - - # test open and read file - with fs.open(file_virtual_path, mode="rb") as f: - self.assertEqual(b"test_open_write", f.read()) + actual_path1 = fileset_storage_location + "/test_file_1.par" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path1, + ): + self.assertTrue(fs.exists(file_virtual_path)) + with fs.open(file_virtual_path, mode="wb") as f: + f.write(b"test_open_write") + self.assertTrue(fs.info(file_virtual_path)["size"] > 0) + + # test open and read file + with fs.open(file_virtual_path, mode="rb") as f: + self.assertEqual(b"test_open_write", f.read()) # test open dir dir_virtual_path = fileset_virtual_location + "/sub_dir" - self.assertTrue(fs.exists(dir_virtual_path)) - with self.assertRaises(IsADirectoryError): - fs.open(dir_virtual_path) - - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_mkdir", f"{_fileset_dir}/test_mkdir" - ), - ) + actual_path2 = fileset_storage_location + "/sub_dir" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path2, + ): + self.assertTrue(fs.exists(dir_virtual_path)) + with self.assertRaises(IsADirectoryError): + fs.open(dir_virtual_path) + def test_mkdir(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_mkdir" - local_fs.mkdir(fileset_storage_location) - fileset_virtual_location = "fileset/fileset_catalog/tmp/test_mkdir" - + actual_path = fileset_storage_location + local_fs = LocalFileSystem() + local_fs.mkdir(fileset_storage_location) sub_dir_path = f"{fileset_storage_location}/sub_dir" local_fs.mkdirs(sub_dir_path) self.assertTrue(local_fs.exists(sub_dir_path)) fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) - self.assertTrue(fs.exists(fileset_virtual_location)) + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(fileset_virtual_location)) - # test mkdir dir which exists - existed_dir_virtual_path = fileset_virtual_location - self.assertTrue(fs.exists(existed_dir_virtual_path)) - with self.assertRaises(FileExistsError): - fs.mkdir(existed_dir_virtual_path) + # test mkdir dir which exists + existed_dir_virtual_path = fileset_virtual_location + self.assertTrue(fs.exists(existed_dir_virtual_path)) + with self.assertRaises(FileExistsError): + fs.mkdir(existed_dir_virtual_path) # test mkdir dir with create_parents = false parent_not_exist_virtual_path = fileset_virtual_location + "/not_exist/sub_dir" - self.assertFalse(fs.exists(parent_not_exist_virtual_path)) - with self.assertRaises(FileNotFoundError): - fs.mkdir(parent_not_exist_virtual_path, create_parents=False) + actual_path1 = fileset_storage_location + "/not_exist/sub_dir" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path1, + ): + self.assertFalse(fs.exists(parent_not_exist_virtual_path)) + with self.assertRaises(FileNotFoundError): + fs.mkdir(parent_not_exist_virtual_path, create_parents=False) # test mkdir dir with create_parents = true parent_not_exist_virtual_path2 = fileset_virtual_location + "/not_exist/sub_dir" - self.assertFalse(fs.exists(parent_not_exist_virtual_path2)) - fs.mkdir(parent_not_exist_virtual_path2, create_parents=True) - self.assertTrue(fs.exists(parent_not_exist_virtual_path2)) - - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_makedirs", f"{_fileset_dir}/test_makedirs" - ), - ) + actual_path2 = fileset_storage_location + "/not_exist/sub_dir" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path2, + ): + self.assertFalse(fs.exists(parent_not_exist_virtual_path2)) + fs.mkdir(parent_not_exist_virtual_path2, create_parents=True) + self.assertTrue(fs.exists(parent_not_exist_virtual_path2)) + def test_makedirs(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_makedirs" - local_fs.mkdir(fileset_storage_location) - fileset_virtual_location = "fileset/fileset_catalog/tmp/test_makedirs" - + actual_path = fileset_storage_location + local_fs = LocalFileSystem() + local_fs.mkdir(fileset_storage_location) sub_dir_path = f"{fileset_storage_location}/sub_dir" local_fs.mkdirs(sub_dir_path) self.assertTrue(local_fs.exists(sub_dir_path)) fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) - self.assertTrue(fs.exists(fileset_virtual_location)) + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(fileset_virtual_location)) - # test mkdir dir which exists - existed_dir_virtual_path = fileset_virtual_location - self.assertTrue(fs.exists(existed_dir_virtual_path)) - with self.assertRaises(FileExistsError): - fs.mkdirs(existed_dir_virtual_path) + # test mkdir dir which exists + existed_dir_virtual_path = fileset_virtual_location + self.assertTrue(fs.exists(existed_dir_virtual_path)) + with self.assertRaises(FileExistsError): + fs.mkdirs(existed_dir_virtual_path) # test mkdir dir not exist parent_not_exist_virtual_path = fileset_virtual_location + "/not_exist/sub_dir" - self.assertFalse(fs.exists(parent_not_exist_virtual_path)) - fs.makedirs(parent_not_exist_virtual_path) - self.assertTrue(fs.exists(parent_not_exist_virtual_path)) - - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_created", f"{_fileset_dir}/test_created" - ), - ) + actual_path1 = fileset_storage_location + "/not_exist/sub_dir" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path1, + ): + self.assertFalse(fs.exists(parent_not_exist_virtual_path)) + fs.makedirs(parent_not_exist_virtual_path) + self.assertTrue(fs.exists(parent_not_exist_virtual_path)) + def test_created(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_created" - local_fs.mkdir(fileset_storage_location) - fileset_virtual_location = "fileset/fileset_catalog/tmp/test_created" - + actual_path = fileset_storage_location + local_fs = LocalFileSystem() + local_fs.mkdir(fileset_storage_location) sub_dir_path = f"{fileset_storage_location}/sub_dir" local_fs.mkdirs(sub_dir_path) self.assertTrue(local_fs.exists(sub_dir_path)) fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) - self.assertTrue(fs.exists(fileset_virtual_location)) + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(fileset_virtual_location)) # test mkdir dir which exists dir_virtual_path = fileset_virtual_location + "/sub_dir" - self.assertTrue(fs.exists(dir_virtual_path)) - self.assertIsNotNone(fs.created(dir_virtual_path)) - - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_modified", f"{_fileset_dir}/test_modified" - ), - ) + actual_path1 = fileset_storage_location + "/sub_dir" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path1, + ): + self.assertTrue(fs.exists(dir_virtual_path)) + self.assertIsNotNone(fs.created(dir_virtual_path)) + def test_modified(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_modified" - local_fs.mkdir(fileset_storage_location) - fileset_virtual_location = "fileset/fileset_catalog/tmp/test_modified" - + actual_path = fileset_storage_location + local_fs = LocalFileSystem() + local_fs.mkdir(fileset_storage_location) sub_dir_path = f"{fileset_storage_location}/sub_dir" local_fs.mkdirs(sub_dir_path) self.assertTrue(local_fs.exists(sub_dir_path)) fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) - self.assertTrue(fs.exists(fileset_virtual_location)) + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(fileset_virtual_location)) # test mkdir dir which exists dir_virtual_path = fileset_virtual_location + "/sub_dir" - self.assertTrue(fs.exists(dir_virtual_path)) - self.assertIsNotNone(fs.modified(dir_virtual_path)) - - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_cat_file", f"{_fileset_dir}/test_cat_file" - ), - ) + actual_path1 = fileset_storage_location + "/sub_dir" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path1, + ): + self.assertTrue(fs.exists(dir_virtual_path)) + self.assertIsNotNone(fs.modified(dir_virtual_path)) + def test_cat_file(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_cat_file" - local_fs.mkdir(fileset_storage_location) - fileset_virtual_location = "fileset/fileset_catalog/tmp/test_cat_file" - + actual_path = fileset_storage_location + local_fs = LocalFileSystem() + local_fs.mkdir(fileset_storage_location) sub_file_path = f"{fileset_storage_location}/test_file_1.par" local_fs.touch(sub_file_path) self.assertTrue(local_fs.exists(sub_file_path)) @@ -641,40 +719,49 @@ def test_cat_file(self, *mock_methods): self.assertTrue(local_fs.exists(sub_dir_path)) fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) - self.assertTrue(fs.exists(fileset_virtual_location)) + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(fileset_virtual_location)) # test open and write file file_virtual_path = fileset_virtual_location + "/test_file_1.par" - self.assertTrue(fs.exists(file_virtual_path)) - with fs.open(file_virtual_path, mode="wb") as f: - f.write(b"test_cat_file") - self.assertTrue(fs.info(file_virtual_path)["size"] > 0) - - # test cat file - content = fs.cat_file(file_virtual_path) - self.assertEqual(b"test_cat_file", content) + actual_path1 = fileset_storage_location + "/test_file_1.par" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path1, + ): + self.assertTrue(fs.exists(file_virtual_path)) + with fs.open(file_virtual_path, mode="wb") as f: + f.write(b"test_cat_file") + self.assertTrue(fs.info(file_virtual_path)["size"] > 0) + + # test cat file + content = fs.cat_file(file_virtual_path) + self.assertEqual(b"test_cat_file", content) # test cat dir dir_virtual_path = fileset_virtual_location + "/sub_dir" - self.assertTrue(fs.exists(dir_virtual_path)) - with self.assertRaises(IsADirectoryError): - fs.cat_file(dir_virtual_path) - - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_get_file", f"{_fileset_dir}/test_get_file" - ), - ) + actual_path2 = fileset_storage_location + "/sub_dir" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path2, + ): + self.assertTrue(fs.exists(dir_virtual_path)) + with self.assertRaises(IsADirectoryError): + fs.cat_file(dir_virtual_path) + def test_get_file(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_get_file" - local_fs.mkdir(fileset_storage_location) - fileset_virtual_location = "fileset/fileset_catalog/tmp/test_get_file" - + actual_path = fileset_storage_location + local_fs = LocalFileSystem() + local_fs.mkdir(fileset_storage_location) sub_file_path = f"{fileset_storage_location}/test_file_1.par" local_fs.touch(sub_file_path) self.assertTrue(local_fs.exists(sub_file_path)) @@ -684,30 +771,46 @@ def test_get_file(self, *mock_methods): self.assertTrue(local_fs.exists(sub_dir_path)) fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) - self.assertTrue(fs.exists(fileset_virtual_location)) + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(fileset_virtual_location)) # test open and write file file_virtual_path = fileset_virtual_location + "/test_file_1.par" - self.assertTrue(fs.exists(file_virtual_path)) - with fs.open(file_virtual_path, mode="wb") as f: - f.write(b"test_get_file") - self.assertTrue(fs.info(file_virtual_path)["size"] > 0) - - # test get file - local_path = self._fileset_dir + "/local_file.par" - local_fs.touch(local_path) - self.assertTrue(local_fs.exists(local_path)) - fs.get_file(file_virtual_path, local_path) - self.assertEqual(b"test_get_file", local_fs.cat_file(local_path)) + actual_path1 = fileset_storage_location + "/test_file_1.par" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path1, + ): + self.assertTrue(fs.exists(file_virtual_path)) + with fs.open(file_virtual_path, mode="wb") as f: + f.write(b"test_get_file") + self.assertTrue(fs.info(file_virtual_path)["size"] > 0) + + # test get file + local_path = self._fileset_dir + "/local_file.par" + local_fs.touch(local_path) + self.assertTrue(local_fs.exists(local_path)) + fs.get_file(file_virtual_path, local_path) + self.assertEqual(b"test_get_file", local_fs.cat_file(local_path)) # test get a dir dir_virtual_path = fileset_virtual_location + "/sub_dir" - local_path = self._fileset_dir + "/local_dir" - self.assertTrue(fs.exists(dir_virtual_path)) - fs.get_file(dir_virtual_path, local_path) - self.assertTrue(local_fs.exists(local_path)) + actual_path2 = fileset_storage_location + "/sub_dir" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path2, + ): + local_path = self._fileset_dir + "/local_dir" + self.assertTrue(fs.exists(dir_virtual_path)) + fs.get_file(dir_virtual_path, local_path) + self.assertTrue(local_fs.exists(local_path)) # test get a file to a remote file remote_path = "gvfs://" + fileset_virtual_location + "/test_file_2.par" @@ -715,100 +818,65 @@ def test_get_file(self, *mock_methods): fs.get_file(file_virtual_path, remote_path) def test_convert_actual_path(self, *mock_methods): - # test convert actual hdfs path - audit_dto = AuditDTO( - _creator="test", - _create_time="2022-01-01T00:00:00Z", - _last_modifier="test", - _last_modified_time="2024-04-05T10:10:35.218Z", - ) - hdfs_fileset: FilesetDTO = FilesetDTO( - _name="test_f1", - _comment="", - _type=FilesetDTO.Type.MANAGED, - _storage_location="hdfs://localhost:8090/fileset/test_f1", - _audit=audit_dto, - _properties={}, - ) - mock_hdfs_context: FilesetContext = FilesetContext( - name_identifier=NameIdentifier.of( - "test_metalake", "test_catalog", "test_schema", "test_f1" - ), - storage_type=StorageType.HDFS, - fileset=hdfs_fileset, - actual_path=hdfs_fileset.storage_location() + "/actual_path", - fs=LocalFileSystem(), - ) - fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, + ) + storage_location = "hdfs://localhost:8090/fileset/test_f1" + virtual_location = fs._get_virtual_location( + NameIdentifier.of("test_metalake", "test_catalog", "test_schema", "test_f1") ) # test actual path not start with storage location actual_path = "/not_start_with_storage/ttt" with self.assertRaises(GravitinoRuntimeException): - fs._convert_actual_path(actual_path, mock_hdfs_context) + fs._convert_actual_path(actual_path, storage_location, virtual_location) # test actual path start with storage location actual_path = "/fileset/test_f1/actual_path" - virtual_path = fs._convert_actual_path(actual_path, mock_hdfs_context) + virtual_path = fs._convert_actual_path( + actual_path, storage_location, virtual_location + ) self.assertEqual( "fileset/test_catalog/test_schema/test_f1/actual_path", virtual_path ) # test convert actual local path - audit_dto = AuditDTO( - _creator="test", - _create_time="2022-01-01T00:00:00Z", - _last_modifier="test", - _last_modified_time="2024-04-05T10:10:35.218Z", - ) - local_fileset: FilesetDTO = FilesetDTO( - _name="test_f1", - _comment="", - _type=FilesetDTO.Type.MANAGED, - _storage_location="file:/tmp/fileset/test_f1", - _audit=audit_dto, - _properties={}, - ) - mock_local_context: FilesetContext = FilesetContext( - name_identifier=NameIdentifier.of( - "test_metalake", "test_catalog", "test_schema", "test_f1" - ), - storage_type=StorageType.LOCAL, - fileset=local_fileset, - actual_path=local_fileset.storage_location() + "/actual_path", - fs=LocalFileSystem(), - ) - fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, + ) + storage_location = "file:/tmp/fileset/test_f1" + virtual_location = fs._get_virtual_location( + NameIdentifier.of("test_metalake", "test_catalog", "test_schema", "test_f1") ) # test actual path not start with storage location actual_path = "/not_start_with_storage/ttt" with self.assertRaises(GravitinoRuntimeException): - fs._convert_actual_path(actual_path, mock_local_context) + fs._convert_actual_path(actual_path, storage_location, virtual_location) # test actual path start with storage location actual_path = "/tmp/fileset/test_f1/actual_path" - virtual_path = fs._convert_actual_path(actual_path, mock_local_context) + virtual_path = fs._convert_actual_path( + actual_path, storage_location, virtual_location + ) self.assertEqual( "fileset/test_catalog/test_schema/test_f1/actual_path", virtual_path ) # test storage location without "/" actual_path = "/tmp/test_convert_actual_path/sub_dir/1.parquet" - storage_location1 = "file:/tmp/test_convert_actual_path" - mock_fileset1: Fileset = mock.Mock(spec=Fileset) - mock_fileset1.storage_location.return_value = storage_location1 - - mock_fileset_context1: FilesetContext = mock.Mock(spec=FilesetContext) - mock_fileset_context1.get_storage_type.return_value = StorageType.LOCAL - mock_fileset_context1.get_name_identifier.return_value = NameIdentifier.of( - "test_metalake", "catalog", "schema", "test_convert_actual_path" + storage_location = "file:/tmp/test_convert_actual_path" + virtual_location = fs._get_virtual_location( + NameIdentifier.of( + "test_metalake", "catalog", "schema", "test_convert_actual_path" + ) ) - mock_fileset_context1.get_fileset.return_value = mock_fileset1 - virtual_path = fs._convert_actual_path(actual_path, mock_fileset_context1) + virtual_path = fs._convert_actual_path( + actual_path, storage_location, virtual_location + ) self.assertEqual( "fileset/catalog/schema/test_convert_actual_path/sub_dir/1.parquet", virtual_path, @@ -816,107 +884,90 @@ def test_convert_actual_path(self, *mock_methods): # test storage location with "/" actual_path = "/tmp/test_convert_actual_path/sub_dir/1.parquet" - storage_location2 = "file:/tmp/test_convert_actual_path/" - mock_fileset2: Fileset = mock.Mock(spec=Fileset) - mock_fileset2.storage_location.return_value = storage_location2 - - mock_fileset_context2: FilesetContext = mock.Mock(spec=FilesetContext) - mock_fileset_context2.get_storage_type.return_value = StorageType.LOCAL - mock_fileset_context2.get_name_identifier.return_value = NameIdentifier.of( - "test_metalake", "catalog", "schema", "test_convert_actual_path" + storage_location = "file:/tmp/test_convert_actual_path/" + virtual_location = fs._get_virtual_location( + NameIdentifier.of( + "test_metalake", "catalog", "schema", "test_convert_actual_path" + ) ) - mock_fileset_context2.get_fileset.return_value = mock_fileset2 - virtual_path = fs._convert_actual_path(actual_path, mock_fileset_context2) + virtual_path = fs._convert_actual_path( + actual_path, storage_location, virtual_location + ) self.assertEqual( "fileset/catalog/schema/test_convert_actual_path/sub_dir/1.parquet", virtual_path, ) - def test_convert_info(self, *mock_methods3): - # test convert actual hdfs path - audit_dto = AuditDTO( - _creator="test", - _create_time="2022-01-01T00:00:00Z", - _last_modifier="test", - _last_modified_time="2024-04-05T10:10:35.218Z", - ) - hdfs_fileset: FilesetDTO = FilesetDTO( - _name="test_f1", - _comment="", - _type=FilesetDTO.Type.MANAGED, - _storage_location="hdfs://localhost:8090/fileset/test_f1", - _audit=audit_dto, - _properties={}, - ) - mock_hdfs_context: FilesetContext = FilesetContext( - name_identifier=NameIdentifier.of( - "test_metalake", "test_catalog", "test_schema", "test_f1" - ), - storage_type=StorageType.HDFS, - fileset=hdfs_fileset, - actual_path=hdfs_fileset.storage_location() + "/actual_path", - fs=LocalFileSystem(), - ) - + def test_convert_info(self, *mock_methods): fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) # test actual path not start with storage location - actual_path = "/not_start_with_storage/ttt" + entry = { + "name": "/not_start_with_storage/ttt", + "size": 1, + "type": "file", + "mtime": datetime.now(), + } + storage_location = "hdfs://localhost:8090/fileset/test_f1" + virtual_location = fs._get_virtual_location( + NameIdentifier.of("test_metalake", "test_catalog", "test_schema", "test_f1") + ) with self.assertRaises(GravitinoRuntimeException): - fs._convert_actual_path(actual_path, mock_hdfs_context) + fs._convert_actual_info(entry, storage_location, virtual_location) # test actual path start with storage location - actual_path = "/fileset/test_f1/actual_path" - virtual_path = fs._convert_actual_path(actual_path, mock_hdfs_context) + entry = { + "name": "/fileset/test_f1/actual_path", + "size": 1, + "type": "file", + "mtime": datetime.now(), + } + info = fs._convert_actual_info(entry, storage_location, virtual_location) self.assertEqual( - "fileset/test_catalog/test_schema/test_f1/actual_path", virtual_path + "fileset/test_catalog/test_schema/test_f1/actual_path", info["name"] ) # test convert actual local path - audit_dto = AuditDTO( - _creator="test", - _create_time="2022-01-01T00:00:00Z", - _last_modifier="test", - _last_modified_time="2024-04-05T10:10:35.218Z", - ) - local_fileset: FilesetDTO = FilesetDTO( - _name="test_f1", - _comment="", - _type=FilesetDTO.Type.MANAGED, - _storage_location="file:/tmp/fileset/test_f1", - _audit=audit_dto, - _properties={}, - ) - mock_local_context: FilesetContext = FilesetContext( - name_identifier=NameIdentifier.of( - "test_metalake", "test_catalog", "test_schema", "test_f1" - ), - storage_type=StorageType.LOCAL, - fileset=local_fileset, - actual_path=local_fileset.storage_location() + "/actual_path", - fs=LocalFileSystem(), - ) - fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) # test actual path not start with storage location - actual_path = "/not_start_with_storage/ttt" + entry = { + "name": "/not_start_with_storage/ttt", + "size": 1, + "type": "file", + "mtime": datetime.now(), + } + storage_location = "file:/tmp/fileset/test_f1" + virtual_location = fs._get_virtual_location( + NameIdentifier.of("test_metalake", "test_catalog", "test_schema", "test_f1") + ) with self.assertRaises(GravitinoRuntimeException): - fs._convert_actual_path(actual_path, mock_local_context) + fs._convert_actual_info(entry, storage_location, virtual_location) # test actual path start with storage location - actual_path = "/tmp/fileset/test_f1/actual_path" - virtual_path = fs._convert_actual_path(actual_path, mock_local_context) + entry = { + "name": "/tmp/fileset/test_f1/actual_path", + "size": 1, + "type": "file", + "mtime": datetime.now(), + } + info = fs._convert_actual_info(entry, storage_location, virtual_location) self.assertEqual( - "fileset/test_catalog/test_schema/test_f1/actual_path", virtual_path + "fileset/test_catalog/test_schema/test_f1/actual_path", info["name"] ) def test_extract_identifier(self, *mock_methods): fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) with self.assertRaises(GravitinoRuntimeException): fs._extract_identifier(path=None) @@ -932,152 +983,93 @@ def test_extract_identifier(self, *mock_methods): self.assertEqual("schema", identifier.namespace().level(2)) self.assertEqual("fileset", identifier.name()) - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_pandas", f"{_fileset_dir}/test_pandas" - ), - ) def test_pandas(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_pandas" + fileset_virtual_location = "gvfs://fileset/fileset_catalog/tmp/test_pandas" + local_fs = LocalFileSystem() local_fs.mkdir(fileset_storage_location) - fileset_virtual_location = "gvfs://fileset/fileset_catalog/tmp/test_pandas" data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 19, 18]}) fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:8090", metalake_name="test_metalake" - ) - # to parquet - data.to_parquet(fileset_virtual_location + "/test.parquet", filesystem=fs) - self.assertTrue(local_fs.exists(fileset_storage_location + "/test.parquet")) - - # read parquet - ds1 = pandas.read_parquet( - path=fileset_virtual_location + "/test.parquet", filesystem=fs - ) - self.assertTrue(data.equals(ds1)) + server_uri="http://localhost:8090", + metalake_name="test_metalake", + skip_instance_cache=True, + ) + actual_path = fileset_storage_location + "/test.parquet" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + # to parquet + data.to_parquet(fileset_virtual_location + "/test.parquet", filesystem=fs) + self.assertTrue(local_fs.exists(fileset_storage_location + "/test.parquet")) + + # read parquet + ds1 = pandas.read_parquet( + path=fileset_virtual_location + "/test.parquet", filesystem=fs + ) + self.assertTrue(data.equals(ds1)) storage_options = { "server_uri": "http://localhost:8090", "metalake_name": "test_metalake", } - # to csv - data.to_csv( - fileset_virtual_location + "/test.csv", - index=False, - storage_options=storage_options, - ) - self.assertTrue(local_fs.exists(fileset_storage_location + "/test.csv")) - # read csv - ds2 = pandas.read_csv( - fileset_virtual_location + "/test.csv", storage_options=storage_options - ) - self.assertTrue(data.equals(ds2)) + actual_path1 = fileset_storage_location + actual_path2 = fileset_storage_location + "/test.csv" + + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + side_effect=[actual_path1, actual_path2, actual_path2], + ): + # to csv + data.to_csv( + fileset_virtual_location + "/test.csv", + index=False, + storage_options=storage_options, + ) + self.assertTrue(local_fs.exists(fileset_storage_location + "/test.csv")) + + # read csv + ds2 = pandas.read_csv( + fileset_virtual_location + "/test.csv", storage_options=storage_options + ) + self.assertTrue(data.equals(ds2)) - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_pyarrow", f"{_fileset_dir}/test_pyarrow" - ), - ) def test_pyarrow(self, *mock_methods): - local_fs = LocalFileSystem() fileset_storage_location = f"{self._fileset_dir}/test_pyarrow" - local_fs.mkdir(fileset_storage_location) - fileset_virtual_location = "gvfs://fileset/fileset_catalog/tmp/test_pyarrow" - data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 19, 18]}) - fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:8090", metalake_name="test_metalake" - ) - - # to parquet - data.to_parquet(fileset_virtual_location + "/test.parquet", filesystem=fs) - self.assertTrue(local_fs.exists(fileset_storage_location + "/test.parquet")) - - # read as arrow dataset - arrow_dataset = dt.dataset( - fileset_virtual_location + "/test.parquet", filesystem=fs - ) - arrow_tb_1 = arrow_dataset.to_table() - - arrow_tb_2 = pa.Table.from_pandas(data) - self.assertTrue(arrow_tb_1.equals(arrow_tb_2)) - - # read as arrow parquet dataset - arrow_tb_3 = pq.read_table( - fileset_virtual_location + "/test.parquet", filesystem=fs - ) - self.assertTrue(arrow_tb_3.equals(arrow_tb_2)) - - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_llama_index", f"{_fileset_dir}/test_llama_index" - ), - ) - def test_llama_index(self, *mock_methods): local_fs = LocalFileSystem() - fileset_storage_location = f"{self._fileset_dir}/test_llama_index" local_fs.mkdir(fileset_storage_location) - - fileset_virtual_location = "gvfs://fileset/fileset_catalog/tmp/test_llama_index" data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 19, 18]}) fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:8090", metalake_name="test_metalake" - ) - - storage_options = { - "server_uri": "http://localhost:8090", - "metalake_name": "test_metalake", - } - # to csv - data.to_csv( - fileset_virtual_location + "/test.csv", - index=False, - storage_options=storage_options, - ) - self.assertTrue(local_fs.exists(fileset_storage_location + "/test.csv")) + server_uri="http://localhost:8090", + metalake_name="test_metalake", + skip_instance_cache=True, + ) + actual_path = fileset_storage_location + "/test.parquet" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + # to parquet + data.to_parquet(fileset_virtual_location + "/test.parquet", filesystem=fs) + self.assertTrue(local_fs.exists(fileset_storage_location + "/test.parquet")) + + # read as arrow dataset + arrow_dataset = dt.dataset( + fileset_virtual_location + "/test.parquet", filesystem=fs + ) + arrow_tb_1 = arrow_dataset.to_table() + arrow_tb_2 = pa.Table.from_pandas(data) + self.assertTrue(arrow_tb_1.equals(arrow_tb_2)) - data.to_csv( - fileset_virtual_location + "/sub_dir/test1.csv", - index=False, - storage_options=storage_options, - ) - self.assertTrue( - local_fs.exists(fileset_storage_location + "/sub_dir/test1.csv") - ) + # read as arrow parquet dataset + arrow_tb_3 = pq.read_table( + fileset_virtual_location + "/test.parquet", filesystem=fs + ) + self.assertTrue(arrow_tb_3.equals(arrow_tb_2)) - reader = SimpleDirectoryReader( - input_dir="fileset/fileset_catalog/tmp/test_llama_index", - fs=fs, - recursive=True, # recursively searches all subdirectories - ) - documents = reader.load_data() - self.assertEqual(len(documents), 2) - doc_1 = documents[0] - result_1 = [line.strip().split(", ") for line in doc_1.text.split("\n")] - self.assertEqual(4, len(result_1)) - for row in result_1: - if row[0] == "A": - self.assertEqual(row[1], "20") - elif row[0] == "B": - self.assertEqual(row[1], "21") - elif row[0] == "C": - self.assertEqual(row[1], "19") - elif row[0] == "D": - self.assertEqual(row[1], "18") - - @patch( - "gravitino.catalog.fileset_catalog.FilesetCatalog.load_fileset", - return_value=mock_base.mock_load_fileset( - "test_location_with_tailing_slash", - f"{_fileset_dir}/test_location_with_tailing_slash/", - ), - ) def test_location_with_tailing_slash(self, *mock_methods): - local_fs = LocalFileSystem() # storage location is ending with a "/" fileset_storage_location = ( f"{self._fileset_dir}/test_location_with_tailing_slash/" @@ -1085,6 +1077,7 @@ def test_location_with_tailing_slash(self, *mock_methods): fileset_virtual_location = ( "fileset/fileset_catalog/tmp/test_location_with_tailing_slash" ) + local_fs = LocalFileSystem() local_fs.mkdir(fileset_storage_location) sub_dir_path = f"{fileset_storage_location}test_1" local_fs.mkdir(sub_dir_path) @@ -1093,82 +1086,45 @@ def test_location_with_tailing_slash(self, *mock_methods): local_fs.touch(sub_file_path) self.assertTrue(local_fs.exists(sub_file_path)) + actual_path = fileset_storage_location fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" + server_uri="http://localhost:9090", + metalake_name="metalake_demo", + skip_instance_cache=True, ) - self.assertTrue(fs.exists(fileset_virtual_location)) + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + self.assertTrue(fs.exists(fileset_virtual_location)) dir_virtual_path = fileset_virtual_location + "/test_1" - dir_info = fs.info(dir_virtual_path) - self.assertEqual(dir_info["name"], dir_virtual_path) + actual_path1 = fileset_storage_location + "test_1" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path1, + ): + dir_info = fs.info(dir_virtual_path) + self.assertEqual(dir_info["name"], dir_virtual_path) file_virtual_path = fileset_virtual_location + "/test_1/test_file_1.par" - file_info = fs.info(file_virtual_path) - self.assertEqual(file_info["name"], file_virtual_path) - - file_status = fs.ls(fileset_virtual_location, detail=True) - for status in file_status: - if status["name"].endswith("test_1"): - self.assertEqual(status["name"], dir_virtual_path) - elif status["name"].endswith("test_file_1.par"): - self.assertEqual(status["name"], file_virtual_path) - else: - raise GravitinoRuntimeException("Unexpected file found") - - def test_get_actual_path_by_ident(self, *mock_methods): - ident1 = NameIdentifier.of( - "test_metalake", "catalog", "schema", "test_get_actual_path_by_ident" - ) - storage_type = gvfs.StorageType.LOCAL - local_fs = LocalFileSystem() - - fs = gvfs.GravitinoVirtualFileSystem( - server_uri="http://localhost:9090", metalake_name="metalake_demo" - ) - - # test storage location end with "/" - storage_location_1 = f"{self._fileset_dir}/test_get_actual_path_by_ident/" - # virtual path end with "/" - virtual_path1 = "fileset/catalog/schema/test_get_actual_path_by_ident/" - local_fs.mkdir(storage_location_1) - self.assertTrue(local_fs.exists(storage_location_1)) - - mock_fileset1: Fileset = mock.Mock(spec=Fileset) - mock_fileset1.storage_location.return_value = storage_location_1 - - actual_path1 = fs._get_actual_path_by_ident( - ident1, mock_fileset1, local_fs, storage_type, virtual_path1 - ) - self.assertEqual(actual_path1, storage_location_1) - - # virtual path end without "/" - virtual_path2 = "fileset/catalog/schema/test_get_actual_path_by_ident" - actual_path2 = fs._get_actual_path_by_ident( - ident1, mock_fileset1, local_fs, storage_type, virtual_path2 - ) - self.assertEqual(actual_path2, storage_location_1) - - # test storage location end without "/" - ident2 = NameIdentifier.of( - "test_metalake", "catalog", "schema", "test_without_slash" - ) - storage_location_2 = f"{self._fileset_dir}/test_without_slash" - # virtual path end with "/" - virtual_path3 = "fileset/catalog/schema/test_without_slash/" - local_fs.mkdir(storage_location_2) - self.assertTrue(local_fs.exists(storage_location_2)) - - mock_fileset2: Fileset = mock.Mock(spec=Fileset) - mock_fileset2.storage_location.return_value = storage_location_2 - - actual_path3 = fs._get_actual_path_by_ident( - ident2, mock_fileset2, local_fs, storage_type, virtual_path3 - ) - self.assertEqual(actual_path3, f"{storage_location_2}/") - - # virtual path end without "/" - virtual_path4 = "fileset/catalog/schema/test_without_slash" - actual_path4 = fs._get_actual_path_by_ident( - ident2, mock_fileset2, local_fs, storage_type, virtual_path4 - ) - self.assertEqual(actual_path4, storage_location_2) + actual_path2 = fileset_storage_location + "test_1/test_file_1.par" + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path2, + ): + file_info = fs.info(file_virtual_path) + self.assertEqual(file_info["name"], file_virtual_path) + + with patch( + "gravitino.catalog.fileset_catalog.FilesetCatalog.get_file_location", + return_value=actual_path, + ): + file_status = fs.ls(fileset_virtual_location, detail=True) + for status in file_status: + if status["name"].endswith("test_1"): + self.assertEqual(status["name"], dir_virtual_path) + elif status["name"].endswith("test_file_1.par"): + self.assertEqual(status["name"], file_virtual_path) + else: + raise GravitinoRuntimeException("Unexpected file found") diff --git a/common/src/main/java/org/apache/gravitino/audit/FilesetDataOperation.java b/common/src/main/java/org/apache/gravitino/audit/FilesetDataOperation.java index b76d1f91b3..88ac4d11b0 100644 --- a/common/src/main/java/org/apache/gravitino/audit/FilesetDataOperation.java +++ b/common/src/main/java/org/apache/gravitino/audit/FilesetDataOperation.java @@ -25,6 +25,10 @@ public enum FilesetDataOperation { CREATE, /** Opens a file. */ OPEN, + /** Opens a file and writes to it. */ + OPEN_AND_WRITE, + /** Opens a file and appends to it. */ + OPEN_AND_APPEND, /** Appends some content into a file. */ APPEND, /** Renames a file or a directory. */