From bc4c19b36dfeb26e27569d439ce594aa948f40c9 Mon Sep 17 00:00:00 2001 From: Michael Kofi Armah Date: Mon, 4 Nov 2024 19:46:55 +0000 Subject: [PATCH] [Integration] [AWS] | Added support to choose specific regions to query resources from (#1099) --- integrations/aws/.port/spec.yaml | 5 + integrations/aws/CHANGELOG.md | 15 +++ integrations/aws/main.py | 106 +++++++++++++----- integrations/aws/pyproject.toml | 2 +- integrations/aws/tests/utils/test_misc.py | 87 +++++++++++++- .../aws/tests/utils/test_overrides.py | 59 ++++++++++ integrations/aws/utils/misc.py | 52 ++++++--- integrations/aws/utils/overrides.py | 44 +++++++- integrations/aws/utils/resources.py | 43 +++++-- 9 files changed, 360 insertions(+), 53 deletions(-) create mode 100644 integrations/aws/tests/utils/test_overrides.py diff --git a/integrations/aws/.port/spec.yaml b/integrations/aws/.port/spec.yaml index 33b2b30eb8..74c2238e7b 100644 --- a/integrations/aws/.port/spec.yaml +++ b/integrations/aws/.port/spec.yaml @@ -37,6 +37,11 @@ configurations: type: string sensitive: true description: AWS API Key for custom events, used to validate the event source for real-time event updates. + - name: maximumConcurrentAccounts + type: integer + require: false + description: The number of concurrent accounts to scan. By default, it is set to 50. + default: 50 deploymentMethodRequirements: - type: default configurations: ['awsAccessKeyId', 'awsSecretAccessKey'] diff --git a/integrations/aws/CHANGELOG.md b/integrations/aws/CHANGELOG.md index acb7545b7c..5dd73418ff 100644 --- a/integrations/aws/CHANGELOG.md +++ b/integrations/aws/CHANGELOG.md @@ -7,12 +7,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.2.53 (2024-10-31) + + +### Improvements + +- Added the option to query resources from specific regions, configurable via the regionPolicy in the selector field of the mapping. +- Introduced `maximumConcurrentAccount` parameter to control the maximum number of accounts synced concurrently. + +### Bug Fixes + +- Skip missing resources in a region without interrupting sync across other regions. + + ## 0.2.52 (2024-10-30) + ### Bug Fixes - Updated `joined_timestamp` mapping in AWS Organizations to comply with RFC3339 timestamp format by replacing the space delimiter with 'T' in the `JoinedTimestamp` field. + ## 0.2.51 (2024-10-23) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index 065dd902af..fc7cac400e 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -28,13 +28,15 @@ from port_ocean.context.ocean import ocean from loguru import logger from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE +from port_ocean.context.event import event +from utils.overrides import AWSPortAppConfig, AWSResourceConfig from utils.misc import ( get_matching_kinds_and_blueprints_from_config, CustomProperties, ResourceKindsWithSpecialHandling, is_access_denied_exception, is_server_error, - semaphore, + get_semaphore, ) from port_ocean.utils.async_iterators import ( stream_async_iterators_tasks, @@ -42,16 +44,21 @@ ) import functools +semaphore = get_semaphore() + async def _handle_global_resource_resync( kind: str, credentials: AwsCredentials, + aws_resource_config: AWSResourceConfig, ) -> ASYNC_GENERATOR_RESYNC_TYPE: denied_access_to_default_region = False default_region = get_default_region_from_credentials(credentials) default_session = await credentials.create_session(default_region) try: - async for batch in resync_cloudcontrol(kind, default_session): + async for batch in resync_cloudcontrol( + kind, default_session, aws_resource_config + ): yield batch except Exception as e: if is_access_denied_exception(e): @@ -63,7 +70,9 @@ async def _handle_global_resource_resync( logger.info(f"Trying to resync {kind} in all regions until success") async for session in credentials.create_session_for_each_region(): try: - async for batch in resync_cloudcontrol(kind, session): + async for batch in resync_cloudcontrol( + kind, session, aws_resource_config + ): yield batch break except Exception as e: @@ -77,13 +86,19 @@ async def resync_resources_for_account( """Function to handle fetching resources for a single account.""" errors, regions = [], [] + aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config) + if is_global_resource(kind): - async for batch in _handle_global_resource_resync(kind, credentials): + async for batch in _handle_global_resource_resync( + kind, credentials, aws_resource_config + ): yield batch else: async for session in credentials.create_session_for_each_region(): try: - async for batch in resync_cloudcontrol(kind, session): + async for batch in resync_cloudcontrol( + kind, session, aws_resource_config + ): yield batch except Exception as exc: regions.append(session.region_name) @@ -123,6 +138,7 @@ async def resync_account(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(kind=ResourceKindsWithSpecialHandling.ELASTICACHE_CLUSTER) async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() + aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config) tasks = [ semaphore_async_iterator( @@ -135,6 +151,7 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "describe_cache_clusters", "CacheClusters", "Marker", + aws_resource_config, ), ) async for session in get_sessions() @@ -149,6 +166,7 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() + aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config) tasks = [ semaphore_async_iterator( semaphore, @@ -160,6 +178,7 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "describe_load_balancers", "LoadBalancers", "Marker", + aws_resource_config, ), ) async for session in get_sessions() @@ -175,6 +194,7 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() + aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config) tasks = [ semaphore_async_iterator( semaphore, @@ -186,6 +206,7 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "list_certificates", "CertificateSummaryList", "NextToken", + aws_resource_config, ), ) async for session in get_sessions() @@ -200,6 +221,8 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(kind=ResourceKindsWithSpecialHandling.AMI_IMAGE) async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() + + aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config) tasks = [ semaphore_async_iterator( semaphore, @@ -211,6 +234,7 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "describe_images", "Images", "NextToken", + aws_resource_config, {"Owners": ["self"]}, ), ) @@ -225,6 +249,8 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(kind=ResourceKindsWithSpecialHandling.CLOUDFORMATION_STACK) async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() + + aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config) tasks = [ semaphore_async_iterator( semaphore, @@ -236,6 +262,7 @@ async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "describe_stacks", "Stacks", "NextToken", + aws_resource_config, ), ) async for session in get_sessions() @@ -293,10 +320,35 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons with logger.contextualize( account_id=account_id, resource_type=resource_type, identifier=identifier ): - matching_resource_configs = get_matching_kinds_and_blueprints_from_config( - resource_type + aws_port_app_config = typing.cast(AWSPortAppConfig, event.port_app_config) + if not isinstance(aws_port_app_config, AWSPortAppConfig): + logger.info("No resources configured in the port app config") + return fastapi.Response(status_code=status.HTTP_200_OK) + + allowed_configs, disallowed_configs = ( + get_matching_kinds_and_blueprints_from_config( + resource_type, region, aws_port_app_config.resources + ) ) + if disallowed_configs: + logger.info( + f"Unregistering resource {identifier} of type {resource_type} in region {region} and account {account_id} for blueprint {disallowed_configs.values()} because it is not allowed" + ) + await ocean.unregister( + [ + Entity(blueprint=blueprint, identifier=identifier) + for blueprints in disallowed_configs.values() + for blueprint in blueprints + ] + ) + + if not allowed_configs: + logger.info( + f"{resource_type} not found or disabled for region {region} in account {account_id}" + ) + return fastapi.Response(status_code=status.HTTP_200_OK) + logger.debug( "Querying full resource on AWS before registering change in port" ) @@ -310,33 +362,29 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons logger.error( f"Cannot sync {resource_type} in region {region} in account {account_id} due to missing access permissions {e}" ) - return fastapi.Response( - status_code=status.HTTP_200_OK, - ) + return fastapi.Response(status_code=status.HTTP_200_OK) if is_server_error(e): logger.error( f"Cannot sync {resource_type} in region {region} in account {account_id} due to server error {e}" ) - return fastapi.Response( - status_code=status.HTTP_200_OK, - ) + return fastapi.Response(status_code=status.HTTP_200_OK) + + logger.error( + f"Failed to retrieve '{resource_type}' resource with ID '{identifier}' in region '{region}' for account '{account_id}'. " + f"Verify that the resource exists and that the necessary permissions are granted." + ) + resource = None - for kind in matching_resource_configs: - blueprints = matching_resource_configs[kind] + for kind, blueprints in allowed_configs.items(): if not resource: # Resource probably deleted - for blueprint in blueprints: - logger.info( - "Resource not found in AWS, un-registering from port" - ) - await ocean.unregister( - [ - Entity( - blueprint=blueprint, - identifier=identifier, - ) - ] - ) + logger.info("Resource not found in AWS, un-registering from port") + await ocean.unregister( + [ + Entity(blueprint=blueprint, identifier=identifier) + for blueprint in blueprints + ] + ) else: # Resource found in AWS, update port logger.info("Resource found in AWS, registering change in port") resource.update( @@ -347,14 +395,14 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons } ) await ocean.register_raw( - kind, - [fix_unserializable_date_properties(resource)], + kind, [fix_unserializable_date_properties(resource)] ) logger.info("Webhook processed successfully") return fastapi.Response( status_code=status.HTTP_200_OK, content=json.dumps({"ok": True}) ) + except Exception as e: logger.exception("Failed to process event from aws") return fastapi.Response( diff --git a/integrations/aws/pyproject.toml b/integrations/aws/pyproject.toml index ce8f9aef1c..372d60064a 100644 --- a/integrations/aws/pyproject.toml +++ b/integrations/aws/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aws" -version = "0.2.52" +version = "0.2.53" description = "This integration will map all your resources in all the available accounts to your Port entities" authors = ["Shalev Avhar ", "Erik Zaadi "] diff --git a/integrations/aws/tests/utils/test_misc.py b/integrations/aws/tests/utils/test_misc.py index 45104a90ad..0c07712f31 100644 --- a/integrations/aws/tests/utils/test_misc.py +++ b/integrations/aws/tests/utils/test_misc.py @@ -1,5 +1,16 @@ -from utils.misc import is_access_denied_exception +from utils.misc import ( + is_access_denied_exception, + is_resource_not_found_exception, + get_matching_kinds_and_blueprints_from_config, +) from typing import Optional, Dict, Any +import unittest +from utils.overrides import AWSResourceConfig, AWSDescribeResourcesSelector +from port_ocean.core.handlers.port_app_config.models import ( + PortResourceConfig, + EntityMapping, + MappingsConfig, +) class MockException(Exception): @@ -25,3 +36,77 @@ def test_access_denied_exception_with_other_error() -> None: def test_access_denied_exception_no_response_attribute() -> None: e = Exception("Test exception") assert not is_access_denied_exception(e) + + +def test_resource_not_found_exception_with_response() -> None: + e = MockException(response={"Error": {"Code": "ResourceNotFoundException"}}) + assert is_resource_not_found_exception(e) + + +def test_resource_not_found_exception_without_response() -> None: + e = MockException(response=None) + assert not is_resource_not_found_exception(e) + + +def test_resource_not_found_exception_with_other_error() -> None: + e = MockException(response={"Error": {"Code": "SomeOtherError"}}) + assert not is_resource_not_found_exception(e) + + +def test_resource_not_found_exception_no_response_attribute() -> None: + e = Exception("Test exception") + assert not is_resource_not_found_exception(e) + + +class TestGetMatchingKindsAndBlueprintsFromConfig(unittest.TestCase): + + def test_get_matching_kinds_and_blueprints(self) -> None: + # Set up actual object instances + selector = AWSDescribeResourcesSelector(query="true") + entity = EntityMapping( + identifier="lambda_function", + blueprint="LambdaBlueprint", + ) + mapping = MappingsConfig(mappings=entity) + port_resource_config = PortResourceConfig(entity=mapping) + + resource_config = AWSResourceConfig( + kind="AWS::Lambda::Function", selector=selector, port=port_resource_config + ) + + kind = "AWS::Lambda::Function" + region = "us-west-1" + resource_config_list = [resource_config] + + allowed_kinds, disallowed_kinds = get_matching_kinds_and_blueprints_from_config( + kind, region, resource_config_list + ) + + self.assertEqual(allowed_kinds, {kind: ["LambdaBlueprint"]}) + self.assertEqual(disallowed_kinds, {}) + + def test_no_matching_kind(self) -> None: + selector = AWSDescribeResourcesSelector(query="true") + entity = EntityMapping( + identifier="AnotherIdentifier", + blueprint="DifferentBlueprint", + ) + mapping = MappingsConfig(mappings=entity) + port_resource_config = PortResourceConfig(entity=mapping) + + resource_config = AWSResourceConfig( + kind="AWS::SomeOther::Resource", + selector=selector, + port=port_resource_config, + ) + + kind = "AWS::Lambda::Function" + region = "us-west-1" + resource_config_list = [resource_config] + + allowed_kinds, disallowed_kinds = get_matching_kinds_and_blueprints_from_config( + kind, region, resource_config_list + ) + + self.assertEqual(allowed_kinds, {}) + self.assertEqual(disallowed_kinds, {}) diff --git a/integrations/aws/tests/utils/test_overrides.py b/integrations/aws/tests/utils/test_overrides.py new file mode 100644 index 0000000000..ce3f7e0aac --- /dev/null +++ b/integrations/aws/tests/utils/test_overrides.py @@ -0,0 +1,59 @@ +import unittest +from utils.overrides import AWSDescribeResourcesSelector, RegionPolicy + + +class TestAWSDescribeResourcesSelector(unittest.TestCase): + + def test_is_region_allowed_no_policy(self) -> None: + selector = AWSDescribeResourcesSelector(query="test") + self.assertTrue(selector.is_region_allowed("us-east-1")) + + def test_is_region_allowed_deny_policy(self) -> None: + region_policy = RegionPolicy(deny=["us-east-1"]) + selector = AWSDescribeResourcesSelector( + query="test", regionPolicy=region_policy + ) + self.assertFalse(selector.is_region_allowed("us-east-1")) + self.assertTrue(selector.is_region_allowed("us-west-2")) + + def test_is_region_allowed_allow_policy(self) -> None: + region_policy = RegionPolicy(allow=["us-west-2"]) + selector = AWSDescribeResourcesSelector( + query="test", regionPolicy=region_policy + ) + self.assertTrue(selector.is_region_allowed("us-west-2")) + self.assertFalse(selector.is_region_allowed("us-east-1")) + + def test_is_region_allowed_both_policies(self) -> None: + region_policy = RegionPolicy(allow=["us-west-2"], deny=["us-east-1"]) + selector = AWSDescribeResourcesSelector( + query="test", regionPolicy=region_policy + ) + self.assertFalse(selector.is_region_allowed("us-east-1")) + self.assertTrue(selector.is_region_allowed("us-west-2")) + self.assertFalse(selector.is_region_allowed("eu-central-1")) + + def test_is_region_allowed_conflicting_policies(self) -> None: + region_policy = RegionPolicy(allow=["us-east-1"], deny=["us-east-1"]) + selector = AWSDescribeResourcesSelector( + query="test", regionPolicy=region_policy + ) + self.assertFalse(selector.is_region_allowed("us-east-1")) + + def test_is_region_allowed_deny_only(self) -> None: + region_policy = RegionPolicy(deny=["us-east-1", "us-west-2"]) + selector = AWSDescribeResourcesSelector( + query="test", regionPolicy=region_policy + ) + self.assertFalse(selector.is_region_allowed("us-east-1")) + self.assertFalse(selector.is_region_allowed("us-west-2")) + self.assertTrue(selector.is_region_allowed("eu-central-1")) + + def test_is_region_allowed_allow_only(self) -> None: + region_policy = RegionPolicy(allow=["us-east-1", "us-west-2"]) + selector = AWSDescribeResourcesSelector( + query="test", regionPolicy=region_policy + ) + self.assertTrue(selector.is_region_allowed("us-east-1")) + self.assertTrue(selector.is_region_allowed("us-west-2")) + self.assertFalse(selector.is_region_allowed("eu-central-1")) diff --git a/integrations/aws/utils/misc.py b/integrations/aws/utils/misc.py index ca416373f5..c101602930 100644 --- a/integrations/aws/utils/misc.py +++ b/integrations/aws/utils/misc.py @@ -1,11 +1,17 @@ import enum -from port_ocean.context.event import event +from port_ocean.context.ocean import ocean +from utils.overrides import AWSResourceConfig +from typing import List import asyncio -MAX_CONCURRENT_TASKS = 50 -semaphore = asyncio.BoundedSemaphore(MAX_CONCURRENT_TASKS) +def get_semaphore() -> asyncio.BoundedSemaphore: + max_concurrent_accounts: int = int( + ocean.integration_config["maximum_concurrent_accounts"] + ) + semaphore = asyncio.BoundedSemaphore(max_concurrent_accounts) + return semaphore class CustomProperties(enum.StrEnum): @@ -45,17 +51,37 @@ def is_server_error(e: Exception) -> bool: return False -def get_matching_kinds_and_blueprints_from_config( - kind: str, -) -> dict[str, list[str]]: - kinds: dict[str, list[str]] = {} - resources = event.port_app_config.resources +def is_resource_not_found_exception(e: Exception) -> bool: + resource_not_found_error_codes = [ + "ResourceNotFoundException", + "ResourceNotFound", + "ResourceNotFoundFault", + ] + + if hasattr(e, "response") and e.response is not None: + error_code = e.response.get("Error", {}).get("Code") + return error_code in resource_not_found_error_codes - for resource in resources: + return False + + +def get_matching_kinds_and_blueprints_from_config( + kind: str, region: str, resource_configs: List[AWSResourceConfig] +) -> tuple[dict[str, list[str]], dict[str, list[str]]]: + allowed_kinds: dict[str, list[str]] = {} + disallowed_kinds: dict[str, list[str]] = {} + for resource in resource_configs: blueprint = resource.port.entity.mappings.blueprint.strip('"') - if resource.kind in kinds: - kinds[resource.kind].append(blueprint) + resource_selector = resource.selector + if not resource_selector.is_region_allowed(region) and kind == resource.kind: + if kind in disallowed_kinds: + disallowed_kinds[kind].append(blueprint) + else: + disallowed_kinds[kind] = [blueprint] elif kind == resource.kind: - kinds[resource.kind] = [blueprint] + if kind in allowed_kinds: + allowed_kinds[kind].append(blueprint) + else: + allowed_kinds[kind] = [blueprint] - return kinds + return allowed_kinds, disallowed_kinds diff --git a/integrations/aws/utils/overrides.py b/integrations/aws/utils/overrides.py index c6d5448dc2..7562b32e33 100644 --- a/integrations/aws/utils/overrides.py +++ b/integrations/aws/utils/overrides.py @@ -3,11 +3,53 @@ PortAppConfig, Selector, ) -from pydantic import Field +from pydantic import Field, BaseModel +from typing import List + + +class RegionPolicy(BaseModel): + allow: List[str] = Field(default_factory=list) + deny: List[str] = Field(default_factory=list) class AWSDescribeResourcesSelector(Selector): use_get_resource_api: bool = Field(alias="useGetResourceAPI", default=False) + region_policy: RegionPolicy = Field( + alias="regionPolicy", default_factory=RegionPolicy + ) + + def is_region_allowed(self, region: str) -> bool: + """ + Determines if a given region is allowed based on the query regions policy. + This method checks the `region_policy` attribute to decide if the specified + region should be allowed or denied. The policy can contain "allow" and "deny" lists + which dictate the behavior. + + Scenarios: + - If `region_policy` is not set or empty, the method returns True, allowing all regions. + - If the region is listed in the "deny" list of `region_policy`, the method returns False. + - If the region is listed in the "allow" list of `region_policy`, the method returns True. + - If the region is not listed in either "allow" or "deny" lists, the method returns False. + - If the region is listed in both "allow" and "deny" lists, the method returns False. + - If the policy denies regions but does not explicitly allow any, and the specific region is not in the deny list, then the region is considered allowed. + - If the policy allows regions but does not explicitly deny any, and the specific region is not in the allow list, then the region is considered denied. + Args: + region (str): The region to be checked. + + Returns: + bool: True if the region is allowed, False otherwise. + """ + if not self.region_policy.allow and not self.region_policy.deny: + return True + if region in self.region_policy.deny: + return False + if region in self.region_policy.allow: + return True + if self.region_policy.deny and not self.region_policy.allow: + return True + if self.region_policy.allow and not self.region_policy.deny: + return False + return False class AWSResourceConfig(ResourceConfig): diff --git a/integrations/aws/utils/resources.py b/integrations/aws/utils/resources.py index d8f408f314..8a28810aa4 100644 --- a/integrations/aws/utils/resources.py +++ b/integrations/aws/utils/resources.py @@ -5,11 +5,11 @@ import aioboto3 from loguru import logger -from port_ocean.context.event import event from utils.misc import ( CustomProperties, ResourceKindsWithSpecialHandling, is_access_denied_exception, + is_resource_not_found_exception, ) from utils.aws import get_sessions @@ -17,6 +17,7 @@ from utils.aws import _session_manager from utils.overrides import AWSResourceConfig from botocore.config import Config as Boto3Config +from botocore.exceptions import ClientError def is_global_resource(kind: str) -> bool: @@ -109,6 +110,7 @@ async def resync_custom_kind( describe_method: str, list_param: str, marker_param: Literal["NextToken", "Marker"], + resource_config: AWSResourceConfig, describe_method_params: dict[str, Any] | None = None, ) -> ASYNC_GENERATOR_RESYNC_TYPE: """ @@ -125,6 +127,15 @@ async def resync_custom_kind( region = session.region_name account_id = await _session_manager.find_account_id_by_session(session) next_token = None + + resource_config_selector = resource_config.selector + + if not resource_config_selector.is_region_allowed(region): + logger.info( + f"Skipping resyncing {kind} in region {region} in account {account_id} because it's not allowed" + ) + return + if not describe_method_params: describe_method_params = {} while True: @@ -163,13 +174,18 @@ async def resync_custom_kind( async def resync_cloudcontrol( - kind: str, session: aioboto3.Session + kind: str, session: aioboto3.Session, resource_config: AWSResourceConfig ) -> ASYNC_GENERATOR_RESYNC_TYPE: - use_get_resource_api = typing.cast( - AWSResourceConfig, event.resource_config - ).selector.use_get_resource_api + resource_config_selector = resource_config.selector + use_get_resource_api = resource_config_selector.use_get_resource_api + region = session.region_name account_id = await _session_manager.find_account_id_by_session(session) + if not resource_config_selector.is_region_allowed(region): + logger.info( + f"Skipping resyncing {kind} in region {region} in account {account_id} because it's not allowed" + ) + return logger.info(f"Resyncing {kind} in account {account_id} in region {region}") next_token = None while True: @@ -197,7 +213,8 @@ async def resync_cloudcontrol( region=region, ) for instance in resources - ) + ), + return_exceptions=True, ) else: resources = [ @@ -209,6 +226,16 @@ async def resync_cloudcontrol( ] for instance in resources: + if isinstance(instance, Exception): + if is_resource_not_found_exception(instance): + error = typing.cast(ClientError, instance) + logger.info( + f"Skipping resyncing {kind} resource in region {region} in account {account_id}; {error.response['Error']['Message']}" + ) + continue + + raise instance + serialized = instance.copy() serialized.update( { @@ -233,5 +260,5 @@ async def resync_cloudcontrol( f"Skipping resyncing {kind} in region {region} in account {account_id} due to missing access permissions" ) else: - logger.warning(f"Error resyncing {kind} in region {region}, {e}") - raise e + logger.error(f"Error resyncing {kind} in region {region}, {e}") + raise e