From f53b1b16e29d2b60910002e6e0008bfdc248d1cb Mon Sep 17 00:00:00 2001 From: dk Date: Mon, 16 Sep 2024 13:05:26 +0700 Subject: [PATCH 01/19] [clean] remove kv and sqlite document stores --- packages/syft/src/syft/server/server.py | 25 - .../syft/service/settings/settings_service.py | 3 +- .../syft/src/syft/store/kv_document_store.py | 759 ------------------ .../src/syft/store/sqlite_document_store.py | 493 ------------ .../syft/stores/kv_document_store_test.py | 313 -------- .../tests/syft/stores/store_mocks_test.py | 82 +- 6 files changed, 40 insertions(+), 1635 deletions(-) delete mode 100644 packages/syft/src/syft/store/kv_document_store.py delete mode 100644 packages/syft/src/syft/store/sqlite_document_store.py delete mode 100644 packages/syft/tests/syft/stores/kv_document_store_test.py diff --git a/packages/syft/src/syft/server/server.py b/packages/syft/src/syft/server/server.py index 438997b1ad2..c0bd8ac00b6 100644 --- a/packages/syft/src/syft/server/server.py +++ b/packages/syft/src/syft/server/server.py @@ -94,12 +94,9 @@ from ..store.db.sqlite import SQLiteDBConfig from ..store.db.sqlite import SQLiteDBManager from ..store.db.stash import ObjectStash -from ..store.document_store import StoreConfig from ..store.document_store_errors import NotFoundException from ..store.document_store_errors import StashException from ..store.linked_obj import LinkedObject -from ..store.sqlite_document_store import SQLiteStoreClientConfig -from ..store.sqlite_document_store import SQLiteStoreConfig from ..types.datetime import DATETIME_FORMAT from ..types.errors import SyftException from ..types.result import Result @@ -307,8 +304,6 @@ def __init__( name: str | None = None, id: UID | None = None, signing_key: SyftSigningKey | SigningKey | None = None, - action_store_config: StoreConfig | None = None, - document_store_config: StoreConfig | None = None, db_config: DBConfig | None = None, root_email: str | None = default_root_email, root_username: str | None = default_root_username, @@ -401,12 +396,6 @@ def __init__( if reset: self.remove_temp_dir() - document_store_config = document_store_config or self.get_default_store( - store_type="Document Store", - ) - action_store_config = action_store_config or self.get_default_store( - store_type="Action Store", - ) db_config = DBConfig.from_connection_string(db_url) if db_url else db_config if db_config is None: @@ -510,20 +499,6 @@ def runs_in_docker(self) -> bool: and any("docker" in line for line in open(path)) ) - def get_default_store(self, store_type: str) -> StoreConfig: - path = self.get_temp_dir("db") - file_name: str = f"{self.id}.sqlite" - # if self.dev_mode: - # leave this until the logger shows this in the notebook - # print(f"{store_type}'s SQLite DB path: {path/file_name}") - # logger.debug(f"{store_type}'s SQLite DB path: {path/file_name}") - return SQLiteStoreConfig( - client_config=SQLiteStoreClientConfig( - filename=file_name, - path=path, - ) - ) - def init_blob_storage(self, config: BlobStorageConfig | None = None) -> None: if config is None: client_config = OnDiskBlobStorageClientConfig( diff --git a/packages/syft/src/syft/service/settings/settings_service.py b/packages/syft/src/syft/service/settings/settings_service.py index 10890350e2d..7cb835b79fe 100644 --- a/packages/syft/src/syft/service/settings/settings_service.py +++ b/packages/syft/src/syft/service/settings/settings_service.py @@ -11,7 +11,6 @@ from ...store.db.db import DBManager from ...store.document_store_errors import NotFoundException from ...store.document_store_errors import StashException -from ...store.sqlite_document_store import SQLiteStoreConfig from ...types.errors import SyftException from ...types.result import as_result from ...types.syft_metaclass import Empty @@ -415,7 +414,7 @@ def get_server_config( "server_side_type": server.server_side_type, # "port": server.port, "processes": server.processes, - "local_db": isinstance(server.document_store_config, SQLiteStoreConfig), + # "local_db": isinstance(server.document_store_config, SQLiteStoreConfig), "dev_mode": server.dev_mode, "reset": True, # we should be able to get all the objects from migration data "tail": False, diff --git a/packages/syft/src/syft/store/kv_document_store.py b/packages/syft/src/syft/store/kv_document_store.py deleted file mode 100644 index 77bfbbb3297..00000000000 --- a/packages/syft/src/syft/store/kv_document_store.py +++ /dev/null @@ -1,759 +0,0 @@ -# future -from __future__ import annotations - -# stdlib -from collections import defaultdict -from enum import Enum -from typing import Any -from typing import cast - -# third party -from typing_extensions import Self - -# relative -from ..serde.serializable import serializable -from ..server.credentials import SyftVerifyKey -from ..service.action.action_permissions import ActionObjectEXECUTE -from ..service.action.action_permissions import ActionObjectOWNER -from ..service.action.action_permissions import ActionObjectPermission -from ..service.action.action_permissions import ActionObjectREAD -from ..service.action.action_permissions import ActionObjectWRITE -from ..service.action.action_permissions import ActionPermission -from ..service.action.action_permissions import StoragePermission -from ..service.context import AuthedServiceContext -from ..service.response import SyftSuccess -from ..types.errors import SyftException -from ..types.result import as_result -from ..types.syft_object import SyftObject -from ..types.uid import UID -from .document_store import NewBaseStash -from .document_store import PartitionKey -from .document_store import PartitionKeys -from .document_store import QueryKey -from .document_store import QueryKeys -from .document_store import StorePartition - - -@serializable(canonical_name="UniqueKeyCheck", version=1) -class UniqueKeyCheck(Enum): - EMPTY = 0 - MATCHES = 1 - ERROR = 2 - - -class KeyValueBackingStore: - """Key-Value store core logic.""" - - def __setitem__(self, key: Any, value: Any) -> None: - raise NotImplementedError - - def __getitem__(self, key: Any) -> Self: - raise NotImplementedError - - def __repr__(self) -> str: - raise NotImplementedError - - def __len__(self) -> int: - raise NotImplementedError - - def __delitem__(self, key: str) -> None: - raise NotImplementedError - - def clear(self) -> None: - raise NotImplementedError - - def copy(self) -> Self: - raise NotImplementedError - - def update(self, *args: Any, **kwargs: Any) -> None: - raise NotImplementedError - - def keys(self) -> Any: - raise NotImplementedError - - def values(self) -> Any: - raise NotImplementedError - - def items(self) -> Any: - raise NotImplementedError - - def pop(self, *args: Any) -> Self: - raise NotImplementedError - - def __contains__(self, item: Any) -> bool: - raise NotImplementedError - - def __iter__(self) -> Any: - raise NotImplementedError - - -class KeyValueStorePartition(StorePartition): - """Key-Value StorePartition - - Parameters: - `settings`: PartitionSettings - PySyft specific settings - `store_config`: StoreConfig - Backend specific configuration - """ - - @as_result(SyftException) - def init_store(self) -> bool: - super().init_store().unwrap() - - try: - self.data = self.store_config.backing_store( - "data", self.settings, self.store_config - ) - self.unique_keys = self.store_config.backing_store( - "unique_keys", self.settings, self.store_config - ) - self.searchable_keys = self.store_config.backing_store( - "searchable_keys", self.settings, self.store_config - ) - - # uid -> set['_permission'] - self.permissions: dict[UID, set[str]] = self.store_config.backing_store( - "permissions", self.settings, self.store_config, ddtype=set - ) - - # uid -> set[''] - self.storage_permissions: dict[UID, set[UID]] = ( - self.store_config.backing_store( - "storage_permissions", - self.settings, - self.store_config, - ddtype=set, - ) - ) - - for partition_key in self.unique_cks: - pk_key = partition_key.key - if pk_key not in self.unique_keys: - self.unique_keys[pk_key] = {} - - for partition_key in self.searchable_cks: - pk_key = partition_key.key - if pk_key not in self.searchable_keys: - self.searchable_keys[pk_key] = defaultdict(list) - except BaseException as e: - raise SyftException.from_exception(e) - - return True - - def __len__(self) -> int: - return len(self.data) - - @as_result(SyftException, KeyError) - def _get( - self, - uid: UID, - credentials: SyftVerifyKey, - has_permission: bool | None = False, - ) -> SyftObject: - # relative - from ..service.action.action_store import ActionObjectREAD - - # if you get something you need READ permission - read_permission = ActionObjectREAD(uid=uid, credentials=credentials) - - if self.has_permission(read_permission) or has_permission: - syft_object = self.data[uid] - return syft_object - raise SyftException(public_message=f"Permission: {read_permission} denied") - - # Potentially thread-unsafe methods. - # CAUTION: - # * Don't use self.lock here. - # * Do not call the public thread-safe methods here(with locking). - # These methods are called from the public thread-safe API, and will hang the process. - - @as_result(SyftException) - def _set( - self, - credentials: SyftVerifyKey, - obj: SyftObject, - add_permissions: list[ActionObjectPermission] | None = None, - add_storage_permission: bool = True, - ignore_duplicates: bool = False, - ) -> SyftObject: - store_query_key: QueryKey = self.settings.store_key.with_obj(obj) - uid = store_query_key.value - write_permission = ActionObjectWRITE(uid=uid, credentials=credentials) - unique_query_keys: QueryKeys = self.settings.unique_keys.with_obj(obj) - store_key_exists = store_query_key.value in self.data - searchable_query_keys = self.settings.searchable_keys.with_obj(obj) - - ck_check = self._check_partition_keys_unique( - unique_query_keys=unique_query_keys - ).unwrap() - - can_write = self.has_permission(write_permission) - - if not store_key_exists and ck_check == UniqueKeyCheck.EMPTY: - # attempt to claim it for writing - can_write = self.take_ownership(uid=uid, credentials=credentials).unwrap() - elif not ignore_duplicates: - keys = ", ".join(f"`{key.key}`" for key in unique_query_keys.all) - raise SyftException( - public_message=f"Duplication Key Error for {obj}.\n" - f"The fields that should be unique are {keys}." - ) - else: - # we are not throwing an error, because we are ignoring duplicates - # we are also not writing though - return obj - - if not can_write: - raise SyftException(public_message=f"Permission: {write_permission} denied") - - self._set_data_and_keys( - store_query_key=store_query_key, - unique_query_keys=unique_query_keys, - searchable_query_keys=searchable_query_keys, - obj=obj, - ) - self.data[uid] = obj - - # Add default permissions - if uid not in self.permissions: - self.permissions[uid] = set() - - self.add_permission(ActionObjectREAD(uid=uid, credentials=credentials)) - - if add_permissions is not None: - self.add_permissions(add_permissions) - - if uid not in self.storage_permissions: - self.storage_permissions[uid] = set() - - if add_storage_permission: - self.add_storage_permission( - StoragePermission( - uid=uid, - server_uid=self.server_uid, - ) - ) - return obj - - @as_result(SyftException) - def take_ownership(self, uid: UID, credentials: SyftVerifyKey) -> bool: - if uid in self.permissions or uid in self.data: - raise SyftException(public_message=f"UID: {uid} already owned.") - - # The first person using this UID can claim ownership - self.add_permissions( - [ - ActionObjectOWNER(uid=uid, credentials=credentials), - ActionObjectWRITE(uid=uid, credentials=credentials), - ActionObjectREAD(uid=uid, credentials=credentials), - ActionObjectEXECUTE(uid=uid, credentials=credentials), - ] - ) - - return True - - def add_permission(self, permission: ActionObjectPermission) -> None: - permissions = self.permissions[permission.uid] - permissions.add(permission.permission_string) - self.permissions[permission.uid] = permissions - - def remove_permission(self, permission: ActionObjectPermission) -> None: - permissions = self.permissions[permission.uid] - permissions.remove(permission.permission_string) - self.permissions[permission.uid] = permissions - - def add_permissions(self, permissions: list[ActionObjectPermission]) -> None: - for permission in permissions: - self.add_permission(permission) - - def has_permission(self, permission: ActionObjectPermission) -> bool: - if not isinstance(permission.permission, ActionPermission): - raise Exception(f"ObjectPermission type: {permission.permission} not valid") - - if ( - permission.credentials - and self.root_verify_key.verify == permission.credentials.verify - ): - return True - - if ( - permission.credentials - and self.has_admin_permissions is not None - and self.has_admin_permissions(permission.credentials) - ): - return True - - if ( - permission.uid in self.permissions - and permission.permission_string in self.permissions[permission.uid] - ): - return True - - # 🟡 TODO 14: add ALL_READ, ALL_EXECUTE etc - # third party - if permission.permission == ActionPermission.OWNER: - pass - elif ( - permission.permission == ActionPermission.READ - and ActionObjectPermission( - permission.uid, ActionPermission.ALL_READ - ).permission_string - in self.permissions[permission.uid] - ): - return True - elif permission.permission == ActionPermission.WRITE: - pass - elif permission.permission == ActionPermission.EXECUTE: - pass - - return False - - @as_result(SyftException) - def _get_permissions_for_uid(self, uid: UID) -> set[str]: - if uid in self.permissions: - return self.permissions[uid] - raise SyftException(public_message=f"No permissions found for uid: {uid}") - - @as_result(SyftException) - def get_all_permissions(self) -> dict[UID, set[str]]: - return self.permissions - - def add_storage_permission(self, permission: StoragePermission) -> None: - permissions = self.storage_permissions[permission.uid] - permissions.add(permission.server_uid) - self.storage_permissions[permission.uid] = permissions - - def add_storage_permissions(self, permissions: list[StoragePermission]) -> None: - for permission in permissions: - self.add_storage_permission(permission) - - def remove_storage_permission(self, permission: StoragePermission) -> None: - permissions = self.storage_permissions[permission.uid] - permissions.remove(permission.server_uid) - self.storage_permissions[permission.uid] = permissions - - def has_storage_permission(self, permission: StoragePermission | UID) -> bool: - if isinstance(permission, UID): - permission = StoragePermission(uid=permission, server_uid=self.server_uid) - - if permission.uid in self.storage_permissions: - return permission.server_uid in self.storage_permissions[permission.uid] - return False - - @as_result(SyftException) - def _get_storage_permissions_for_uid(self, uid: UID) -> set[UID]: - if uid in self.storage_permissions: - return self.storage_permissions[uid] - raise SyftException( - public_message=f"No storage permissions found for uid: {uid}" - ) - - @as_result(SyftException) - def get_all_storage_permissions(self) -> dict[UID, set[UID]]: - return self.storage_permissions - - @as_result(SyftException) - def _all( - self, - credentials: SyftVerifyKey, - order_by: PartitionKey | None = None, - has_permission: bool | None = False, - ) -> list[NewBaseStash.object_type]: # type: ignore - # this checks permissions - res = [self._get(uid, credentials, has_permission) for uid in self.data.keys()] - result = [x.ok() for x in res if x.is_ok()] - if order_by is not None: - result = sorted(result, key=lambda x: getattr(x, order_by.key, "")) - return result - - @as_result(SyftException) - def _remove_keys( - self, - store_key: QueryKey, - unique_query_keys: QueryKeys, - searchable_query_keys: QueryKeys, - ) -> None: - uqks = unique_query_keys.all - for qk in uqks: - pk_key, pk_value = qk.key, qk.value - ck_col = self.unique_keys[pk_key] - ck_col.pop(store_key.value, None) - self.unique_keys[pk_key] = ck_col - - sqks = searchable_query_keys.all - for qk in sqks: - pk_key, pk_value = qk.key, qk.value - ck_col = self.searchable_keys[pk_key] - if isinstance(pk_value, list): - for pk_value_item in pk_value: - pk_value_str = str(pk_value_item) - if pk_value_str in ck_col and ( - store_key.value in ck_col[pk_value_str] - ): - ck_col[pk_value_str].remove(store_key.value) - else: - if pk_value in ck_col and (store_key.value in ck_col[pk_value]): - ck_col[pk_value].remove(store_key.value) - self.searchable_keys[pk_key] = ck_col - - @as_result(SyftException) - def _find_index_or_search_keys( - self, - credentials: SyftVerifyKey, - index_qks: QueryKeys, - search_qks: QueryKeys, - order_by: PartitionKey | None = None, - ) -> list[SyftObject]: - ids: set | None = None - errors = [] - # third party - if len(index_qks.all) > 0: - index_results = self._get_keys_index(qks=index_qks) - if index_results.is_ok(): - if ids is None: - ids = index_results.ok() if index_results.ok() else set() - ids = cast(set, ids) - ids = ids.intersection(index_results.ok()) - else: - errors.append(index_results.err()) - - search_results = None - if len(search_qks.all) > 0: - search_results = self._find_keys_search(qks=search_qks) - - if search_results.is_ok(): - if ids is None: - ids = search_results.ok() if search_results.ok() else set() - ids = cast(set, ids) - ids = ids.intersection(search_results.ok()) - else: - errors.append(search_results.err()) - - if len(errors) > 0: - raise SyftException(public_message=" ".join([str(e) for e in errors])) - - if ids is None: - return [] - - qks: QueryKeys = self.store_query_keys(ids) - return self._get_all_from_store( - credentials=credentials, qks=qks, order_by=order_by - ).unwrap() - - @as_result(SyftException) - def _update( - self, - credentials: SyftVerifyKey, - qk: QueryKey, - obj: SyftObject, - has_permission: bool = False, - overwrite: bool = False, - allow_missing_keys: bool = False, - ) -> SyftObject: - try: - if qk.value not in self.data: - raise SyftException( - public_message=f"No {type(obj)} exists for query key: {qk}" - ) - - if has_permission or self.has_permission( - ActionObjectWRITE(uid=qk.value, credentials=credentials) - ): - _original_obj = self.data[qk.value] - _original_unique_keys = self.settings.unique_keys.with_obj( - _original_obj - ) - if allow_missing_keys: - searchable_keys = PartitionKeys( - pks=[ - x - for x in self.settings.searchable_keys.all - if hasattr(_original_obj, x.key) - ] - ) - _original_searchable_keys = searchable_keys.with_obj(_original_obj) - - else: - _original_searchable_keys = self.settings.searchable_keys.with_obj( - _original_obj - ) - - store_query_key = self.settings.store_key.with_obj(_original_obj) - - # remove old keys - self._remove_keys( - store_key=store_query_key, - unique_query_keys=_original_unique_keys, - searchable_query_keys=_original_searchable_keys, - ) - - # update the object with new data - if overwrite: - # Overwrite existing object and their values - _original_obj = obj - else: - for key, value in obj.to_dict(exclude_empty=True).items(): - if key == "id": - # protected field - continue - setattr(_original_obj, key, value) - - # update data and keys - self._set_data_and_keys( - store_query_key=store_query_key, - unique_query_keys=self.settings.unique_keys.with_obj(_original_obj), - searchable_query_keys=self.settings.searchable_keys.with_obj( - _original_obj - ), - # has been updated - obj=_original_obj, - ) - - # 🟡 TODO 28: Add locking in this transaction - - return _original_obj - else: - raise SyftException( - public_message=f"Failed to update obj {obj}, you have no permission" - ) - - except Exception as e: - raise SyftException.from_exception(e) - - @as_result(SyftException) - def _get_all_from_store( - self, - credentials: SyftVerifyKey, - qks: QueryKeys, - order_by: PartitionKey | None = None, - ) -> list[SyftObject]: - matches = [] - for qk in qks.all: - if qk.value in self.data: - if self.has_permission( - ActionObjectREAD(uid=qk.value, credentials=credentials) - ): - matches.append(self.data[qk.value]) - if order_by is not None: - matches = sorted(matches, key=lambda x: getattr(x, order_by.key, "")) - return matches - - def create(self, obj: SyftObject) -> None: - pass - - @as_result(SyftException) - def _delete( - self, credentials: SyftVerifyKey, qk: QueryKey, has_permission: bool = False - ) -> SyftSuccess: - try: - if has_permission or self.has_permission( - ActionObjectWRITE(uid=qk.value, credentials=credentials) - ): - _obj = self.data.pop(qk.value) - self.permissions.pop(qk.value) - self.storage_permissions.pop(qk.value) - self._delete_unique_keys_for(_obj) - self._delete_search_keys_for(_obj) - return SyftSuccess(message="Deleted") - else: - raise SyftException( - public_message=f"Failed to delete with query key {qk}, you have no permission" - ) - except Exception as e: - raise SyftException( - public_message=f"Failed to delete with query key {qk} with error: {e}" - ) - - @as_result(SyftException) - def _delete_unique_keys_for(self, obj: SyftObject) -> SyftSuccess: - for _unique_ck in self.unique_cks: - qk = _unique_ck.with_obj(obj) - unique_keys = self.unique_keys[qk.key] - unique_keys.pop(qk.value, None) - self.unique_keys[qk.key] = unique_keys - return SyftSuccess(message="Deleted") - - @as_result(SyftException) - def _delete_search_keys_for(self, obj: SyftObject) -> SyftSuccess: - for _search_ck in self.searchable_cks: - qk: QueryKey = _search_ck.with_obj(obj) - search_keys: defaultdict = self.searchable_keys[qk.key] - if isinstance(qk.value, list): - for qk_value in qk.value: - search_keys.pop(qk_value, None) - else: - search_keys.pop(qk.value, None) - self.searchable_keys[qk.key] = search_keys - return SyftSuccess(message="Deleted") - - @as_result(SyftException) - def _get_keys_index(self, qks: QueryKeys) -> set[Any]: - try: - # match AND - subsets: list = [] - for qk in qks.all: - subset: set = set() - pk_key, pk_value = qk.key, qk.value - if pk_key not in self.unique_keys: - raise SyftException( - public_message=f"Failed to query index with {qk}" - ) - ck_col = self.unique_keys[pk_key] - if pk_value not in ck_col.keys(): - # must be at least one in all query keys - continue - store_value = ck_col[pk_value] - subsets.append({store_value}) - - if len(subsets) == 0: - return set() - # AND - subset = subsets.pop() - for s in subsets: - subset = subset.intersection(s) - - return subset - except Exception as e: - raise SyftException(public_message=f"Failed to query with {qks}. {e}") - - @as_result(SyftException) - def _find_keys_search(self, qks: QueryKeys) -> set[QueryKey]: - try: - # match AND - subsets = [] - for qk in qks.all: - subset: set = set() - pk_key, pk_value = qk.key, qk.value - if pk_key not in self.searchable_keys: - raise SyftException(public_message=f"Failed to search with {qk}") - ck_col = self.searchable_keys[pk_key] - if qk.type_list: - # 🟡 TODO: change this hacky way to do on to many relationships - # this is when you search a QueryKey which is a list of items - # at the moment its mostly just a List[UID] - # match OR against all keys for this col - # the values of the list will be turned into strings in a single key - matches = set() - for item in pk_value: - for col_key in ck_col.keys(): - if str(item) in col_key: - store_values = ck_col[col_key] - for value in store_values: - matches.add(value) - if len(matches): - subsets.append(matches) - else: - # this is the normal path - if pk_value not in ck_col.keys(): - # must be at least one in all query keys - subsets.append(set()) - continue - store_values = ck_col[pk_value] - subsets.append(set(store_values)) - - if len(subsets) == 0: - return set() - # AND - subset = subsets.pop() - for s in subsets: - subset = subset.intersection(s) - return subset - except Exception as e: - raise SyftException(public_message=f"Failed to query with {qks}. {e}") - - @as_result(SyftException) - def _check_partition_keys_unique( - self, unique_query_keys: QueryKeys - ) -> UniqueKeyCheck: - # dont check the store key - qks = [ - x - for x in unique_query_keys.all - if x.partition_key != self.settings.store_key - ] - matches = [] - for qk in qks: - pk_key, pk_value = qk.key, qk.value - if pk_key not in self.unique_keys: - raise SyftException( - public_message=f"pk_key: {pk_key} not in unique_keys: {self.unique_keys.keys()}" - ) - ck_col = self.unique_keys[pk_key] - if pk_value in ck_col: - matches.append(pk_key) - - if len(matches) == 0: - return UniqueKeyCheck.EMPTY - elif len(matches) == len(qks): - return UniqueKeyCheck.MATCHES - - return UniqueKeyCheck.ERROR - - def _set_data_and_keys( - self, - store_query_key: QueryKey, - unique_query_keys: QueryKeys, - searchable_query_keys: QueryKeys, - obj: SyftObject, - ) -> None: - uqks = unique_query_keys.all - - for qk in uqks: - pk_key, pk_value = qk.key, qk.value - ck_col = self.unique_keys[pk_key] - ck_col[pk_value] = store_query_key.value - self.unique_keys[pk_key] = ck_col - - self.unique_keys[store_query_key.key][store_query_key.value] = ( - store_query_key.value - ) - - sqks = searchable_query_keys.all - for qk in sqks: - pk_key, pk_value = qk.key, qk.value - ck_col = self.searchable_keys[pk_key] - if qk.type_list: - # coerce the list of objects to strings for a single key - pk_value = " ".join([str(obj) for obj in pk_value]) - - # check if key is present, then add to existing key - if pk_value in ck_col: - ck_col[pk_value].append(store_query_key.value) - else: - # else create the key with a list - ck_col[pk_value] = [store_query_key.value] - - self.searchable_keys[pk_key] = ck_col - - self.data[store_query_key.value] = obj - - @as_result(SyftException) - def _migrate_data( - self, to_klass: SyftObject, context: AuthedServiceContext, has_permission: bool - ) -> bool: - credentials = context.credentials - has_permission = (credentials == self.root_verify_key) or has_permission - if has_permission: - for key, value in self.data.items(): - try: - migrated_value = value.migrate_to(to_klass.__version__, context) - except Exception: - raise SyftException( - public_message=f"Failed to migrate data to {to_klass} for qk {to_klass.__version__}: {key}" - ) - qk = self.settings.store_key.with_obj(key) - self._update( - credentials, - qk=qk, - obj=migrated_value, - has_permission=has_permission, - overwrite=True, - allow_missing_keys=True, - ).unwrap() - - return True - - raise SyftException( - public_message="You don't have permissions to migrate data." - ) diff --git a/packages/syft/src/syft/store/sqlite_document_store.py b/packages/syft/src/syft/store/sqlite_document_store.py deleted file mode 100644 index 82d75d68e6b..00000000000 --- a/packages/syft/src/syft/store/sqlite_document_store.py +++ /dev/null @@ -1,493 +0,0 @@ -# future -from __future__ import annotations - -# stdlib -from collections import defaultdict -from copy import deepcopy -import logging -from pathlib import Path -import sqlite3 -from sqlite3 import Connection -from sqlite3 import Cursor -import tempfile -from typing import Any - -# third party -from pydantic import Field -from pydantic import field_validator -from typing_extensions import Self - -# relative -from ..serde.deserialize import _deserialize -from ..serde.serializable import serializable -from ..serde.serialize import _serialize -from ..types.errors import SyftException -from ..types.result import as_result -from ..types.uid import UID -from ..util.util import thread_ident -from .document_store import DocumentStore -from .document_store import PartitionSettings -from .document_store import StoreClientConfig -from .document_store import StoreConfig -from .kv_document_store import KeyValueBackingStore -from .kv_document_store import KeyValueStorePartition -from .locks import LockingConfig -from .locks import NoLockingConfig -from .locks import SyftLock - -logger = logging.getLogger(__name__) - -# here we can create a single connection per cache_key -# since pytest is concurrent processes, we need to isolate each connection -# by its filename and optionally the thread that its running in -# we keep track of each SQLiteBackingStore init in REF_COUNTS -# when it hits 0 we can close the connection and release the file descriptor -SQLITE_CONNECTION_POOL_DB: dict[str, Connection] = {} -SQLITE_CONNECTION_POOL_CUR: dict[str, Cursor] = {} -REF_COUNTS: dict[str, int] = defaultdict(int) - - -def cache_key(db_name: str) -> str: - return f"{db_name}_{thread_ident()}" - - -def _repr_debug_(value: Any) -> str: - if hasattr(value, "_repr_debug_"): - return str(value._repr_debug_()) - return repr(value) - - -def special_exception_public_message(table_name: str, e: Exception) -> str: - error_msg = ( - str(e) - if not isinstance(e, SyftException) - else e._private_message or e.public_message - ) - - if "disk I/O error" in error_msg: - message = f"Error usually related to concurrent writes. {error_msg}" - return message - - if "Cannot operate on a closed database" in error_msg: - message = ( - "Error usually related to calling self.db.close()" - + f"before last SQLiteBackingStore.__del__ gets called. {error_msg}" - ) - return message - - return error_msg - - -@serializable( - attrs=["index_name", "settings", "store_config"], - canonical_name="SQLiteBackingStore", - version=1, -) -class SQLiteBackingStore(KeyValueBackingStore): - """Core Store logic for the SQLite stores. - - Parameters: - `index_name`: str - Index name - `settings`: PartitionSettings - Syft specific settings - `store_config`: SQLiteStoreConfig - Connection Configuration - `ddtype`: Type - Class used as fallback on `get` errors - """ - - def __init__( - self, - index_name: str, - settings: PartitionSettings, - store_config: StoreConfig, - ddtype: type | None = None, - ) -> None: - self.index_name = index_name - self.settings = settings - self.store_config = store_config - self._ddtype = ddtype - if self.store_config.client_config: - self.file_path = self.store_config.client_config.file_path - if store_config.client_config: - self.db_filename = store_config.client_config.filename - - self.lock = SyftLock(NoLockingConfig()) - self.create_table() - REF_COUNTS[cache_key(self.db_filename)] += 1 - self.subs_char = r"?" - - @property - def table_name(self) -> str: - return f"{self.settings.name}_{self.index_name}" - - def _connect(self) -> None: - # SQLite is not thread safe by default so we ensure that each connection - # comes from a different thread. In cases of Uvicorn and other AWSGI servers - # there will be many threads handling incoming requests so we need to ensure - # that different connections are used in each thread. By using a dict for the - # _db and _cur we can ensure they are never shared - - path = Path(self.file_path) - if not path.exists(): - path.parent.mkdir(parents=True, exist_ok=True) - - if self.store_config.client_config: - connection = sqlite3.connect( - self.file_path, - timeout=self.store_config.client_config.timeout, - check_same_thread=False, # do we need this if we use the lock - # check_same_thread=self.store_config.client_config.check_same_thread, - ) - # Set journal mode to WAL. - connection.execute("PRAGMA journal_mode = WAL") - connection.execute("PRAGMA busy_timeout = 5000") - connection.execute("PRAGMA temp_store = 2") - connection.execute("PRAGMA synchronous = 1") - SQLITE_CONNECTION_POOL_DB[cache_key(self.db_filename)] = connection - - def create_table(self) -> None: - try: - with self.lock: - # TODO: add to backing_store an option for "if_exists_ok" - self.cur.execute( - f"create table if not exists {self.table_name} (uid VARCHAR(32) NOT NULL PRIMARY KEY, " # nosec - + "repr TEXT NOT NULL, value BLOB NOT NULL, " # nosec - + "sqltime TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL)" # nosec - ) - self.db.commit() - except Exception as e: - public_message = special_exception_public_message(self.table_name, e) - raise SyftException.from_exception(e, public_message=public_message) - - @property - def db(self) -> Connection: - if cache_key(self.db_filename) not in SQLITE_CONNECTION_POOL_DB: - self._connect() - return SQLITE_CONNECTION_POOL_DB[cache_key(self.db_filename)] - - @property - def cur(self) -> Cursor: - if cache_key(self.db_filename) not in SQLITE_CONNECTION_POOL_CUR: - SQLITE_CONNECTION_POOL_CUR[cache_key(self.db_filename)] = self.db.cursor() - - return SQLITE_CONNECTION_POOL_CUR[cache_key(self.db_filename)] - - def _close(self) -> None: - self._commit() - REF_COUNTS[cache_key(self.db_filename)] -= 1 - if REF_COUNTS[cache_key(self.db_filename)] <= 0: - # once you close it seems like other object references can't re-use the - # same connection - - self.db.close() - db_key = cache_key(self.db_filename) - if db_key in SQLITE_CONNECTION_POOL_CUR: - # NOTE if we don't remove the cursor, the cursor cache_key can clash with a future thread id - del SQLITE_CONNECTION_POOL_CUR[db_key] - del SQLITE_CONNECTION_POOL_DB[cache_key(self.db_filename)] - else: - # don't close yet because another SQLiteBackingStore is probably still open - pass - - def _commit(self) -> None: - self.db.commit() - - @staticmethod - @as_result(SyftException) - def _execute( - lock: SyftLock, - cursor: Cursor, - db: Connection, - table_name: str, - sql: str, - args: list[Any] | None, - ) -> Cursor: - with lock: - cur: Cursor | None = None - try: - cur = cursor.execute(sql, args) - except Exception as e: - public_message = special_exception_public_message(table_name, e) - raise SyftException.from_exception(e, public_message=public_message) - - # TODO: Which exception is safe to rollback on - # we should map out some more clear exceptions that can be returned - # rather than halting the program like disk I/O error etc - # self.db.rollback() # Roll back all changes if an exception occurs. - # err = Err(str(e)) - db.commit() # Commit if everything went ok - return cur - - def _set(self, key: UID, value: Any) -> None: - if self._exists(key): - self._update(key, value) - else: - insert_sql = ( - f"insert into {self.table_name} (uid, repr, value) VALUES " # nosec - f"({self.subs_char}, {self.subs_char}, {self.subs_char})" # nosec - ) - data = _serialize(value, to_bytes=True) - self._execute( - self.lock, - self.cur, - self.db, - self.table_name, - insert_sql, - [str(key), _repr_debug_(value), data], - ).unwrap() - - def _update(self, key: UID, value: Any) -> None: - insert_sql = ( - f"update {self.table_name} set uid = {self.subs_char}, " # nosec - f"repr = {self.subs_char}, value = {self.subs_char} " # nosec - f"where uid = {self.subs_char}" # nosec - ) - data = _serialize(value, to_bytes=True) - self._execute( - self.lock, - self.cur, - self.db, - self.table_name, - insert_sql, - [str(key), _repr_debug_(value), data, str(key)], - ).unwrap() - - def _get(self, key: UID) -> Any: - select_sql = ( - f"select * from {self.table_name} where uid = {self.subs_char} " # nosec - "order by sqltime" - ) - cursor = self._execute( - self.lock, self.cur, self.db, self.table_name, select_sql, [str(key)] - ).unwrap(public_message=f"Query {select_sql} failed") - row = cursor.fetchone() - if row is None or len(row) == 0: - raise KeyError(f"{key} not in {type(self)}") - data = row[2] - return _deserialize(data, from_bytes=True) - - def _exists(self, key: UID) -> bool: - select_sql = f"select uid from {self.table_name} where uid = {self.subs_char}" # nosec - cursor = self._execute( - self.lock, self.cur, self.db, self.table_name, select_sql, [str(key)] - ).unwrap() - row = cursor.fetchone() # type: ignore - if row is None: - return False - - return bool(row) - - def _get_all(self) -> Any: - select_sql = f"select * from {self.table_name} order by sqltime" # nosec - keys = [] - data = [] - - cursor = self._execute( - self.lock, self.cur, self.db, self.table_name, select_sql, [] - ).unwrap() - rows = cursor.fetchall() # type: ignore - if not rows: - return {} - - for row in rows: - keys.append(UID(row[0])) - data.append(_deserialize(row[2], from_bytes=True)) - return dict(zip(keys, data)) - - def _get_all_keys(self) -> Any: - select_sql = f"select uid from {self.table_name} order by sqltime" # nosec - - cursor = self._execute( - self.lock, self.cur, self.db, self.table_name, select_sql, [] - ).unwrap() - rows = cursor.fetchall() # type: ignore - if not rows: - return [] - - keys = [UID(row[0]) for row in rows] - return keys - - def _delete(self, key: UID) -> None: - select_sql = f"delete from {self.table_name} where uid = {self.subs_char}" # nosec - self._execute( - self.lock, self.cur, self.db, self.table_name, select_sql, [str(key)] - ).unwrap() - - def _delete_all(self) -> None: - select_sql = f"delete from {self.table_name}" # nosec - self._execute( - self.lock, self.cur, self.db, self.table_name, select_sql, [] - ).unwrap() - - def _len(self) -> int: - select_sql = f"select count(uid) from {self.table_name}" # nosec - cursor = self._execute( - self.lock, self.cur, self.db, self.table_name, select_sql, [] - ).unwrap() - cnt = cursor.fetchone()[0] - return cnt - - def __setitem__(self, key: Any, value: Any) -> None: - self._set(key, value) - - def __getitem__(self, key: Any) -> Self: - try: - return self._get(key) - except KeyError as e: - if self._ddtype is not None: - return self._ddtype() - raise e - - def __repr__(self) -> str: - return repr(self._get_all()) - - def __len__(self) -> int: - return self._len() - - def __delitem__(self, key: str) -> None: - self._delete(key) - - def clear(self) -> None: - self._delete_all() - - def copy(self) -> Self: - return deepcopy(self) - - def keys(self) -> Any: - return self._get_all_keys() - - def values(self) -> Any: - return self._get_all().values() - - def items(self) -> Any: - return self._get_all().items() - - def pop(self, key: Any) -> Self: - value = self._get(key) - self._delete(key) - return value - - def __contains__(self, key: Any) -> bool: - return self._exists(key) - - def __iter__(self) -> Any: - return iter(self.keys()) - - def __del__(self) -> None: - try: - self._close() - except Exception as e: - logger.error("Could not close connection", exc_info=e) - - -@serializable(canonical_name="SQLiteStorePartition", version=1) -class SQLiteStorePartition(KeyValueStorePartition): - """SQLite StorePartition - - Parameters: - `settings`: PartitionSettings - PySyft specific settings, used for indexing and partitioning - `store_config`: SQLiteStoreConfig - SQLite specific configuration - """ - - def close(self) -> None: - self.lock.acquire() - try: - # I think we don't want these now, because of the REF_COUNT - # self.data._close() - # self.unique_keys._close() - # self.searchable_keys._close() - pass - except BaseException: - pass - self.lock.release() - - def commit(self) -> None: - self.lock.acquire() - try: - self.data._commit() - self.unique_keys._commit() - self.searchable_keys._commit() - except BaseException: - pass - self.lock.release() - - -# the base document store is already a dict but we can change it later -@serializable(canonical_name="SQLiteDocumentStore", version=1) -class SQLiteDocumentStore(DocumentStore): - """SQLite Document Store - - Parameters: - `store_config`: StoreConfig - SQLite specific configuration, including connection details and client class type. - """ - - partition_type = SQLiteStorePartition - - -@serializable(canonical_name="SQLiteStoreClientConfig", version=1) -class SQLiteStoreClientConfig(StoreClientConfig): - """SQLite connection config - - Parameters: - `filename` : str - Database name - `path` : Path or str - Database folder - `check_same_thread`: bool - If True (default), ProgrammingError will be raised if the database connection is used - by a thread other than the one that created it. If False, the connection may be accessed - in multiple threads; write operations may need to be serialized by the user to avoid - data corruption. - `timeout`: int - How many seconds the connection should wait before raising an exception, if the database - is locked by another connection. If another connection opens a transaction to modify the - database, it will be locked until that transaction is committed. Default five seconds. - """ - - filename: str = "syftdb.sqlite" - path: str | Path = Field(default_factory=tempfile.gettempdir) - check_same_thread: bool = True - timeout: int = 5 - - # We need this in addition to Field(default_factory=...) - # so users can still do SQLiteStoreClientConfig(path=None) - @field_validator("path", mode="before") - @classmethod - def __default_path(cls, path: str | Path | None) -> str | Path: - if path is None: - return tempfile.gettempdir() - return path - - @property - def file_path(self) -> Path | None: - return Path(self.path) / self.filename - - -@serializable() -class SQLiteStoreConfig(StoreConfig): - __canonical_name__ = "SQLiteStoreConfig" - """SQLite Store config, used by SQLiteStorePartition - - Parameters: - `client_config`: SQLiteStoreClientConfig - SQLite connection configuration - `store_type`: DocumentStore - Class interacting with QueueStash. Default: SQLiteDocumentStore - `backing_store`: KeyValueBackingStore - The Store core logic. Default: SQLiteBackingStore - locking_config: LockingConfig - The config used for store locking. Available options: - * NoLockingConfig: no locking, ideal for single-thread stores. - * ThreadingLockingConfig: threading-based locking, ideal for same-process in-memory stores. - Defaults to NoLockingConfig. - """ - - client_config: SQLiteStoreClientConfig - store_type: type[DocumentStore] = SQLiteDocumentStore - backing_store: type[KeyValueBackingStore] = SQLiteBackingStore - locking_config: LockingConfig = Field(default_factory=NoLockingConfig) diff --git a/packages/syft/tests/syft/stores/kv_document_store_test.py b/packages/syft/tests/syft/stores/kv_document_store_test.py deleted file mode 100644 index 7271071c4f7..00000000000 --- a/packages/syft/tests/syft/stores/kv_document_store_test.py +++ /dev/null @@ -1,313 +0,0 @@ -# stdlib -from copy import copy -from threading import Thread - -# third party -import pytest - -# syft absolute -from syft.store.document_store import PartitionSettings -from syft.store.document_store import QueryKeys -from syft.store.kv_document_store import KeyValueStorePartition -from syft.types.errors import SyftException -from syft.types.uid import UID - -# relative -from .store_mocks_test import MockObjectType -from .store_mocks_test import MockStoreConfig -from .store_mocks_test import MockSyftObject - - -@pytest.fixture -def kv_store_partition(worker): - store_config = MockStoreConfig() - settings = PartitionSettings(name="test", object_type=MockObjectType) - store = KeyValueStorePartition( - server_uid=worker.id, - root_verify_key=worker.root_client.credentials.verify_key, - settings=settings, - store_config=store_config, - ) - - res = store.init_store() - assert res.is_ok() - - yield store - - -def test_kv_store_partition_sanity(kv_store_partition: KeyValueStorePartition) -> None: - assert hasattr(kv_store_partition, "data") - assert hasattr(kv_store_partition, "unique_keys") - assert hasattr(kv_store_partition, "searchable_keys") - - -def test_kv_store_partition_init_failed(root_verify_key) -> None: - store_config = MockStoreConfig(is_crashed=True) - settings = PartitionSettings(name="test", object_type=MockObjectType) - - with pytest.raises(SyftException): - KeyValueStorePartition( - UID(), root_verify_key, settings=settings, store_config=store_config - ) - - -def test_kv_store_partition_set( - root_verify_key, kv_store_partition: KeyValueStorePartition -) -> None: - obj = MockSyftObject(data=1) - res = kv_store_partition.set(root_verify_key, obj, ignore_duplicates=False) - - assert res.is_ok() - assert res.ok() == obj - assert len(kv_store_partition.all(root_verify_key).ok()) == 1 - - res = kv_store_partition.set(root_verify_key, obj, ignore_duplicates=False) - assert res.is_err() - assert len(kv_store_partition.all(root_verify_key).ok()) == 1 - - res = kv_store_partition.set(root_verify_key, obj, ignore_duplicates=True) - assert res.is_ok() - assert len(kv_store_partition.all(root_verify_key).ok()) == 1 - - obj2 = MockSyftObject(data=2) - res = kv_store_partition.set(root_verify_key, obj2, ignore_duplicates=False) - assert res.is_ok() - assert res.ok() == obj2 - assert len(kv_store_partition.all(root_verify_key).ok()) == 2 - - -def test_kv_store_partition_set_backend_fail(root_verify_key) -> None: - store_config = MockStoreConfig(is_crashed=True) - settings = PartitionSettings(name="test", object_type=MockObjectType) - - with pytest.raises(SyftException): - KeyValueStorePartition( - UID(), root_verify_key, settings=settings, store_config=store_config - ) - - -def test_kv_store_partition_delete( - root_verify_key, worker, kv_store_partition: KeyValueStorePartition -) -> None: - objs = [] - for v in range(10): - obj = MockSyftObject(data=v) - kv_store_partition.set(root_verify_key, obj, ignore_duplicates=False) - objs.append(obj) - - assert len(kv_store_partition.all(root_verify_key).ok()) == len(objs) - - # can't delete a random object since it was not added - obj = MockSyftObject(data="bogus") - key = kv_store_partition.settings.store_key.with_obj(obj) - res = kv_store_partition.delete(root_verify_key, key) - assert res.is_err() - assert len(kv_store_partition.all(root_verify_key).ok()) == len(objs) - - # cleanup store - for idx, v in enumerate(objs): - key = kv_store_partition.settings.store_key.with_obj(v) - res = kv_store_partition.delete(root_verify_key, key) - assert res.is_ok() - assert len(kv_store_partition.all(root_verify_key).ok()) == len(objs) - idx - 1 - # check that the corresponding permissions were also deleted - assert ( - len(kv_store_partition.data) - == len(kv_store_partition.permissions) - == len(kv_store_partition.storage_permissions) - ) - - res = kv_store_partition.delete(root_verify_key, key) - assert res.is_err() - assert len(kv_store_partition.all(root_verify_key).ok()) == len(objs) - idx - 1 - assert ( - len(kv_store_partition.data) - == len(kv_store_partition.permissions) - == len(kv_store_partition.storage_permissions) - ) - - assert len(kv_store_partition.all(root_verify_key).ok()) == 0 - - -def test_kv_store_partition_delete_and_recreate( - root_verify_key, worker, kv_store_partition: KeyValueStorePartition -) -> None: - obj = MockSyftObject(data="bogus") - repeats = 5 - # running it multiple items ensures we can recreate it again once its delete from store. - for _ in range(repeats): - # Add an object - kv_store_partition.set(root_verify_key, obj, ignore_duplicates=False) - - assert len(kv_store_partition.all(root_verify_key).ok()) == 1 - - # Delete object - key = kv_store_partition.settings.store_key.with_obj(obj) - res = kv_store_partition.delete(root_verify_key, key) - - assert res.is_ok() - assert len(kv_store_partition.all(root_verify_key).ok()) == 0 - assert len(kv_store_partition.data) == len(kv_store_partition.permissions) - - assert len(kv_store_partition.all(root_verify_key).ok()) == 0 - - -def test_kv_store_partition_update( - root_verify_key, kv_store_partition: KeyValueStorePartition -) -> None: - # add item - obj = MockSyftObject(data=1) - kv_store_partition.set(root_verify_key, obj, ignore_duplicates=False) - assert len(kv_store_partition.all(root_verify_key).ok()) == 1 - - # fail to update missing keys - rand_obj = MockSyftObject(data="bogus") - key = kv_store_partition.settings.store_key.with_obj(rand_obj) - res = kv_store_partition.update(root_verify_key, key, obj) - assert res.is_err() - - # update the key multiple times - repeats = 5 - for v in range(repeats): - key = kv_store_partition.settings.store_key.with_obj(obj) - obj_new = MockSyftObject(data=v) - - res = kv_store_partition.update(root_verify_key, key, copy(obj_new)) - assert res.is_ok() - - # The ID should stay the same on update, unly the values are updated. - assert len(kv_store_partition.all(root_verify_key).ok()) == 1 - assert kv_store_partition.all(root_verify_key).ok()[0].id == obj.id - assert kv_store_partition.all(root_verify_key).ok()[0].id != obj_new.id - assert kv_store_partition.all(root_verify_key).ok()[0].data == v - - stored = kv_store_partition.get_all_from_store( - root_verify_key, QueryKeys(qks=[key]) - ) - assert stored.ok()[0].data == v - - -def test_kv_store_partition_set_multithreaded( - root_verify_key, - kv_store_partition: KeyValueStorePartition, -) -> None: - thread_cnt = 3 - repeats = 5 - execution_err = None - - def _kv_cbk(tid: int) -> None: - nonlocal execution_err - for idx in range(repeats): - obj = MockSyftObject(data=idx) - - for _ in range(10): - res = kv_store_partition.set( - root_verify_key, obj, ignore_duplicates=False - ) - if res.is_ok(): - break - - if res.is_err(): - execution_err = res - assert res.is_ok() - - tids = [] - for tid in range(thread_cnt): - thread = Thread(target=_kv_cbk, args=(tid,)) - thread.start() - - tids.append(thread) - - for thread in tids: - thread.join() - - stored = kv_store_partition.all(root_verify_key) - - assert execution_err is None - stored_cnt = len(stored.ok()) - assert stored_cnt == thread_cnt * repeats - - -def test_kv_store_partition_update_multithreaded( - root_verify_key, - kv_store_partition: KeyValueStorePartition, -) -> None: - thread_cnt = 3 - repeats = 5 - - obj = MockSyftObject(data=0) - key = kv_store_partition.settings.store_key.with_obj(obj) - kv_store_partition.set(root_verify_key, obj, ignore_duplicates=False) - execution_err = None - - def _kv_cbk(tid: int) -> None: - nonlocal execution_err - for repeat in range(repeats): - obj = MockSyftObject(data=repeat) - - for _ in range(10): - res = kv_store_partition.update(root_verify_key, key, obj) - if res.is_ok(): - break - - if res.is_err(): - execution_err = res - assert res.is_ok() - - tids = [] - for tid in range(thread_cnt): - thread = Thread(target=_kv_cbk, args=(tid,)) - thread.start() - - tids.append(thread) - - for thread in tids: - thread.join() - - assert execution_err is None - - -def test_kv_store_partition_set_delete_multithreaded( - root_verify_key, - kv_store_partition: KeyValueStorePartition, -) -> None: - thread_cnt = 3 - repeats = 5 - execution_err = None - - def _kv_cbk(tid: int) -> None: - nonlocal execution_err - for idx in range(repeats): - obj = MockSyftObject(data=idx) - - for _ in range(10): - res = kv_store_partition.set( - root_verify_key, obj, ignore_duplicates=False - ) - if res.is_ok(): - break - - if res.is_err(): - execution_err = res - assert res.is_ok() - - key = kv_store_partition.settings.store_key.with_obj(obj) - - res = kv_store_partition.delete(root_verify_key, key) - if res.is_err(): - execution_err = res - assert res.is_ok() - - tids = [] - for tid in range(thread_cnt): - thread = Thread(target=_kv_cbk, args=(tid,)) - thread.start() - - tids.append(thread) - - for thread in tids: - thread.join() - - assert execution_err is None - stored_cnt = len(kv_store_partition.all(root_verify_key).ok()) - assert stored_cnt == 0 diff --git a/packages/syft/tests/syft/stores/store_mocks_test.py b/packages/syft/tests/syft/stores/store_mocks_test.py index 8dc8f4e43bc..395b6f4bae0 100644 --- a/packages/syft/tests/syft/stores/store_mocks_test.py +++ b/packages/syft/tests/syft/stores/store_mocks_test.py @@ -4,48 +4,44 @@ # syft absolute from syft.serde.serializable import serializable from syft.store.document_store import DocumentStore -from syft.store.document_store import PartitionSettings -from syft.store.document_store import StoreConfig -from syft.store.kv_document_store import KeyValueBackingStore from syft.types.syft_object import SYFT_OBJECT_VERSION_2 from syft.types.syft_object import SyftObject from syft.types.uid import UID +# @serializable( +# canonical_name="MockKeyValueBackingStore", +# version=1, +# ) +# class MockKeyValueBackingStore(dict, KeyValueBackingStore): +# def __init__( +# self, +# index_name: str, +# settings: PartitionSettings, +# store_config: StoreConfig, +# **kwargs: Any, +# ) -> None: +# super(dict).__init__() +# self._ddtype = kwargs.get("ddtype", None) +# self.is_crashed = store_config.is_crashed -@serializable( - canonical_name="MockKeyValueBackingStore", - version=1, -) -class MockKeyValueBackingStore(dict, KeyValueBackingStore): - def __init__( - self, - index_name: str, - settings: PartitionSettings, - store_config: StoreConfig, - **kwargs: Any, - ) -> None: - super(dict).__init__() - self._ddtype = kwargs.get("ddtype", None) - self.is_crashed = store_config.is_crashed +# def _check_if_crashed(self) -> None: +# if self.is_crashed: +# raise RuntimeError("The backend is down") - def _check_if_crashed(self) -> None: - if self.is_crashed: - raise RuntimeError("The backend is down") +# def __setitem__(self, key: Any, value: Any) -> None: +# self._check_if_crashed() +# value = super().__setitem__(key, value) +# return value - def __setitem__(self, key: Any, value: Any) -> None: - self._check_if_crashed() - value = super().__setitem__(key, value) - return value - - def __getitem__(self, key: Any) -> Any: - try: - self._check_if_crashed() - value = super().__getitem__(key) - return value - except KeyError as e: - if self._ddtype: - return self._ddtype() - raise e +# def __getitem__(self, key: Any) -> Any: +# try: +# self._check_if_crashed() +# value = super().__getitem__(key) +# return value +# except KeyError as e: +# if self._ddtype: +# return self._ddtype() +# raise e @serializable() @@ -68,11 +64,11 @@ class MockSyftObject(SyftObject): data: Any -@serializable() -class MockStoreConfig(StoreConfig): - __canonical_name__ = "MockStoreConfig" - __version__ = 1 - store_type: type[DocumentStore] = MockStore - db_name: str = "testing" - backing_store: type[KeyValueBackingStore] = MockKeyValueBackingStore - is_crashed: bool = False +# @serializable() +# class MockStoreConfig(StoreConfig): +# __canonical_name__ = "MockStoreConfig" +# __version__ = 1 +# store_type: type[DocumentStore] = MockStore +# db_name: str = "testing" +# backing_store: type[KeyValueBackingStore] = MockKeyValueBackingStore +# is_crashed: bool = False From 1271b22dd6b0cc3fd199cff5f4f505c68fd5ab96 Mon Sep 17 00:00:00 2001 From: dk Date: Tue, 17 Sep 2024 10:16:46 +0700 Subject: [PATCH 02/19] [syft/protocol] stage protocol changes --- packages/syft/src/syft/protocol/protocol_version.json | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index 326aa7e2c82..bbb60b931dd 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -73,6 +73,13 @@ "hash": "3a46370205152fa23a7d2bfa47130dbf2e2bc7ef31f6d3fe4c92fd8d683770b5", "action": "add" } + }, + "SQLiteStoreConfig": { + "1": { + "version": 1, + "hash": "ad062a5f863ae84683867d2a6a5e1d4420c010a64b88bc7b392106e33d71ac03", + "action": "remove" + } } } } From 260d72870e9bf7afd2af2328fe8a76039f23e703 Mon Sep 17 00:00:00 2001 From: dk Date: Tue, 17 Sep 2024 10:39:42 +0700 Subject: [PATCH 03/19] [clean] remove `store_mocks_test.py` and relevant types in `data_protocol.py` --- .../syft/src/syft/protocol/data_protocol.py | 8 +- .../tests/syft/stores/store_mocks_test.py | 74 ------------------- 2 files changed, 1 insertion(+), 81 deletions(-) delete mode 100644 packages/syft/tests/syft/stores/store_mocks_test.py diff --git a/packages/syft/src/syft/protocol/data_protocol.py b/packages/syft/src/syft/protocol/data_protocol.py index 0c848585119..2316263112a 100644 --- a/packages/syft/src/syft/protocol/data_protocol.py +++ b/packages/syft/src/syft/protocol/data_protocol.py @@ -36,12 +36,8 @@ IGNORE_TYPES = [ "mock_type", - "MockStore", - "MockSyftObject", - "MockStoreConfig", "MockWrapper", "base_stash_mock_object_type", - "MockKeyValueBackingStore", "MockObjectFromSyftBaseObj", "MockObjectToSyftBaseObj", ] @@ -237,9 +233,7 @@ def diff_state(self, state: dict) -> tuple[dict, dict]: cls, version = serde_properties[7], serde_properties[9] if issubclass(cls, SyftBaseObject): canonical_name = cls.__canonical_name__ - if canonical_name in IGNORE_TYPES or canonical_name.startswith( - "MockSyftObject_" - ): + if canonical_name in IGNORE_TYPES: continue hash_str = DataProtocol._calculate_object_hash(cls) diff --git a/packages/syft/tests/syft/stores/store_mocks_test.py b/packages/syft/tests/syft/stores/store_mocks_test.py deleted file mode 100644 index 395b6f4bae0..00000000000 --- a/packages/syft/tests/syft/stores/store_mocks_test.py +++ /dev/null @@ -1,74 +0,0 @@ -# stdlib -from typing import Any - -# syft absolute -from syft.serde.serializable import serializable -from syft.store.document_store import DocumentStore -from syft.types.syft_object import SYFT_OBJECT_VERSION_2 -from syft.types.syft_object import SyftObject -from syft.types.uid import UID - -# @serializable( -# canonical_name="MockKeyValueBackingStore", -# version=1, -# ) -# class MockKeyValueBackingStore(dict, KeyValueBackingStore): -# def __init__( -# self, -# index_name: str, -# settings: PartitionSettings, -# store_config: StoreConfig, -# **kwargs: Any, -# ) -> None: -# super(dict).__init__() -# self._ddtype = kwargs.get("ddtype", None) -# self.is_crashed = store_config.is_crashed - -# def _check_if_crashed(self) -> None: -# if self.is_crashed: -# raise RuntimeError("The backend is down") - -# def __setitem__(self, key: Any, value: Any) -> None: -# self._check_if_crashed() -# value = super().__setitem__(key, value) -# return value - -# def __getitem__(self, key: Any) -> Any: -# try: -# self._check_if_crashed() -# value = super().__getitem__(key) -# return value -# except KeyError as e: -# if self._ddtype: -# return self._ddtype() -# raise e - - -@serializable() -class MockObjectType(SyftObject): - __canonical_name__ = "mock_type" - __version__ = 1 - - -@serializable() -class MockStore(DocumentStore): - __canonical_name__ = "MockStore" - __version__ = 1 - pass - - -@serializable() -class MockSyftObject(SyftObject): - __canonical_name__ = f"MockSyftObject_{UID()}" - __version__ = SYFT_OBJECT_VERSION_2 - data: Any - - -# @serializable() -# class MockStoreConfig(StoreConfig): -# __canonical_name__ = "MockStoreConfig" -# __version__ = 1 -# store_type: type[DocumentStore] = MockStore -# db_name: str = "testing" -# backing_store: type[KeyValueBackingStore] = MockKeyValueBackingStore -# is_crashed: bool = False From f611a788b9eaef2b1548cfd3bfdb00a416555485 Mon Sep 17 00:00:00 2001 From: dk Date: Tue, 17 Sep 2024 10:51:15 +0700 Subject: [PATCH 04/19] [clean] remove api refereces for kv and sqlite document stores --- .../syft.store.kv_document_store.rst | 31 --------------- .../syft.store.sqlite_document_store.rst | 39 ------------------- 2 files changed, 70 deletions(-) delete mode 100644 docs/source/api_reference/syft.store.kv_document_store.rst delete mode 100644 docs/source/api_reference/syft.store.sqlite_document_store.rst diff --git a/docs/source/api_reference/syft.store.kv_document_store.rst b/docs/source/api_reference/syft.store.kv_document_store.rst deleted file mode 100644 index a5d91177e60..00000000000 --- a/docs/source/api_reference/syft.store.kv_document_store.rst +++ /dev/null @@ -1,31 +0,0 @@ -syft.store.kv\_document\_store -============================== - -.. automodule:: syft.store.kv_document_store - - - - - - - - - - - - .. rubric:: Classes - - .. autosummary:: - - KeyValueBackingStore - KeyValueStorePartition - UniqueKeyCheck - - - - - - - - - diff --git a/docs/source/api_reference/syft.store.sqlite_document_store.rst b/docs/source/api_reference/syft.store.sqlite_document_store.rst deleted file mode 100644 index cdc7ad4f4f0..00000000000 --- a/docs/source/api_reference/syft.store.sqlite_document_store.rst +++ /dev/null @@ -1,39 +0,0 @@ -syft.store.sqlite\_document\_store -================================== - -.. automodule:: syft.store.sqlite_document_store - - - - - - - - .. rubric:: Functions - - .. autosummary:: - - thread_ident - - - - - - .. rubric:: Classes - - .. autosummary:: - - SQLiteBackingStore - SQLiteDocumentStore - SQLiteStoreClientConfig - SQLiteStoreConfig - SQLiteStorePartition - - - - - - - - - From 90193050b8ab63eabcdd5a2763449ffd4889ec87 Mon Sep 17 00:00:00 2001 From: dk Date: Tue, 17 Sep 2024 10:52:09 +0700 Subject: [PATCH 05/19] [clean/server] remove unused arg (`local_db`) from Server and relevant places --- packages/grid/backend/grid/core/server.py | 1 - packages/syft/src/syft/orchestra.py | 4 ---- packages/syft/src/syft/server/run.py | 9 --------- packages/syft/src/syft/server/server.py | 3 --- .../syft/src/syft/service/settings/settings_service.py | 1 - tests/integration/orchestra/orchestra_test.py | 3 +-- 6 files changed, 1 insertion(+), 20 deletions(-) diff --git a/packages/grid/backend/grid/core/server.py b/packages/grid/backend/grid/core/server.py index 7d8d011de5d..c6e4568afe7 100644 --- a/packages/grid/backend/grid/core/server.py +++ b/packages/grid/backend/grid/core/server.py @@ -101,7 +101,6 @@ def seaweedfs_config() -> SeaweedFSConfig: server_side_type=server_side_type, enable_warnings=enable_warnings, blob_storage_config=blob_storage_config, - local_db=single_container_mode, queue_config=queue_config, migrate=False, in_memory_workers=settings.INMEMORY_WORKERS, diff --git a/packages/syft/src/syft/orchestra.py b/packages/syft/src/syft/orchestra.py index efed6023ab8..c129afcc5c8 100644 --- a/packages/syft/src/syft/orchestra.py +++ b/packages/syft/src/syft/orchestra.py @@ -171,7 +171,6 @@ def deploy_to_python( tail: bool, dev_mode: bool, processes: int, - local_db: bool, server_side_type: ServerSideType, enable_warnings: bool, n_consumers: int, @@ -248,7 +247,6 @@ def deploy_to_python( server_side_type=server_side_type, ) else: - kwargs["local_db"] = local_db kwargs["thread_workers"] = thread_workers if server_type_enum in worker_classes: worker_class = worker_classes[server_type_enum] @@ -314,7 +312,6 @@ def launch( # worker related inputs port: int | str | None = None, processes: int = 1, # temporary work around for jax in subprocess - local_db: bool = False, dev_mode: bool = False, reset: bool = False, log_level: str | int | None = None, @@ -368,7 +365,6 @@ def launch( tail=tail, dev_mode=dev_mode, processes=processes, - local_db=local_db, server_side_type=server_side_type_enum, enable_warnings=enable_warnings, log_level=log_level, diff --git a/packages/syft/src/syft/server/run.py b/packages/syft/src/syft/server/run.py index a04c6d15abf..3be0fdec341 100644 --- a/packages/syft/src/syft/server/run.py +++ b/packages/syft/src/syft/server/run.py @@ -52,13 +52,6 @@ def run() -> ServerHandle | None: default="True", dest="reset", ) - parser.add_argument( - "--local-db", - help="reset", - type=str, - default="False", - dest="local_db", - ) parser.add_argument( "--processes", help="processing mode", @@ -80,7 +73,6 @@ def run() -> ServerHandle | None: args.dev_mode = str_to_bool(args.dev_mode) args.reset = str_to_bool(args.reset) - args.local_db = str_to_bool(args.local_db) args.tail = str_to_bool(args.tail) server = Orchestra.launch( @@ -90,7 +82,6 @@ def run() -> ServerHandle | None: port=args.port, dev_mode=args.dev_mode, reset=args.reset, - local_db=args.local_db, processes=args.processes, tail=args.tail, ) diff --git a/packages/syft/src/syft/server/server.py b/packages/syft/src/syft/server/server.py index 8a065605fca..93db6d935f9 100644 --- a/packages/syft/src/syft/server/server.py +++ b/packages/syft/src/syft/server/server.py @@ -312,7 +312,6 @@ def __init__( is_subprocess: bool = False, server_type: str | ServerType = ServerType.DATASITE, deployment_type: str | DeploymentType = "remote", - local_db: bool = False, reset: bool = False, blob_storage_config: BlobStorageConfig | None = None, queue_config: QueueConfig | None = None, @@ -701,7 +700,6 @@ def named( name: str, processes: int = 0, reset: bool = False, - local_db: bool = False, server_type: str | ServerType = ServerType.DATASITE, server_side_type: str | ServerSideType = ServerSideType.HIGH_SIDE, deployment_type: str | DeploymentType = "remote", @@ -732,7 +730,6 @@ def named( id=uid, signing_key=key, processes=processes, - local_db=local_db, server_type=server_type, server_side_type=server_side_type, deployment_type=deployment_type, diff --git a/packages/syft/src/syft/service/settings/settings_service.py b/packages/syft/src/syft/service/settings/settings_service.py index 7cb835b79fe..d621055fa6c 100644 --- a/packages/syft/src/syft/service/settings/settings_service.py +++ b/packages/syft/src/syft/service/settings/settings_service.py @@ -414,7 +414,6 @@ def get_server_config( "server_side_type": server.server_side_type, # "port": server.port, "processes": server.processes, - # "local_db": isinstance(server.document_store_config, SQLiteStoreConfig), "dev_mode": server.dev_mode, "reset": True, # we should be able to get all the objects from migration data "tail": False, diff --git a/tests/integration/orchestra/orchestra_test.py b/tests/integration/orchestra/orchestra_test.py index accf1b0065d..cc59d7a7d8b 100644 --- a/tests/integration/orchestra/orchestra_test.py +++ b/tests/integration/orchestra/orchestra_test.py @@ -13,7 +13,7 @@ @pytest.mark.parametrize("server_type", ["datasite", "gateway", "enclave"]) def test_orchestra_python_local(server_type): name = token_hex(8) - server = sy.orchestra.launch(name=name, server_type=server_type, local_db=False) + server = sy.orchestra.launch(name=name, server_type=server_type) try: assert isinstance(server.python_server, Server) @@ -32,7 +32,6 @@ def test_orchestra_python_server(server_type): name=name, port="auto", server_type=server_type, - local_db=False, ) try: From 21e04433171ec2777a020793b8b4a22e703ea1fe Mon Sep 17 00:00:00 2001 From: dk Date: Tue, 17 Sep 2024 10:55:53 +0700 Subject: [PATCH 06/19] [clean/tests] remove `loca_db` arg when launching nodes in tests --- tests/integration/conftest.py | 2 -- tests/integration/local/conftest.py | 1 - tests/integration/local/enclave_local_test.py | 1 - tests/integration/local/gateway_local_test.py | 1 - tests/integration/local/request_multiple_nodes_test.py | 2 -- tests/integration/local/syft_function_test.py | 1 - 6 files changed, 8 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index b0e8fdd58dd..40f2f5193e9 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -51,7 +51,6 @@ def full_low_worker(n_consumers: int = 3, create_producer: bool = True) -> Worke n_consumers=n_consumers, create_producer=create_producer, queue_port=None, - local_db=False, thread_workers=False, ) # startup code here @@ -71,7 +70,6 @@ def full_high_worker(n_consumers: int = 3, create_producer: bool = True) -> Work n_consumers=n_consumers, create_producer=create_producer, queue_port=None, - local_db=False, thread_workers=False, ) # startup code here diff --git a/tests/integration/local/conftest.py b/tests/integration/local/conftest.py index 8664244e506..03dc343f549 100644 --- a/tests/integration/local/conftest.py +++ b/tests/integration/local/conftest.py @@ -27,7 +27,6 @@ def server(server_args: dict[str, Any]) -> Generator[ServerHandle, None, None]: "dev_mode": False, "reset": True, "queue_port": None, - "local_db": False, **server_args, } ) diff --git a/tests/integration/local/enclave_local_test.py b/tests/integration/local/enclave_local_test.py index 2275e6a7074..d3377a570dc 100644 --- a/tests/integration/local/enclave_local_test.py +++ b/tests/integration/local/enclave_local_test.py @@ -16,7 +16,6 @@ def test_enclave_root_client_exception(): server_type=sy.ServerType.ENCLAVE, dev_mode=True, reset=True, - local_db=True, ) with pytest.raises(SyftException): enclave_server.login(email="info@openmined.org", password="changethis") diff --git a/tests/integration/local/gateway_local_test.py b/tests/integration/local/gateway_local_test.py index 7c05ceec8e6..48bd790fb79 100644 --- a/tests/integration/local/gateway_local_test.py +++ b/tests/integration/local/gateway_local_test.py @@ -32,7 +32,6 @@ def _launch( server_type=server_type, dev_mode=True, reset=True, - local_db=True, association_request_auto_approval=association_request_auto_approval, port=port, background_tasks=True, diff --git a/tests/integration/local/request_multiple_nodes_test.py b/tests/integration/local/request_multiple_nodes_test.py index c729131afa2..b5443343551 100644 --- a/tests/integration/local/request_multiple_nodes_test.py +++ b/tests/integration/local/request_multiple_nodes_test.py @@ -16,7 +16,6 @@ def server_1(): server_side_type="low", dev_mode=False, reset=True, - local_db=True, create_producer=True, n_consumers=1, queue_port=None, @@ -33,7 +32,6 @@ def server_2(): server_side_type="high", dev_mode=False, reset=True, - local_db=True, create_producer=True, n_consumers=1, queue_port=None, diff --git a/tests/integration/local/syft_function_test.py b/tests/integration/local/syft_function_test.py index 868447b68c6..941285e158a 100644 --- a/tests/integration/local/syft_function_test.py +++ b/tests/integration/local/syft_function_test.py @@ -23,7 +23,6 @@ def server(): n_consumers=3, create_producer=True, queue_port=None, - local_db=False, ) # startup code here yield _server From 6cc25aec8d5c550e4ed515453b095ede3025927b Mon Sep 17 00:00:00 2001 From: dk Date: Tue, 17 Sep 2024 19:55:29 +0700 Subject: [PATCH 07/19] [clean] remove dict document store's api reference --- .../syft.store.dict_document_store.rst | 32 ------------------- 1 file changed, 32 deletions(-) delete mode 100644 docs/source/api_reference/syft.store.dict_document_store.rst diff --git a/docs/source/api_reference/syft.store.dict_document_store.rst b/docs/source/api_reference/syft.store.dict_document_store.rst deleted file mode 100644 index 0d297482041..00000000000 --- a/docs/source/api_reference/syft.store.dict_document_store.rst +++ /dev/null @@ -1,32 +0,0 @@ -syft.store.dict\_document\_store -================================ - -.. automodule:: syft.store.dict_document_store - - - - - - - - - - - - .. rubric:: Classes - - .. autosummary:: - - DictBackingStore - DictDocumentStore - DictStoreConfig - DictStorePartition - - - - - - - - - From 949d3369ef0eeda029dd9e7f6493dd948cc55d19 Mon Sep 17 00:00:00 2001 From: dk Date: Wed, 18 Sep 2024 10:38:48 +0700 Subject: [PATCH 08/19] update protocol_version.json --- packages/syft/src/syft/protocol/protocol_version.json | 7 ------- 1 file changed, 7 deletions(-) diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index bbb60b931dd..326aa7e2c82 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -73,13 +73,6 @@ "hash": "3a46370205152fa23a7d2bfa47130dbf2e2bc7ef31f6d3fe4c92fd8d683770b5", "action": "add" } - }, - "SQLiteStoreConfig": { - "1": { - "version": 1, - "hash": "ad062a5f863ae84683867d2a6a5e1d4420c010a64b88bc7b392106e33d71ac03", - "action": "remove" - } } } } From a6afcdd4e12366f356e50841a21fc1b988c4c25c Mon Sep 17 00:00:00 2001 From: dk Date: Wed, 18 Sep 2024 11:21:23 +0700 Subject: [PATCH 09/19] update protocol_version.json --- packages/syft/src/syft/protocol/protocol_version.json | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index 326aa7e2c82..bbb60b931dd 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -73,6 +73,13 @@ "hash": "3a46370205152fa23a7d2bfa47130dbf2e2bc7ef31f6d3fe4c92fd8d683770b5", "action": "add" } + }, + "SQLiteStoreConfig": { + "1": { + "version": 1, + "hash": "ad062a5f863ae84683867d2a6a5e1d4420c010a64b88bc7b392106e33d71ac03", + "action": "remove" + } } } } From 8ea19be462f5c524b7b4f93f9fafaaf8de86a2a5 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 19 Sep 2024 12:46:52 +0530 Subject: [PATCH 10/19] add back old mongo and sqlite store config skeletons - update protocol version --- packages/syft/src/syft/__init__.py | 2 + .../src/syft/protocol/protocol_version.json | 46 ++++------- .../syft/src/syft/store/kv_document_store.py | 18 +++++ packages/syft/src/syft/store/mongo_client.py | 7 ++ .../src/syft/store/mongo_document_store.py | 81 +++++++++++++++++++ .../src/syft/store/sqlite_document_store.py | 77 ++++++++++++++++++ 6 files changed, 201 insertions(+), 30 deletions(-) create mode 100644 packages/syft/src/syft/store/kv_document_store.py create mode 100644 packages/syft/src/syft/store/mongo_client.py create mode 100644 packages/syft/src/syft/store/mongo_document_store.py create mode 100644 packages/syft/src/syft/store/sqlite_document_store.py diff --git a/packages/syft/src/syft/__init__.py b/packages/syft/src/syft/__init__.py index 2534f22077e..eda917138db 100644 --- a/packages/syft/src/syft/__init__.py +++ b/packages/syft/src/syft/__init__.py @@ -77,6 +77,8 @@ from .service.user.roles import Roles as roles from .service.user.user_service import UserService from .stable_version import LATEST_STABLE_SYFT +from .store.mongo_document_store import MongoStoreConfig +from .store.sqlite_document_store import SQLiteStoreConfig from .types.errors import SyftException from .types.errors import raises from .types.result import as_result diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index bbb60b931dd..bf85d20a1a9 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -25,34 +25,6 @@ "action": "add" } }, - "MongoDict": { - "1": { - "version": 1, - "hash": "57e36f57eed75e62b29e2bac1295035a9bf2c0e3c56719dac24cb6cc685be00b", - "action": "remove" - } - }, - "MongoStoreConfig": { - "1": { - "version": 1, - "hash": "53342b27d34165b7e2699f8e7ad70d13d125875e6a75e8fa18f5796428f41036", - "action": "remove" - } - }, - "JobItem": { - "1": { - "version": 1, - "hash": "0b32277b7d3b9bdc14a2a51cc9005f8254e7f7b6ec059ddcccbcd681a807afb6", - "action": "remove" - } - }, - "DictStoreConfig": { - "1": { - "version": 1, - "hash": "2e1365c5535fa51c22eef79f67dd6444789bc829c27881367e3050e06e2ffbfe", - "action": "remove" - } - }, "QueueItem": { "2": { "version": 2, @@ -74,10 +46,24 @@ "action": "add" } }, - "SQLiteStoreConfig": { + "MongoDict": { "1": { "version": 1, - "hash": "ad062a5f863ae84683867d2a6a5e1d4420c010a64b88bc7b392106e33d71ac03", + "hash": "57e36f57eed75e62b29e2bac1295035a9bf2c0e3c56719dac24cb6cc685be00b", + "action": "remove" + } + }, + "JobItem": { + "1": { + "version": 1, + "hash": "0b32277b7d3b9bdc14a2a51cc9005f8254e7f7b6ec059ddcccbcd681a807afb6", + "action": "remove" + } + }, + "DictStoreConfig": { + "1": { + "version": 1, + "hash": "2e1365c5535fa51c22eef79f67dd6444789bc829c27881367e3050e06e2ffbfe", "action": "remove" } } diff --git a/packages/syft/src/syft/store/kv_document_store.py b/packages/syft/src/syft/store/kv_document_store.py new file mode 100644 index 00000000000..52f712d842b --- /dev/null +++ b/packages/syft/src/syft/store/kv_document_store.py @@ -0,0 +1,18 @@ +# relative +from .document_store import StorePartition + + +class KeyValueBackingStore: + pass + + +class KeyValueStorePartition(StorePartition): + """Key-Value StorePartition + Parameters: + `settings`: PartitionSettings + PySyft specific settings + `store_config`: StoreConfig + Backend specific configuration + """ + + pass diff --git a/packages/syft/src/syft/store/mongo_client.py b/packages/syft/src/syft/store/mongo_client.py new file mode 100644 index 00000000000..d605ea86e15 --- /dev/null +++ b/packages/syft/src/syft/store/mongo_client.py @@ -0,0 +1,7 @@ +# relative +from ..serde.serializable import serializable + + +@serializable(canonical_name="MongoStoreClientConfig", version=1) +class MongoStoreClientConfig: + pass diff --git a/packages/syft/src/syft/store/mongo_document_store.py b/packages/syft/src/syft/store/mongo_document_store.py new file mode 100644 index 00000000000..80f9ba3a341 --- /dev/null +++ b/packages/syft/src/syft/store/mongo_document_store.py @@ -0,0 +1,81 @@ +# third party +from deprecated import deprecated +from pydantic import Field + +# relative +from ..serde.serializable import serializable +from ..types.syft_object import StorableObjectType +from .document_store import DocumentStore +from .document_store import StoreConfig +from .document_store import StorePartition +from .kv_document_store import KeyValueBackingStore +from .locks import LockingConfig +from .locks import NoLockingConfig +from .mongo_client import MongoStoreClientConfig + + +class MongoBsonObject: + pass + + +class MongoBackingStore(KeyValueBackingStore): + pass + + +@serializable( + attrs=["index_name", "settings", "store_config"], + canonical_name="MongoBackingStore", + version=1, +) +@serializable(attrs=["storage_type"], canonical_name="MongoStorePartition", version=1) +class MongoStorePartition(StorePartition): + """Mongo StorePartition + Parameters: + `settings`: PartitionSettings + PySyft specific settings, used for partitioning and indexing. + `store_config`: MongoStoreConfig + Mongo specific configuration + """ + + storage_type: type[StorableObjectType] = MongoBsonObject + + +@serializable(canonical_name="MongoDocumentStore", version=1) +class MongoDocumentStore(DocumentStore): + """Mongo Document Store + Parameters: + `store_config`: MongoStoreConfig + Mongo specific configuration, including connection configuration, database name, or client class type. + """ + + partition_type = MongoStorePartition + + +@deprecated( + version="0.9.1", reason="Use syft.store.db.postgres.PostgresDBConfig instead." +) +@serializable() +class MongoStoreConfig(StoreConfig): + __canonical_name__ = "MongoStoreConfig" + + """Mongo Store configuration + Parameters: + `client_config`: MongoStoreClientConfig + Mongo connection details: hostname, port, user, password etc. + `store_type`: Type[DocumentStore] + The type of the DocumentStore. Default: MongoDocumentStore + `db_name`: str + Database name + locking_config: LockingConfig + The config used for store locking. Available options: + * NoLockingConfig: no locking, ideal for single-thread stores. + * ThreadingLockingConfig: threading-based locking, ideal for same-process in-memory stores. + Defaults to NoLockingConfig. + """ + + client_config: MongoStoreClientConfig + store_type: type[DocumentStore] = MongoDocumentStore + db_name: str = "app" + backing_store: type[KeyValueBackingStore] = MongoBackingStore + # TODO: should use a distributed lock, with RedisLockingConfig + locking_config: LockingConfig = Field(default_factory=NoLockingConfig) diff --git a/packages/syft/src/syft/store/sqlite_document_store.py b/packages/syft/src/syft/store/sqlite_document_store.py new file mode 100644 index 00000000000..7e8b980ca14 --- /dev/null +++ b/packages/syft/src/syft/store/sqlite_document_store.py @@ -0,0 +1,77 @@ +# third party +from pydantic import Field + +# relative +from ..serde.serializable import serializable +from .document_store import DocumentStore +from .document_store import StoreClientConfig +from .document_store import StoreConfig +from .kv_document_store import KeyValueBackingStore +from .kv_document_store import KeyValueStorePartition +from .locks import LockingConfig +from .locks import NoLockingConfig + + +@serializable( + attrs=["index_name", "settings", "store_config"], + canonical_name="SQLiteBackingStore", + version=1, +) +class SQLiteBackingStore(KeyValueBackingStore): + """Core Store logic for the SQLite stores.""" + + pass + + +@serializable(canonical_name="SQLiteStorePartition", version=1) +class SQLiteStorePartition(KeyValueStorePartition): + """SQLite StorePartition + Parameters: + `settings`: PartitionSettings + PySyft specific settings, used for indexing and partitioning + `store_config`: SQLiteStoreConfig + SQLite specific configuration + """ + + +# the base document store is already a dict but we can change it later +@serializable(canonical_name="SQLiteDocumentStore", version=1) +class SQLiteDocumentStore(DocumentStore): + """SQLite Document Store + Parameters: + `store_config`: StoreConfig + SQLite specific configuration, including connection details and client class type. + """ + + partition_type = SQLiteStorePartition + + +@serializable(canonical_name="SQLiteStoreClientConfig", version=1) +class SQLiteStoreClientConfig(StoreClientConfig): + """SQLite connection config""" + + pass + + +@serializable() +class SQLiteStoreConfig(StoreConfig): + __canonical_name__ = "SQLiteStoreConfig" + """SQLite Store config, used by SQLiteStorePartition + Parameters: + `client_config`: SQLiteStoreClientConfig + SQLite connection configuration + `store_type`: DocumentStore + Class interacting with QueueStash. Default: SQLiteDocumentStore + `backing_store`: KeyValueBackingStore + The Store core logic. Default: SQLiteBackingStore + locking_config: LockingConfig + The config used for store locking. Available options: + * NoLockingConfig: no locking, ideal for single-thread stores. + * ThreadingLockingConfig: threading-based locking, ideal for same-process in-memory stores. + Defaults to NoLockingConfig. + """ + + client_config: SQLiteStoreClientConfig + store_type: type[DocumentStore] = SQLiteDocumentStore + backing_store: type[KeyValueBackingStore] = SQLiteBackingStore + locking_config: LockingConfig = Field(default_factory=NoLockingConfig) From 1ef11487b8365a76fee085c4bdf60eed1052ab9b Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 19 Sep 2024 12:59:37 +0530 Subject: [PATCH 11/19] use syft 0.9.1 in tox migration prepare task for k8s --- tox.ini | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tox.ini b/tox.ini index 6a54b437098..3f2835e413e 100644 --- a/tox.ini +++ b/tox.ini @@ -1436,7 +1436,7 @@ passenv=HOME, USER, EXTERNAL_REGISTRY_USERNAME, EXTERNAL_REGISTRY_PASSWORD deps = requests nbmake - syft==0.9.0 + syft==0.9.1 allowlist_externals = bash tox @@ -1449,7 +1449,7 @@ setenv = CLUSTER_NAME = syft-migration-source CLUSTER_HTTP_PORT = {env:SERVER_PORT:8080} MIGRATION_DATA_DIR = {env:MIGRATION_DATA_DIR:{temp_dir}/migration} - LATEST_SYFT_VERSION = 0.9.0 + LATEST_SYFT_VERSION = 0.9.1 commands = bash -c "env; date; k3d version" bash -c "k3d cluster delete ${CLUSTER_NAME} || true" From 1e5736262eae52610e07f35ba3a27263f0747531 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 19 Sep 2024 13:23:21 +0530 Subject: [PATCH 12/19] fix lint Co-authored-by: khoaguin --- packages/syft/src/syft/store/mongo_client.py | 3 ++- packages/syft/src/syft/store/mongo_document_store.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/syft/src/syft/store/mongo_client.py b/packages/syft/src/syft/store/mongo_client.py index d605ea86e15..c8af9b97c75 100644 --- a/packages/syft/src/syft/store/mongo_client.py +++ b/packages/syft/src/syft/store/mongo_client.py @@ -1,7 +1,8 @@ # relative from ..serde.serializable import serializable +from .document_store import StoreClientConfig @serializable(canonical_name="MongoStoreClientConfig", version=1) -class MongoStoreClientConfig: +class MongoStoreClientConfig(StoreClientConfig): pass diff --git a/packages/syft/src/syft/store/mongo_document_store.py b/packages/syft/src/syft/store/mongo_document_store.py index 80f9ba3a341..18eb99d89b4 100644 --- a/packages/syft/src/syft/store/mongo_document_store.py +++ b/packages/syft/src/syft/store/mongo_document_store.py @@ -14,7 +14,7 @@ from .mongo_client import MongoStoreClientConfig -class MongoBsonObject: +class MongoBsonObject(StorableObjectType): pass From cb9a758c90f92bf5a5567986541f241ae91c4074 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 19 Sep 2024 13:34:18 +0530 Subject: [PATCH 13/19] fix lint --- packages/syft/src/syft/store/mongo_document_store.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packages/syft/src/syft/store/mongo_document_store.py b/packages/syft/src/syft/store/mongo_document_store.py index 18eb99d89b4..899ca102f90 100644 --- a/packages/syft/src/syft/store/mongo_document_store.py +++ b/packages/syft/src/syft/store/mongo_document_store.py @@ -1,5 +1,4 @@ # third party -from deprecated import deprecated from pydantic import Field # relative @@ -51,9 +50,6 @@ class MongoDocumentStore(DocumentStore): partition_type = MongoStorePartition -@deprecated( - version="0.9.1", reason="Use syft.store.db.postgres.PostgresDBConfig instead." -) @serializable() class MongoStoreConfig(StoreConfig): __canonical_name__ = "MongoStoreConfig" From a2084bed505e81d36c3556c873b19f98b8e446c9 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 19 Sep 2024 15:22:09 +0530 Subject: [PATCH 14/19] fix serde decorator for MongoDocumentStore and MongoBackingStore add mongo config to base.yaml Co-authored-by: khoaguin --- packages/grid/helm/examples/dev/base.yaml | 8 ++++++++ packages/syft/src/syft/store/mongo_document_store.py | 8 ++++---- tox.ini | 2 +- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/packages/grid/helm/examples/dev/base.yaml b/packages/grid/helm/examples/dev/base.yaml index 3fc1ad5c4da..0df96d4e826 100644 --- a/packages/grid/helm/examples/dev/base.yaml +++ b/packages/grid/helm/examples/dev/base.yaml @@ -28,6 +28,14 @@ postgres: secret: rootPassword: example +# Deprecated, support will be removed in 0.9.3 +mongo: + resourcesPreset: null + resources: null + + secret: + rootPassword: example + seaweedfs: resourcesPreset: null resources: null diff --git a/packages/syft/src/syft/store/mongo_document_store.py b/packages/syft/src/syft/store/mongo_document_store.py index 899ca102f90..e1c68316894 100644 --- a/packages/syft/src/syft/store/mongo_document_store.py +++ b/packages/syft/src/syft/store/mongo_document_store.py @@ -17,15 +17,15 @@ class MongoBsonObject(StorableObjectType): pass -class MongoBackingStore(KeyValueBackingStore): - pass - - @serializable( attrs=["index_name", "settings", "store_config"], canonical_name="MongoBackingStore", version=1, ) +class MongoBackingStore(KeyValueBackingStore): + pass + + @serializable(attrs=["storage_type"], canonical_name="MongoStorePartition", version=1) class MongoStorePartition(StorePartition): """Mongo StorePartition diff --git a/tox.ini b/tox.ini index 3f2835e413e..0681ec32292 100644 --- a/tox.ini +++ b/tox.ini @@ -1465,7 +1465,7 @@ commands = ' ; wait for everything else to be loaded - tox -e dev.k8s.ready -- frontend backend postgres proxy seaweedfs registry + tox -e dev.k8s.ready -- frontend backend mongo proxy seaweedfs registry bash -c 'python -c "import syft as sy; print(\"Migrating from syft version:\", sy.__version__)"' From 5f049f0f1ef5f6d2518534c6437846e85912bd2f Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Thu, 19 Sep 2024 16:11:11 +0530 Subject: [PATCH 15/19] add migrations for SyftWorkerImage - define a new version for SyftWorker --- .../src/syft/protocol/protocol_version.json | 7 +++ .../src/syft/service/worker/worker_image.py | 9 ++++ .../src/syft/service/worker/worker_pool.py | 53 ++++++++++++++++++- 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index bf85d20a1a9..b47aba7a604 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -66,6 +66,13 @@ "hash": "2e1365c5535fa51c22eef79f67dd6444789bc829c27881367e3050e06e2ffbfe", "action": "remove" } + }, + "SyftWorker": { + "2": { + "version": 2, + "hash": "e996dabbb8ad4ff0bc5d19528077c11f73b9300d810735d367916e4e5b9149b6", + "action": "add" + } } } } diff --git a/packages/syft/src/syft/service/worker/worker_image.py b/packages/syft/src/syft/service/worker/worker_image.py index 99ca3ad6040..17a42ead8bc 100644 --- a/packages/syft/src/syft/service/worker/worker_image.py +++ b/packages/syft/src/syft/service/worker/worker_image.py @@ -1,11 +1,15 @@ # stdlib +# stdlib +from collections.abc import Callable + # relative from ...custom_worker.config import PrebuiltWorkerConfig from ...custom_worker.config import WorkerConfig from ...serde.serializable import serializable from ...server.credentials import SyftVerifyKey from ...types.datetime import DateTime +from ...types.syft_migration import migrate from ...types.syft_object import SYFT_OBJECT_VERSION_1 from ...types.syft_object import SYFT_OBJECT_VERSION_2 from ...types.syft_object import SyftObject @@ -88,3 +92,8 @@ def built_image_tag(self) -> str | None: if self.is_built and self.image_identifier: return self.image_identifier.full_name_with_tag return None + + +@migrate(SyftWorkerImageV1, SyftWorkerImage) +def migrate_syft_worker_image_v1_to_v2() -> list[Callable]: + return [] # no migrations needed at data level, only unique and searchable attributes changed diff --git a/packages/syft/src/syft/service/worker/worker_pool.py b/packages/syft/src/syft/service/worker/worker_pool.py index 4c006f7ac08..fffa0deba70 100644 --- a/packages/syft/src/syft/service/worker/worker_pool.py +++ b/packages/syft/src/syft/service/worker/worker_pool.py @@ -1,4 +1,5 @@ # stdlib +from collections.abc import Callable from enum import Enum from typing import Any from typing import cast @@ -14,12 +15,16 @@ from ...types.datetime import DateTime from ...types.errors import SyftException from ...types.result import as_result +from ...types.syft_migration import migrate from ...types.syft_object import SYFT_OBJECT_VERSION_1 +from ...types.syft_object import SYFT_OBJECT_VERSION_2 from ...types.syft_object import SyftObject from ...types.syft_object import short_uid +from ...types.transforms import TransformContext from ...types.uid import UID from ..response import SyftError from .worker_image import SyftWorkerImage +from .worker_image import SyftWorkerImageV1 @serializable(canonical_name="WorkerStatus", version=1) @@ -44,7 +49,7 @@ class WorkerHealth(Enum): @serializable() -class SyftWorker(SyftObject): +class SyftWorkerV1(SyftObject): __canonical_name__ = "SyftWorker" __version__ = SYFT_OBJECT_VERSION_1 @@ -60,6 +65,36 @@ class SyftWorker(SyftObject): "created_at", ] + id: UID + name: str + container_id: str | None = None + created_at: DateTime = DateTime.now() + healthcheck: WorkerHealth | None = None + status: WorkerStatus + image: SyftWorkerImageV1 | None = None + worker_pool_name: str + consumer_state: ConsumerState = ConsumerState.DETACHED + job_id: UID | None = None + to_be_deleted: bool = False + + +@serializable() +class SyftWorker(SyftObject): + __canonical_name__ = "SyftWorker" + __version__ = SYFT_OBJECT_VERSION_2 + + __attr_unique__ = ["name"] + __attr_searchable__ = ["name", "container_id", "to_be_deleted"] + __repr_attrs__ = [ + "name", + "container_id", + "image", + "status", + "healthcheck", + "worker_pool_name", + "created_at", + ] + id: UID name: str container_id: str | None = None @@ -283,3 +318,19 @@ def _get_worker_container_status( container_status, SyftError(message=f"Unknown container status: {container_status}"), ) + + +def migrate_worker_image_v1_to_v2(context: TransformContext) -> TransformContext: + old_image = context["image"] + if isinstance(old_image, SyftWorkerImageV1): + new_image = old_image.migrate_to( + version=SYFT_OBJECT_VERSION_2, + context=context.to_server_context(), + ) + context["image"] = new_image + return context + + +@migrate(SyftWorkerV1, SyftWorker) +def migrate_worker_v1_to_v2() -> list[Callable]: + return [migrate_worker_image_v1_to_v2] From 475ce5464ed53ccbf03c7978a79ad011605f50a4 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Thu, 19 Sep 2024 18:10:22 +0700 Subject: [PATCH 16/19] [CI] add migration test for k8s to CI --- .github/workflows/pr-tests-stack.yml | 143 +++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) diff --git a/.github/workflows/pr-tests-stack.yml b/.github/workflows/pr-tests-stack.yml index 02eff919dea..ff9b66d9d12 100644 --- a/.github/workflows/pr-tests-stack.yml +++ b/.github/workflows/pr-tests-stack.yml @@ -621,6 +621,149 @@ jobs: run: | tox -e migration.test + pr-tests-migrations-k8s: + strategy: + max-parallel: 99 + matrix: + os: [ubuntu-latest] + python-version: ["3.12"] + fail-fast: false + + runs-on: ${{matrix.os}} + + steps: + - name: Permission to home directory + run: | + sudo chown -R $USER:$USER $HOME + - uses: actions/checkout@v4 + - name: Check for file changes + uses: dorny/paths-filter@v3 + id: changes + with: + base: ${{ github.ref }} + token: ${{ github.token }} + filters: .github/file-filters.yml + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + if: steps.changes.outputs.stack == 'true' + with: + python-version: ${{ matrix.python-version }} + + - name: Add K3d Registry + run: | + sudo python ./scripts/patch_hosts.py --add-k3d-registry + + - name: Free Disk Space (Ubuntu) + uses: jlumbroso/free-disk-space@main + with: + tool-cache: true + large-packages: false + + # free 10GB of space + - name: Remove unnecessary files + if: matrix.os == 'ubuntu-latest' + run: | + sudo rm -rf /usr/share/dotnet + sudo rm -rf "$AGENT_TOOLSDIRECTORY" + docker image prune --all --force + docker builder prune --all --force + docker system prune --all --force + + - name: Install pip dependencies + if: steps.changes.outputs.stack == 'true' + run: | + python -m pip install --upgrade pip + pip install uv==0.4.1 tox==4.18.0 tox-uv==1.11.2 + uv --version + + - name: Get uv cache dir + if: steps.changes.outputs.stack == 'true' + id: pip-cache + shell: bash + run: | + echo "dir=$(uv cache dir)" >> $GITHUB_OUTPUT + + - name: Load github cache + uses: actions/cache@v4 + if: steps.changes.outputs.stack == 'true' + with: + path: ${{ steps.pip-cache.outputs.dir }} + key: ${{ runner.os }}-uv-py${{ matrix.python-version }} + restore-keys: | + ${{ runner.os }}-uv-py${{ matrix.python-version }} + + - name: Install kubectl + if: steps.changes.outputs.stack == 'true' + run: | + # cleanup apt version + sudo apt remove kubectl || true + # install kubectl 1.27 + curl -LO https://dl.k8s.io/release/v1.27.2/bin/linux/amd64/kubectl + chmod +x kubectl + sudo install kubectl /usr/local/bin; + + - name: Install helm + if: steps.changes.outputs.stack == 'true' + run: | + # install helm + curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 + chmod 700 get_helm.sh + ./get_helm.sh + + - name: Run Migrations Tests + if: steps.changes.outputs.stack == 'true' + timeout-minutes: 60 + env: + GITHUB_CI: true + shell: bash + run: | + K3D_VERSION=v5.6.3 + DEVSPACE_VERSION=v6.3.12 + # install k3d + wget https://github.com/k3d-io/k3d/releases/download/${K3D_VERSION}/k3d-linux-amd64 + mv k3d-linux-amd64 k3d + chmod +x k3d + export PATH=`pwd`:$PATH + k3d version + curl -sSL https://github.com/loft-sh/devspace/releases/download/${DEVSPACE_VERSION}/devspace-linux-amd64 -o ./devspace + chmod +x devspace + devspace version + tox -e migration.k8s.test + + - name: Get current timestamp + id: date + if: failure() + shell: bash + run: echo "date=$(date +%s)" >> $GITHUB_OUTPUT + + - name: Collect logs from k3d + if: steps.changes.outputs.stack == 'true' && failure() + shell: bash + run: | + mkdir -p ./k8s-logs + kubectl describe all -A --context k3d-test-gateway-1 --namespace syft > ./k8s-logs/test-gateway-1-desc-${{ steps.date.outputs.date }}.txt + kubectl describe all -A --context k3d-test-datasite-1 --namespace syft > ./k8s-logs/test-datasite-1-desc-${{ steps.date.outputs.date }}.txt + kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-test-gateway-1 --namespace syft > ./k8s-logs/test-gateway-1-logs-${{ steps.date.outputs.date }}.txt + kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-test-datasite-1 --namespace syft > ./k8s-logs/test-datasite-1-logs-${{ steps.date.outputs.date }}.txt + ls -la ./k8s-logs + + - name: Upload logs to GitHub + uses: actions/upload-artifact@master + if: steps.changes.outputs.stack == 'true' && failure() + with: + name: k8s-logs-notebook-${{ matrix.os }}-${{ steps.date.outputs.date }} + path: ./k8s-logs/ + + - name: Cleanup k3d + if: steps.changes.outputs.stack == 'true' && failure() + shell: bash + run: | + export PATH=`pwd`:$PATH + k3d cluster delete test-gateway-1 || true + k3d cluster delete test-datasite-1 || true + k3d registry delete k3d-registry.localhost || true + pr-tests-notebook-scenario-k8s-sync: strategy: max-parallel: 99 From a3ea25ffb8de94c0b26ce7284cf8639dc9e03149 Mon Sep 17 00:00:00 2001 From: khoaguin Date: Thu, 19 Sep 2024 18:20:01 +0700 Subject: [PATCH 17/19] [CI] change cluster name when collecting logs for migration k8s test --- .github/workflows/pr-tests-stack.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/pr-tests-stack.yml b/.github/workflows/pr-tests-stack.yml index ff9b66d9d12..09dbdfda26b 100644 --- a/.github/workflows/pr-tests-stack.yml +++ b/.github/workflows/pr-tests-stack.yml @@ -742,9 +742,9 @@ jobs: shell: bash run: | mkdir -p ./k8s-logs - kubectl describe all -A --context k3d-test-gateway-1 --namespace syft > ./k8s-logs/test-gateway-1-desc-${{ steps.date.outputs.date }}.txt + kubectl describe all -A --context k3d-syft-migration-source --namespace syft > ./k8s-logs/syft-migration-source-desc-${{ steps.date.outputs.date }}.txt kubectl describe all -A --context k3d-test-datasite-1 --namespace syft > ./k8s-logs/test-datasite-1-desc-${{ steps.date.outputs.date }}.txt - kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-test-gateway-1 --namespace syft > ./k8s-logs/test-gateway-1-logs-${{ steps.date.outputs.date }}.txt + kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-syft-migration-source --namespace syft > ./k8s-logs/syft-migration-source-logs-${{ steps.date.outputs.date }}.txt kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-test-datasite-1 --namespace syft > ./k8s-logs/test-datasite-1-logs-${{ steps.date.outputs.date }}.txt ls -la ./k8s-logs @@ -760,7 +760,7 @@ jobs: shell: bash run: | export PATH=`pwd`:$PATH - k3d cluster delete test-gateway-1 || true + k3d cluster delete syft-migration-source || true k3d cluster delete test-datasite-1 || true k3d registry delete k3d-registry.localhost || true From f749543a5eeb774acc7f10991702d9bfbc6cf3e4 Mon Sep 17 00:00:00 2001 From: dk Date: Fri, 20 Sep 2024 10:01:25 +0700 Subject: [PATCH 18/19] [CI] stop collecting logs from `k3d-syft-migration-source` cluster since it was deleted [tox] list migration k8s tests in envlist --- .github/workflows/pr-tests-stack.yml | 4 ++-- tox.ini | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pr-tests-stack.yml b/.github/workflows/pr-tests-stack.yml index 09dbdfda26b..1161909779f 100644 --- a/.github/workflows/pr-tests-stack.yml +++ b/.github/workflows/pr-tests-stack.yml @@ -742,9 +742,9 @@ jobs: shell: bash run: | mkdir -p ./k8s-logs - kubectl describe all -A --context k3d-syft-migration-source --namespace syft > ./k8s-logs/syft-migration-source-desc-${{ steps.date.outputs.date }}.txt + # kubectl describe all -A --context k3d-syft-migration-source --namespace syft > ./k8s-logs/syft-migration-source-desc-${{ steps.date.outputs.date }}.txt + # kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-syft-migration-source --namespace syft > ./k8s-logs/syft-migration-source-logs-${{ steps.date.outputs.date }}.txt kubectl describe all -A --context k3d-test-datasite-1 --namespace syft > ./k8s-logs/test-datasite-1-desc-${{ steps.date.outputs.date }}.txt - kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-syft-migration-source --namespace syft > ./k8s-logs/syft-migration-source-logs-${{ steps.date.outputs.date }}.txt kubectl logs -l app.kubernetes.io/name!=random --prefix=true --context k3d-test-datasite-1 --namespace syft > ./k8s-logs/test-datasite-1-logs-${{ steps.date.outputs.date }}.txt ls -la ./k8s-logs diff --git a/tox.ini b/tox.ini index 0681ec32292..7a0bbd89d12 100644 --- a/tox.ini +++ b/tox.ini @@ -46,6 +46,8 @@ envlist = e2e.test.notebook migration.prepare migration.test + migration.k8s.prepare + migration.k8s.test syft.api.snapshot skipsdist = True From 522715223cbe14811d7fe41e86968c852780e38f Mon Sep 17 00:00:00 2001 From: khoaguin Date: Fri, 20 Sep 2024 13:11:37 +0700 Subject: [PATCH 19/19] [CI] skipping pr-tests-migrations-k8s for now --- .github/workflows/pr-tests-stack.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/pr-tests-stack.yml b/.github/workflows/pr-tests-stack.yml index 1161909779f..21ca78a02e2 100644 --- a/.github/workflows/pr-tests-stack.yml +++ b/.github/workflows/pr-tests-stack.yml @@ -622,6 +622,7 @@ jobs: tox -e migration.test pr-tests-migrations-k8s: + if: false # skipping this job for now strategy: max-parallel: 99 matrix: