From 76a053b85e5067e60356ceffc1cf0889d2753adb Mon Sep 17 00:00:00 2001 From: Tim Purschke Date: Wed, 19 Jun 2024 14:22:33 +0200 Subject: [PATCH] import improvements --- .../modelling/getOwnersFromMultipleSources.py | 29 +- .../modelling/getOwnersFromTufinRlm.py | 272 ------------------ 2 files changed, 21 insertions(+), 280 deletions(-) delete mode 100644 scripts/customizing/modelling/getOwnersFromTufinRlm.py diff --git a/scripts/customizing/modelling/getOwnersFromMultipleSources.py b/scripts/customizing/modelling/getOwnersFromMultipleSources.py index 46e8a4de4..071c35912 100644 --- a/scripts/customizing/modelling/getOwnersFromMultipleSources.py +++ b/scripts/customizing/modelling/getOwnersFromMultipleSources.py @@ -1,5 +1,21 @@ #!/usr/bin/python3 -# dependencies: this script needs the package python3-git +# reads the main app data from a git repo +# and renriches the data with csv files containing users and server ip addresses + +# dependencies: +# a) package python3-git must be installed +# b) requires the following config items in /usr/local/orch/etc/secrets/customizingConfig.json +# Tufin RLM +# username +# password +# apiBaseUri # Tufin API, e.g. "https://tufin.domain.com/" +# git +# gitRepoUrl +# gitusername +# gitpassword +# csvFiles # array of file basenames containing the app data +# ldapPath # full ldap user path (used for building DN from user basename) + from asyncio.log import logger import traceback @@ -19,17 +35,13 @@ import csv + baseDir = "/usr/local/fworch/" baseDirEtc = baseDir + "etc/" repoTargetDir = baseDirEtc + "cmdb-repo" defaultConfigFileName = baseDirEtc + "secrets/customizingConfig.json" defaultRlmImportFileName = baseDir + "scripts/customizing/modelling/getOwnersFromTufinRlm.json" importSourceString = "tufinRlm" -custPrefix = '/xxx' -csvFiles = [ - repoTargetDir + custPrefix + "COM_3.csv", - repoTargetDir + custPrefix + "APP_4.csv" -] # TUFIN settings: api_url_path_rlm_login = 'apps/public/rlm/oauth/token' @@ -70,7 +82,7 @@ def readConfig(configFilename): customConfig = json.loads(customConfigFH.read()) return (customConfig['username'], customConfig['password'], customConfig['apiBaseUri'], customConfig['ldapPath'], - customConfig['gitRepoUrl'], customConfig['gitusername'], customConfig['gitpassword']) + customConfig['gitRepoUrl'], customConfig['gitusername'], customConfig['gitpassword'], customConfig['csvFiles']) except: logger.error("could not read config file " + configFilename + ", Exception: " + str(traceback.format_exc())) sys.exit(1) @@ -250,7 +262,7 @@ def rlmGetOwners(token, api_url): logger = getLogger(debug_level_in=2) # read config - rlmUsername, rlmPassword, rlmApiUrl, ldapPath, gitRepoUrl, gitUsername, gitPassword = readConfig(args.config) + rlmUsername, rlmPassword, rlmApiUrl, ldapPath, gitRepoUrl, gitUsername, gitPassword, csvFiles = readConfig(args.config) ###################################################### # 1. get all owners @@ -266,6 +278,7 @@ def rlmGetOwners(token, api_url): dfAllApps = [] for csvFile in csvFiles: + csvFile = repoTargetDir + '/' + csvFile # add directory to csv files with open(csvFile, newline='') as csvFile: reader = csv.reader(csvFile) dfAllApps += list(reader)[1:]# Skip headers in first line diff --git a/scripts/customizing/modelling/getOwnersFromTufinRlm.py b/scripts/customizing/modelling/getOwnersFromTufinRlm.py deleted file mode 100644 index b605895d2..000000000 --- a/scripts/customizing/modelling/getOwnersFromTufinRlm.py +++ /dev/null @@ -1,272 +0,0 @@ -#!/usr/bin/python3 - -# library for Tufin STRACK API calls -from asyncio.log import logger -import traceback -from textwrap import indent -import requests.packages -import requests -import json -import sys -import argparse -import logging -from sys import stdout -import ipaddress -import os -import socket -from pathlib import Path - - -api_url_path_rlm_login = 'apps/public/rlm/oauth/token' -api_url_path_rlm_apps = 'apps/public/rlm/api/owners' -defaultConfigFileName = "/usr/local/fworch/etc/secrets/customizingConfig.json" - -class ApiLoginFailed(Exception): - """Raised when login to API failed""" - - def __init__(self, message="Login to API failed"): - self.message = message - super().__init__(self.message) - -class ApiFailure(Exception): - """Raised for any other Api call exceptions""" - - def __init__(self, message="There was an unclassified error while executing an API call"): - self.message = message - super().__init__(self.message) - -class ApiTimeout(Exception): - """Raised for 502 http error with proxy due to timeout""" - - def __init__(self, message="reverse proxy timeout error during API call - try increasing the reverse proxy timeout"): - self.message = message - super().__init__(self.message) - -class ApiServiceUnavailable(Exception): - """Raised for 503 http error Service unavailable""" - - def __init__(self, message="API unavailable"): - self.message = message - super().__init__(self.message) - - -def readConfig(configFilename): - try: - - with open(configFilename, "r") as customConfigFH: - customConfig = json.loads(customConfigFH.read()) - return (customConfig['username'], customConfig['password'], customConfig['ldapPath'], customConfig['apiBaseUri']) - - except: - logger.error("could not read config file " + configFilename + ", Exception: " + str(traceback.format_exc())) - sys.exit(1) - - -def buildDN(userId, ldapPath): - if '{USERID}' in ldapPath: - return ldapPath.replace('{USERID}', userId) - else: - logger.error("could not find {USERID} parameter in ldapPath " + ldapPath) - sys.exit(1) - - -def getNetworkBorders(ip): - if '/' in ip: - network = ipaddress.IPv4Network(ip, strict=False) - return str(network.network_address), str(network.broadcast_address), 'network' - else: - return str(ip), str(ip), 'host' - - -def reverse_dns_lookup(ip_address): - """ - Perform a reverse DNS lookup to find the domain name associated with an IP address. - - Args: - ip_address (str): The IP address to perform the reverse DNS lookup on. - - Returns: - str: The domain name associated with the IP address or an error message if the lookup fails. - """ - try: - # Perform the reverse DNS lookup using the gethostbyaddr method of the socket module. - # This method returns a tuple containing the primary domain name, an alias list, and an IP address list. - hostname, _, _ = socket.gethostbyaddr(ip_address) - - # Return the primary domain name. - return hostname - except socket.herror as e: - # Handle the exception if the host could not be found (herror). - # Return an error message with the exception details. - return f"ERROR: Reverse DNS lookup failed: {e}" - except socket.gaierror as e: - # Handle the exception if the address-related error occurs (gaierror). - # Return an error message with the exception details. - return f"ERROR: Address-related error during reverse DNS lookup: {e}" - except Exception as e: - # Handle any other exceptions that may occur. - # Return a generic error message with the exception details. - return f"ERROR: during reverse DNS lookup: {e}" - - -def extractSocketInfo(asset, services): - # ignoring services for the moment - sockets =[] - - if 'assets' in asset and 'values' in asset['assets']: - for ip in asset['assets']['values']: - ip1, ip2, nwtype = getNetworkBorders(ip) - if nwtype=='host': - hname = reverse_dns_lookup(ip1) - if hname=='' or hname.startswith('ERROR:'): - hname = "NONAME" - sockets.append({ "ip": ip1, "ip_end": ip2, "type": nwtype, "name": hname }) - else: - sockets.append({ "ip": ip1, "ip_end": ip2, "type": nwtype }) - - if 'objects' in asset: - for obj in asset['objects']: - if 'values' in obj: - for cidr in obj['values']: - ip1, ip2, nwtype = getNetworkBorders(cidr) - sockets.append({ "name": obj['name'], "ip": ip1, "ip_end": ip2, "type": nwtype }) - return sockets - - -def getLogger(debug_level_in=0): - debug_level=int(debug_level_in) - if debug_level>=1: - llevel = logging.DEBUG - else: - llevel = logging.INFO - - logger = logging.getLogger() # use root logger - # logHandler = logging.StreamHandler(stream=stdout) - logformat = "%(asctime)s [%(levelname)-5.5s] [%(filename)-10.10s:%(funcName)-10.10s:%(lineno)4d] %(message)s" - # logHandler.setLevel(llevel) - # handlers = [logHandler] - # logging.basicConfig(format=logformat, datefmt="%Y-%m-%dT%H:%M:%S%z", handlers=handlers, level=llevel) - logging.basicConfig(format=logformat, datefmt="%Y-%m-%dT%H:%M:%S%z", level=llevel) - logger.setLevel(llevel) - - #set log level for noisy requests/connectionpool module to WARNING: - connection_log = logging.getLogger("urllib3.connectionpool") - connection_log.setLevel(logging.WARNING) - connection_log.propagate = True - - if debug_level>8: - logger.debug ("debug_level=" + str(debug_level) ) - return logger - - -def rlmLogin(user, password, api_url): - payload = { "username": user, "password": password, "client_id": "securechange", "client_secret": "123", "grant_type": "password" } - - with requests.Session() as session: - session.verify = False - try: - response = session.post(api_url, payload) - except requests.exceptions.RequestException: - raise ApiFailure ("api: error during login to url: " + str(api_url) + " with user " + user) from None - - if response.text is not None and response.status_code==200: - return json.loads(response.text)['access_token'] - else: - raise ApiLoginFailed("RLM api: ERROR: did not receive an OAUTH token during login" + \ - ", api_url: " + str(api_url) + \ - ", status code: " + str(response)) - - -def rlmGetOwners(token, api_url): - - headers = {} - rlmVersion = 2.5 - - if rlmVersion < 2.6: - headers = {'Authorization': 'Bearer ' + token, 'Content-Type': 'application/json'} - else: - api_url += "?access_token=" + token - - with requests.Session() as session: - session.verify = False - try: - response = session.get(api_url, headers=headers) - - except requests.exceptions.RequestException: - raise ApiServiceUnavailable ("api: error while getting owners from url: " + str(api_url) + " with token " + token) from None - - if response.text is not None and response.status_code==200: - # logger.info(str(response.text)) - return json.loads(response.text) - else: - raise ApiFailure("api: ERROR: could not get owners" + \ - ", api_url: " + str(api_url) + \ - ", status code: " + str(response)) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description='Read configuration from FW management via API calls') - parser.add_argument('-c', '--config', default=defaultConfigFileName, - help='Filename of custom config file for modelling imports') - parser.add_argument('-s', "--suppress_certificate_warnings", action='store_true', default = True, - help = "suppress certificate warnings") - parser.add_argument('-l', '--limit', metavar='api_limit', default='150', - help='The maximal number of returned results per HTTPS Connection; default=50') - - args = parser.parse_args() - owners = {} - - if args.suppress_certificate_warnings: - requests.packages.urllib3.disable_warnings() - - logger = getLogger(debug_level_in=2) - - # read config - rlmUsername, rlmPassword, ldapPath, rlmApiUrl = readConfig(args.config) - - if not rlmApiUrl.startswith("http"): - # assuming config file instead of direct API access - try: - with open(rlmApiUrl, "r") as ownerDumpFH: - ownerData = json.loads(ownerDumpFH.read()) - except: - logger.error("error while trying to read owners from config file '" + rlmApiUrl + "', exception: " + str(traceback.format_exc())) - sys.exit(1) - else: - # get App List directly from RLM via API - try: - oauthToken = rlmLogin(rlmUsername, rlmPassword, rlmApiUrl + api_url_path_rlm_login) - # logger.debug("token for RLM: " + oauthToken) - ownerData = rlmGetOwners(oauthToken, rlmApiUrl + api_url_path_rlm_apps) - - except: - logger.error("error while getting owner data from RLM API: " + str(traceback.format_exc())) - sys.exit(1) - - # normalizing owners config from Tufin RLM - normOwners = { "owners": [] } - for owner in ownerData['owners']: - # logger.info("dealing with owner " + str(owner)) - - users = [] - for uid in owner['owner']['members']: - users.append(buildDN(uid, ldapPath)) - - ownNorm = { - "app_id_external": owner['owner']['name'], - "name": owner['description'], - "main_user": None, - "modellers": users, - "import_source": "tufinRlm", - "app_servers": extractSocketInfo(owner['asset'], owner['services']), - } - normOwners['owners'].append(ownNorm) - - # logger.info("normOwners = " + json.dumps(normOwners, indent=3)) - path = os.path.dirname(__file__) - fileOut = path + '/' + Path(os.path.basename(__file__)).stem + ".json" - logger.info("dumping into file " + fileOut) - with open(fileOut, "w") as outFH: - json.dump(normOwners, outFH, indent=3) - sys.exit(0)