diff --git a/PILOTVERSION b/PILOTVERSION index f92be120..ca88450d 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.7.1.50 \ No newline at end of file +3.7.2.4 \ No newline at end of file diff --git a/pilot/test/test_escommunicator.py b/pilot/test/test_escommunicator.py index b483914e..efd32a0f 100644 --- a/pilot/test/test_escommunicator.py +++ b/pilot/test/test_escommunicator.py @@ -82,6 +82,13 @@ class TestESCommunicationManagerPanda(unittest.TestCase): def test_communicator_manager(self): """Make sure that es communicator manager thread works as expected.""" communicator_manager = None + # set a timeout of 10 seconds to prevent potential hanging due to problems with DNS resolution, or if the DNS + # server is slow to respond + socket.setdefaulttimeout(10) + try: + fqdn = socket.getfqdn() + except socket.herror: + fqdn = 'localhost' try: args = {'workflow': 'eventservice_hpc', 'queue': 'BNL_CLOUD_MCORE', @@ -90,7 +97,7 @@ def test_communicator_manager(self): 'url': 'https://aipanda007.cern.ch', 'job_label': 'ptest', 'pilot_user': 'ATLAS', - 'node': socket.getfqdn(), + 'node': fqdn, 'mem': 16000, 'disk_space': 160000, 'working_group': '', diff --git a/pilot/test/test_esworkexecutor.py b/pilot/test/test_esworkexecutor.py index c88d6a0a..1b332745 100644 --- a/pilot/test/test_esworkexecutor.py +++ b/pilot/test/test_esworkexecutor.py @@ -64,6 +64,13 @@ def setUpClass(cls): :raises Exception: in case of failure. """ + # set a timeout of 10 seconds to prevent potential hanging due to problems with DNS resolution, or if the DNS + # server is slow to respond + socket.setdefaulttimeout(10) + try: + fqdn = socket.getfqdn() + except socket.herror: + fqdn = 'localhost' try: args = {'workflow': 'eventservice_hpc', 'queue': 'BNL_CLOUD_MCORE', @@ -72,7 +79,7 @@ def setUpClass(cls): 'url': 'https://aipanda007.cern.ch', 'job_label': 'ptest', 'pilot_user': 'ATLAS', - 'node': socket.getfqdn(), + 'node': fqdn, 'mem': 16000, 'disk_space': 160000, 'working_group': '', diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index 35fb77bd..38d6691e 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -575,9 +575,11 @@ def prepend_env_vars(environ: str, cmd: str) -> str: exports_to_add = '' for _cmd in exports: exports_to_add += _cmd - if exports_to_add: - cmd = exports_to_add + cmd - logger.debug(f'prepended exports to payload command: {exports_to_add}') + + # add the UTC time zone + exports_to_add += "export TZ=\'UTC\'; " + cmd = exports_to_add + cmd + logger.debug(f'prepended exports to payload command: {exports_to_add}') return cmd diff --git a/pilot/user/atlas/container.py b/pilot/user/atlas/container.py index e9758055..145a3aac 100644 --- a/pilot/user/atlas/container.py +++ b/pilot/user/atlas/container.py @@ -35,6 +35,7 @@ from pilot.user.atlas.proxy import get_and_verify_proxy, get_voms_role from pilot.info import InfoService, infosys from pilot.util.config import config +from pilot.util.constants import get_rucio_client_version from pilot.util.container import obscure_token from pilot.util.filehandling import ( grep, @@ -370,7 +371,8 @@ def get_container_options(container_options): pass # opts += 'export ALRB_CONT_CMDOPTS=\"$ALRB_CONT_CMDOPTS -c -i -p\";' else: - opts += '-e \"-C\"' + #opts += '-e \"-C\"' + opts += '-e \"-c -i\"' return opts @@ -892,7 +894,7 @@ def get_root_container_script(cmd: str) -> str: :param cmd: root command (str) :return: script content (str). """ - content = f'date\nlsetup \'root pilot-default\'\ndate\nstdbuf -oL bash -c \"python3 {cmd}\"\nexit $?' + content = f'date\nexport XRD_LOGLEVEL=Debug\nlsetup \'root pilot-default\'\ndate\nstdbuf -oL bash -c \"python3 {cmd}\"\nexit $?' logger.debug(f'root setup script content:\n\n{content}\n\n') return content @@ -915,10 +917,13 @@ def get_middleware_container_script(middleware_container: str, cmd: str, asetup: content = cmd[cmd.find('source $AtlasSetup'):] elif 'rucio' in middleware_container: content = sitename - content += f'export ATLAS_LOCAL_ROOT_BASE={get_file_system_root_path()}/atlas.cern.ch/repo/ATLASLocalRootBase; ' - content += "alias setupATLAS=\'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh\'; " - content += "setupATLAS -3; " - content = f'lsetup "python pilot-default";python3 {cmd} ' + #content += f'export ATLAS_LOCAL_ROOT_BASE={get_file_system_root_path()}/atlas.cern.ch/repo/ATLASLocalRootBase; ' + #content += "alias setupATLAS=\'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh\'; " + #content += "setupATLAS -3; " + rucio_version = get_rucio_client_version() + if rucio_version: + content += f'export ATLAS_LOCAL_RUCIOCLIENTS_VERSION={rucio_version}; ' + content += f'lsetup "python pilot-default";python3 {cmd} ' else: content = 'export ALRB_LOCAL_PY3=YES; ' if asetup: # export ATLAS_LOCAL_ROOT_BASE=/cvmfs/..;source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet; diff --git a/pilot/util/activemq.py b/pilot/util/activemq.py index 1b4d4485..e5db475b 100644 --- a/pilot/util/activemq.py +++ b/pilot/util/activemq.py @@ -155,8 +155,15 @@ def __init__(self, **kwargs: dict) -> None: # prevent stomp from exposing credentials in stdout (in case pilot is running in debug mode) logging.getLogger('stomp').setLevel(logging.INFO) - # get the list of brokers to use - _addrinfos = socket.getaddrinfo(self.broker, 0, socket.AF_INET, 0, socket.IPPROTO_TCP) + # set a timeout of 10 seconds to prevent potential hanging due to problems with DNS resolution, or if the DNS + # server is slow to respond + socket.setdefaulttimeout(10) + try: + # get the list of brokers to use + _addrinfos = socket.getaddrinfo(self.broker, 0, socket.AF_INET, 0, socket.IPPROTO_TCP) + except socket.herror as exc: + logger.warning(f'failed get address from socket: {exc}') + return self.brokers_resolved = [_ai[4][0] for _ai in _addrinfos] receive_topic = self.receive_topics[0] diff --git a/pilot/util/auxiliary.py b/pilot/util/auxiliary.py index a8b0fa8e..6fdf6790 100644 --- a/pilot/util/auxiliary.py +++ b/pilot/util/auxiliary.py @@ -21,10 +21,11 @@ """Auxiliary functions.""" +import logging import os import re +import socket import sys -import logging from collections.abc import Set, Mapping from collections import deque, OrderedDict @@ -40,7 +41,6 @@ SERVER_UPDATE_TROUBLE, get_pilot_version, ) - from pilot.common.errorcodes import ErrorCodes from pilot.util.container import execute from pilot.util.filehandling import dump @@ -705,8 +705,11 @@ def get_host_name(): elif hasattr(os, 'uname'): host = os.uname()[1] else: - import socket - host = socket.gethostname() + try: + host = socket.gethostname() + except socket.herror as exc: + logger.warning(f'failed to get host name: {exc}') + host = 'localhost' return host.split('.')[0] globaljobid = get_globaljobid() diff --git a/pilot/util/constants.py b/pilot/util/constants.py index b620d71e..90d45790 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -27,8 +27,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '7' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '50' # build number should be reset to '1' for every new development cycle +REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '4' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/container.py b/pilot/util/container.py index ede58cc5..7725f115 100644 --- a/pilot/util/container.py +++ b/pilot/util/container.py @@ -89,6 +89,7 @@ def execute(executable: Any, **kwargs: dict) -> Any: stderr = '' # Acquire the lock before creating the subprocess + process = None with execute_lock: process = subprocess.Popen(exe, bufsize=-1, @@ -122,11 +123,13 @@ def execute(executable: Any, **kwargs: dict) -> Any: # (not strictly necessary when process.communicate() is used) try: # wait for the process to complete with a timeout of 60 seconds - process.wait(timeout=60) + if process: + process.wait(timeout=60) except subprocess.TimeoutExpired: # Handle the case where the process did not complete within the timeout - print("process did not complete within the timeout of 60s - terminating") - process.terminate() + if process: + logger.warning("process did not complete within the timeout of 60s - terminating") + process.terminate() # remove any added \n if stdout and stdout.endswith('\n'): @@ -152,8 +155,9 @@ def _timeout_handler(): nonlocal exit_code # Use nonlocal to modify the outer variable logger.warning("subprocess execution timed out") exit_code = -2 - process.terminate() # Terminate the subprocess if it's still running - logger.info(f'process terminated after {timeout_seconds}s') + if process: + process.terminate() # Terminate the subprocess if it's still running + logger.info(f'process terminated after {timeout_seconds}s') obscure = kwargs.get('obscure', '') # if this string is set, hide it in the log message if not kwargs.get('mute', False): @@ -162,43 +166,49 @@ def _timeout_handler(): exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable] # Create the subprocess with stdout and stderr redirection to files - process = subprocess.Popen(exe, - stdout=stdout_file, - stderr=stderr_file, - cwd=kwargs.get('cwd', os.getcwd()), - preexec_fn=os.setsid, - encoding='utf-8', - errors='replace') - - # Set up a timer for the timeout - timeout_timer = threading.Timer(timeout_seconds, _timeout_handler) + # Acquire the lock before creating the subprocess + process = None + with execute_lock: + process = subprocess.Popen(exe, + stdout=stdout_file, + stderr=stderr_file, + cwd=kwargs.get('cwd', os.getcwd()), + preexec_fn=os.setsid, + encoding='utf-8', + errors='replace') - try: - # Start the timer - timeout_timer.start() + # Set up a timer for the timeout + timeout_timer = threading.Timer(timeout_seconds, _timeout_handler) - # wait for the process to finish try: - # wait for the process to complete with a timeout (this will likely never happen since a timer is used) - process.wait(timeout=timeout_seconds + 10) - except subprocess.TimeoutExpired: - # Handle the case where the process did not complete within the timeout - timeout_seconds = timeout_seconds + 10 - logger.warning(f"process wait did not complete within the timeout of {timeout_seconds}s - terminating") - exit_code = -2 - process.terminate() - except Exception as exc: - logger.warning(f'execution caught: {exc}') - finally: - # Cancel the timer to avoid it firing after the subprocess has completed - timeout_timer.cancel() + # Start the timer + timeout_timer.start() + + # wait for the process to finish + try: + # wait for the process to complete with a timeout (this will likely never happen since a timer is used) + process.wait(timeout=timeout_seconds + 10) + except subprocess.TimeoutExpired: + # Handle the case where the process did not complete within the timeout + timeout_seconds = timeout_seconds + 10 + logger.warning(f"process wait did not complete within the timeout of {timeout_seconds}s - terminating") + exit_code = -2 + process.terminate() + except Exception as exc: + logger.warning(f'execution caught: {exc}') + finally: + # Cancel the timer to avoid it firing after the subprocess has completed + timeout_timer.cancel() if exit_code == -2: # the process was terminated due to a time-out exit_code = errors.COMMANDTIMEDOUT else: # get the exit code after a normal finish - exit_code = process.returncode + if process: + exit_code = process.returncode + else: + exit_code = -1 return exit_code diff --git a/pilot/util/harvester.py b/pilot/util/harvester.py index b76faf1b..8a9c396c 100644 --- a/pilot/util/harvester.py +++ b/pilot/util/harvester.py @@ -119,11 +119,20 @@ def get_initial_work_report() -> dict: :return: work report dictionary (dict). """ + # set a timeout of 10 seconds to prevent potential hanging due to problems with DNS resolution, or if the DNS + # server is slow to respond + socket.setdefaulttimeout(10) + try: + hostname = socket.gethostname() + except socket.herror as exc: + logger.warning(f'failed to get hostname: {exc}') + hostname = 'localhost' + return {'jobStatus': 'starting', 'messageLevel': logging.getLevelName(logger.getEffectiveLevel()), 'cpuConversionFactor': 1.0, 'cpuConsumptionTime': '', - 'node': os.environ.get('PANDA_HOSTNAME', socket.gethostname()), + 'node': os.environ.get('PANDA_HOSTNAME', hostname), 'workdir': '', 'timestamp': time_stamp(), 'endTime': '', diff --git a/pilot/util/https.py b/pilot/util/https.py index b33245e3..65cf9335 100644 --- a/pilot/util/https.py +++ b/pilot/util/https.py @@ -19,16 +19,24 @@ # Authors: # - Daniel Drizhuk, d.drizhuk@gmail.com, 2017 # - Mario Lassnig, mario.lassnig@cern.ch, 2017 -# - Paul Nilsson, paul.nilsson@cern.ch, 2017-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2017-24 """Functions for https interactions.""" +try: + import certifi +except ImportError: + certifi = None import json import logging import os import pipes import platform import random +try: + import requests +except ImportError: + requests = None import socket import ssl import sys @@ -420,6 +428,7 @@ def get_urlopen_output(req: Any, context: Any) -> (int, str): """ exitcode = -1 output = "" + logger.debug('ok about to open url') try: output = urllib.request.urlopen(req, context=context) except urllib.error.HTTPError as exc: @@ -428,7 +437,7 @@ def get_urlopen_output(req: Any, context: Any) -> (int, str): logger.warning(f'connection error: {exc.reason}') else: exitcode = 0 - + logger.debug(f'ok url opened: exitcode={exitcode}') return exitcode, output @@ -545,12 +554,20 @@ def get_panda_server(url: str, port: str, update_server: bool = True) -> str: if not update_server: return pandaserver + # set a timeout of 10 seconds to prevent potential hanging due to problems with DNS resolution, or if the DNS + # server is slow to respond + socket.setdefaulttimeout(10) + # add randomization for PanDA server default = 'pandaserver.cern.ch' if default in pandaserver: - rnd = random.choice([socket.getfqdn(vv) for vv in set([v[-1][0] for v in socket.getaddrinfo(default, 25443, socket.AF_INET)])]) - pandaserver = pandaserver.replace(default, rnd) - logger.debug(f'updated {default} to {pandaserver}') + try: + rnd = random.choice([socket.getfqdn(vv) for vv in set([v[-1][0] for v in socket.getaddrinfo(default, 25443, socket.AF_INET)])]) + except socket.herror as exc: + logger.warning(f'failed to get address from socket: {exc} - will use default server ({pandaserver})') + else: + pandaserver = pandaserver.replace(default, rnd) + logger.debug(f'updated {default} to {pandaserver}') return pandaserver @@ -609,3 +626,139 @@ def get_server_command(url: str, port: str, cmd: str = 'getJob') -> str: # randomize server name url = get_panda_server(url, port) return f'{url}/server/panda/{cmd}' + + +def request2_bad(url: str, data: dict = {}) -> str: + """ + Send a request using HTTPS. + + :param url: the URL of the resource (str) + :param data: data to send (dict) + :return: server response (str). + """ + + # convert the dictionary to a JSON string + data_json = json.dumps(data).encode('utf-8') + + # Create an SSLContext object + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + logger.debug(f'capath={_ctx.capath}') + logger.debug(f'cacert={_ctx.cacert}') + ssl_context.load_verify_locations(_ctx.capath) + ssl_context.load_cert_chain(_ctx.cacert) + # define additional headers + headers = { + "Content-Type": "application/json", + "User-Agent": _ctx.user_agent, + } + + # create a request object with the SSL context + request = urllib.request.Request(url, data=data_json, headers=headers, method='POST') + + # perform the HTTP request with the SSL context + try: + response = urllib.request.urlopen(request, context=ssl_context) + ret = response.read().decode('utf-8') + except (urllib.error.URLError, urllib.error.HTTPError) as exc: + logger.warning(f'failed to send request: {exc}') + ret = "" + + return ret + + +def request2(url: str, data: dict = {}) -> str: + """ + Send a request using HTTPS (using urllib module). + + :param url: the URL of the resource (str) + :param data: data to send (dict) + :return: server response (str). + """ + # https might not have been set up if running in a [middleware] container + if not _ctx.cacert: + logger.debug('setting up unset https') + https_setup(None, get_pilot_version()) + + # define additional headers + headers = { + "Content-Type": "application/json", + "User-Agent": _ctx.user_agent, + } + + logger.debug(f'headers={headers}') + + # Encode data as JSON + data_json = json.dumps(data).encode('utf-8') + #data_json = urllib.parse.quote(json.dumps(data)) + #data_json = data_json.encode('utf-8') + + logger.debug(f'data_json={data_json}') + + # Set up the request + req = urllib.request.Request(url, data_json, headers=headers) + + # Create a context with certificate verification + logger.debug(f'cacert={_ctx.cacert}') # /alrb/x509up_u25606_prod + logger.debug(f'capath={_ctx.capath}') # /cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase/etc/grid-security-emi/certificates + #context = ssl.create_default_context(cafile=_ctx.cacert, capath=_ctx.capath) + #logger.debug(f'context={context}') + + ssl_context = ssl.create_default_context(capath=_ctx.capath, cafile=_ctx.cacert) + # Send the request securely + try: + with urllib.request.urlopen(req, context=ssl_context) as response: + # Handle the response here + logger.debug(response.status, response.reason) + logger.debug(response.read().decode('utf-8')) + ret = response.read().decode('utf-8') + except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError) as exc: + logger.warning(f'failed to send request: {exc}') + ret = "" + + return ret + + +def request3(url: str, data: dict = {}) -> str: + """ + Send a request using HTTPS (using requests module). + + :param url: the URL of the resource (str) + :param data: data to send (dict) + :return: server response (str). + """ + if not requests: + logger.warning('cannot use requests module (not available)') + return "" + if not certifi: + logger.warning('cannot use certifi module (not available)') + return "" + + # https might not have been set up if running in a [middleware] container + if not _ctx.cacert: + logger.debug('setting up unset https') + https_setup(None, get_pilot_version()) + + # define additional headers + headers = { + "Content-Type": "application/json", + "User-Agent": _ctx.user_agent, + } + + # Convert the dictionary to a JSON string + data_json = json.dumps(data) + + # Use the requests module to make the HTTP request + try: + # certifi.where() = /cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase/x86_64/python/3.11.7-x86_64-el9/ + # lib/python3.11/site-packages/certifi/cacert.pem + # _ctx.cacert = /alrb/x509up_u25606_prod + response = requests.post(url, data=data_json, headers=headers, verify=_ctx.cacert, cert=certifi.where(), timeout=120) + response.raise_for_status() # Raise an error for bad responses (4xx and 5xx) + + # Handle the response as needed + ret = response.text + except (requests.exceptions.RequestException, requests.exceptions.Timeout) as exc: + logger.warning(f'failed to send request: {exc}') + ret = "" + + return ret diff --git a/pilot/util/tracereport.py b/pilot/util/tracereport.py index d4e17850..46cb3e11 100644 --- a/pilot/util/tracereport.py +++ b/pilot/util/tracereport.py @@ -28,11 +28,12 @@ from json import dumps from os import environ, getuid +from pilot.common.exception import FileHandlingFailure from pilot.util.config import config from pilot.util.constants import get_pilot_version, get_rucio_client_version from pilot.util.container import execute, execute2 -from pilot.common.exception import FileHandlingFailure from pilot.util.filehandling import append_to_file, write_file +# from pilot.util.https import request3 import logging logger = logging.getLogger(__name__) @@ -102,16 +103,27 @@ def init(self, job): self.update(data) self['timeStart'] = time.time() - hostname = os.environ.get('PANDA_HOSTNAME', socket.gethostname()) + # set a timeout of 10 seconds to prevent potential hanging due to problems with DNS resolution, or if the DNS + # server is slow to respond + socket.setdefaulttimeout(10) + + try: + hostname = os.environ.get('PANDA_HOSTNAME', socket.gethostname()) + except socket.herror as exc: + logger.warning(f'unable to detect hostname for trace report: {exc}') + hostname = os.environ.get('PANDA_HOSTNAME', 'unknown') + try: self['hostname'] = socket.gethostbyaddr(hostname)[0] - except Exception: - logger.debug("unable to detect hostname for trace report") + except socket.herror as exc: + logger.warning(f'unable to detect hostname by address for trace report: {exc}') + self['hostname'] = 'unknown' try: self['ip'] = socket.gethostbyname(hostname) - except Exception: - logger.debug("unable to detect host IP for trace report") + except socket.herror as exc: + logger.debug(f"unable to detect host IP for trace report: {exc}") + self['ip'] = '0.0.0.0' if job.jobdefinitionid: s = 'ppilot_%s' % job.jobdefinitionid @@ -119,7 +131,7 @@ def init(self, job): else: #self['uuid'] = commands.getoutput('uuidgen -t 2> /dev/null').replace('-', '') # all LFNs of one request have the same uuid cmd = 'uuidgen -t 2> /dev/null' - exit_code, stdout, stderr = execute(cmd) + exit_code, stdout, stderr = execute(cmd, timeout=10) self['uuid'] = stdout.replace('-', '') def get_value(self, key): @@ -187,6 +199,13 @@ def send(self): ssl_certificate = self.get_ssl_certificate() + #ret = request3(url, data) + #if ret: + # logger.info("tracing report sent") + # return True + #else: + # logger.warning("failed to send tracing report - using old curl command") + # create the command command = 'curl' if self.ipv == 'IPv4':