diff --git a/pyatlan/model/core.py b/pyatlan/model/core.py index 8160cec8..59582d1f 100644 --- a/pyatlan/model/core.py +++ b/pyatlan/model/core.py @@ -1,5 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 # Copyright 2022 Atlan Pte. Ltd. +from __future__ import annotations + import json from abc import ABC from typing import TYPE_CHECKING @@ -251,8 +253,44 @@ def __init__(self, *args, **kwargs): self._source_tag_attachements = [ SourceTagAttachment(**source_tag["attributes"]) for source_tag in self.attributes[attr_id] + if isinstance(source_tag, dict) and source_tag.get("attributes") ] + @classmethod + def of( + cls, + atlan_tag_name: AtlanTagName, + entity_guid: Optional[str] = None, + source_tag_attachment: Optional[SourceTagAttachment] = None, + ) -> AtlanTag: + from pyatlan.cache.atlan_tag_cache import AtlanTagCache + + """ + Construct an Atlan tag assignment for a specific entity. + + :param atlan_tag_name: human-readable name of the Atlan tag + :param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned + :param source_tag_attachment: (optional) source-specific details for the tag + :return: an Atlan tag assignment with default settings for propagation and a specific entity assignment + """ + tag = AtlanTag( + type_name=atlan_tag_name, + propagate=True, + remove_propagations_on_entity_delete=True, + restrict_propagation_through_lineage=False, + restrict_propagation_through_hierarchy=False, + ) + if entity_guid: + tag.entity_guid = entity_guid + tag.entity_status = EntityStatus.ACTIVE + if source_tag_attachment: + source_tag_attr_id = ( + AtlanTagCache.get_source_tags_attr_id(atlan_tag_name.id) or "" + ) + tag.attributes = {source_tag_attr_id: [source_tag_attachment]} + tag._source_tag_attachements.append(source_tag_attachment) + return tag + class AtlanTags(AtlanObject): __root__: List[AtlanTag] = Field( diff --git a/pyatlan/model/structs.py b/pyatlan/model/structs.py index 702c9f6a..009ec36a 100644 --- a/pyatlan/model/structs.py +++ b/pyatlan/model/structs.py @@ -4,17 +4,21 @@ from __future__ import annotations from datetime import datetime -from typing import Any, Dict, List, Optional, Set, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Union from pydantic.v1 import BaseModel, Extra, Field, root_validator from pyatlan.model.enums import ( + AtlanConnectorType, BadgeComparisonOperator, BadgeConditionColor, SourceCostUnitType, ) from pyatlan.model.utils import to_camel_case -from pyatlan.utils import validate_required_fields +from pyatlan.utils import select_optional_set_fields, validate_required_fields + +if TYPE_CHECKING: + from pyatlan.cache.source_tag_cache import SourceTagName class AtlanObject(BaseModel): @@ -161,6 +165,133 @@ class SourceTagAttachment(AtlanObject): source_tag_sync_timestamp: Optional[datetime] = Field(default=None, description="") source_tag_sync_error: Optional[str] = Field(default=None, description="") + @classmethod + def by_name( + cls, + name: SourceTagName, + source_tag_values: List[SourceTagAttachmentValue], + source_tag_sync_timestamp: Optional[datetime] = None, + is_source_tag_synced: Optional[bool] = None, + source_tag_sync_error: Optional[str] = None, + ): + from pyatlan.cache.source_tag_cache import SourceTagCache + + """ + Create a source-synced tag attachment with + a particular value when the attachment is synced to the source. + + :param client: connectivity to an Atlan tenant + :param name: unique name of the source tag in Atlan + :param source_tag_values: value of the tag attachment from the source + :param is_source_tag_synced: whether the tag attachment has been synced at the source (True) or not (False) + :param source_tag_sync_timestamp: time (epoch) when the tag attachment was synced at the source, in milliseconds + :param source_tag_sync_error: error message if the tag attachment sync at the source failed + :returns: a SourceTagAttachment with the provided information + :raises AtlanError: on any error communicating via the underlying APIs + :raises NotFoundError: if the source-synced tag cannot be resolved + """ + tag = SourceTagCache.get_by_name(name) + tag_connector_name = AtlanConnectorType._get_connector_type_from_qualified_name( + tag.qualified_name or "" + ) + return cls.of( + source_tag_name=tag.name, + source_tag_qualified_name=tag.qualified_name, + source_tag_guid=tag.guid, + source_tag_connector_name=tag_connector_name, + source_tag_values=source_tag_values, + **select_optional_set_fields( + dict( + is_source_tag_synced=is_source_tag_synced, + source_tag_sync_timestamp=source_tag_sync_timestamp, + source_tag_sync_error=source_tag_sync_error, + ) + ), + ) + + @classmethod + def by_qualified_name( + cls, + source_tag_qualified_name: str, + source_tag_values: List[SourceTagAttachmentValue], + source_tag_sync_timestamp: Optional[datetime] = None, + is_source_tag_synced: Optional[bool] = None, + source_tag_sync_error: Optional[str] = None, + ): + from pyatlan.cache.source_tag_cache import SourceTagCache + + """ + Create a source-synced tag attachment with a particular value when the attachment is synced to the source. + + :param client: connectivity to an Atlan tenant + :param source_tag_qualified_name: unique name of the source tag in Atlan + :param source_tag_values: value of the tag attachment from the source + :param is_source_tag_synced: whether the tag attachment has been synced at the source (True) or not (False) + :param source_tag_sync_timestamp: time (epoch) when the tag attachment was synced at the source, in milliseconds + :param source_tag_sync_error: error message if the tag attachment sync at the source failed + :returns: a SourceTagAttachment with the provided information + :raises AtlanError: on any error communicating via the underlying APIs + :raises NotFoundError: if the source-synced tag cannot be resolved + """ + tag = SourceTagCache.get_by_qualified_name(source_tag_qualified_name) + tag_connector_name = AtlanConnectorType._get_connector_type_from_qualified_name( + source_tag_qualified_name or "" + ) + return cls.of( + source_tag_name=tag.name, + source_tag_qualified_name=source_tag_qualified_name, + source_tag_guid=tag.guid, + source_tag_connector_name=tag_connector_name, + source_tag_values=source_tag_values, + **select_optional_set_fields( + dict( + is_source_tag_synced=is_source_tag_synced, + source_tag_sync_timestamp=source_tag_sync_timestamp, + source_tag_sync_error=source_tag_sync_error, + ) + ), + ) + + @classmethod + def of( + cls, + source_tag_name: Optional[str] = None, + source_tag_qualified_name: Optional[str] = None, + source_tag_guid: Optional[str] = None, + source_tag_connector_name: Optional[str] = None, + source_tag_values: Optional[List[SourceTagAttachmentValue]] = None, + is_source_tag_synced: Optional[bool] = None, + source_tag_sync_timestamp: Optional[datetime] = None, + source_tag_sync_error: Optional[str] = None, + ): + """ + Quickly create a new SourceTagAttachment. + + :param source_tag_name: simple name of the source tag + :param source_tag_qualified_name: unique name of the source tag in Atlan + :param source_tag_guid: unique identifier (GUID) of the source tag in Atlan + :param source_tag_connector_name: connector that is the source of the tag + :param source_tag_values: value of the tag attachment from the source + :param is_source_tag_synced: whether the tag attachment has been synced at the source (True) or not (False) + :param source_tag_sync_timestamp: time (epoch) when the tag attachment was synced at the source, in milliseconds + :param source_tag_sync_error: error message if the tag attachment sync at the source failed + :returns: a SourceTagAttachment with the provided information + """ + return SourceTagAttachment( + **select_optional_set_fields( + dict( + source_tag_name=source_tag_name, + source_tag_qualified_name=source_tag_qualified_name, + source_tag_guid=source_tag_guid, + source_tag_connector_name=source_tag_connector_name, + source_tag_value=source_tag_values, + is_source_tag_synced=is_source_tag_synced, + source_tag_sync_timestamp=source_tag_sync_timestamp, + source_tag_sync_error=source_tag_sync_error, + ) + ), + ) + class StarredDetails(AtlanObject): """Description""" diff --git a/pyatlan/utils.py b/pyatlan/utils.py index 5c27c8ef..c0dabb62 100644 --- a/pyatlan/utils.py +++ b/pyatlan/utils.py @@ -86,6 +86,14 @@ def attributes_to_params( return query_params +def select_optional_set_fields(params: Dict[str, Any]) -> Dict: + """ + Filter the provided parameters to include + only those fields that are not set to `None`. + """ + return {key: value for key, value in params.items() if value is not None} + + def non_null(obj: Optional[object], def_value: object): return obj if obj is not None else def_value diff --git a/tests/integration/test_index_search.py b/tests/integration/test_index_search.py index b3df7224..8f8715c4 100644 --- a/tests/integration/test_index_search.py +++ b/tests/integration/test_index_search.py @@ -2,15 +2,17 @@ # Copyright 2022 Atlan Pte. Ltd. from dataclasses import dataclass, field from datetime import datetime -from time import time +from time import sleep, time from typing import Generator, Set from unittest.mock import patch import pytest +from pyatlan.cache.source_tag_cache import SourceTagName from pyatlan.client.asset import LOGGER, IndexSearchResults from pyatlan.client.atlan import AtlanClient from pyatlan.model.assets import Asset, AtlasGlossaryTerm, Column, Table +from pyatlan.model.core import AtlanTag, AtlanTagName from pyatlan.model.enums import AtlanConnectorType, CertificateStatus, SortOrder from pyatlan.model.fields.atlan_fields import SearchableField from pyatlan.model.fluent_search import CompoundQuery, FluentSearch @@ -25,11 +27,13 @@ Term, Wildcard, ) +from pyatlan.model.structs import SourceTagAttachment, SourceTagAttachmentValue QUALIFIED_NAME = "qualifiedName" ASSET_GUID = Asset.GUID.keyword_field_name NOW_AS_TIMESTAMP = int(time() * 1000) NOW_AS_YYYY_MM_DD = datetime.today().strftime("%Y-%m-%d") +EXISTING_TAG = "Issue" EXISTING_SOURCE_SYNCED_TAG = "Confidential" VALUES_FOR_TERM_QUERIES = { @@ -134,13 +138,31 @@ def test_search(client: AtlanClient, asset_tracker, cls): asset_tracker.missing_types.add(name) +def _assert_source_tag(tables, source_tag, source_tag_value): + assert tables and len(tables) > 0 + for table in tables: + tags = table.atlan_tags + assert tags and len(tags) > 0 + synced_tags = [tag for tag in tags if str(tag.type_name) == source_tag] + assert synced_tags and len(synced_tags) > 0 + for st in synced_tags: + attachments = st.source_tag_attachements + assert attachments and len(attachments) > 0 + for sta in attachments: + values = sta.source_tag_value + assert values and len(values) > 0 + for value in values: + attached_value = value.tag_attachment_value + assert attached_value and attached_value == source_tag_value + + def test_search_source_synced_assets(client: AtlanClient): tables = [ table for table in ( FluentSearch() .select() - .where(Asset.TYPE_NAME.eq("Table")) + .where(CompoundQuery.asset_type(Table)) .where( CompoundQuery.tagged_with_value( EXISTING_SOURCE_SYNCED_TAG, "Highly Restricted" @@ -148,25 +170,75 @@ def test_search_source_synced_assets(client: AtlanClient): ) .execute(client=client) ) + if isinstance(table, Table) ] - assert tables and len(tables) > 0 - for table in tables: - assert isinstance(table, Table) - tags = table.atlan_tags - assert tags and len(tags) > 0 - synced_tags = [ - tag for tag in tags if str(tag.type_name) == EXISTING_SOURCE_SYNCED_TAG - ] - assert synced_tags and len(synced_tags) > 0 - for st in synced_tags: - attachments = st.source_tag_attachements - assert attachments and len(attachments) > 0 - for sta in attachments: - values = sta.source_tag_value - assert values and len(values) > 0 - for value in values: - attached_value = value.tag_attachment_value - assert attached_value and attached_value == "Highly Restricted" + _assert_source_tag(tables, EXISTING_SOURCE_SYNCED_TAG, "Highly Restricted") + + +def test_source_tag_assign_with_value(client: AtlanClient, table: Table): + # Make sure no tags are assigned initially + assert table.guid + table = client.asset.get_by_guid(guid=table.guid, asset_type=Table) + assert not table.atlan_tags + assert table.name and table.qualified_name + + source_tag_name = SourceTagName( + "snowflake/development@@ANALYTICS/WIDE_WORLD_IMPORTERS/CONFIDENTIAL" + ) + to_update = table.updater(table.qualified_name, table.name) + to_update.atlan_tags = [ + AtlanTag.of(atlan_tag_name=AtlanTagName(EXISTING_TAG)), + AtlanTag.of( + atlan_tag_name=AtlanTagName(EXISTING_SOURCE_SYNCED_TAG), + source_tag_attachment=SourceTagAttachment.by_name( + name=source_tag_name, + source_tag_values=[ + SourceTagAttachmentValue(tag_attachment_value="Not Restricted") + ], + ), + ), + ] + response = client.asset.save(to_update, replace_atlan_tags=True) + + assert (tables := response.assets_updated(asset_type=Table)) and len(tables) == 1 + assert ( + tables + and len(tables) == 1 + and tables[0].atlan_tags + and len(tables[0].atlan_tags) == 2 + ) + for tag in tables[0].atlan_tags: + assert str(tag.type_name) in (EXISTING_TAG, EXISTING_SOURCE_SYNCED_TAG) + + # Make sure source tag is now attached + # to the table with the provided value + sleep(5) + tables = [ + table + for table in ( + FluentSearch() + .select() + .where(CompoundQuery.asset_type(Table)) + .where(Table.QUALIFIED_NAME.eq(table.qualified_name)) + .where( + CompoundQuery.tagged_with_value( + EXISTING_SOURCE_SYNCED_TAG, "Not Restricted" + ) + ) + .execute(client=client) + ) + if isinstance(table, Table) + ] + + assert ( + tables + and len(tables) == 1 + and tables[0].atlan_tags + and len(tables[0].atlan_tags) == 2 + ) + for tag in tables[0].atlan_tags: + assert str(tag.type_name) in (EXISTING_TAG, EXISTING_SOURCE_SYNCED_TAG) + _assert_source_tag(tables, EXISTING_SOURCE_SYNCED_TAG, "Not Restricted") def test_search_next_page(client: AtlanClient):