Skip to content

Commit

Permalink
Merge pull request #101 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.6.7.10
  • Loading branch information
PalNilsson authored Sep 18, 2023
2 parents b908fbd + 768b76d commit f903d9e
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 24 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.6.6.22
3.6.7.10
10 changes: 10 additions & 0 deletions pilot/common/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,16 @@ def __init__(self, *args, **kwargs):
self._message = errors.get_error_message(self._errorCode)


class MiddlewareImportFailure(PilotException):
"""
No matching replicas were found in list_replicas() output.
"""
def __init__(self, *args, **kwargs):
super(MiddlewareImportFailure, self).__init__(args, kwargs)
self._errorCode = errors.MIDDLEWAREIMPORTFAILURE
self._message = errors.get_error_message(self._errorCode)


class JobAlreadyRunning(PilotException):
"""
Job is already running elsewhere.
Expand Down
3 changes: 3 additions & 0 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,9 @@ def handle_backchannel_command(res, job, args, test_tobekilled=False):
if 'command' in res and res.get('command') != 'NULL':
# warning: server might return comma-separated string, 'debug,tobekilled'
cmd = res.get('command')

# for testing debug command: cmd = 'tail pilotlog.txt'

# is it a 'command options'-type? debug_command=tail .., ls .., gdb .., ps .., du ..
if ' ' in cmd and 'tobekilled' not in cmd:
job.debug, job.debug_command = get_debug_command(cmd)
Expand Down
7 changes: 5 additions & 2 deletions pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,12 @@ def find_log_to_tail(debug_command, workdir, args, is_analysis):
counter += 10

# fallback to known log file if no other file could be found
if not path:
logger.warning(f'file {filename} was not found for {maxwait} s, using default')
logf = path if path else config.Payload.payloadstdout
if not path:
if filename:
logger.warning(f'file {filename} was not found for {maxwait} s, using default')
else:
logger.info(f'using {logf} for real-time logging')

return logf

Expand Down
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '6' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '22' # build number should be reset to '1' for every new development cycle
REVISION = '7' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '10' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
7 changes: 6 additions & 1 deletion pilot/util/https.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .constants import get_pilot_version
from .container import execute
from pilot.common.errorcodes import ErrorCodes
from pilot.common.exception import FileHandlingFailure

import logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -201,7 +202,11 @@ def request(url, data=None, plain=False, secure=True, ipv='IPv6'):
# get the filename and strdata for the curl config file
filename, strdata = get_vars(url, data)
# write the strdata to file
writestatus = write_file(filename, strdata)
try:
writestatus = write_file(filename, strdata)
except FileHandlingFailure:
writestatus = None

# get the config option for the curl command
dat = get_curl_config_option(writestatus, url, data, filename)

Expand Down
76 changes: 65 additions & 11 deletions pilot/util/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,40 @@
from signal import SIGKILL

from pilot.common.errorcodes import ErrorCodes
from pilot.common.exception import PilotException
from pilot.common.exception import PilotException, MiddlewareImportFailure
from pilot.util.auxiliary import set_pilot_state #, show_memory_usage
from pilot.util.config import config
from pilot.util.constants import PILOT_PRE_PAYLOAD
from pilot.util.container import execute
from pilot.util.filehandling import get_disk_usage, remove_files, get_local_file_size, read_file, zip_files
from pilot.util.filehandling import (
get_disk_usage,
remove_files,
get_local_file_size,
read_file,
zip_files
)
from pilot.util.loopingjob import looping_job
from pilot.util.math import convert_mb_to_b, human2bytes
from pilot.util.parameters import convert_to_int, get_maximum_input_sizes
from pilot.util.processes import get_current_cpu_consumption_time, kill_processes, get_number_of_child_processes,\
get_subprocesses, reap_zombies
from pilot.util.math import (
convert_mb_to_b,
human2bytes
)
from pilot.util.parameters import (
convert_to_int,
get_maximum_input_sizes
)
from pilot.util.processes import (
get_current_cpu_consumption_time,
kill_processes,
get_number_of_child_processes,
get_subprocesses,
reap_zombies
)
from pilot.util.psutils import is_process_running
from pilot.util.timing import get_time_since
from pilot.util.workernode import get_local_disk_space, check_hz
from pilot.util.workernode import (
get_local_disk_space,
check_hz
)
from pilot.info import infosys

import logging
Expand All @@ -39,7 +60,7 @@
errors = ErrorCodes()


def job_monitor_tasks(job, mt, args):
def job_monitor_tasks(job, mt, args): # noqa: C901
"""
Perform the tasks for the job monitoring.
The function is called once a minute. Individual checks will be performed at any desired time interval (>= 1
Expand All @@ -54,6 +75,10 @@ def job_monitor_tasks(job, mt, args):
exit_code = 0
diagnostics = ""

# verify that the process is still alive
if not still_running(job.pid):
return 0, ""

current_time = int(time.time())

# update timing info for running jobs (to avoid an update after the job has finished)
Expand All @@ -65,6 +90,11 @@ def job_monitor_tasks(job, mt, args):

# confirm that the worker node has a proper SC_CLK_TCK (problems seen on MPPMU)
check_hz()

# verify that the process is still alive (again)
if not still_running(job.pid):
return 0, ""

try:
cpuconsumptiontime = get_current_cpu_consumption_time(job.pid)
except Exception as error:
Expand All @@ -73,9 +103,14 @@ def job_monitor_tasks(job, mt, args):
exit_code = get_exception_error_code(diagnostics)
return exit_code, diagnostics
else:
job.cpuconsumptiontime = int(round(cpuconsumptiontime))
job.cpuconversionfactor = 1.0
logger.info(f'CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})')
_cpuconsumptiontime = int(round(cpuconsumptiontime))
if _cpuconsumptiontime > 0:
job.cpuconsumptiontime = int(round(cpuconsumptiontime))
job.cpuconversionfactor = 1.0
logger.info(f'(instant) CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})')
else:
logger.warning(f'process {job.pid} is no longer using CPU - aborting')
return 0, ""

# keep track of the subprocesses running (store payload subprocess PIDs)
store_subprocess_pids(job)
Expand Down Expand Up @@ -130,9 +165,28 @@ def job_monitor_tasks(job, mt, args):
if exit_code != 0:
return exit_code, diagnostics

logger.debug(f'job monitor tasks loop took {int(time.time()) - current_time} s to complete')

return exit_code, diagnostics


def still_running(pid):
# verify that the process is still alive

running = False
try:
if pid:
if not is_process_running(pid):
logger.warning(f'aborting job monitor tasks since payload process {pid} is not running')
else:
running = True
logger.debug(f'payload process {pid} is running')
except MiddlewareImportFailure as exc:
logger.warning(f'exception caught: {exc}')

return running


def display_oom_info(payload_pid):
"""
Display OOM process info.
Expand Down
46 changes: 46 additions & 0 deletions pilot/util/psutils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, [email protected], 2023

import os
try:
import psutil
except ImportError:
print('FAILED; psutil module could not be imported')
is_psutil_available = False
else:
is_psutil_available = True

# from pilot.common.exception import MiddlewareImportFailure

import logging
logger = logging.getLogger(__name__)


def is_process_running_by_pid(pid):
return os.path.exists(f"/proc/{pid}")


def is_process_running(pid):
"""
Is the given process still running?
Note: if psutil module is not available, this function will raise an exception.
:param pid: process id (int)
:return: True (process still running), False (process not running)
:raises: MiddlewareImportFailure if psutil module is not available.
"""

if not is_psutil_available:
is_running = is_process_running_by_pid(pid)
logger.warning(f'using /proc/{pid} instead of psutil (is_running={is_running})')
return is_running
# raise MiddlewareImportFailure("required dependency could not be imported: psutil")
else:
return psutil.pid_exists(pid)
25 changes: 21 additions & 4 deletions pilot/util/realtimelogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
from logging import Logger, INFO
import logging

try:
import google.cloud.logging
from google.cloud.logging_v2.handlers import CloudLoggingHandler
except ImportError:
pass

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -109,8 +115,6 @@ def __init__(self, args, info_dic, workdir, secrets, level=INFO):

try:
if logtype == "google-cloud-logging":
import google.cloud.logging
from google.cloud.logging_v2.handlers import CloudLoggingHandler
client = google.cloud.logging.Client()
_handler = CloudLoggingHandler(client, name=name)
api_logger = logging.getLogger('google.cloud.logging_v2')
Expand Down Expand Up @@ -258,10 +262,13 @@ def sending_logs(self, args, job):
self.set_jobinfo(job)
self.add_logfiles(job)
i = 0
t_start = time.time()
cutoff = 10 * 60 # 10 minutes
while not args.graceful_stop.is_set():
i += 1
if i % 10 == 1:
logger.debug(f'RealTimeLogger iteration #{i} (job state={job.state})')
logger.debug(f'RealTimeLogger iteration #{i} (job state={job.state}, logfiles={self.logfiles})')
# there might be special cases when RT logs should be sent, e.g. for pilot logs
if job.state == '' or job.state == 'starting' or job.state == 'running':
if len(self.logfiles) > len(self.openfiles):
for logfile in self.logfiles:
Expand All @@ -278,14 +285,24 @@ def sending_logs(self, args, job):
logger.debug('no real-time logging during stage-in/out')
pass
else:
# logger.debug(f'real-time logging: sending logs for state={job.state} [2]')
# run longer for pilotlog
# wait for job.completed=True, for a maximum of N minutes
if ['pilotlog.txt' in logfile for logfile in self.logfiles] == [True]:
if not job.completed and (time.time() - t_start < cutoff):
time.sleep(5)
continue
logger.info(f'aborting real-time logging of pilot log after {time.time() - t_start} s (cut off: {cutoff} s)')

logger.info(f'sending last real-time logs for job {job.jobid} (state={job.state})')
self.send_loginfiles() # send the remaining logs after the job completion
self.close_files()
break
time.sleep(5)
else:
logger.debug('sending last real-time logs')
self.send_loginfiles() # send the remaining logs after the job completion
self.close_files()
logger.info('finished sending real-time logs')

def get_rtlogging_ssl(self):
"""
Expand Down
6 changes: 3 additions & 3 deletions pilot/workflow/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ def interrupt(args, signum, frame):
sig = [v for v, k in list(signal.__dict__.items()) if k == signum][0]

# ignore SIGUSR1 since that will be aimed at a child process
if str(sig) == 'SIGUSR1':
logger.info('ignore intercepted SIGUSR1 aimed at child process')
return
#if str(sig) == 'SIGUSR1':
# logger.info('ignore intercepted SIGUSR1 aimed at child process')
# return

args.signal_counter += 1

Expand Down

0 comments on commit f903d9e

Please sign in to comment.