diff --git a/PILOTVERSION b/PILOTVERSION index 1f2bdbd7..2d892f10 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.6.6.22 \ No newline at end of file +3.6.7.10 \ No newline at end of file diff --git a/pilot/common/exception.py b/pilot/common/exception.py index 3322f078..bac86be3 100644 --- a/pilot/common/exception.py +++ b/pilot/common/exception.py @@ -378,6 +378,16 @@ def __init__(self, *args, **kwargs): self._message = errors.get_error_message(self._errorCode) +class MiddlewareImportFailure(PilotException): + """ + No matching replicas were found in list_replicas() output. + """ + def __init__(self, *args, **kwargs): + super(MiddlewareImportFailure, self).__init__(args, kwargs) + self._errorCode = errors.MIDDLEWAREIMPORTFAILURE + self._message = errors.get_error_message(self._errorCode) + + class JobAlreadyRunning(PilotException): """ Job is already running elsewhere. diff --git a/pilot/control/job.py b/pilot/control/job.py index 139859e8..46d4d480 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -508,6 +508,9 @@ def handle_backchannel_command(res, job, args, test_tobekilled=False): if 'command' in res and res.get('command') != 'NULL': # warning: server might return comma-separated string, 'debug,tobekilled' cmd = res.get('command') + + # for testing debug command: cmd = 'tail pilotlog.txt' + # is it a 'command options'-type? debug_command=tail .., ls .., gdb .., ps .., du .. if ' ' in cmd and 'tobekilled' not in cmd: job.debug, job.debug_command = get_debug_command(cmd) diff --git a/pilot/control/payload.py b/pilot/control/payload.py index ba706493..1590d550 100644 --- a/pilot/control/payload.py +++ b/pilot/control/payload.py @@ -463,9 +463,12 @@ def find_log_to_tail(debug_command, workdir, args, is_analysis): counter += 10 # fallback to known log file if no other file could be found - if not path: - logger.warning(f'file {filename} was not found for {maxwait} s, using default') logf = path if path else config.Payload.payloadstdout + if not path: + if filename: + logger.warning(f'file {filename} was not found for {maxwait} s, using default') + else: + logger.info(f'using {logf} for real-time logging') return logf diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 972e815f..687562c9 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -13,8 +13,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '6' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '22' # build number should be reset to '1' for every new development cycle +REVISION = '7' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '10' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/https.py b/pilot/util/https.py index 4d042ae6..b230bfbe 100644 --- a/pilot/util/https.py +++ b/pilot/util/https.py @@ -29,6 +29,7 @@ from .constants import get_pilot_version from .container import execute from pilot.common.errorcodes import ErrorCodes +from pilot.common.exception import FileHandlingFailure import logging logger = logging.getLogger(__name__) @@ -201,7 +202,11 @@ def request(url, data=None, plain=False, secure=True, ipv='IPv6'): # get the filename and strdata for the curl config file filename, strdata = get_vars(url, data) # write the strdata to file - writestatus = write_file(filename, strdata) + try: + writestatus = write_file(filename, strdata) + except FileHandlingFailure: + writestatus = None + # get the config option for the curl command dat = get_curl_config_option(writestatus, url, data, filename) diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 86dbdc1c..b7a098bb 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -18,19 +18,40 @@ from signal import SIGKILL from pilot.common.errorcodes import ErrorCodes -from pilot.common.exception import PilotException +from pilot.common.exception import PilotException, MiddlewareImportFailure from pilot.util.auxiliary import set_pilot_state #, show_memory_usage from pilot.util.config import config from pilot.util.constants import PILOT_PRE_PAYLOAD from pilot.util.container import execute -from pilot.util.filehandling import get_disk_usage, remove_files, get_local_file_size, read_file, zip_files +from pilot.util.filehandling import ( + get_disk_usage, + remove_files, + get_local_file_size, + read_file, + zip_files +) from pilot.util.loopingjob import looping_job -from pilot.util.math import convert_mb_to_b, human2bytes -from pilot.util.parameters import convert_to_int, get_maximum_input_sizes -from pilot.util.processes import get_current_cpu_consumption_time, kill_processes, get_number_of_child_processes,\ - get_subprocesses, reap_zombies +from pilot.util.math import ( + convert_mb_to_b, + human2bytes +) +from pilot.util.parameters import ( + convert_to_int, + get_maximum_input_sizes +) +from pilot.util.processes import ( + get_current_cpu_consumption_time, + kill_processes, + get_number_of_child_processes, + get_subprocesses, + reap_zombies +) +from pilot.util.psutils import is_process_running from pilot.util.timing import get_time_since -from pilot.util.workernode import get_local_disk_space, check_hz +from pilot.util.workernode import ( + get_local_disk_space, + check_hz +) from pilot.info import infosys import logging @@ -39,7 +60,7 @@ errors = ErrorCodes() -def job_monitor_tasks(job, mt, args): +def job_monitor_tasks(job, mt, args): # noqa: C901 """ Perform the tasks for the job monitoring. The function is called once a minute. Individual checks will be performed at any desired time interval (>= 1 @@ -54,6 +75,10 @@ def job_monitor_tasks(job, mt, args): exit_code = 0 diagnostics = "" + # verify that the process is still alive + if not still_running(job.pid): + return 0, "" + current_time = int(time.time()) # update timing info for running jobs (to avoid an update after the job has finished) @@ -65,6 +90,11 @@ def job_monitor_tasks(job, mt, args): # confirm that the worker node has a proper SC_CLK_TCK (problems seen on MPPMU) check_hz() + + # verify that the process is still alive (again) + if not still_running(job.pid): + return 0, "" + try: cpuconsumptiontime = get_current_cpu_consumption_time(job.pid) except Exception as error: @@ -73,9 +103,14 @@ def job_monitor_tasks(job, mt, args): exit_code = get_exception_error_code(diagnostics) return exit_code, diagnostics else: - job.cpuconsumptiontime = int(round(cpuconsumptiontime)) - job.cpuconversionfactor = 1.0 - logger.info(f'CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})') + _cpuconsumptiontime = int(round(cpuconsumptiontime)) + if _cpuconsumptiontime > 0: + job.cpuconsumptiontime = int(round(cpuconsumptiontime)) + job.cpuconversionfactor = 1.0 + logger.info(f'(instant) CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})') + else: + logger.warning(f'process {job.pid} is no longer using CPU - aborting') + return 0, "" # keep track of the subprocesses running (store payload subprocess PIDs) store_subprocess_pids(job) @@ -130,9 +165,28 @@ def job_monitor_tasks(job, mt, args): if exit_code != 0: return exit_code, diagnostics + logger.debug(f'job monitor tasks loop took {int(time.time()) - current_time} s to complete') + return exit_code, diagnostics +def still_running(pid): + # verify that the process is still alive + + running = False + try: + if pid: + if not is_process_running(pid): + logger.warning(f'aborting job monitor tasks since payload process {pid} is not running') + else: + running = True + logger.debug(f'payload process {pid} is running') + except MiddlewareImportFailure as exc: + logger.warning(f'exception caught: {exc}') + + return running + + def display_oom_info(payload_pid): """ Display OOM process info. diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py new file mode 100644 index 00000000..94d4bc7b --- /dev/null +++ b/pilot/util/psutils.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Authors: +# - Paul Nilsson, paul.nilsson@cern.ch, 2023 + +import os +try: + import psutil +except ImportError: + print('FAILED; psutil module could not be imported') + is_psutil_available = False +else: + is_psutil_available = True + +# from pilot.common.exception import MiddlewareImportFailure + +import logging +logger = logging.getLogger(__name__) + + +def is_process_running_by_pid(pid): + return os.path.exists(f"/proc/{pid}") + + +def is_process_running(pid): + """ + Is the given process still running? + + Note: if psutil module is not available, this function will raise an exception. + + :param pid: process id (int) + :return: True (process still running), False (process not running) + :raises: MiddlewareImportFailure if psutil module is not available. + """ + + if not is_psutil_available: + is_running = is_process_running_by_pid(pid) + logger.warning(f'using /proc/{pid} instead of psutil (is_running={is_running})') + return is_running + # raise MiddlewareImportFailure("required dependency could not be imported: psutil") + else: + return psutil.pid_exists(pid) diff --git a/pilot/util/realtimelogger.py b/pilot/util/realtimelogger.py index 1ecded27..65f7369e 100644 --- a/pilot/util/realtimelogger.py +++ b/pilot/util/realtimelogger.py @@ -18,6 +18,12 @@ from logging import Logger, INFO import logging +try: + import google.cloud.logging + from google.cloud.logging_v2.handlers import CloudLoggingHandler +except ImportError: + pass + logger = logging.getLogger(__name__) @@ -109,8 +115,6 @@ def __init__(self, args, info_dic, workdir, secrets, level=INFO): try: if logtype == "google-cloud-logging": - import google.cloud.logging - from google.cloud.logging_v2.handlers import CloudLoggingHandler client = google.cloud.logging.Client() _handler = CloudLoggingHandler(client, name=name) api_logger = logging.getLogger('google.cloud.logging_v2') @@ -258,10 +262,13 @@ def sending_logs(self, args, job): self.set_jobinfo(job) self.add_logfiles(job) i = 0 + t_start = time.time() + cutoff = 10 * 60 # 10 minutes while not args.graceful_stop.is_set(): i += 1 if i % 10 == 1: - logger.debug(f'RealTimeLogger iteration #{i} (job state={job.state})') + logger.debug(f'RealTimeLogger iteration #{i} (job state={job.state}, logfiles={self.logfiles})') + # there might be special cases when RT logs should be sent, e.g. for pilot logs if job.state == '' or job.state == 'starting' or job.state == 'running': if len(self.logfiles) > len(self.openfiles): for logfile in self.logfiles: @@ -278,14 +285,24 @@ def sending_logs(self, args, job): logger.debug('no real-time logging during stage-in/out') pass else: - # logger.debug(f'real-time logging: sending logs for state={job.state} [2]') + # run longer for pilotlog + # wait for job.completed=True, for a maximum of N minutes + if ['pilotlog.txt' in logfile for logfile in self.logfiles] == [True]: + if not job.completed and (time.time() - t_start < cutoff): + time.sleep(5) + continue + logger.info(f'aborting real-time logging of pilot log after {time.time() - t_start} s (cut off: {cutoff} s)') + + logger.info(f'sending last real-time logs for job {job.jobid} (state={job.state})') self.send_loginfiles() # send the remaining logs after the job completion self.close_files() break time.sleep(5) else: + logger.debug('sending last real-time logs') self.send_loginfiles() # send the remaining logs after the job completion self.close_files() + logger.info('finished sending real-time logs') def get_rtlogging_ssl(self): """ diff --git a/pilot/workflow/generic.py b/pilot/workflow/generic.py index f4914f66..94e22cbb 100644 --- a/pilot/workflow/generic.py +++ b/pilot/workflow/generic.py @@ -52,9 +52,9 @@ def interrupt(args, signum, frame): sig = [v for v, k in list(signal.__dict__.items()) if k == signum][0] # ignore SIGUSR1 since that will be aimed at a child process - if str(sig) == 'SIGUSR1': - logger.info('ignore intercepted SIGUSR1 aimed at child process') - return + #if str(sig) == 'SIGUSR1': + # logger.info('ignore intercepted SIGUSR1 aimed at child process') + # return args.signal_counter += 1