Skip to content

Commit

Permalink
Merge pull request #98 from PalNilsson/next
Browse files Browse the repository at this point in the history
3.6.6.22
  • Loading branch information
PalNilsson authored Sep 8, 2023
2 parents 70dff3a + 002e3e0 commit b908fbd
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 30 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.6.5.32
3.6.6.22
115 changes: 98 additions & 17 deletions pilot/control/payloads/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,32 @@ def utility_after_payload_started(self, job):
else:
# store process handle in job object, and keep track on how many times the command has been launched
# also store the full command in case it needs to be restarted later (by the job_monitor() thread)
logger.debug(f'storing process for {utilitycommand}')
job.utilities[cmd_dictionary.get('command')] = [proc1, 1, utilitycommand]

# wait for command to start
#time.sleep(1)
#cmd = utilitycommand.split(';')[-1]
#prmon = f'prmon --pid {job.pid}'
#pid = None
#if prmon in cmd:
# import subprocess
# import re
# ps = subprocess.run(['ps', 'aux', str(os.getpid())], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
# encoding='utf-8')
# pattern = r'\b\d+\b'
# for line in ps.stdout.split('\n'):
# if prmon in line and f';{prmon}' not in line: # ignore the line that includes the setup
# matches = re.findall(pattern, line)
# if matches:
# pid = matches[0]
# break
#if pid:
# logger.info(f'{prmon} command has pid={pid} (appending to cmd dictionary)')
# job.utilities[cmd_dictionary.get('command')].append(pid)
#else:
# logger.info(f'could not extract any pid from ps for cmd={cmd}')

def utility_after_payload_started_new(self, job):
"""
Functions to run after payload started
Expand Down Expand Up @@ -831,11 +855,16 @@ def stop_utilities(self):
for utcmd in list(self.__job.utilities.keys()):
utproc = self.__job.utilities[utcmd][0]
if utproc:
status = self.kill_and_wait_for_process(utproc.pid, user, utcmd)
if status == 0:
logger.info(f'cleaned up after prmon process {utproc.pid}')
# get the pid for prmon
_list = self.__job.utilities.get('MemoryMonitor')
if _list:
pid = int(_list[-1]) if len(_list) == 4 else utproc.pid
logger.info(f'using pid={pid} to kill prmon')
else:
logger.warning(f'failed to cleanup prmon process {utproc.pid} (abnormal exit status: {status})')
logger.warning(f'did not find the pid for the memory monitor in the utilities list: {self.__job.utilities}')
pid = utproc.pid
status = self.kill_and_wait_for_process(pid, user, utcmd)
logger.info(f'utility process {utproc.pid} cleanup finished with status={status}')
user.post_utility_command_action(utcmd, self.__job)

def kill_and_wait_for_process(self, pid, user, utcmd):
Expand All @@ -849,26 +878,78 @@ def kill_and_wait_for_process(self, pid, user, utcmd):
"""

sig = user.get_utility_command_kill_signal(utcmd)
logger.info("stopping process \'%s\' with signal %d", utcmd, sig)
logger.info("stopping utility process \'%s\' with signal %d", utcmd, sig)

try:
# Send SIGUSR1 signal to the process group
os.killpg(pid, sig)

# Wait for the process to finish
_, status = os.waitpid(pid, 0)

# Check the exit status of the process
if os.WIFEXITED(status):
return os.WEXITSTATUS(status)
# Send SIGUSR1 signal to the process
os.kill(pid, sig)

# Check if the process exists
if os.kill(pid, 0):
# Wait for the process to finish
_, status = os.waitpid(pid, 0)

# Check the exit status of the process
if os.WIFEXITED(status):
logger.debug('normal exit')
return os.WEXITSTATUS(status)
else:
# Handle abnormal termination if needed
logger.warning('abnormal termination')
return None
else:
# Handle abnormal termination if needed
return None
# Process doesn't exist - ignore
logger.info(f'process {pid} no longer exists')
return True

except OSError as exc:
# Handle errors, such as process not found
logger.warning(f"exception caught: {exc}")
logger.warning(f"Error sending signal to/waiting for process {pid}: {exc}")
return None

# try:
# # Send SIGUSR1 signal to the process
# os.kill(pid, sig)
#
# try:
# # Wait for the process to finish
# _, status = os.waitpid(pid, 0)
#
# # Check the exit status of the process
# if os.WIFEXITED(status):
# return os.WEXITSTATUS(status)
# else:
# # Handle abnormal termination if needed
# logger.warning('abnormal termination')
# return None
# except OSError as exc:
# # Handle errors related to waiting for the process
# logger.warning(f"error waiting for process {pid}: {exc}")
# return None
#
# except OSError as exc:
# # Handle errors, such as process not found
# logger.warning(f"error sending signal to process {pid}: {exc}")
# return None

# try:
# # Send SIGUSR1 signal to the process
# os.kill(pid, sig)
#
# # Wait for the process to finish
# _, status = os.waitpid(pid, 0)
#
# # Check the exit status of the process
# if os.WIFEXITED(status):
# return os.WEXITSTATUS(status)
# else:
# # Handle abnormal termination if needed
# return None
# except OSError as exc:
# # Handle errors, such as process not found
# logger.warning(f"exception caught: {exc}")
# return None

def rename_log_files(self, iteration):
"""
Expand Down
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '5' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '32' # build number should be reset to '1' for every new development cycle
REVISION = '6' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '22' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
2 changes: 1 addition & 1 deletion pilot/util/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def execute(executable: Any, **kwargs: dict) -> Any:

# always use a timeout to prevent stdout buffer problem in nodes with lots of cores
timeout = get_timeout(kwargs.get('timeout', None))
logger.debug(f'subprocess.communicate() will use timeout={timeout} s')

exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable]

Expand All @@ -86,6 +85,7 @@ def execute(executable: Any, **kwargs: dict) -> Any:
return process

try:
logger.debug(f'subprocess.communicate() will use timeout={timeout} s')
stdout, stderr = process.communicate(timeout=timeout)
except subprocess.TimeoutExpired as exc:
# make sure that stdout buffer gets flushed - in case of time-out exceptions
Expand Down
44 changes: 36 additions & 8 deletions pilot/util/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

import os
import time
from subprocess import PIPE
import re
import subprocess
from glob import glob
from typing import Any
from signal import SIGKILL
Expand Down Expand Up @@ -57,6 +58,11 @@ def job_monitor_tasks(job, mt, args):

# update timing info for running jobs (to avoid an update after the job has finished)
if job.state == 'running':

# make sure that any utility commands are still running (and determine pid of memory monitor- as early as possible)
if job.utilities != {}:
utility_monitor(job)

# confirm that the worker node has a proper SC_CLK_TCK (problems seen on MPPMU)
check_hz()
try:
Expand Down Expand Up @@ -124,10 +130,6 @@ def job_monitor_tasks(job, mt, args):
if exit_code != 0:
return exit_code, diagnostics

# make sure that any utility commands are still running
if job.utilities != {}:
utility_monitor(job)

return exit_code, diagnostics


Expand Down Expand Up @@ -459,7 +461,7 @@ def verify_running_processes(current_time, mt, pid):
return 0, ""


def utility_monitor(job):
def utility_monitor(job): # noqa: C901
"""
Make sure that any utility commands are still running.
In case a utility tool has crashed, this function may restart the process.
Expand All @@ -475,8 +477,34 @@ def utility_monitor(job):
# loop over all utilities
for utcmd in list(job.utilities.keys()): # E.g. utcmd = MemoryMonitor

# make sure the subprocess is still running
utproc = job.utilities[utcmd][0]

if utcmd == 'MemoryMonitor':
if len(job.utilities[utcmd]) < 4: # only proceed if the pid has not been appended to the list already
try:
_ps = subprocess.run(['ps', 'aux', str(os.getpid())], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True, check=True, encoding='utf-8')
prmon = f'prmon --pid {job.pid}'
pid = None
pattern = r'\b\d+\b'
for line in _ps.stdout.split('\n'):
# line=atlprd55 16451 0.0 0.0 2944 1148 ? SN 17:42 0:00 prmon --pid 13096 ..
if prmon in line and f';{prmon}' not in line: # ignore the line that includes the setup
matches = re.findall(pattern, line)
if matches:
pid = matches[0]
logger.info(f'extracting prmon pid from line: {line}')
break
if pid:
logger.info(f'{prmon} command has pid={pid} (appending to cmd dictionary)')
job.utilities[utcmd].append(pid)
else:
logger.info(f'could not extract any pid from ps for cmd={prmon}')

except subprocess.CalledProcessError as exc:
logger.warning(f"error: {exc}")

# make sure the subprocess is still running
if not utproc.poll() is None:

# clean up the process
Expand All @@ -495,7 +523,7 @@ def utility_monitor(job):

try:
proc1 = execute(utility_command, workdir=job.workdir, returnproc=True, usecontainer=False,
stdout=PIPE, stderr=PIPE, cwd=job.workdir, queuedata=job.infosys.queuedata)
stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=job.workdir, queuedata=job.infosys.queuedata)
except Exception as error:
logger.error(f'could not execute: {error}')
else:
Expand Down
6 changes: 5 additions & 1 deletion pilot/workflow/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ def interrupt(args, signum, frame):
:param args: pilot arguments.
:param signum: signal.
:param frame: stack/execution frame pointing to the frame that was interrupted by the signal.
:return:
"""

try:
sig = [v for v, k in signal.__dict__.iteritems() if k == signum][0]
except Exception:
sig = [v for v, k in list(signal.__dict__.items()) if k == signum][0]

# ignore SIGUSR1 since that will be aimed at a child process
if str(sig) == 'SIGUSR1':
logger.info('ignore intercepted SIGUSR1 aimed at child process')
return

args.signal_counter += 1

# keep track of when first kill signal arrived, any stuck loops should abort at a defined cut off time
Expand Down

0 comments on commit b908fbd

Please sign in to comment.