Skip to content

Commit

Permalink
Merge pull request #407 from atlanhq/DVX-709
Browse files Browse the repository at this point in the history
 DVX-709: Implement caching for `Connection` and `SourceTag`
  • Loading branch information
Aryamanz29 authored Oct 21, 2024
2 parents d6a6bbd + a5a00b8 commit 0325f2f
Show file tree
Hide file tree
Showing 7 changed files with 1,135 additions and 5 deletions.
176 changes: 176 additions & 0 deletions pyatlan/cache/abstract_asset_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2024 Atlan Pte. Ltd.
from __future__ import annotations

import threading
from abc import ABC, abstractmethod
from typing import Any

from pyatlan.errors import ErrorCode
from pyatlan.model.assets import Asset
from pyatlan.model.enums import AtlanConnectorType


class AbstractAssetCache(ABC):
"""
Base class for reusable components that are common
to all caches, where a cache is populated entry-by-entry.
"""

def __init__(self, client):
self.client = client
self.lock = threading.Lock()
self.name_to_guid = dict()
self.guid_to_asset = dict()
self.qualified_name_to_guid = dict()

@classmethod
@abstractmethod
def get_cache(cls):
"""Abstract method to retreive cache."""

@abstractmethod
def lookup_by_guid(self, guid: str):
"""Abstract method to lookup asset by guid."""

@abstractmethod
def lookup_by_qualified_name(self, qualified_name: str):
"""Abstract method to lookup asset by qualified name."""

@abstractmethod
def lookup_by_name(self, name: Any):
"""Abstract method to lookup asset by name."""

@abstractmethod
def get_name(self, asset: Asset):
"""Abstract method to get name from asset."""

def is_guid_known(self, guid: str) -> bool:
"""
Checks whether the provided Atlan-internal UUID is known.
NOTE: will not refresh the cache itself to determine this.
:param guid: Atlan-internal UUID of the object
:returns: `True` if the object is known, `False` otherwise
"""
return guid in self.guid_to_asset

def is_qualified_name_known(self, qualified_name: str):
"""
Checks whether the provided Atlan-internal ID string is known.
NOTE: will not refresh the cache itself to determine this.
:param qualified_name: Atlan-internal ID string of the object
:returns: `True` if the object is known, `False` otherwise
"""
return qualified_name in self.qualified_name_to_guid

def is_name_known(self, name: str):
"""
Checks whether the provided Atlan-internal ID string is known.
NOTE: will not refresh the cache itself to determine this.
:param name: human-constructable name of the object
:returns: `True` if the object is known, `False` otherwise
"""
return name in self.name_to_guid

def cache(self, asset: Asset):
"""
Add an entry to the cache.
:param asset: to be cached
"""
name = asset and self.get_name(asset)
if not all([name, asset.guid, asset.qualified_name]):
return
self.name_to_guid[name] = asset.guid
self.guid_to_asset[asset.guid] = asset
self.qualified_name_to_guid[asset.qualified_name] = asset.guid

def _get_by_guid(self, guid: str, allow_refresh: bool = True):
"""
Retrieve an asset from the cache by its UUID.
If the asset is not found, it will be looked up and added to the cache.
:param guid: UUID of the asset in Atlan
:returns: the asset (if found)
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
:raises NotFoundError: if the asset cannot be found (does not exist) in Atlan
:raises InvalidRequestError: if no UUID was provided for the asset to retrieve
"""
if not guid:
raise ErrorCode.MISSING_ID.exception_with_parameters()
asset = self.guid_to_asset.get(guid)
if not asset and allow_refresh:
self.lookup_by_guid(guid)
asset = self.guid_to_asset.get(guid)
if not asset:
raise ErrorCode.ASSET_NOT_FOUND_BY_GUID.exception_with_parameters(guid)
return asset

def _get_by_qualified_name(self, qualified_name: str, allow_refresh: bool = True):
"""
Retrieve an asset from the cache by its unique Atlan-internal name.
:param qualified_name: unique Atlan-internal name of the asset
:param allow_refresh: whether to allow a refresh of the cache (`True`) or not (`False`)
:returns: the asset (if found)
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
:raises NotFoundError: if the object cannot be found (does not exist) in Atlan
:raises InvalidRequestError: if no qualified_name was provided for the object to retrieve
"""
if not qualified_name:
raise ErrorCode.MISSING_ID.exception_with_parameters()
guid = self.qualified_name_to_guid.get(qualified_name)
if not guid and allow_refresh:
self.lookup_by_qualified_name(qualified_name)
guid = self.qualified_name_to_guid.get(qualified_name)
if not guid:
raise ErrorCode.ASSET_NOT_FOUND_BY_QN.exception_with_parameters(
qualified_name,
AtlanConnectorType._get_connector_type_from_qualified_name(
qualified_name
),
)
return self._get_by_guid(guid=guid, allow_refresh=False)

def _get_by_name(self, name: AbstractAssetName, allow_refresh: bool = True):
"""
Retrieve an asset from the cache by its uniquely identifiable name.
:param name: uniquely identifiable name of the asset in Atlan
:param allow_refresh: whether to allow a refresh of the cache (`True`) or not (`False`)
:returns: the asset (if found)
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
:raises NotFoundError: if the object cannot be found (does not exist) in Atlan
:raises InvalidRequestError: if no name was provided for the object to retrieve
"""
if not isinstance(name, AbstractAssetName):
raise ErrorCode.MISSING_NAME.exception_with_parameters()
guid = self.name_to_guid.get(str(name))
if not guid and allow_refresh:
self.lookup_by_name(name)
guid = self.name_to_guid.get(str(name))
if not guid:
raise ErrorCode.ASSET_NOT_FOUND_BY_NAME.exception_with_parameters(
name._TYPE_NAME, name
)
return self._get_by_guid(guid=guid, allow_refresh=False)


class AbstractAssetName(ABC):
"""
Base class for reusable components common to all asset names
used by the cache's find methods, such as AssetCache.get_by_name().
"""

_TYPE_NAME = str()

@abstractmethod
def __init__(self):
pass

@abstractmethod
def __str__(self):
pass
204 changes: 204 additions & 0 deletions pyatlan/cache/connection_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2024 Atlan Pte. Ltd.
from __future__ import annotations

import logging
import threading
from typing import Dict, Optional, Union

from pyatlan.cache.abstract_asset_cache import AbstractAssetCache, AbstractAssetName
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Asset, Connection
from pyatlan.model.enums import AtlanConnectorType
from pyatlan.model.fluent_search import FluentSearch
from pyatlan.model.search import Term

LOGGER = logging.getLogger(__name__)

lock = threading.Lock()


class ConnectionCache(AbstractAssetCache):
"""
Lazily-loaded cache for translating between
a connection's simplified name its details.
- guid = UUID of the connection
for eg: 9c677e77-e01d-40e0-85b7-8ba4cd7d0ea9
- qualified_name = Atlan-internal name of the connection (with epoch)
for eg: default/snowflake/1234567890
- name = simple name of the form {{connectorType}}/{{connectorName}},
for eg: snowflake/development
"""

_SEARCH_FIELDS = [
Connection.NAME,
Connection.STATUS,
Connection.CONNECTOR_NAME,
]
SEARCH_ATTRIBUTES = [field.atlan_field_name for field in _SEARCH_FIELDS]
caches: Dict[int, ConnectionCache] = dict()

def __init__(self, client: AtlanClient):
super().__init__(client)

@classmethod
def get_cache(cls) -> ConnectionCache:
from pyatlan.client.atlan import AtlanClient

with lock:
default_client = AtlanClient.get_default_client()
cache_key = default_client.cache_key
if cache_key not in cls.caches:
cls.caches[cache_key] = ConnectionCache(client=default_client)
return cls.caches[cache_key]

@classmethod
def get_by_guid(cls, guid: str, allow_refresh: bool = True) -> Connection:
"""
Retrieve a connection from the cache by its UUID.
If the asset is not found, it will be looked up and added to the cache.
:param guid: UUID of the connection in Atlan
for eg: 9c677e77-e01d-40e0-85b7-8ba4cd7d0ea9
:returns: connection (if found)
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
:raises NotFoundError: if the connection cannot be found (does not exist) in Atlan
:raises InvalidRequestError: if no UUID was provided for the connection to retrieve
"""
return cls.get_cache()._get_by_guid(guid=guid, allow_refresh=allow_refresh)

@classmethod
def get_by_qualified_name(
cls, qualified_name: str, allow_refresh: bool = True
) -> Connection:
"""
Retrieve a connection from the cache by its unique Atlan-internal name.
:param qualified_name: unique Atlan-internal name of the connection
for eg: default/snowflake/1234567890
:param allow_refresh: whether to allow a refresh of the cache (`True`) or not (`False`)
:param qualified_name: unique Atlan-internal name of the connection
:returns: connection (if found)
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
:raises NotFoundError: if the connection cannot be found (does not exist) in Atlan
:raises InvalidRequestError: if no qualified_name was provided for the connection to retrieve
"""
return cls.get_cache()._get_by_qualified_name(
qualified_name=qualified_name, allow_refresh=allow_refresh
)

@classmethod
def get_by_name(
cls, name: ConnectionName, allow_refresh: bool = True
) -> Connection:
"""
Retrieve an connection from the cache by its uniquely identifiable name.
:param name: uniquely identifiable name of the connection in Atlan
In the form of {{connectorType}}/{{connectorName}}
for eg: snowflake/development
:param allow_refresh: whether to allow a refresh of the cache (`True`) or not (`False`)
:returns: connection (if found)
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
:raises NotFoundError: if the connection cannot be found (does not exist) in Atlan
:raises InvalidRequestError: if no name was provided for the connection to retrieve
"""
return cls.get_cache()._get_by_name(name=name, allow_refresh=allow_refresh)

def lookup_by_guid(self, guid: str) -> None:
if not guid:
return
with self.lock:
response = (
FluentSearch(_includes_on_results=self.SEARCH_ATTRIBUTES)
.where(Term.with_state("ACTIVE"))
.where(Term.with_super_type_names("Asset"))
.where(Connection.GUID.eq(guid))
.execute(self.client)
)
candidate = (response.current_page() and response.current_page()[0]) or None
if candidate and isinstance(candidate, Connection):
self.cache(candidate)

def lookup_by_qualified_name(self, connection_qn: str) -> None:
if not connection_qn:
return
with self.lock:
response = (
FluentSearch(_includes_on_results=self.SEARCH_ATTRIBUTES)
.where(Term.with_state("ACTIVE"))
.where(Term.with_super_type_names("Asset"))
.where(Connection.QUALIFIED_NAME.eq(connection_qn))
.execute(self.client)
)
candidate = (response.current_page() and response.current_page()[0]) or None
if candidate and isinstance(candidate, Connection):
self.cache(candidate)

def lookup_by_name(self, name: ConnectionName) -> None:
if not isinstance(name, ConnectionName):
return
results = self.client.asset.find_connections_by_name(
name=name.name,
connector_type=name.type,
attributes=self.SEARCH_ATTRIBUTES,
)
if not results:
return
if len(results) > 1:
LOGGER.warning(
(
"Found multiple connections of the same type "
"with the same name, caching only the first: %s"
),
name,
)
self.cache(results[0])

def get_name(self, asset: Asset):
if not isinstance(asset, Connection):
return
return str(ConnectionName(asset))


class ConnectionName(AbstractAssetName):
"""
Unique identity for a connection,
in the form: {{type}}/{{name}}
- For eg: snowflake/development
"""

_TYPE_NAME = "Connection"

def __init__(
self,
connection: Union[
str,
Optional[Connection],
] = None,
):
self.name = None
self.type = None

if isinstance(connection, Connection):
self.name = connection.name
self.type = connection.connector_name

elif isinstance(connection, str):
tokens = connection.split("/")
if len(tokens) > 1:
self.type = AtlanConnectorType(tokens[0]) # type: ignore[call-arg]
self.name = connection[len(tokens[0]) + 1 :] # noqa

def __hash__(self):
return hash((self.name, self.type))

def __str__(self):
return f"{self.type}/{self.name}"

def __eq__(self, other):
if isinstance(other, ConnectionName):
return self.name == other.name and self.type == other.type
return False
Loading

0 comments on commit 0325f2f

Please sign in to comment.