Skip to content

Commit

Permalink
Merge pull request #114 from PalNilsson/next
Browse files Browse the repository at this point in the history
3.7.2.4
  • Loading branch information
PalNilsson authored Mar 6, 2024
2 parents 569c213 + d1513d7 commit aab762a
Show file tree
Hide file tree
Showing 12 changed files with 288 additions and 66 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.7.1.50
3.7.2.4
9 changes: 8 additions & 1 deletion pilot/test/test_escommunicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ class TestESCommunicationManagerPanda(unittest.TestCase):
def test_communicator_manager(self):
"""Make sure that es communicator manager thread works as expected."""
communicator_manager = None
# set a timeout of 10 seconds to prevent potential hanging due to problems with DNS resolution, or if the DNS
# server is slow to respond
socket.setdefaulttimeout(10)
try:
fqdn = socket.getfqdn()
except socket.herror:
fqdn = 'localhost'
try:
args = {'workflow': 'eventservice_hpc',
'queue': 'BNL_CLOUD_MCORE',
Expand All @@ -90,7 +97,7 @@ def test_communicator_manager(self):
'url': 'https://aipanda007.cern.ch',
'job_label': 'ptest',
'pilot_user': 'ATLAS',
'node': socket.getfqdn(),
'node': fqdn,
'mem': 16000,
'disk_space': 160000,
'working_group': '',
Expand Down
9 changes: 8 additions & 1 deletion pilot/test/test_esworkexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ def setUpClass(cls):
:raises Exception: in case of failure.
"""
# set a timeout of 10 seconds to prevent potential hanging due to problems with DNS resolution, or if the DNS
# server is slow to respond
socket.setdefaulttimeout(10)
try:
fqdn = socket.getfqdn()
except socket.herror:
fqdn = 'localhost'
try:
args = {'workflow': 'eventservice_hpc',
'queue': 'BNL_CLOUD_MCORE',
Expand All @@ -72,7 +79,7 @@ def setUpClass(cls):
'url': 'https://aipanda007.cern.ch',
'job_label': 'ptest',
'pilot_user': 'ATLAS',
'node': socket.getfqdn(),
'node': fqdn,
'mem': 16000,
'disk_space': 160000,
'working_group': '',
Expand Down
8 changes: 5 additions & 3 deletions pilot/user/atlas/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,11 @@ def prepend_env_vars(environ: str, cmd: str) -> str:
exports_to_add = ''
for _cmd in exports:
exports_to_add += _cmd
if exports_to_add:
cmd = exports_to_add + cmd
logger.debug(f'prepended exports to payload command: {exports_to_add}')

# add the UTC time zone
exports_to_add += "export TZ=\'UTC\'; "
cmd = exports_to_add + cmd
logger.debug(f'prepended exports to payload command: {exports_to_add}')

return cmd

Expand Down
17 changes: 11 additions & 6 deletions pilot/user/atlas/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from pilot.user.atlas.proxy import get_and_verify_proxy, get_voms_role
from pilot.info import InfoService, infosys
from pilot.util.config import config
from pilot.util.constants import get_rucio_client_version
from pilot.util.container import obscure_token
from pilot.util.filehandling import (
grep,
Expand Down Expand Up @@ -370,7 +371,8 @@ def get_container_options(container_options):
pass
# opts += 'export ALRB_CONT_CMDOPTS=\"$ALRB_CONT_CMDOPTS -c -i -p\";'
else:
opts += '-e \"-C\"'
#opts += '-e \"-C\"'
opts += '-e \"-c -i\"'

return opts

Expand Down Expand Up @@ -892,7 +894,7 @@ def get_root_container_script(cmd: str) -> str:
:param cmd: root command (str)
:return: script content (str).
"""
content = f'date\nlsetup \'root pilot-default\'\ndate\nstdbuf -oL bash -c \"python3 {cmd}\"\nexit $?'
content = f'date\nexport XRD_LOGLEVEL=Debug\nlsetup \'root pilot-default\'\ndate\nstdbuf -oL bash -c \"python3 {cmd}\"\nexit $?'
logger.debug(f'root setup script content:\n\n{content}\n\n')

return content
Expand All @@ -915,10 +917,13 @@ def get_middleware_container_script(middleware_container: str, cmd: str, asetup:
content = cmd[cmd.find('source $AtlasSetup'):]
elif 'rucio' in middleware_container:
content = sitename
content += f'export ATLAS_LOCAL_ROOT_BASE={get_file_system_root_path()}/atlas.cern.ch/repo/ATLASLocalRootBase; '
content += "alias setupATLAS=\'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh\'; "
content += "setupATLAS -3; "
content = f'lsetup "python pilot-default";python3 {cmd} '
#content += f'export ATLAS_LOCAL_ROOT_BASE={get_file_system_root_path()}/atlas.cern.ch/repo/ATLASLocalRootBase; '
#content += "alias setupATLAS=\'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh\'; "
#content += "setupATLAS -3; "
rucio_version = get_rucio_client_version()
if rucio_version:
content += f'export ATLAS_LOCAL_RUCIOCLIENTS_VERSION={rucio_version}; '
content += f'lsetup "python pilot-default";python3 {cmd} '
else:
content = 'export ALRB_LOCAL_PY3=YES; '
if asetup: # export ATLAS_LOCAL_ROOT_BASE=/cvmfs/..;source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet;
Expand Down
11 changes: 9 additions & 2 deletions pilot/util/activemq.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,15 @@ def __init__(self, **kwargs: dict) -> None:
# prevent stomp from exposing credentials in stdout (in case pilot is running in debug mode)
logging.getLogger('stomp').setLevel(logging.INFO)

# get the list of brokers to use
_addrinfos = socket.getaddrinfo(self.broker, 0, socket.AF_INET, 0, socket.IPPROTO_TCP)
# set a timeout of 10 seconds to prevent potential hanging due to problems with DNS resolution, or if the DNS
# server is slow to respond
socket.setdefaulttimeout(10)
try:
# get the list of brokers to use
_addrinfos = socket.getaddrinfo(self.broker, 0, socket.AF_INET, 0, socket.IPPROTO_TCP)
except socket.herror as exc:
logger.warning(f'failed get address from socket: {exc}')
return
self.brokers_resolved = [_ai[4][0] for _ai in _addrinfos]

receive_topic = self.receive_topics[0]
Expand Down
11 changes: 7 additions & 4 deletions pilot/util/auxiliary.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@

"""Auxiliary functions."""

import logging
import os
import re
import socket
import sys
import logging

from collections.abc import Set, Mapping
from collections import deque, OrderedDict
Expand All @@ -40,7 +41,6 @@
SERVER_UPDATE_TROUBLE,
get_pilot_version,
)

from pilot.common.errorcodes import ErrorCodes
from pilot.util.container import execute
from pilot.util.filehandling import dump
Expand Down Expand Up @@ -705,8 +705,11 @@ def get_host_name():
elif hasattr(os, 'uname'):
host = os.uname()[1]
else:
import socket
host = socket.gethostname()
try:
host = socket.gethostname()
except socket.herror as exc:
logger.warning(f'failed to get host name: {exc}')
host = 'localhost'
return host.split('.')[0]

globaljobid = get_globaljobid()
Expand Down
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '7' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '50' # build number should be reset to '1' for every new development cycle
REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '4' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
76 changes: 43 additions & 33 deletions pilot/util/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def execute(executable: Any, **kwargs: dict) -> Any:
stderr = ''

# Acquire the lock before creating the subprocess
process = None
with execute_lock:
process = subprocess.Popen(exe,
bufsize=-1,
Expand Down Expand Up @@ -122,11 +123,13 @@ def execute(executable: Any, **kwargs: dict) -> Any:
# (not strictly necessary when process.communicate() is used)
try:
# wait for the process to complete with a timeout of 60 seconds
process.wait(timeout=60)
if process:
process.wait(timeout=60)
except subprocess.TimeoutExpired:
# Handle the case where the process did not complete within the timeout
print("process did not complete within the timeout of 60s - terminating")
process.terminate()
if process:
logger.warning("process did not complete within the timeout of 60s - terminating")
process.terminate()

# remove any added \n
if stdout and stdout.endswith('\n'):
Expand All @@ -152,8 +155,9 @@ def _timeout_handler():
nonlocal exit_code # Use nonlocal to modify the outer variable
logger.warning("subprocess execution timed out")
exit_code = -2
process.terminate() # Terminate the subprocess if it's still running
logger.info(f'process terminated after {timeout_seconds}s')
if process:
process.terminate() # Terminate the subprocess if it's still running
logger.info(f'process terminated after {timeout_seconds}s')

obscure = kwargs.get('obscure', '') # if this string is set, hide it in the log message
if not kwargs.get('mute', False):
Expand All @@ -162,43 +166,49 @@ def _timeout_handler():
exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable]

# Create the subprocess with stdout and stderr redirection to files
process = subprocess.Popen(exe,
stdout=stdout_file,
stderr=stderr_file,
cwd=kwargs.get('cwd', os.getcwd()),
preexec_fn=os.setsid,
encoding='utf-8',
errors='replace')

# Set up a timer for the timeout
timeout_timer = threading.Timer(timeout_seconds, _timeout_handler)
# Acquire the lock before creating the subprocess
process = None
with execute_lock:
process = subprocess.Popen(exe,
stdout=stdout_file,
stderr=stderr_file,
cwd=kwargs.get('cwd', os.getcwd()),
preexec_fn=os.setsid,
encoding='utf-8',
errors='replace')

try:
# Start the timer
timeout_timer.start()
# Set up a timer for the timeout
timeout_timer = threading.Timer(timeout_seconds, _timeout_handler)

# wait for the process to finish
try:
# wait for the process to complete with a timeout (this will likely never happen since a timer is used)
process.wait(timeout=timeout_seconds + 10)
except subprocess.TimeoutExpired:
# Handle the case where the process did not complete within the timeout
timeout_seconds = timeout_seconds + 10
logger.warning(f"process wait did not complete within the timeout of {timeout_seconds}s - terminating")
exit_code = -2
process.terminate()
except Exception as exc:
logger.warning(f'execution caught: {exc}')
finally:
# Cancel the timer to avoid it firing after the subprocess has completed
timeout_timer.cancel()
# Start the timer
timeout_timer.start()

# wait for the process to finish
try:
# wait for the process to complete with a timeout (this will likely never happen since a timer is used)
process.wait(timeout=timeout_seconds + 10)
except subprocess.TimeoutExpired:
# Handle the case where the process did not complete within the timeout
timeout_seconds = timeout_seconds + 10
logger.warning(f"process wait did not complete within the timeout of {timeout_seconds}s - terminating")
exit_code = -2
process.terminate()
except Exception as exc:
logger.warning(f'execution caught: {exc}')
finally:
# Cancel the timer to avoid it firing after the subprocess has completed
timeout_timer.cancel()

if exit_code == -2:
# the process was terminated due to a time-out
exit_code = errors.COMMANDTIMEDOUT
else:
# get the exit code after a normal finish
exit_code = process.returncode
if process:
exit_code = process.returncode
else:
exit_code = -1

return exit_code

Expand Down
11 changes: 10 additions & 1 deletion pilot/util/harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,20 @@ def get_initial_work_report() -> dict:
:return: work report dictionary (dict).
"""
# set a timeout of 10 seconds to prevent potential hanging due to problems with DNS resolution, or if the DNS
# server is slow to respond
socket.setdefaulttimeout(10)
try:
hostname = socket.gethostname()
except socket.herror as exc:
logger.warning(f'failed to get hostname: {exc}')
hostname = 'localhost'

return {'jobStatus': 'starting',
'messageLevel': logging.getLevelName(logger.getEffectiveLevel()),
'cpuConversionFactor': 1.0,
'cpuConsumptionTime': '',
'node': os.environ.get('PANDA_HOSTNAME', socket.gethostname()),
'node': os.environ.get('PANDA_HOSTNAME', hostname),
'workdir': '',
'timestamp': time_stamp(),
'endTime': '',
Expand Down
Loading

0 comments on commit aab762a

Please sign in to comment.