From a40194d7dc0c8fa27bee35d85cb2ad792334206c Mon Sep 17 00:00:00 2001 From: tmaeno Date: Mon, 9 Oct 2023 08:44:07 +0200 Subject: [PATCH] * added n_try in a few functions in Client * removal of symlink creation in build step * added json serializable --- ChangeLog.txt | 3 ++ packages/hatch_build.py | 13 ------ pandaclient/Client.py | 87 +++++++++++++++++++++++++++++----------- pandaclient/FileSpec.py | 10 +++-- pandaclient/JobSpec.py | 23 +++++++++++ pandaclient/MiscUtils.py | 37 ++++++++++++++++- setup.py | 13 ------ 7 files changed, 131 insertions(+), 55 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index 6591063f..46900af0 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -2,6 +2,9 @@ 1.5.65 * added --workflowName to pchain + * added n_try in a few functions in Client + * removal of symlink creation in build step + * added json serializable 1.5.64 * fixed pyproject.toml diff --git a/packages/hatch_build.py b/packages/hatch_build.py index a047c7a1..90bfb043 100644 --- a/packages/hatch_build.py +++ b/packages/hatch_build.py @@ -64,19 +64,6 @@ def initialize(self, version, build_data): # post install only for client installation if not os.path.exists(os.path.join(self.params['install_purelib'], 'pandacommon')): - target = 'taskbuffer' - if not os.path.exists(os.path.join(self.params['install_purelib'], target)): - os.symlink('pandaclient', target) target = 'pandatools' if not os.path.exists(os.path.join(self.params['install_purelib'], target)): os.symlink('pandaclient', target) - # dummy pandaserver package - target = 'pandaserver' - if not os.path.exists(os.path.join(self.params['install_purelib'], target)): - os.makedirs(target) - target_init = os.path.join(target, '__init__.py') - with open(target_init, 'w'): - pass - target_tb = os.path.join(target, 'taskbuffer') - if not os.path.exists(target_tb): - os.symlink('../pandaclient', target_tb) diff --git a/pandaclient/Client.py b/pandaclient/Client.py index 3c31f122..75dd66f6 100755 --- a/pandaclient/Client.py +++ b/pandaclient/Client.py @@ -10,6 +10,7 @@ import stat import json import gzip +import time import string import traceback try: @@ -242,7 +243,7 @@ def randomize_ip(self, url): # GET method - def get(self, url, data, rucioAccount=False, via_file=False): + def get(self, url, data, rucioAccount=False, via_file=False, n_try=1): use_https = is_https(url) # make command com = '%s --silent --get' % self.path @@ -295,7 +296,11 @@ def get(self, url, data, rucioAccount=False, via_file=False): if self.verbose: print(hide_sensitive_info(com)) print(strData[:-1]) - s,o = commands_get_status_output(com) + for i_try in range(n_try): + s,o = commands_get_status_output(com) + if s == 0 or i_try+1 == n_try: + break + time.sleep(1) if o != '\x00': try: tmpout = unquote_plus(o) @@ -317,7 +322,7 @@ def get(self, url, data, rucioAccount=False, via_file=False): # POST method - def post(self,url,data,rucioAccount=False, is_json=False, via_file=False, compress_body=False): + def post(self,url,data,rucioAccount=False, is_json=False, via_file=False, compress_body=False, n_try=1): use_https = is_https(url) # make command com = '%s --silent' % self.path @@ -387,7 +392,11 @@ def post(self,url,data,rucioAccount=False, is_json=False, via_file=False, compre print(hide_sensitive_info(com)) for key in data: print('{}={}'.format(key, data[key])) - s,o = commands_get_status_output(com) + for i_try in range(n_try): + s,o = commands_get_status_output(com) + if s == 0 or i_try+1 == n_try: + break + time.sleep(1) if o != '\x00': try: if is_json: @@ -411,7 +420,7 @@ def post(self,url,data,rucioAccount=False, is_json=False, via_file=False, compre return ret # PUT method - def put(self, url, data): + def put(self, url, data, n_try=1): use_https = is_https(url) # make command com = '%s --silent' % self.path @@ -442,7 +451,11 @@ def put(self, url, data): if self.verbose: print(hide_sensitive_info(com)) # execute - ret = commands_get_status_output(com) + for i_try in range(n_try): + ret = commands_get_status_output(com) + if ret[0] == 0 or i_try+1 == n_try: + break + time.sleep(1) ret = self.convRet(ret) if self.verbose: print(ret) @@ -518,10 +531,14 @@ def http_method(self, url, data, header, rdata=None, compress_body=False, is_jso return 1, errMsg # GET method - def get(self, url, data, rucioAccount=False, via_file=False, output_name=None): + def get(self, url, data, rucioAccount=False, via_file=False, output_name=None, n_try=1): if data: url = '{}?{}'.format(url, urlencode(data)) - code, text = self.http_method(url, {}, {}) + for i_try in range(n_try): + code, text = self.http_method(url, {}, {}) + if code in [0, 403, 404] or i_try+1 == n_try: + break + time.sleep(1) if code == 0 and output_name: with open(output_name, 'wb') as f: f.write(text) @@ -529,14 +546,18 @@ def get(self, url, data, rucioAccount=False, via_file=False, output_name=None): return code, text # POST method - def post(self,url,data,rucioAccount=False, is_json=False, via_file=False, compress_body=False): - code, text = self.http_method(url, data, {}, compress_body=True, is_json=is_json) + def post(self,url,data,rucioAccount=False, is_json=False, via_file=False, compress_body=False, n_try=1): + for i_try in range(n_try): + code, text = self.http_method(url, data, {}, compress_body=True, is_json=is_json) + if code in [0, 403, 404] or i_try+1 == n_try: + break + time.sleep(1) if is_json and code == 0: text = json.loads(text) return code, text # PUT method - def put(self, url, data): + def put(self, url, data, n_try=1): boundary = ''.join(random.choice(string.ascii_letters) for ii in range(30 + 1)) body = b'' for k in data: @@ -552,7 +573,12 @@ def put(self, url, data): body += '\r\n'.join(lines).encode() headers = {'content-type': 'multipart/form-data; boundary=' + boundary, 'content-length': str(len(body))} - return self.http_method(url, None, headers, body) + for i_try in range(n_try): + code, text = self.http_method(url, None, headers, body) + if code in [0, 403, 404] or i_try+1 == n_try: + break + time.sleep(1) + return code, text if 'PANDA_USE_NATIVE_HTTPLIB' in os.environ: @@ -602,12 +628,13 @@ def submitJobs(jobs,verbose=False): # get job statuses -def getJobStatus(ids, verbose=False): +def getJobStatus(ids, verbose=False, no_pickle=False): """Get status of jobs args: ids: a list of PanDA IDs verbose: True to see verbose messages + no_pickle: True to use json instead of pickle returns: status code 0: communication succeeded to the panda server @@ -615,16 +642,24 @@ def getJobStatus(ids, verbose=False): a list of job specs, or None if failed """ # serialize - strIDs = pickle.dumps(ids, protocol=0) + if not no_pickle: + strIDs = pickle.dumps(ids, protocol=0) + else: + strIDs = ids # instantiate curl curl = _Curl() curl.verbose = verbose # execute url = baseURL + '/getJobStatus' - data = {'ids':strIDs} + data = {'ids': strIDs} + if no_pickle: + data["no_pickle"] = True status,output = curl.post(url, data, via_file=True) try: - return status, pickle_loads(output) + if not no_pickle: + return status, pickle_loads(output) + else: + return status, MiscUtils.load_jobs_json(output) except Exception as e: dump_log("getJobStatus", e, output) return EC_Failed,None @@ -778,13 +813,14 @@ def retryTask(jediTaskID,verbose=False,properErrorCode=False,newParams=None): # put file -def putFile(file,verbose=False,useCacheSrv=False,reuseSandbox=False): +def putFile(file,verbose=False,useCacheSrv=False,reuseSandbox=False, n_try=1): """Upload a file with the size limit on 10 MB args: file: filename to be uploaded verbose: True to see debug messages useCacheSrv: True to use a dedicated cache server separated from the PanDA server reuseSandbox: True to avoid uploading the same sandbox files + n_try: number of tries returns: status code 0: communication succeeded to the panda server @@ -836,17 +872,18 @@ def putFile(file,verbose=False,useCacheSrv=False,reuseSandbox=False): baseURLCSRVSSL = baseURLSSL url = baseURLCSRVSSL + '/putFile' data = {'file':file} - s,o = curl.put(url,data) + s,o = curl.put(url, data, n_try=n_try) return s, str_decode(o) # get file -def getFile(filename, output_path=None, verbose=False): +def getFile(filename, output_path=None, verbose=False, n_try=1): """Get a file args: filename: filename to be downloaded output_path: output path. set to filename if unspecified verbose: True to see debug messages + n_try: number of tries returns: status code 0: communication succeeded to the panda server @@ -864,7 +901,7 @@ def getFile(filename, output_path=None, verbose=False): if netloc.port: url += ':%s' % netloc.port url = url + '/cache/' + filename - s, o = curl.get(url, {}, output_name=output_path) + s, o = curl.get(url, {}, output_name=output_path, n_try=n_try) return s, o @@ -1600,7 +1637,7 @@ def get_new_token(): # call idds command def call_idds_command(command_name, args=None, kwargs=None, dumper=None, verbose=False, compress=False, - manager=False, loader=None, json_outputs=False): + manager=False, loader=None, json_outputs=False, n_try=1): """Call an iDDS command through PanDA args: command_name: command name @@ -1612,6 +1649,7 @@ def call_idds_command(command_name, args=None, kwargs=None, dumper=None, verbose manager: True to use ClientManager loader: function object for json.loads json_outputs: True to use json outputs + n_try: number of tries returns: status code 0: communication succeeded to the panda server @@ -1643,7 +1681,7 @@ def call_idds_command(command_name, args=None, kwargs=None, dumper=None, verbose data['manager'] = True if json_outputs: data['json_outputs'] = True - status, output = curl.post(url, data, compress_body=compress) + status, output = curl.post(url, data, compress_body=compress, n_try=n_try) if status != 0: return EC_Failed, output else: @@ -1661,13 +1699,14 @@ def call_idds_command(command_name, args=None, kwargs=None, dumper=None, verbose # call idds user workflow command -def call_idds_user_workflow_command(command_name, kwargs=None, verbose=False, json_outputs=False): +def call_idds_user_workflow_command(command_name, kwargs=None, verbose=False, json_outputs=False, n_try=1): """Call an iDDS workflow user command args: command_name: command name kwargs: a dictionary of keyword arguments verbose: True to see verbose message json_outputs: True to use json outputs + n_try: number of tries returns: status code 0: communication succeeded to the panda server @@ -1688,7 +1727,7 @@ def call_idds_user_workflow_command(command_name, kwargs=None, verbose=False, js data['kwargs'] = json.dumps(kwargs) if json_outputs: data['json_outputs'] = True - status, output = curl.post(url, data) + status, output = curl.post(url, data, n_try=n_try) if status != 0: return EC_Failed, output else: diff --git a/pandaclient/FileSpec.py b/pandaclient/FileSpec.py index 6e4bd60d..1ac5e9c1 100755 --- a/pandaclient/FileSpec.py +++ b/pandaclient/FileSpec.py @@ -111,7 +111,9 @@ def updateExpression(cls): return ret updateExpression = classmethod(updateExpression) - - - - + # dump to be json-serializable + def dump_to_json_serializable(self): + stat = self.__getstate__()[:-1] + # set None as _owner + stat.append(None) + return stat diff --git a/pandaclient/JobSpec.py b/pandaclient/JobSpec.py index 6d57c7a0..697e7417 100755 --- a/pandaclient/JobSpec.py +++ b/pandaclient/JobSpec.py @@ -5,6 +5,7 @@ import re import datetime +from FileSpec import FileSpec reserveChangedState = False @@ -833,3 +834,25 @@ def to_dict(self): v = None ret[a] = v return ret + + # dump to json-serializable + def dump_to_json_serializable(self): + job_state = self.__getstate__() + file_state_list = [] + for file_spec in job_state[-1]: + file_stat = file_spec.dump_to_json_serializable() + file_state_list.append(file_stat) + job_state = job_state[:-1] + # append files + job_state.append(file_state_list) + return job_state + + # load from json-serializable + def load_from_json_serializable(self, job_state): + # initialize with empty file list + self.__setstate__(job_state[:-1] + [[]]) + # add files + for file_stat in job_state[-1]: + file_spec = FileSpec() + file_spec.__setstate__(file_stat) + self.addFile(file_spec) diff --git a/pandaclient/MiscUtils.py b/pandaclient/MiscUtils.py index 04ee7609..79a976bf 100644 --- a/pandaclient/MiscUtils.py +++ b/pandaclient/MiscUtils.py @@ -3,6 +3,7 @@ import os import json import uuid +import datetime import traceback import subprocess try: @@ -297,4 +298,38 @@ def parse_secondary_datasets_opt(secondaryDSs): secondaryDSs = tmpMap else: secondaryDSs = {} - return True, secondaryDSs \ No newline at end of file + return True, secondaryDSs + + +# convert datetime to string +class NonJsonObjectEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, datetime.datetime): + return {"_datetime_object": obj.strftime("%Y-%m-%d %H:%M:%S.%f")} + return json.JSONEncoder.default(self, obj) + + +# hook for json decoder +def as_python_object(dct): + if "_datetime_object" in dct: + return datetime.datetime.strptime(str(dct["_datetime_object"]), "%Y-%m-%d %H:%M:%S.%f") + return dct + + +# dump jobs to serialized json +def dump_jobs_json(jobs): + state_objects = [] + for job_spec in jobs: + state_objects.append(job_spec.dump_to_json_serializable()) + return json.dumps(state_objects, cls=NonJsonObjectEncoder) + + +# load serialized json to jobs +def load_jobs_json(state): + state_objects = json.loads(state, object_hook=as_python_object) + jobs = [] + for job_state in state_objects: + job_spec = JobSpec.JobSpec() + job_spec.load_from_json_serializable(job_state) + jobs.append(job_spec) + return jobs diff --git a/setup.py b/setup.py index 4108b881..189c3f59 100644 --- a/setup.py +++ b/setup.py @@ -122,22 +122,9 @@ def run (self): install_data_org.run(self) # post install only for client installation if not os.path.exists(os.path.join(self.install_purelib, 'pandacommon')): - target = os.path.join(self.install_purelib, 'taskbuffer') - if not os.path.exists(target): - os.symlink('pandaclient', - os.path.join(self.install_purelib, 'taskbuffer')) target = os.path.join(self.install_purelib, 'pandatools') if not os.path.exists(target): os.symlink('pandaclient', target) - target = os.path.join(self.install_purelib, 'pandaserver') - if not os.path.exists(target): - os.makedirs(target) - target_init = os.path.join(target, '__init__.py') - with open(target_init, 'w'): - pass - target = os.path.join(target, 'taskbuffer') - if not os.path.exists(target): - os.symlink('../pandaclient', target) # delete for autoGenFile in autoGenFiles: try: