Skip to content

Commit

Permalink
1.4.61
Browse files Browse the repository at this point in the history
  • Loading branch information
tmaeno committed Mar 29, 2021
1 parent e0fda97 commit 4e24e67
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 9 deletions.
3 changes: 3 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
** Release Notes

1.4.61
* added PANDA_USE_NATIVE_HTTPLIB

1.4.57
* added GUI example

Expand Down
83 changes: 81 additions & 2 deletions pandatools/Client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@
import os
import re
import sys
import ssl
import stat
import json
import string
import traceback
try:
from urllib import urlencode, unquote_plus
from urlparse import urlparse
from urllib2 import urlopen, Request, HTTPError
except ImportError:
from urllib.parse import urlencode, unquote_plus, urlparse
from urllib.request import urlopen, Request
from urllib.error import HTTPError
import struct
try:
import cPickle as pickle
Expand Down Expand Up @@ -361,8 +367,78 @@ def convRet(self,ret):
return ret


class _NativeCurl(_Curl):

def http_method(self, url, data, header, rdata=None):
try:
url = self.randomize_ip(url)
if header is None:
header = {}
if self.authMode == 'oidc':
self.get_id_token()
header['Authorization'] = 'Bearer {0}'.format(self.idToken)
header['Origin'] = self.authVO
if rdata is None:
rdata = urlencode(data).encode()
req = Request(url, rdata, headers=header)
context = ssl._create_unverified_context()
if self.sslCert and self.sslKey:
context.load_cert_chain(certfile=self.sslCert, keyfile=self.sslKey)
if self.verbose:
print('url = {}'.format(url))
print('header = {}'.format(str(header)))
print('data = {}'.format(str(data)))
conn = urlopen(req, context=context)
code = conn.getcode()
if code == 200:
code = 0
text = conn.read()
if self.verbose:
print(code, text)
return code, text
except Exception as e:
print (traceback.format_exc())
return 1, str(e)

# GET method
def get(self, url, data, rucioAccount=False, via_file=False):
url = '{}?{}'.format(url, urlencode(data))
return self.http_method(url, {}, {})

# POST method
def post(self,url,data,rucioAccount=False, is_json=False, via_file=False):
code, text = self.http_method(url, data, {})
if is_json and code == 0:
text = json.loads(text)
return code, text

# PUT method
def put(self, url, data):
boundary = ''.join(random.choice(string.ascii_letters) for ii in range(30 + 1))
body = b''
for k in data:
lines = ['--' + boundary,
'Content-Disposition: form-data; name="%s"; filename="%s"' % (k, data[k]),
'Content-Type: application/octet-stream',
'']
body += '\r\n'.join(lines).encode()
body += b'\r\n'
body += open(data[k], 'rb').read()
body += b'\r\n'
lines = ['--%s--' % boundary, '']
body += '\r\n'.join(lines).encode()
headers = {'content-type': 'multipart/form-data; boundary=' + boundary,
'content-length': str(len(body))}
return self.http_method(url, None, headers, body)


if 'PANDA_USE_NATIVE_HTTPLIB' in os.environ:
_Curl = _NativeCurl


# dump log
def dump_log(func_name, exception_obj, output):
print(traceback.format_exc())
print(output)
err_str = "{} failed : {}".format(func_name, str(exception_obj))
tmp_log = PLogger.getPandaLogger()
Expand Down Expand Up @@ -621,6 +697,7 @@ def putFile(file,verbose=False,useCacheSrv=False,reuseSandbox=False):
url = baseURLSSL + '/checkSandboxFile'
data = {'fileSize':fileSize,'checkSum':checkSum}
status,output = curl.post(url,data)
output = output.decode()
if status != 0:
return EC_Failed,'ERROR: Could not check Sandbox duplication with %s' % status
elif output.startswith('FOUND:'):
Expand All @@ -636,7 +713,8 @@ def putFile(file,verbose=False,useCacheSrv=False,reuseSandbox=False):
else:
url = baseURLSSL + '/putFile'
data = {'file':file}
return curl.put(url,data)
s,o = curl.put(url,data)
return s, o.decode()


# get grid source file
Expand Down Expand Up @@ -863,6 +941,7 @@ def getPandaClientVer(verbose):
# execute
url = baseURL + '/getPandaClientVer'
status,output = curl.get(url,{})
output = output.decode()
# failed
if status != 0:
return status,output
Expand Down Expand Up @@ -1251,7 +1330,7 @@ def get_user_name_from_token():
curl = _Curl()
token_info = curl.get_token_info()
try:
return token_info['name'], token_info['groups']
return token_info['name'], token_info['groups'], token_info['preferred_username']
except Exception:
return None, None

Expand Down
2 changes: 1 addition & 1 deletion pandatools/PandaToolsPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "1.4.60"
release_version = "1.4.61"
6 changes: 5 additions & 1 deletion pandatools/PathenaScript.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@
group_input.add_argument('--avoidVP', action='store_const', const=True, dest='avoidVP', default=False,
help='Not to use sites where virtual placement is enabled')
group_submit.add_argument('--maxCpuCount', action='store', dest='maxCpuCount', default=0, type=int,
help='Required CPU count in seconds. Mainly to extend time limit for looping job detection')
help=argparse.SUPPRESS)
group_expert.add_argument('--noLoopingCheck', action='store_const', const=True, dest='noLoopingCheck', default=False,
help="Disable looping job check")
group_output.add_argument('--official', action='store_const', const=True, dest='official', default=False,
help='Produce official dataset')
action = group_job.add_argument('--unlimitNumOutputs', action='store_const', const=True, dest='unlimitNumOutputs', default=False,
Expand Down Expand Up @@ -1483,6 +1485,8 @@ def _onExit(dir, files, del_command):
taskParamMap['fixedSandbox'] = archiveName
if options.maxCpuCount > 0:
taskParamMap['walltime'] = -options.maxCpuCount
if options.noLoopingCheck:
taskParamMap['noLoopingCheck'] = True
if options.maxWalltime > 0:
taskParamMap['maxWalltime'] = options.maxWalltime
if options.cpuTimePerEvent > 0:
Expand Down
8 changes: 6 additions & 2 deletions pandatools/PrunScript.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@
help='Required memory size in MB. e.g., for 1GB --memory 1024')
group_submit.add_argument('--nCore', action='store', dest='nCore', default=-1, type=int,
help='The number of CPU cores. Note that the system distinguishes only nCore=1 and nCore>1. This means that even if you set nCore=2 jobs can go to sites with nCore=8 and your application must use the 8 cores there. The number of available cores is defined in an environment variable, $ATHENA_PROC_NUMBER, on WNs. Your application must check the env variable when starting up to dynamically change the number of cores')
group_submit.add_argument('--maxCpuCount',action='store',dest='maxCpuCount',default=-1,type=int,
help='Required CPU count in seconds. Mainly to extend time limit for looping job detection')
group_submit.add_argument('--maxCpuCount', action='store', dest='maxCpuCount', default=0, type=int,
help=argparse.SUPPRESS)
group_expert.add_argument('--noLoopingCheck', action='store_const', const=True, dest='noLoopingCheck', default=False,
help="Disable looping job check")
group_submit.add_argument('--useDirectIOSites', action='store_const', const=True, dest='useDirectIOSites', default=False,
help="Use only sites which use directIO to read input files")

Expand Down Expand Up @@ -1386,6 +1388,8 @@ def _onExit(dir, files, del_command):
taskParamMap['fixedSandbox'] = archiveName
if options.maxCpuCount > 0:
taskParamMap['walltime'] = -options.maxCpuCount
if options.noLoopingCheck:
taskParamMap['noLoopingCheck'] = True
if options.maxWalltime > 0:
taskParamMap['maxWalltime'] = options.maxWalltime
if options.cpuTimePerEvent > 0:
Expand Down
6 changes: 3 additions & 3 deletions pandatools/PsubUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ def get_proxy_info(force, verbose):
cacheVomsInfo = status,out
else:
# OIDC
uid, groups = Client.get_user_name_from_token()
uid, groups, nickname = Client.get_user_name_from_token()
if uid is None:
status = 1
else:
status = 0
cacheVomsInfo = (status, (uid, groups))
cacheVomsInfo = (status, (uid, groups, nickname))
return cacheVomsInfo


Expand Down Expand Up @@ -172,7 +172,7 @@ def getNickname(verbose=False):
status, output = get_proxy_info(False, verbose)
# OIDC
if Client.use_oidc():
return output[0]
return output[2]
# X509
for line in output.split('\n'):
if line.startswith('attribute'):
Expand Down

0 comments on commit 4e24e67

Please sign in to comment.