Skip to content

Commit

Permalink
Added the IBM VPC tagging (#861)
Browse files Browse the repository at this point in the history
  • Loading branch information
athiruma authored Nov 4, 2024
1 parent 76d3c8f commit b27c35b
Show file tree
Hide file tree
Showing 28 changed files with 818 additions and 13 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,27 @@ This tool support the following policies:

* [tag_baremetal](cloud_governance/policy/ibm/tag_baremetal.py): Tag IBM baremetal machines
* [tag_vm](cloud_governance/policy/ibm/tag_vm.py): Tga IBM Virtual Machines machines
* [tag_resources](./cloud_governance/policy/ibm/tag_resources.py): Tag IBM resources
list of supported IBM Resources

- virtual_servers
- volumes
- floating_ips
- vpcs
- virtual_network_interfaces
- security_groups
- public_gateways
- vpc_endpoint_gateways
- load_balancers
- schematics_workspaces

Environment Variables required:

| KeyName | Value | Description |
|----------------------------|--------|----------------------------------------------------------------------------|
| IBM_CUSTOM_TAGS_LIST | string | pass string with separated with comma. i.e: "cost-center: test, env: test" |
| RESOURCE_TO_TAG (optional) | string | pass the resource name to tag. ex: virtual_servers |
| IBM_CLOUD_API_KEY | string | IBM Cloud API Key |

** You can write your own policy using [Cloud-Custodian](https://cloudcustodian.io/docs/quickstart/index.html)
and run it (see 'custom cloud custodian policy' in [Policy workflows](#policy-workloads)).
Expand Down
18 changes: 18 additions & 0 deletions cloud_governance/common/clouds/ibm/account/ibm_authenticator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import logging

from ibm_cloud_sdk_core.authenticators import IAMAuthenticator

from cloud_governance.main.environment_variables import environment_variables


class IBMAuthenticator:
"""
Refer: https://cloud.ibm.com/apidocs/vpc/latest
Refer: https://github.com/IBM/ibm-cloud-sdk-common?tab=readme-ov-file
"""

def __init__(self):
logging.disable(logging.DEBUG)
self.env_config = environment_variables
self.__api_key = self.env_config.IBM_CLOUD_API_KEY
self.iam_authenticator = IAMAuthenticator(self.__api_key)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from ibm_schematics.schematics_v1 import SchematicsV1

from cloud_governance.common.clouds.ibm.account.ibm_authenticator import IBMAuthenticator


class SchematicOperations(IBMAuthenticator):
"""
This class performs schematic operations.
"""
REGION_SCHEMATICS_URL = "https://%s.schematics.cloud.ibm.com/"

def __init__(self):
super().__init__()
self.__client = SchematicsV1(self.iam_authenticator)
self.__client.set_service_url('https://us.schematics.cloud.ibm.com')

def set_service_url(self, region: str):
"""
This method sets the service URL.
:param region:
:return:
"""
service_url = self.REGION_SCHEMATICS_URL % region
self.__client.set_service_url(service_url)

def get_workspaces(self):
"""
This method lists all available schematics workspaces
:return:
"""
response = self.__client.list_workspaces().get_result()
return response['workspaces']

def get_supported_locations(self):
"""
This method lists supported locations
:return:
"""
response = self.__client.list_locations().get_result()
return response['locations']

def get_all_workspaces(self):
"""
This method lists all available schematics workspaces
:return:
"""
locations = self.get_supported_locations()
resources_list = {}
for location in locations:
region = location['region']
geography_code = location['geography_code']
self.set_service_url(region)
if geography_code not in resources_list:
resources_list[geography_code] = self.get_workspaces()
return resources_list
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from ibm_platform_services.global_tagging_v1 import GlobalTaggingV1, Resource

from cloud_governance.common.clouds.ibm.account.ibm_authenticator import IBMAuthenticator
from cloud_governance.common.logger.init_logger import logger
from cloud_governance.common.logger.logger_time_stamp import logger_time_stamp


class GlobalTaggingOperations(IBMAuthenticator):
"""
This class performs tagging operations on cloud resources.
"""
BATCH_SIZE = 100

def __init__(self):
super().__init__()
self.__tag_service = GlobalTaggingV1(authenticator=self.iam_authenticator)

@logger_time_stamp
def update_tags(self, resources_crn: list, tags: list):
"""
This method updates the tags associated with an instance.
:param resources_crn:
:param tags:
:return:
"""
resources_list = [Resource(resource_crn) for resource_crn in resources_crn]
resources_batch_list = [resources_list[i:i + self.BATCH_SIZE]
for i in range(0, len(resources_list), self.BATCH_SIZE)]
success = 0
errors = []
for resource_batch in resources_batch_list:
responses = self.__tag_service.attach_tag(resources=resource_batch, tag_names=tags) \
.get_result()['results']
for resource in responses:
if resource['is_error']:
errors.append(resource.get('resource_id'))
logger.error(f'Unable to attach resource tags to: {resource["resource_id"]}')
else:
success += 1
return success == len(resources_crn), errors
Empty file.
222 changes: 222 additions & 0 deletions cloud_governance/common/clouds/ibm/vpc/vpc_infra_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
from functools import wraps

import ibm_vpc
from typing import Callable

from cloud_governance.common.clouds.ibm.account.ibm_authenticator import IBMAuthenticator


def region_wrapper(func):
@wraps(func)
def wrapper(*args, **kwargs):
vpc_obj = VpcInfraOperations()
regions = vpc_obj.get_regions()
resources_list = {}
for region in regions:
region_name = region.get('name')
if region['status'] == 'available':
vpc_obj.set_service_url(region_name)
resources_list[region_name] = func(*args, **kwargs)
return resources_list

return wrapper


class VpcInfraOperations(IBMAuthenticator):
"""
This class contains methods to perform operations on VPC Infra Operations.
"""

REGION_SERVICE_URL = "https://%s.iaas.cloud.ibm.com/v1"

def __init__(self):
super().__init__()
self.__client = ibm_vpc.vpc_v1.VpcV1(authenticator=self.iam_authenticator)

def get_regions(self):
"""
This method lists all available regions.
:return:
"""
regions = self.__client.list_regions().get_result()['regions']
return regions

def set_service_url(self, region_name: str):
"""
This method sets the service URL.
:param region_name:
:return:
"""
service_url = self.REGION_SERVICE_URL % region_name
self.__client.set_service_url(service_url)

def iter_next_resources(self, exec_func: Callable, resource_name: str, region_name: str = None):
"""
This method .
:param region_name:
:param exec_func:
:param resource_name:
:return:
"""
if region_name:
self.set_service_url(region_name)
response = exec_func().get_result()
resources = response[resource_name]
while response.get('next'):
href = response['next']['href']
start = href.split('&')[-1].split('=')[-1]
response = exec_func(start=start).get_result()
resources.extend(response[resource_name])
return resources

def get_instances(self, region_name: str = None):
"""
This method lists available instances in one region, default 'us-south'
:param region_name:
:return:
"""
return self.iter_next_resources(exec_func=self.__client.list_instances,
resource_name='instances', region_name=region_name)

def get_volumes(self, region_name: str = None):
"""
This method lists available volumes.
:return:
"""
return self.iter_next_resources(exec_func=self.__client.list_volumes,
resource_name='volumes', region_name=region_name)

def get_floating_ips(self, region_name: str = None):
"""
This method lists available floating ips.
:param region_name:
:return:
"""
return self.iter_next_resources(exec_func=self.__client.list_floating_ips,
resource_name='floating_ips', region_name=region_name)

def get_vpcs(self, region_name: str = None):
"""
This method lists available vpcs.
:param region_name:
:return:
"""
return self.iter_next_resources(exec_func=self.__client.list_vpcs,
resource_name='vpcs', region_name=region_name)

def get_virtual_network_interfaces(self, region_name: str = None):
"""
This method lists available virtual network interfaces.
:param region_name:
:return:
"""
return self.iter_next_resources(exec_func=self.__client.list_virtual_network_interfaces,
resource_name='virtual_network_interfaces', region_name=region_name)

def get_security_groups(self, region_name: str = None):
"""
This method lists available security_groups
:param region_name:
:return:
"""
return self.iter_next_resources(exec_func=self.__client.list_security_groups,
resource_name='security_groups', region_name=region_name)

def get_public_gateways(self, region_name: str = None):
"""
This method lists available public_gateways
:param region_name:
:return:
"""
return self.iter_next_resources(exec_func=self.__client.list_public_gateways,
resource_name='public_gateways', region_name=region_name)

def get_vpc_endpoint_gateways(self, region_name: str = None):
"""
This method lists available vpc endpoint gateways
:param region_name:
:return:
"""
return self.iter_next_resources(exec_func=self.__client.list_endpoint_gateways,
resource_name='endpoint_gateways', region_name=region_name)

def get_load_balancers(self, region_name: str = None):
"""
This method lists available load balancers
:param region_name:
:return:
"""
return self.iter_next_resources(exec_func=self.__client.list_load_balancers,
resource_name='load_balancers', region_name=region_name)

@region_wrapper
def get_all_instances(self):
"""
This method lists all available instances.
:return:
"""
return self.get_instances()

@region_wrapper
def get_all_volumes(self):
"""
This method lists all available volumes.
:return:
"""
return self.get_volumes()

@region_wrapper
def get_all_vpcs(self):
"""
This method lists all available vpc's.
:return:
"""
return self.get_vpcs()

@region_wrapper
def get_all_floating_ips(self):
"""
This method lists all floating ips.
:return:
"""
return self.get_floating_ips()

@region_wrapper
def get_all_virtual_network_interfaces(self):
"""
This method lists all available virtual network interfaces.
:return:
"""
return self.get_virtual_network_interfaces()

@region_wrapper
def get_all_security_groups(self):
"""
This method lists all available security_groups
:return:
"""
return self.get_security_groups()

@region_wrapper
def get_all_public_gateways(self):
"""
This method lists all available public_gateways
:return:
"""
return self.get_public_gateways()

@region_wrapper
def get_all_vpc_endpoint_gateways(self):
"""
This method lists all available vpc endpoint gateways
:return:
"""
return self.get_vpc_endpoint_gateways()

@region_wrapper
def get_all_load_balancers(self):
"""
This method lists all available load balancers.
:return:
"""
return self.get_load_balancers()
7 changes: 5 additions & 2 deletions cloud_governance/common/logger/logger_time_stamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def logger_time_stamp(method):
@param method:
@return: method wrapper
"""

@wraps(method) # solve method help doc
def method_wrapper(*args, **kwargs):
"""
Expand All @@ -31,13 +32,15 @@ def method_wrapper(*args, **kwargs):
date_time_end = datetime.datetime.now().strftime(datetime_format)
total_time = time_end - time_start
total_time_str = f'Total time: {round(total_time, 2)} sec'
logger.warn(f'Method name: {method.__name__} , End time: {date_time_end} , {total_time_str}')
logger.info(f'Method name: {method.__name__} , End time: {date_time_end} , {total_time_str}')
except Exception as err:
time_end = time.time()
total_time = time_end - time_start
date_time_end = datetime.datetime.now().strftime(datetime_format)
logger.error(f'Method name: {method.__name__} , End time with errors: {date_time_end} , Total time: {round(total_time, 2)} sec')
logger.error(
f'Method name: {method.__name__} , End time with errors: {date_time_end} , Total time: {round(total_time, 2)} sec')
raise err # Exception(method.__name__, err)

return result

return method_wrapper
Loading

0 comments on commit b27c35b

Please sign in to comment.