Skip to content

Commit

Permalink
* added n_try in a few functions in Client
Browse files Browse the repository at this point in the history
* removal of symlink creation in build step
* added json serializable
  • Loading branch information
tmaeno committed Oct 9, 2023
1 parent 4368780 commit a40194d
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 55 deletions.
3 changes: 3 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

1.5.65
* added --workflowName to pchain
* added n_try in a few functions in Client
* removal of symlink creation in build step
* added json serializable

1.5.64
* fixed pyproject.toml
Expand Down
13 changes: 0 additions & 13 deletions packages/hatch_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,6 @@ def initialize(self, version, build_data):

# post install only for client installation
if not os.path.exists(os.path.join(self.params['install_purelib'], 'pandacommon')):
target = 'taskbuffer'
if not os.path.exists(os.path.join(self.params['install_purelib'], target)):
os.symlink('pandaclient', target)
target = 'pandatools'
if not os.path.exists(os.path.join(self.params['install_purelib'], target)):
os.symlink('pandaclient', target)
# dummy pandaserver package
target = 'pandaserver'
if not os.path.exists(os.path.join(self.params['install_purelib'], target)):
os.makedirs(target)
target_init = os.path.join(target, '__init__.py')
with open(target_init, 'w'):
pass
target_tb = os.path.join(target, 'taskbuffer')
if not os.path.exists(target_tb):
os.symlink('../pandaclient', target_tb)
87 changes: 63 additions & 24 deletions pandaclient/Client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import stat
import json
import gzip
import time
import string
import traceback
try:
Expand Down Expand Up @@ -242,7 +243,7 @@ def randomize_ip(self, url):


# GET method
def get(self, url, data, rucioAccount=False, via_file=False):
def get(self, url, data, rucioAccount=False, via_file=False, n_try=1):
use_https = is_https(url)
# make command
com = '%s --silent --get' % self.path
Expand Down Expand Up @@ -295,7 +296,11 @@ def get(self, url, data, rucioAccount=False, via_file=False):
if self.verbose:
print(hide_sensitive_info(com))
print(strData[:-1])
s,o = commands_get_status_output(com)
for i_try in range(n_try):
s,o = commands_get_status_output(com)
if s == 0 or i_try+1 == n_try:
break
time.sleep(1)
if o != '\x00':
try:
tmpout = unquote_plus(o)
Expand All @@ -317,7 +322,7 @@ def get(self, url, data, rucioAccount=False, via_file=False):


# POST method
def post(self,url,data,rucioAccount=False, is_json=False, via_file=False, compress_body=False):
def post(self,url,data,rucioAccount=False, is_json=False, via_file=False, compress_body=False, n_try=1):
use_https = is_https(url)
# make command
com = '%s --silent' % self.path
Expand Down Expand Up @@ -387,7 +392,11 @@ def post(self,url,data,rucioAccount=False, is_json=False, via_file=False, compre
print(hide_sensitive_info(com))
for key in data:
print('{}={}'.format(key, data[key]))
s,o = commands_get_status_output(com)
for i_try in range(n_try):
s,o = commands_get_status_output(com)
if s == 0 or i_try+1 == n_try:
break
time.sleep(1)
if o != '\x00':
try:
if is_json:
Expand All @@ -411,7 +420,7 @@ def post(self,url,data,rucioAccount=False, is_json=False, via_file=False, compre
return ret

# PUT method
def put(self, url, data):
def put(self, url, data, n_try=1):
use_https = is_https(url)
# make command
com = '%s --silent' % self.path
Expand Down Expand Up @@ -442,7 +451,11 @@ def put(self, url, data):
if self.verbose:
print(hide_sensitive_info(com))
# execute
ret = commands_get_status_output(com)
for i_try in range(n_try):
ret = commands_get_status_output(com)
if ret[0] == 0 or i_try+1 == n_try:
break
time.sleep(1)
ret = self.convRet(ret)
if self.verbose:
print(ret)
Expand Down Expand Up @@ -518,25 +531,33 @@ def http_method(self, url, data, header, rdata=None, compress_body=False, is_jso
return 1, errMsg

# GET method
def get(self, url, data, rucioAccount=False, via_file=False, output_name=None):
def get(self, url, data, rucioAccount=False, via_file=False, output_name=None, n_try=1):
if data:
url = '{}?{}'.format(url, urlencode(data))
code, text = self.http_method(url, {}, {})
for i_try in range(n_try):
code, text = self.http_method(url, {}, {})
if code in [0, 403, 404] or i_try+1 == n_try:
break
time.sleep(1)
if code == 0 and output_name:
with open(output_name, 'wb') as f:
f.write(text)
text = True
return code, text

# POST method
def post(self,url,data,rucioAccount=False, is_json=False, via_file=False, compress_body=False):
code, text = self.http_method(url, data, {}, compress_body=True, is_json=is_json)
def post(self,url,data,rucioAccount=False, is_json=False, via_file=False, compress_body=False, n_try=1):
for i_try in range(n_try):
code, text = self.http_method(url, data, {}, compress_body=True, is_json=is_json)
if code in [0, 403, 404] or i_try+1 == n_try:
break
time.sleep(1)
if is_json and code == 0:
text = json.loads(text)
return code, text

# PUT method
def put(self, url, data):
def put(self, url, data, n_try=1):
boundary = ''.join(random.choice(string.ascii_letters) for ii in range(30 + 1))
body = b''
for k in data:
Expand All @@ -552,7 +573,12 @@ def put(self, url, data):
body += '\r\n'.join(lines).encode()
headers = {'content-type': 'multipart/form-data; boundary=' + boundary,
'content-length': str(len(body))}
return self.http_method(url, None, headers, body)
for i_try in range(n_try):
code, text = self.http_method(url, None, headers, body)
if code in [0, 403, 404] or i_try+1 == n_try:
break
time.sleep(1)
return code, text


if 'PANDA_USE_NATIVE_HTTPLIB' in os.environ:
Expand Down Expand Up @@ -602,29 +628,38 @@ def submitJobs(jobs,verbose=False):


# get job statuses
def getJobStatus(ids, verbose=False):
def getJobStatus(ids, verbose=False, no_pickle=False):
"""Get status of jobs
args:
ids: a list of PanDA IDs
verbose: True to see verbose messages
no_pickle: True to use json instead of pickle
returns:
status code
0: communication succeeded to the panda server
255: communication failure
a list of job specs, or None if failed
"""
# serialize
strIDs = pickle.dumps(ids, protocol=0)
if not no_pickle:
strIDs = pickle.dumps(ids, protocol=0)
else:
strIDs = ids
# instantiate curl
curl = _Curl()
curl.verbose = verbose
# execute
url = baseURL + '/getJobStatus'
data = {'ids':strIDs}
data = {'ids': strIDs}
if no_pickle:
data["no_pickle"] = True
status,output = curl.post(url, data, via_file=True)
try:
return status, pickle_loads(output)
if not no_pickle:
return status, pickle_loads(output)
else:
return status, MiscUtils.load_jobs_json(output)
except Exception as e:
dump_log("getJobStatus", e, output)
return EC_Failed,None
Expand Down Expand Up @@ -778,13 +813,14 @@ def retryTask(jediTaskID,verbose=False,properErrorCode=False,newParams=None):


# put file
def putFile(file,verbose=False,useCacheSrv=False,reuseSandbox=False):
def putFile(file,verbose=False,useCacheSrv=False,reuseSandbox=False, n_try=1):
"""Upload a file with the size limit on 10 MB
args:
file: filename to be uploaded
verbose: True to see debug messages
useCacheSrv: True to use a dedicated cache server separated from the PanDA server
reuseSandbox: True to avoid uploading the same sandbox files
n_try: number of tries
returns:
status code
0: communication succeeded to the panda server
Expand Down Expand Up @@ -836,17 +872,18 @@ def putFile(file,verbose=False,useCacheSrv=False,reuseSandbox=False):
baseURLCSRVSSL = baseURLSSL
url = baseURLCSRVSSL + '/putFile'
data = {'file':file}
s,o = curl.put(url,data)
s,o = curl.put(url, data, n_try=n_try)
return s, str_decode(o)


# get file
def getFile(filename, output_path=None, verbose=False):
def getFile(filename, output_path=None, verbose=False, n_try=1):
"""Get a file
args:
filename: filename to be downloaded
output_path: output path. set to filename if unspecified
verbose: True to see debug messages
n_try: number of tries
returns:
status code
0: communication succeeded to the panda server
Expand All @@ -864,7 +901,7 @@ def getFile(filename, output_path=None, verbose=False):
if netloc.port:
url += ':%s' % netloc.port
url = url + '/cache/' + filename
s, o = curl.get(url, {}, output_name=output_path)
s, o = curl.get(url, {}, output_name=output_path, n_try=n_try)
return s, o


Expand Down Expand Up @@ -1600,7 +1637,7 @@ def get_new_token():

# call idds command
def call_idds_command(command_name, args=None, kwargs=None, dumper=None, verbose=False, compress=False,
manager=False, loader=None, json_outputs=False):
manager=False, loader=None, json_outputs=False, n_try=1):
"""Call an iDDS command through PanDA
args:
command_name: command name
Expand All @@ -1612,6 +1649,7 @@ def call_idds_command(command_name, args=None, kwargs=None, dumper=None, verbose
manager: True to use ClientManager
loader: function object for json.loads
json_outputs: True to use json outputs
n_try: number of tries
returns:
status code
0: communication succeeded to the panda server
Expand Down Expand Up @@ -1643,7 +1681,7 @@ def call_idds_command(command_name, args=None, kwargs=None, dumper=None, verbose
data['manager'] = True
if json_outputs:
data['json_outputs'] = True
status, output = curl.post(url, data, compress_body=compress)
status, output = curl.post(url, data, compress_body=compress, n_try=n_try)
if status != 0:
return EC_Failed, output
else:
Expand All @@ -1661,13 +1699,14 @@ def call_idds_command(command_name, args=None, kwargs=None, dumper=None, verbose


# call idds user workflow command
def call_idds_user_workflow_command(command_name, kwargs=None, verbose=False, json_outputs=False):
def call_idds_user_workflow_command(command_name, kwargs=None, verbose=False, json_outputs=False, n_try=1):
"""Call an iDDS workflow user command
args:
command_name: command name
kwargs: a dictionary of keyword arguments
verbose: True to see verbose message
json_outputs: True to use json outputs
n_try: number of tries
returns:
status code
0: communication succeeded to the panda server
Expand All @@ -1688,7 +1727,7 @@ def call_idds_user_workflow_command(command_name, kwargs=None, verbose=False, js
data['kwargs'] = json.dumps(kwargs)
if json_outputs:
data['json_outputs'] = True
status, output = curl.post(url, data)
status, output = curl.post(url, data, n_try=n_try)
if status != 0:
return EC_Failed, output
else:
Expand Down
10 changes: 6 additions & 4 deletions pandaclient/FileSpec.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ def updateExpression(cls):
return ret
updateExpression = classmethod(updateExpression)





# dump to be json-serializable
def dump_to_json_serializable(self):
stat = self.__getstate__()[:-1]
# set None as _owner
stat.append(None)
return stat
23 changes: 23 additions & 0 deletions pandaclient/JobSpec.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import re
import datetime
from FileSpec import FileSpec

reserveChangedState = False

Expand Down Expand Up @@ -833,3 +834,25 @@ def to_dict(self):
v = None
ret[a] = v
return ret

# dump to json-serializable
def dump_to_json_serializable(self):
job_state = self.__getstate__()
file_state_list = []
for file_spec in job_state[-1]:
file_stat = file_spec.dump_to_json_serializable()
file_state_list.append(file_stat)
job_state = job_state[:-1]
# append files
job_state.append(file_state_list)
return job_state

# load from json-serializable
def load_from_json_serializable(self, job_state):
# initialize with empty file list
self.__setstate__(job_state[:-1] + [[]])
# add files
for file_stat in job_state[-1]:
file_spec = FileSpec()
file_spec.__setstate__(file_stat)
self.addFile(file_spec)
37 changes: 36 additions & 1 deletion pandaclient/MiscUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import json
import uuid
import datetime
import traceback
import subprocess
try:
Expand Down Expand Up @@ -297,4 +298,38 @@ def parse_secondary_datasets_opt(secondaryDSs):
secondaryDSs = tmpMap
else:
secondaryDSs = {}
return True, secondaryDSs
return True, secondaryDSs


# convert datetime to string
class NonJsonObjectEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
return {"_datetime_object": obj.strftime("%Y-%m-%d %H:%M:%S.%f")}
return json.JSONEncoder.default(self, obj)


# hook for json decoder
def as_python_object(dct):
if "_datetime_object" in dct:
return datetime.datetime.strptime(str(dct["_datetime_object"]), "%Y-%m-%d %H:%M:%S.%f")
return dct


# dump jobs to serialized json
def dump_jobs_json(jobs):
state_objects = []
for job_spec in jobs:
state_objects.append(job_spec.dump_to_json_serializable())
return json.dumps(state_objects, cls=NonJsonObjectEncoder)


# load serialized json to jobs
def load_jobs_json(state):
state_objects = json.loads(state, object_hook=as_python_object)
jobs = []
for job_state in state_objects:
job_spec = JobSpec.JobSpec()
job_spec.load_from_json_serializable(job_state)
jobs.append(job_spec)
return jobs
Loading

0 comments on commit a40194d

Please sign in to comment.