Skip to content

Commit

Permalink
a bit of PEP-8 and other beautifications
Browse files Browse the repository at this point in the history
  • Loading branch information
fbarreir committed Oct 30, 2024
1 parent 3299e31 commit 6912c21
Showing 1 changed file with 74 additions and 77 deletions.
151 changes: 74 additions & 77 deletions pandaserver/userinterface/UserIF.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ def submitJobs(self, jobsStr, user, host, userFQANs, prodRole=False, toPending=F
good_labels_message = f"submitJobs {user} wrong job_label={tmpJob.job_label}"
break
except Exception:
errType, errValue = sys.exc_info()[:2]
_logger.error(f"submitJobs : checking good_labels {errType} {errValue}")
err_type, err_value = sys.exc_info()[:2]
_logger.error(f"submitJobs : checking good_labels {err_type} {err_value}")
good_labels = False

# reject injection for error with the labels
Expand Down Expand Up @@ -256,8 +256,8 @@ def killJobs(self, idsStr, user, host, code, prodManager, useMailAsID, fqans, ki
break
time.sleep(1)
except Exception:
errType, errValue = sys.exc_info()[:2]
_logger.error(f"killJob : failed to convert email address {user} : {errType} {errValue}")
err_type, err_value = sys.exc_info()[:2]
_logger.error(f"killJob : failed to convert email address {user} : {err_type} {err_value}")
# get working groups with prod role
wgProdRole = []
for fqan in fqans:
Expand Down Expand Up @@ -432,8 +432,8 @@ def retryTask(
allowActiveTask=True,
)
except Exception:
errType, errValue = sys.exc_info()[:2]
ret = 1, f"server error with {errType}:{errValue}"
err_type, err_value = sys.exc_info()[:2]
ret = 1, f"server error with {err_type}:{err_value}"
else:
com_qualifier = ""
for com_key, com_param in [
Expand Down Expand Up @@ -461,12 +461,11 @@ def retryTask(
for jobID in jobdefList:
self.taskBuffer.finalizePendingJobs(cUID, jobID)
self.taskBuffer.increaseAttemptNrPanda(jediTaskID, 5)
retStr = "retry has been triggered for failed jobs "
retStr += f"while the task is still {ret[1]}"
return_str = f"retry has been triggered for failed jobs while the task is still {ret[1]}"
if newParams is None:
ret = 0, retStr
ret = 0, return_str
else:
ret = 3, retStr
ret = 3, return_str
return ret

# reassign task
Expand Down Expand Up @@ -545,37 +544,37 @@ def getTaskParamsMap(self, jediTaskID):
def updateWorkers(self, user, host, harvesterID, data):
ret = self.taskBuffer.updateWorkers(harvesterID, data)
if ret is None:
retVal = (False, MESSAGE_DATABASE)
return_value = (False, MESSAGE_DATABASE)
else:
retVal = (True, ret)
return json.dumps(retVal)
return_value = (True, ret)
return json.dumps(return_value)

# update workers
def updateServiceMetrics(self, user, host, harvesterID, data):
ret = self.taskBuffer.updateServiceMetrics(harvesterID, data)
if ret is None:
retVal = (False, MESSAGE_DATABASE)
return_value = (False, MESSAGE_DATABASE)
else:
retVal = (True, ret)
return json.dumps(retVal)
return_value = (True, ret)
return json.dumps(return_value)

# add harvester dialog messages
def addHarvesterDialogs(self, user, harvesterID, dialogs):
ret = self.taskBuffer.addHarvesterDialogs(harvesterID, dialogs)
if not ret:
retVal = (False, MESSAGE_DATABASE)
return_value = (False, MESSAGE_DATABASE)
else:
retVal = (True, "")
return json.dumps(retVal)
return_value = (True, "")
return json.dumps(return_value)

# heartbeat for harvester
def harvesterIsAlive(self, user, host, harvesterID, data):
ret = self.taskBuffer.harvesterIsAlive(user, host, harvesterID, data)
if ret is None:
retVal = (False, MESSAGE_DATABASE)
return_value = (False, MESSAGE_DATABASE)
else:
retVal = (True, ret)
return json.dumps(retVal)
return_value = (True, ret)
return json.dumps(return_value)

# get stats of workers
def getWorkerStats(self):
Expand All @@ -591,26 +590,26 @@ def reportWorkerStats_jobtype(self, harvesterID, siteName, paramsList):

# set num slots for workload provisioning
def setNumSlotsForWP(self, pandaQueueName, numSlots, gshare, resourceType, validPeriod):
retVal = self.taskBuffer.setNumSlotsForWP(pandaQueueName, numSlots, gshare, resourceType, validPeriod)
return json.dumps(retVal)
return_value = self.taskBuffer.setNumSlotsForWP(pandaQueueName, numSlots, gshare, resourceType, validPeriod)
return json.dumps(return_value)

# enable jumbo jobs
def enableJumboJobs(self, jediTaskID, totalJumboJobs, nJumboPerSite):
retVal = self.taskBuffer.enableJumboJobs(jediTaskID, totalJumboJobs, nJumboPerSite)
if totalJumboJobs > 0 and retVal[0] == 0:
return_value = self.taskBuffer.enableJumboJobs(jediTaskID, totalJumboJobs, nJumboPerSite)
if totalJumboJobs > 0 and return_value[0] == 0:
self.avalancheTask(jediTaskID, "panda", True)
return json.dumps(retVal)
return json.dumps(return_value)

# get user job metadata
def getUserJobMetadata(self, jediTaskID):
retVal = self.taskBuffer.getUserJobMetadata(jediTaskID)
return json.dumps(retVal)
return_value = self.taskBuffer.getUserJobMetadata(jediTaskID)
return json.dumps(return_value)

# get jumbo job datasets
def getJumboJobDatasets(self, n_days, grace_period):
retVal = self.taskBuffer.getJumboJobDatasets(n_days, grace_period)
return_value = self.taskBuffer.getJumboJobDatasets(n_days, grace_period)
# serialize
return json.dumps(retVal)
return json.dumps(return_value)

# sweep panda queue
def sweepPQ(self, panda_queue, status_list, ce_list, submission_host_list):
Expand Down Expand Up @@ -673,11 +672,11 @@ def _getFQAN(req):

# get DN
def _getDN(req):
realDN = ""
real_dn = ""
if "SSL_CLIENT_S_DN" in req.subprocess_env:
# remove redundant CN
realDN = CoreUtils.get_bare_dn(req.subprocess_env["SSL_CLIENT_S_DN"], keep_proxy=True)
return realDN
real_dn = CoreUtils.get_bare_dn(req.subprocess_env["SSL_CLIENT_S_DN"], keep_proxy=True)
return real_dn


# check role
Expand Down Expand Up @@ -1013,9 +1012,8 @@ def insertTaskParams(req, taskParams=None, properErrorCode=None, parent_tid=None

# check security
if not isSecure(req):
tmpMsg = MESSAGE_SSL
tmp_log.debug(tmpMsg)
return WrappedPickle.dumps((False, tmpMsg))
tmp_log.debug(MESSAGE_SSL)
return WrappedPickle.dumps((False, MESSAGE_SSL))
# get DN
user = None
if "SSL_CLIENT_S_DN" in req.subprocess_env:
Expand All @@ -1025,9 +1023,8 @@ def insertTaskParams(req, taskParams=None, properErrorCode=None, parent_tid=None
try:
json.loads(taskParams)
except Exception:
tmpMsg = "failed to decode json"
tmp_log.debug(tmpMsg)
return WrappedPickle.dumps((False, tmpMsg))
tmp_log.debug(MESSAGE_JSON)
return WrappedPickle.dumps((False, MESSAGE_JSON))
# check role
is_production_role = _has_production_role(req)
# get FQANs
Expand Down Expand Up @@ -1557,20 +1554,20 @@ def updateWorkers(req, harvesterID, workers):
user = _getDN(req)
# hostname
host = req.get_remote_host()
retVal = None
return_value = None
tStart = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
# convert
try:
data = json.loads(workers)
except Exception:
retVal = json.dumps((False, MESSAGE_JSON))
return_value = json.dumps((False, MESSAGE_JSON))
# update
if retVal is None:
retVal = userIF.updateWorkers(user, host, harvesterID, data)
if return_value is None:
return_value = userIF.updateWorkers(user, host, harvesterID, data)
tDelta = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - tStart
_logger.debug(f"updateWorkers {harvesterID} took {tDelta.seconds}.{tDelta.microseconds // 1000:03d} sec")

return retVal
return return_value


# update workers
Expand All @@ -1582,23 +1579,23 @@ def updateServiceMetrics(req, harvesterID, metrics):
user = _getDN(req)

host = req.get_remote_host()
retVal = None
return_value = None
tStart = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)

# convert
try:
data = json.loads(metrics)
except Exception:
retVal = json.dumps((False, MESSAGE_JSON))
return_value = json.dumps((False, MESSAGE_JSON))

# update
if retVal is None:
retVal = userIF.updateServiceMetrics(user, host, harvesterID, data)
if return_value is None:
return_value = userIF.updateServiceMetrics(user, host, harvesterID, data)

tDelta = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - tStart
_logger.debug(f"updateServiceMetrics {harvesterID} took {tDelta.seconds}.{tDelta.microseconds // 1000:03d} sec")

return retVal
return return_value


# add harvester dialog messages
Expand Down Expand Up @@ -1773,9 +1770,9 @@ def relay_idds_command(req, command_name, args=None, kwargs=None, manager=None,
else:
c = iDDS_Client(idds_host)
if not hasattr(c, command_name):
tmpStr = f"{command_name} is not a command of iDDS {c.__class__.__name__}"
tmp_log.error(tmpStr)
return json.dumps((False, tmpStr))
tmp_str = f"{command_name} is not a command of iDDS {c.__class__.__name__}"
tmp_log.error(tmp_str)
return json.dumps((False, tmp_str))
if args:
try:
args = idds.common.utils.json_loads(args)
Expand Down Expand Up @@ -1807,9 +1804,9 @@ def relay_idds_command(req, command_name, args=None, kwargs=None, manager=None,
except Exception:
return idds.common.utils.json_dumps((True, ret))
except Exception as e:
tmpStr = f"failed to execute command with {str(e)}"
tmp_log.error(f"{tmpStr} {traceback.format_exc()}")
return json.dumps((False, tmpStr))
tmp_str = f"failed to execute command with {str(e)}"
tmp_log.error(f"{tmp_str} {traceback.format_exc()}")
return json.dumps((False, tmp_str))


# relay iDDS workflow command with ownership check
Expand All @@ -1836,9 +1833,9 @@ def execute_idds_workflow_command(req, command_name, kwargs=None, json_outputs=N
elif command_name in ["abort", "suspend", "resume", "retry", "finish"]:
check_owner = True
else:
tmpMsg = f"{command_name} is unsupported"
tmp_log.error(tmpMsg)
return json.dumps((False, tmpMsg))
tmp_message = f"{command_name} is unsupported"
tmp_log.error(tmp_message)
return json.dumps((False, tmp_message))
# check owner
c = iDDS_ClientManager(idds_host)
if json_outputs:
Expand All @@ -1847,27 +1844,27 @@ def execute_idds_workflow_command(req, command_name, kwargs=None, json_outputs=N
if check_owner:
# requester
if not dn:
tmpMsg = "SSL_CLIENT_S_DN is missing in HTTP request"
tmp_log.error(tmpMsg)
return json.dumps((False, tmpMsg))
tmp_message = "SSL_CLIENT_S_DN is missing in HTTP request"
tmp_log.error(tmp_message)
return json.dumps((False, tmp_message))
requester = clean_user_id(dn)
# get request_id
request_id = kwargs.get("request_id")
if request_id is None:
tmpMsg = "request_id is missing"
tmp_log.error(tmpMsg)
return json.dumps((False, tmpMsg))
tmp_message = "request_id is missing"
tmp_log.error(tmp_message)
return json.dumps((False, tmp_message))
# get request
req = c.get_requests(request_id=request_id)
if not req:
tmpMsg = f"request {request_id} is not found"
tmp_log.error(tmpMsg)
return json.dumps((False, tmpMsg))
tmp_message = f"request {request_id} is not found"
tmp_log.error(tmp_message)
return json.dumps((False, tmp_message))
user_name = req[0].get("username")
if user_name and user_name != requester:
tmpMsg = f"request {request_id} is not owned by {requester}"
tmp_log.error(tmpMsg)
return json.dumps((False, tmpMsg))
tmp_message = f"request {request_id} is not owned by {requester}"
tmp_log.error(tmp_message)
return json.dumps((False, tmp_message))
# set original username
if dn:
c.set_original_user(user_name=clean_user_id(dn))
Expand Down Expand Up @@ -1901,9 +1898,9 @@ def set_user_secret(req, key=None, value=None):
# get owner
dn = req.subprocess_env.get("SSL_CLIENT_S_DN")
if not dn:
tmpMsg = "SSL_CLIENT_S_DN is missing in HTTP request"
tmp_log.error(tmpMsg)
return json.dumps((False, tmpMsg))
tmp_message = "SSL_CLIENT_S_DN is missing in HTTP request"
tmp_log.error(tmp_message)
return json.dumps((False, tmp_message))
owner = clean_user_id(dn)
return json.dumps(userIF.set_user_secret(owner, key, value))

Expand All @@ -1916,9 +1913,9 @@ def get_user_secrets(req, keys=None, get_json=None):
get_json = resolve_true(get_json)

if not dn:
tmpMsg = "SSL_CLIENT_S_DN is missing in HTTP request"
tmp_log.error(tmpMsg)
return json.dumps((False, tmpMsg))
tmp_message = "SSL_CLIENT_S_DN is missing in HTTP request"
tmp_log.error(tmp_message)
return json.dumps((False, tmp_message))
owner = clean_user_id(dn)
return json.dumps(userIF.get_user_secrets(owner, keys, get_json))

Expand Down

0 comments on commit 6912c21

Please sign in to comment.