From 2a90a3a7283a6d93a89dca128adeb50d4d3e79c3 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Mon, 11 Sep 2023 14:51:59 +0200 Subject: [PATCH 01/12] New version --- PILOTVERSION | 2 +- pilot/util/constants.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 1f2bdbd7..3aa7c18e 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.6.6.22 \ No newline at end of file +3.6.7.1 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 972e815f..b174cdaa 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -13,8 +13,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '6' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '22' # build number should be reset to '1' for every new development cycle +REVISION = '7' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '1' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From ca7283e5a8d1a3dd66fd7dd7d15db392ed844098 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 12 Sep 2023 09:39:31 +0200 Subject: [PATCH 02/12] Moved import of google cloud logging to top of module --- pilot/util/realtimelogger.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pilot/util/realtimelogger.py b/pilot/util/realtimelogger.py index 1ecded27..a74a1d22 100644 --- a/pilot/util/realtimelogger.py +++ b/pilot/util/realtimelogger.py @@ -18,6 +18,12 @@ from logging import Logger, INFO import logging +try: + import google.cloud.logging + from google.cloud.logging_v2.handlers import CloudLoggingHandler +except ImportError: + pass + logger = logging.getLogger(__name__) @@ -109,8 +115,6 @@ def __init__(self, args, info_dic, workdir, secrets, level=INFO): try: if logtype == "google-cloud-logging": - import google.cloud.logging - from google.cloud.logging_v2.handlers import CloudLoggingHandler client = google.cloud.logging.Client() _handler = CloudLoggingHandler(client, name=name) api_logger = logging.getLogger('google.cloud.logging_v2') From f412e578bf836be03440aeed0ac5c9d00b7181fd Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 12 Sep 2023 09:50:25 +0200 Subject: [PATCH 03/12] Corrected log message --- pilot/control/payload.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pilot/control/payload.py b/pilot/control/payload.py index ba706493..1590d550 100644 --- a/pilot/control/payload.py +++ b/pilot/control/payload.py @@ -463,9 +463,12 @@ def find_log_to_tail(debug_command, workdir, args, is_analysis): counter += 10 # fallback to known log file if no other file could be found - if not path: - logger.warning(f'file {filename} was not found for {maxwait} s, using default') logf = path if path else config.Payload.payloadstdout + if not path: + if filename: + logger.warning(f'file {filename} was not found for {maxwait} s, using default') + else: + logger.info(f'using {logf} for real-time logging') return logf From d8276e325a5bba77c9611e8e1cac7ab50508e7b6 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 12 Sep 2023 16:53:23 +0200 Subject: [PATCH 04/12] Removed SIGUSR1 exception --- pilot/workflow/generic.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pilot/workflow/generic.py b/pilot/workflow/generic.py index f4914f66..94e22cbb 100644 --- a/pilot/workflow/generic.py +++ b/pilot/workflow/generic.py @@ -52,9 +52,9 @@ def interrupt(args, signum, frame): sig = [v for v, k in list(signal.__dict__.items()) if k == signum][0] # ignore SIGUSR1 since that will be aimed at a child process - if str(sig) == 'SIGUSR1': - logger.info('ignore intercepted SIGUSR1 aimed at child process') - return + #if str(sig) == 'SIGUSR1': + # logger.info('ignore intercepted SIGUSR1 aimed at child process') + # return args.signal_counter += 1 From 1c141c129acacd873b660525ce0a7edeb4584d32 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 12 Sep 2023 19:07:41 +0200 Subject: [PATCH 05/12] Testing tail pilotlog.txt --- PILOTVERSION | 2 +- pilot/control/job.py | 7 +++++++ pilot/util/constants.py | 2 +- pilot/util/realtimelogger.py | 10 ++++++++-- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 3aa7c18e..01c29b80 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.6.7.1 \ No newline at end of file +3.6.7.2f \ No newline at end of file diff --git a/pilot/control/job.py b/pilot/control/job.py index 139859e8..84546aae 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -508,6 +508,13 @@ def handle_backchannel_command(res, job, args, test_tobekilled=False): if 'command' in res and res.get('command') != 'NULL': # warning: server might return comma-separated string, 'debug,tobekilled' cmd = res.get('command') + + cmd = 'tail pilotlog.txt' + + + + + # is it a 'command options'-type? debug_command=tail .., ls .., gdb .., ps .., du .. if ' ' in cmd and 'tobekilled' not in cmd: job.debug, job.debug_command = get_debug_command(cmd) diff --git a/pilot/util/constants.py b/pilot/util/constants.py index b174cdaa..fb143341 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -14,7 +14,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '7' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '1' # build number should be reset to '1' for every new development cycle +BUILD = '2f' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/realtimelogger.py b/pilot/util/realtimelogger.py index a74a1d22..6164d4b5 100644 --- a/pilot/util/realtimelogger.py +++ b/pilot/util/realtimelogger.py @@ -265,7 +265,8 @@ def sending_logs(self, args, job): while not args.graceful_stop.is_set(): i += 1 if i % 10 == 1: - logger.debug(f'RealTimeLogger iteration #{i} (job state={job.state})') + logger.debug(f'RealTimeLogger iteration #{i} (job state={job.state}, logfiles={self.logfiles})') + # there might be special cases when RT logs should be sent, e.g. for pilot logs if job.state == '' or job.state == 'starting' or job.state == 'running': if len(self.logfiles) > len(self.openfiles): for logfile in self.logfiles: @@ -282,14 +283,19 @@ def sending_logs(self, args, job): logger.debug('no real-time logging during stage-in/out') pass else: - # logger.debug(f'real-time logging: sending logs for state={job.state} [2]') + # run longer for pilotlog + # .. + + logger.info(f'sending last real-time logs (state={job.state})') self.send_loginfiles() # send the remaining logs after the job completion self.close_files() break time.sleep(5) else: + logger.debug('sending last real-time logs') self.send_loginfiles() # send the remaining logs after the job completion self.close_files() + logger.info('finished sending real-time logs') def get_rtlogging_ssl(self): """ From 7e1af701379101c1c2a37d8a7f9c29e46cf20bfa Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Wed, 13 Sep 2023 09:52:14 +0200 Subject: [PATCH 06/12] Added protection against failed write of info dictionary --- pilot/util/https.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pilot/util/https.py b/pilot/util/https.py index 4d042ae6..0ebe3619 100644 --- a/pilot/util/https.py +++ b/pilot/util/https.py @@ -29,6 +29,7 @@ from .constants import get_pilot_version from .container import execute from pilot.common.errorcodes import ErrorCodes +from pilot.common.exception import FileHandlingFailure import logging logger = logging.getLogger(__name__) @@ -201,7 +202,11 @@ def request(url, data=None, plain=False, secure=True, ipv='IPv6'): # get the filename and strdata for the curl config file filename, strdata = get_vars(url, data) # write the strdata to file - writestatus = write_file(filename, strdata) + try: + writestatus = write_file(filename, strdata) + except FileHandlingFailure as exc: + writestatus = None + # get the config option for the curl command dat = get_curl_config_option(writestatus, url, data, filename) From 542f49eb0f68dc5b7dd3e89d0f51b3bf28ced077 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Wed, 13 Sep 2023 09:52:31 +0200 Subject: [PATCH 07/12] Added protection against failed write of info dictionary --- pilot/util/https.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pilot/util/https.py b/pilot/util/https.py index 0ebe3619..b230bfbe 100644 --- a/pilot/util/https.py +++ b/pilot/util/https.py @@ -204,7 +204,7 @@ def request(url, data=None, plain=False, secure=True, ipv='IPv6'): # write the strdata to file try: writestatus = write_file(filename, strdata) - except FileHandlingFailure as exc: + except FileHandlingFailure: writestatus = None # get the config option for the curl command From 522bbfedb0af7a8399b22a2d4f4604b4663f25c4 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Thu, 14 Sep 2023 18:32:25 +0200 Subject: [PATCH 08/12] Added MiddlewareImportFailure --- pilot/common/exception.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pilot/common/exception.py b/pilot/common/exception.py index 3322f078..bac86be3 100644 --- a/pilot/common/exception.py +++ b/pilot/common/exception.py @@ -378,6 +378,16 @@ def __init__(self, *args, **kwargs): self._message = errors.get_error_message(self._errorCode) +class MiddlewareImportFailure(PilotException): + """ + No matching replicas were found in list_replicas() output. + """ + def __init__(self, *args, **kwargs): + super(MiddlewareImportFailure, self).__init__(args, kwargs) + self._errorCode = errors.MIDDLEWAREIMPORTFAILURE + self._message = errors.get_error_message(self._errorCode) + + class JobAlreadyRunning(PilotException): """ Job is already running elsewhere. From fbda5bfb706b9ec92550aaae2185167e11ccc1ec Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Thu, 14 Sep 2023 18:38:11 +0200 Subject: [PATCH 09/12] Added MiddlewareImportFailure --- PILOTVERSION | 2 +- pilot/control/job.py | 6 +---- pilot/util/constants.py | 2 +- pilot/util/monitoring.py | 49 ++++++++++++++++++++++++++++++------ pilot/util/psutils.py | 38 ++++++++++++++++++++++++++++ pilot/util/realtimelogger.py | 13 +++++++--- 6 files changed, 92 insertions(+), 18 deletions(-) create mode 100644 pilot/util/psutils.py diff --git a/PILOTVERSION b/PILOTVERSION index 01c29b80..a3d22684 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.6.7.2f \ No newline at end of file +3.6.7.6 \ No newline at end of file diff --git a/pilot/control/job.py b/pilot/control/job.py index 84546aae..46d4d480 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -509,11 +509,7 @@ def handle_backchannel_command(res, job, args, test_tobekilled=False): # warning: server might return comma-separated string, 'debug,tobekilled' cmd = res.get('command') - cmd = 'tail pilotlog.txt' - - - - + # for testing debug command: cmd = 'tail pilotlog.txt' # is it a 'command options'-type? debug_command=tail .., ls .., gdb .., ps .., du .. if ' ' in cmd and 'tobekilled' not in cmd: diff --git a/pilot/util/constants.py b/pilot/util/constants.py index fb143341..f4c30ad1 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -14,7 +14,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '7' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '2f' # build number should be reset to '1' for every new development cycle +BUILD = '6' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 86dbdc1c..b5caa409 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -18,19 +18,40 @@ from signal import SIGKILL from pilot.common.errorcodes import ErrorCodes -from pilot.common.exception import PilotException +from pilot.common.exception import PilotException, MiddlewareImportFailure from pilot.util.auxiliary import set_pilot_state #, show_memory_usage from pilot.util.config import config from pilot.util.constants import PILOT_PRE_PAYLOAD from pilot.util.container import execute -from pilot.util.filehandling import get_disk_usage, remove_files, get_local_file_size, read_file, zip_files +from pilot.util.filehandling import ( + get_disk_usage, + remove_files, + get_local_file_size, + read_file, + zip_files +) from pilot.util.loopingjob import looping_job -from pilot.util.math import convert_mb_to_b, human2bytes -from pilot.util.parameters import convert_to_int, get_maximum_input_sizes -from pilot.util.processes import get_current_cpu_consumption_time, kill_processes, get_number_of_child_processes,\ - get_subprocesses, reap_zombies +from pilot.util.math import ( + convert_mb_to_b, + human2bytes +) +from pilot.util.parameters import ( + convert_to_int, + get_maximum_input_sizes +) +from pilot.util.processes import ( + get_current_cpu_consumption_time, + kill_processes, + get_number_of_child_processes, + get_subprocesses, + reap_zombies +) +from pilot.util.psutils import is_process_running from pilot.util.timing import get_time_since -from pilot.util.workernode import get_local_disk_space, check_hz +from pilot.util.workernode import ( + get_local_disk_space, + check_hz +) from pilot.info import infosys import logging @@ -39,7 +60,7 @@ errors = ErrorCodes() -def job_monitor_tasks(job, mt, args): +def job_monitor_tasks(job, mt, args): # noqa: C901 """ Perform the tasks for the job monitoring. The function is called once a minute. Individual checks will be performed at any desired time interval (>= 1 @@ -59,6 +80,16 @@ def job_monitor_tasks(job, mt, args): # update timing info for running jobs (to avoid an update after the job has finished) if job.state == 'running': + # verify that the process is actually still running + try: + if not is_process_running(job.pid): + logger.warning(f'aborting job monitor tasks since payload process {job.pid} is not running') + return 0, "" + else: + logger.debug(f'payload process {job.pid} is running') + except MiddlewareImportFailure as exc: + logger.warning(f'exception caught: {exc}') + # make sure that any utility commands are still running (and determine pid of memory monitor- as early as possible) if job.utilities != {}: utility_monitor(job) @@ -130,6 +161,8 @@ def job_monitor_tasks(job, mt, args): if exit_code != 0: return exit_code, diagnostics + logger.debug(f'job monitor tasks loop took {int(time.time()) - current_time} s to complete') + return exit_code, diagnostics diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py new file mode 100644 index 00000000..a15ed279 --- /dev/null +++ b/pilot/util/psutils.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Authors: +# - Paul Nilsson, paul.nilsson@cern.ch, 2023 + +try: + import psutil +except ImportError: + print('FAILED; psutil module could not be imported') + is_psutil_available = False +else: + is_psutil_available = True + +from pilot.common.exception import MiddlewareImportFailure + +import logging +logger = logging.getLogger(__name__) + + +def is_process_running(pid): + """ + Is the given process still running? + + Note: if psutil module is not available, this function will raise an exception. + + :param pid: process id (int) + :return: True (process still running), False (process not running) + :raises: MiddlewareImportFailure if psutil module is not available. + """ + + if not is_psutil_available: + raise MiddlewareImportFailure("required dependency could not be imported: psutil") + else: + return psutil.pid_exists(pid) diff --git a/pilot/util/realtimelogger.py b/pilot/util/realtimelogger.py index 6164d4b5..65f7369e 100644 --- a/pilot/util/realtimelogger.py +++ b/pilot/util/realtimelogger.py @@ -262,6 +262,8 @@ def sending_logs(self, args, job): self.set_jobinfo(job) self.add_logfiles(job) i = 0 + t_start = time.time() + cutoff = 10 * 60 # 10 minutes while not args.graceful_stop.is_set(): i += 1 if i % 10 == 1: @@ -284,9 +286,14 @@ def sending_logs(self, args, job): pass else: # run longer for pilotlog - # .. - - logger.info(f'sending last real-time logs (state={job.state})') + # wait for job.completed=True, for a maximum of N minutes + if ['pilotlog.txt' in logfile for logfile in self.logfiles] == [True]: + if not job.completed and (time.time() - t_start < cutoff): + time.sleep(5) + continue + logger.info(f'aborting real-time logging of pilot log after {time.time() - t_start} s (cut off: {cutoff} s)') + + logger.info(f'sending last real-time logs for job {job.jobid} (state={job.state})') self.send_loginfiles() # send the remaining logs after the job completion self.close_files() break From 1411841adf92e9cedd99b0b91c1d24864c6c1d00 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Thu, 14 Sep 2023 20:44:54 +0200 Subject: [PATCH 10/12] Improved cpu consumption time reporting --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/monitoring.py | 38 +++++++++++++++++++++++++++----------- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index a3d22684..fd3546cb 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.6.7.6 \ No newline at end of file +3.6.7.9 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index f4c30ad1..8d24ffac 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -14,7 +14,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '7' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '6' # build number should be reset to '1' for every new development cycle +BUILD = '9' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index b5caa409..1ff46784 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -75,27 +75,26 @@ def job_monitor_tasks(job, mt, args): # noqa: C901 exit_code = 0 diagnostics = "" + # verify that the process is still alive + if not still_running(job.pid): + return 0, "" + current_time = int(time.time()) # update timing info for running jobs (to avoid an update after the job has finished) if job.state == 'running': - # verify that the process is actually still running - try: - if not is_process_running(job.pid): - logger.warning(f'aborting job monitor tasks since payload process {job.pid} is not running') - return 0, "" - else: - logger.debug(f'payload process {job.pid} is running') - except MiddlewareImportFailure as exc: - logger.warning(f'exception caught: {exc}') - # make sure that any utility commands are still running (and determine pid of memory monitor- as early as possible) if job.utilities != {}: utility_monitor(job) # confirm that the worker node has a proper SC_CLK_TCK (problems seen on MPPMU) check_hz() + + # verify that the process is still alive (again) + if not still_running(job.pid): + return 0, "" + try: cpuconsumptiontime = get_current_cpu_consumption_time(job.pid) except Exception as error: @@ -106,7 +105,7 @@ def job_monitor_tasks(job, mt, args): # noqa: C901 else: job.cpuconsumptiontime = int(round(cpuconsumptiontime)) job.cpuconversionfactor = 1.0 - logger.info(f'CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})') + logger.info(f'(instant) CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})') # keep track of the subprocesses running (store payload subprocess PIDs) store_subprocess_pids(job) @@ -166,6 +165,23 @@ def job_monitor_tasks(job, mt, args): # noqa: C901 return exit_code, diagnostics +def still_running(pid): + # verify that the process is still alive + + running = False + try: + if pid: + if not is_process_running(pid): + logger.warning(f'aborting job monitor tasks since payload process {pid} is not running') + else: + running = True + logger.debug(f'payload process {pid} is running') + except MiddlewareImportFailure as exc: + logger.warning(f'exception caught: {exc}') + + return running + + def display_oom_info(payload_pid): """ Display OOM process info. From 6f37467318dd0a954b0b1e1327e078e96c8c7456 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Mon, 18 Sep 2023 10:30:56 +0200 Subject: [PATCH 11/12] Improved cpu consumption time reporting --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/monitoring.py | 11 ++++++++--- pilot/util/psutils.py | 10 +++++++++- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index fd3546cb..2d892f10 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.6.7.9 \ No newline at end of file +3.6.7.10 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 8d24ffac..687562c9 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -14,7 +14,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '7' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '9' # build number should be reset to '1' for every new development cycle +BUILD = '10' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 1ff46784..b7a098bb 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -103,9 +103,14 @@ def job_monitor_tasks(job, mt, args): # noqa: C901 exit_code = get_exception_error_code(diagnostics) return exit_code, diagnostics else: - job.cpuconsumptiontime = int(round(cpuconsumptiontime)) - job.cpuconversionfactor = 1.0 - logger.info(f'(instant) CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})') + _cpuconsumptiontime = int(round(cpuconsumptiontime)) + if _cpuconsumptiontime > 0: + job.cpuconsumptiontime = int(round(cpuconsumptiontime)) + job.cpuconversionfactor = 1.0 + logger.info(f'(instant) CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})') + else: + logger.warning(f'process {job.pid} is no longer using CPU - aborting') + return 0, "" # keep track of the subprocesses running (store payload subprocess PIDs) store_subprocess_pids(job) diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index a15ed279..d4aa9c55 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -7,6 +7,7 @@ # Authors: # - Paul Nilsson, paul.nilsson@cern.ch, 2023 +import os try: import psutil except ImportError: @@ -21,6 +22,10 @@ logger = logging.getLogger(__name__) +def is_process_running_by_pid(pid): + return os.path.exists(f"/proc/{pid}") + + def is_process_running(pid): """ Is the given process still running? @@ -33,6 +38,9 @@ def is_process_running(pid): """ if not is_psutil_available: - raise MiddlewareImportFailure("required dependency could not be imported: psutil") + is_running = is_process_running_by_pid(pid) + logger.warning(f'using /proc/{pid} instead of psutil (is_running={is_running})') + return is_running + # raise MiddlewareImportFailure("required dependency could not be imported: psutil") else: return psutil.pid_exists(pid) From 06f7bf638eeee7d9be8760519a5edacce5040d6f Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Mon, 18 Sep 2023 10:39:02 +0200 Subject: [PATCH 12/12] Flake8 --- pilot/util/psutils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index d4aa9c55..94d4bc7b 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -16,7 +16,7 @@ else: is_psutil_available = True -from pilot.common.exception import MiddlewareImportFailure +# from pilot.common.exception import MiddlewareImportFailure import logging logger = logging.getLogger(__name__)