Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.6.7.10 #101

Merged
merged 16 commits into from
Sep 18, 2023
Merged
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.6.6.22
3.6.7.10
10 changes: 10 additions & 0 deletions pilot/common/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,16 @@ def __init__(self, *args, **kwargs):
self._message = errors.get_error_message(self._errorCode)


class MiddlewareImportFailure(PilotException):
"""
No matching replicas were found in list_replicas() output.
"""
def __init__(self, *args, **kwargs):
super(MiddlewareImportFailure, self).__init__(args, kwargs)
self._errorCode = errors.MIDDLEWAREIMPORTFAILURE
self._message = errors.get_error_message(self._errorCode)


class JobAlreadyRunning(PilotException):
"""
Job is already running elsewhere.
Expand Down
3 changes: 3 additions & 0 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,9 @@ def handle_backchannel_command(res, job, args, test_tobekilled=False):
if 'command' in res and res.get('command') != 'NULL':
# warning: server might return comma-separated string, 'debug,tobekilled'
cmd = res.get('command')

# for testing debug command: cmd = 'tail pilotlog.txt'

# is it a 'command options'-type? debug_command=tail .., ls .., gdb .., ps .., du ..
if ' ' in cmd and 'tobekilled' not in cmd:
job.debug, job.debug_command = get_debug_command(cmd)
Expand Down
7 changes: 5 additions & 2 deletions pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,12 @@ def find_log_to_tail(debug_command, workdir, args, is_analysis):
counter += 10

# fallback to known log file if no other file could be found
if not path:
logger.warning(f'file {filename} was not found for {maxwait} s, using default')
logf = path if path else config.Payload.payloadstdout
if not path:
if filename:
logger.warning(f'file {filename} was not found for {maxwait} s, using default')
else:
logger.info(f'using {logf} for real-time logging')

return logf

Expand Down
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '6' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '22' # build number should be reset to '1' for every new development cycle
REVISION = '7' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '10' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
7 changes: 6 additions & 1 deletion pilot/util/https.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .constants import get_pilot_version
from .container import execute
from pilot.common.errorcodes import ErrorCodes
from pilot.common.exception import FileHandlingFailure

import logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -201,7 +202,11 @@ def request(url, data=None, plain=False, secure=True, ipv='IPv6'):
# get the filename and strdata for the curl config file
filename, strdata = get_vars(url, data)
# write the strdata to file
writestatus = write_file(filename, strdata)
try:
writestatus = write_file(filename, strdata)
except FileHandlingFailure:
writestatus = None

# get the config option for the curl command
dat = get_curl_config_option(writestatus, url, data, filename)

Expand Down
76 changes: 65 additions & 11 deletions pilot/util/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,40 @@
from signal import SIGKILL

from pilot.common.errorcodes import ErrorCodes
from pilot.common.exception import PilotException
from pilot.common.exception import PilotException, MiddlewareImportFailure
from pilot.util.auxiliary import set_pilot_state #, show_memory_usage
from pilot.util.config import config
from pilot.util.constants import PILOT_PRE_PAYLOAD
from pilot.util.container import execute
from pilot.util.filehandling import get_disk_usage, remove_files, get_local_file_size, read_file, zip_files
from pilot.util.filehandling import (
get_disk_usage,
remove_files,
get_local_file_size,
read_file,
zip_files
)
from pilot.util.loopingjob import looping_job
from pilot.util.math import convert_mb_to_b, human2bytes
from pilot.util.parameters import convert_to_int, get_maximum_input_sizes
from pilot.util.processes import get_current_cpu_consumption_time, kill_processes, get_number_of_child_processes,\
get_subprocesses, reap_zombies
from pilot.util.math import (
convert_mb_to_b,
human2bytes
)
from pilot.util.parameters import (
convert_to_int,
get_maximum_input_sizes
)
from pilot.util.processes import (
get_current_cpu_consumption_time,
kill_processes,
get_number_of_child_processes,
get_subprocesses,
reap_zombies
)
from pilot.util.psutils import is_process_running
from pilot.util.timing import get_time_since
from pilot.util.workernode import get_local_disk_space, check_hz
from pilot.util.workernode import (
get_local_disk_space,
check_hz
)
from pilot.info import infosys

import logging
Expand All @@ -39,7 +60,7 @@
errors = ErrorCodes()


def job_monitor_tasks(job, mt, args):
def job_monitor_tasks(job, mt, args): # noqa: C901
"""
Perform the tasks for the job monitoring.
The function is called once a minute. Individual checks will be performed at any desired time interval (>= 1
Expand All @@ -54,6 +75,10 @@ def job_monitor_tasks(job, mt, args):
exit_code = 0
diagnostics = ""

# verify that the process is still alive
if not still_running(job.pid):
return 0, ""

current_time = int(time.time())

# update timing info for running jobs (to avoid an update after the job has finished)
Expand All @@ -65,6 +90,11 @@ def job_monitor_tasks(job, mt, args):

# confirm that the worker node has a proper SC_CLK_TCK (problems seen on MPPMU)
check_hz()

# verify that the process is still alive (again)
if not still_running(job.pid):
return 0, ""

try:
cpuconsumptiontime = get_current_cpu_consumption_time(job.pid)
except Exception as error:
Expand All @@ -73,9 +103,14 @@ def job_monitor_tasks(job, mt, args):
exit_code = get_exception_error_code(diagnostics)
return exit_code, diagnostics
else:
job.cpuconsumptiontime = int(round(cpuconsumptiontime))
job.cpuconversionfactor = 1.0
logger.info(f'CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})')
_cpuconsumptiontime = int(round(cpuconsumptiontime))
if _cpuconsumptiontime > 0:
job.cpuconsumptiontime = int(round(cpuconsumptiontime))
job.cpuconversionfactor = 1.0
logger.info(f'(instant) CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})')
else:
logger.warning(f'process {job.pid} is no longer using CPU - aborting')
return 0, ""

# keep track of the subprocesses running (store payload subprocess PIDs)
store_subprocess_pids(job)
Expand Down Expand Up @@ -130,9 +165,28 @@ def job_monitor_tasks(job, mt, args):
if exit_code != 0:
return exit_code, diagnostics

logger.debug(f'job monitor tasks loop took {int(time.time()) - current_time} s to complete')

return exit_code, diagnostics


def still_running(pid):
# verify that the process is still alive

running = False
try:
if pid:
if not is_process_running(pid):
logger.warning(f'aborting job monitor tasks since payload process {pid} is not running')
else:
running = True
logger.debug(f'payload process {pid} is running')
except MiddlewareImportFailure as exc:
logger.warning(f'exception caught: {exc}')

return running


def display_oom_info(payload_pid):
"""
Display OOM process info.
Expand Down
46 changes: 46 additions & 0 deletions pilot/util/psutils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, [email protected], 2023

import os
try:
import psutil
except ImportError:
print('FAILED; psutil module could not be imported')
is_psutil_available = False
else:
is_psutil_available = True

# from pilot.common.exception import MiddlewareImportFailure

import logging
logger = logging.getLogger(__name__)


def is_process_running_by_pid(pid):
return os.path.exists(f"/proc/{pid}")


def is_process_running(pid):
"""
Is the given process still running?

Note: if psutil module is not available, this function will raise an exception.

:param pid: process id (int)
:return: True (process still running), False (process not running)
:raises: MiddlewareImportFailure if psutil module is not available.
"""

if not is_psutil_available:
is_running = is_process_running_by_pid(pid)
logger.warning(f'using /proc/{pid} instead of psutil (is_running={is_running})')
return is_running
# raise MiddlewareImportFailure("required dependency could not be imported: psutil")
else:
return psutil.pid_exists(pid)
25 changes: 21 additions & 4 deletions pilot/util/realtimelogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
from logging import Logger, INFO
import logging

try:
import google.cloud.logging
from google.cloud.logging_v2.handlers import CloudLoggingHandler
except ImportError:
pass

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -109,8 +115,6 @@ def __init__(self, args, info_dic, workdir, secrets, level=INFO):

try:
if logtype == "google-cloud-logging":
import google.cloud.logging
from google.cloud.logging_v2.handlers import CloudLoggingHandler
client = google.cloud.logging.Client()
_handler = CloudLoggingHandler(client, name=name)
api_logger = logging.getLogger('google.cloud.logging_v2')
Expand Down Expand Up @@ -258,10 +262,13 @@ def sending_logs(self, args, job):
self.set_jobinfo(job)
self.add_logfiles(job)
i = 0
t_start = time.time()
cutoff = 10 * 60 # 10 minutes
while not args.graceful_stop.is_set():
i += 1
if i % 10 == 1:
logger.debug(f'RealTimeLogger iteration #{i} (job state={job.state})')
logger.debug(f'RealTimeLogger iteration #{i} (job state={job.state}, logfiles={self.logfiles})')
# there might be special cases when RT logs should be sent, e.g. for pilot logs
if job.state == '' or job.state == 'starting' or job.state == 'running':
if len(self.logfiles) > len(self.openfiles):
for logfile in self.logfiles:
Expand All @@ -278,14 +285,24 @@ def sending_logs(self, args, job):
logger.debug('no real-time logging during stage-in/out')
pass
else:
# logger.debug(f'real-time logging: sending logs for state={job.state} [2]')
# run longer for pilotlog
# wait for job.completed=True, for a maximum of N minutes
if ['pilotlog.txt' in logfile for logfile in self.logfiles] == [True]:
if not job.completed and (time.time() - t_start < cutoff):
time.sleep(5)
continue
logger.info(f'aborting real-time logging of pilot log after {time.time() - t_start} s (cut off: {cutoff} s)')

logger.info(f'sending last real-time logs for job {job.jobid} (state={job.state})')
self.send_loginfiles() # send the remaining logs after the job completion
self.close_files()
break
time.sleep(5)
else:
logger.debug('sending last real-time logs')
self.send_loginfiles() # send the remaining logs after the job completion
self.close_files()
logger.info('finished sending real-time logs')

def get_rtlogging_ssl(self):
"""
Expand Down
6 changes: 3 additions & 3 deletions pilot/workflow/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ def interrupt(args, signum, frame):
sig = [v for v, k in list(signal.__dict__.items()) if k == signum][0]

# ignore SIGUSR1 since that will be aimed at a child process
if str(sig) == 'SIGUSR1':
logger.info('ignore intercepted SIGUSR1 aimed at child process')
return
#if str(sig) == 'SIGUSR1':
# logger.info('ignore intercepted SIGUSR1 aimed at child process')
# return

args.signal_counter += 1

Expand Down