diff --git a/release/huaweicloud-obs-sdk-python_3.20.1.tar.gz b/release/huaweicloud-obs-sdk-python_3.20.1.tar.gz new file mode 100644 index 0000000..1f08589 Binary files /dev/null and b/release/huaweicloud-obs-sdk-python_3.20.1.tar.gz differ diff --git a/release/huaweicloud-obs-sdk-python_3.20.1.tar.gz.sha256 b/release/huaweicloud-obs-sdk-python_3.20.1.tar.gz.sha256 new file mode 100644 index 0000000..bc6e616 --- /dev/null +++ b/release/huaweicloud-obs-sdk-python_3.20.1.tar.gz.sha256 @@ -0,0 +1 @@ +3d7bd6df798270963df45493eebc6c7eea6b1972287a7663e78e3b4b720ccbb3 *huaweicloud-obs-sdk-python_3.20.1.tar.gz diff --git a/src/obs/__init__.py b/src/obs/__init__.py index be6e0d0..6780d3b 100644 --- a/src/obs/__init__.py +++ b/src/obs/__init__.py @@ -72,6 +72,8 @@ 'DeleteObjectsRequest', 'ListMultipartUploadsRequest', 'GetObjectRequest', - 'UploadFileHeader' + 'UploadFileHeader', + 'Payer', + 'ExtensionHeader' ] diff --git a/src/obs/auth.py b/src/obs/auth.py index 463fcf9..722fde0 100644 --- a/src/obs/auth.py +++ b/src/obs/auth.py @@ -127,7 +127,7 @@ def __make_canonicalstring(self, method, bucket_name, key, path_args, headers, e return ''.join(str_list) class V4Authentication(object): - CONTENT_SHA256 = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' + CONTENT_SHA256 = 'UNSIGNED-PAYLOAD' def __init__(self, ak, sk, region, shortDate, longDate, path_style, ha): self.ak = ak self.sk = sk diff --git a/src/obs/bucket.py b/src/obs/bucket.py index db335a5..027163e 100644 --- a/src/obs/bucket.py +++ b/src/obs/bucket.py @@ -75,7 +75,10 @@ class BucketClient(object): 'optionsObject', 'setBucketEncryption', 'getBucketEncryption', - 'deleteBucketEncryption' + 'deleteBucketEncryption', + 'headObject', + 'setBucketRequestPayment', + 'getBucketRequestPayment' ] def __init__(self, obsClient, bucketName): diff --git a/src/obs/bulktasks.py b/src/obs/bulktasks.py index bbee2a9..52cef52 100644 --- a/src/obs/bulktasks.py +++ b/src/obs/bulktasks.py @@ -1,213 +1,213 @@ -#!/usr/bin/env python -# -*- coding:utf-8 -*- -# Copyright 2019 Huawei Technologies Co.,Ltd. -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use -# this file except in compliance with the License. You may obtain a copy of the -# License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. - -import threading -from obs import const -from obs import util -if const.IS_PYTHON2: - import Queue as queue -else: - import queue - - - -class ThreadPool(object): - - def __init__(self, thread_size=const.DEFAULT_TASK_NUM, queue_size=const.DEFAULT_TASK_QUEUE_SIZE): - self.thread_size = thread_size - self._alive_threads = 0 - self._task_queue = queue.Queue(queue_size) - self._threads = [] - self._init_threads() - self._shutdown_lock = threading.Lock() - - def _init_threads(self): - for i in range(self.thread_size): - self._alive_threads += 1 - work_thread = threading.Thread(target = self._run) - self._threads.append(work_thread) - work_thread.start() - - def _run(self): - task = self._task_queue.get() - while task is not None: - (func, args, kwargs, future) = task - - if future is None: - result = func(*args, **kwargs) - else: - try: - result = func(*args, **kwargs) - except Exception as e: - future.set_exception(e) - else: - future.set_result(result) - - del task - - task = self._task_queue.get() - - def execute(self, func, *args, **kwargs): - task = (func, args, kwargs, None) - self._task_queue.put(task) - - def submit(self, func, *args, **kwargs): - future = Future() - task = (func, args, kwargs, future) - self._task_queue.put(task) - return future - - def shutdown(self, wait=True): - with self._shutdown_lock: - while self._alive_threads: - self._task_queue.put(None) - self._alive_threads -= 1 - if wait: - for t in self._threads: - t.join() - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown(wait=True) - return False - - -class TimeoutError(Exception): - pass - -PENDING = 'PENDING' -COMPLETED = 'COMPLETED' - -class Future(object): - def __init__(self): - self._condition = threading.Condition() - self._state = PENDING - self._result = None - self._exception = None - self._callback = None - - def set_result(self, result): - with self._condition: - self._result = result - self._state = COMPLETED - self._condition.notify_all() - - if self._callback: - self._callback(self) - - def set_exception(self, exception): - with self._condition: - self._exception = exception - self._state = COMPLETED - self._condition.notify_all() - - if self._callback: - self._callback(self) - - - def set_callback(self, callback): - with self._condition: - if self._state is PENDING: - self._callback = callback - return - callback(self) - - def _get_result(self): - if self._exception: - raise self._exception - else: - return self._result - - def get_result(self, timeout=None): - with self._condition: - if self._state == COMPLETED: - return self._get_result() - - self._condition.wait(timeout) - - if self._state == COMPLETED: - return self._get_result() - else: - raise TimeoutError() - - def get_exception(self, timeout=None): - with self._condition: - if self._state == COMPLETED: - return self._exception - - self._condition.wait(timeout) - - if self._state == COMPLETED: - return self._exception - else: - raise TimeoutError() - -class ExecuteProgress(object): - def __init__(self): - self.successful_tasks = 0 - self._successful_lock = threading.Lock() - self.failed_tasks = 0 - self._failed_lock = threading.Lock() - self.finished_tasks = 0 - self._finished_lock = threading.Lock() - self.total_tasks = 0 - - def _successful_increment(self): - with self._successful_lock: - self.successful_tasks += 1 - return self.successful_tasks - - def _failed_increment(self): - with self._failed_lock: - self.failed_tasks += 1 - return self.failed_tasks - - def _finished_increment(self): - with self._finished_lock: - self.finished_tasks += 1 - return self.finished_tasks - - def get_successful_tasks(self): - with self._successful_lock: - return self.successful_tasks - - def get_failed_tasks(self): - with self._failed_lock: - return self.failed_tasks - - def get_finished_tasks(self): - with self._finished_lock: - return self.finished_tasks - - def get_total_tasks(self): - return self.total_tasks - -def _reportProgress(progress, interval, progressCallback): - finishedTasks = progress._finished_increment() - if finishedTasks % interval == 0 or finishedTasks == progress.get_total_tasks(): - successfulTasks = progress.get_successful_tasks() - failedTasks = progress.get_failed_tasks() - progressCallback(successfulTasks, failedTasks, progress.get_total_tasks()) - -def _checkBulkTasksPara(task_num, task_queue_size, task_interval, threshold): - origine = [task_num, task_queue_size, task_interval, threshold] - default = (const.DEFAULT_TASK_NUM, const.DEFAULT_TASK_QUEUE_SIZE, const.DEFAULT_BYTE_INTTERVAL, const.DEFAULT_MAXIMUM_SIZE) - size = len(origine) - for i in range(size): - origine[i] = util.to_int(origine[i]) - if origine[i] is None or origine[i] <= 0: - origine[i] = default[i] +#!/usr/bin/env python +# -*- coding:utf-8 -*- +# Copyright 2019 Huawei Technologies Co.,Ltd. +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + +import threading +from obs import const +from obs import util +if const.IS_PYTHON2: + import Queue as queue +else: + import queue + + + +class ThreadPool(object): + + def __init__(self, thread_size=const.DEFAULT_TASK_NUM, queue_size=const.DEFAULT_TASK_QUEUE_SIZE): + self.thread_size = thread_size + self._alive_threads = 0 + self._task_queue = queue.Queue(queue_size) + self._threads = [] + self._init_threads() + self._shutdown_lock = threading.Lock() + + def _init_threads(self): + for i in range(self.thread_size): + self._alive_threads += 1 + work_thread = threading.Thread(target = self._run) + self._threads.append(work_thread) + work_thread.start() + + def _run(self): + task = self._task_queue.get() + while task is not None: + (func, args, kwargs, future) = task + + if future is None: + result = func(*args, **kwargs) + else: + try: + result = func(*args, **kwargs) + except Exception as e: + future.set_exception(e) + else: + future.set_result(result) + + del task + + task = self._task_queue.get() + + def execute(self, func, *args, **kwargs): + task = (func, args, kwargs, None) + self._task_queue.put(task) + + def submit(self, func, *args, **kwargs): + future = Future() + task = (func, args, kwargs, future) + self._task_queue.put(task) + return future + + def shutdown(self, wait=True): + with self._shutdown_lock: + while self._alive_threads: + self._task_queue.put(None) + self._alive_threads -= 1 + if wait: + for t in self._threads: + t.join() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown(wait=True) + return False + + +class TimeoutError(Exception): + pass + +PENDING = 'PENDING' +COMPLETED = 'COMPLETED' + +class Future(object): + def __init__(self): + self._condition = threading.Condition() + self._state = PENDING + self._result = None + self._exception = None + self._callback = None + + def set_result(self, result): + with self._condition: + self._result = result + self._state = COMPLETED + self._condition.notify_all() + + if self._callback: + self._callback(self) + + def set_exception(self, exception): + with self._condition: + self._exception = exception + self._state = COMPLETED + self._condition.notify_all() + + if self._callback: + self._callback(self) + + + def set_callback(self, callback): + with self._condition: + if self._state is PENDING: + self._callback = callback + return + callback(self) + + def _get_result(self): + if self._exception: + raise self._exception + else: + return self._result + + def get_result(self, timeout=None): + with self._condition: + if self._state == COMPLETED: + return self._get_result() + + self._condition.wait(timeout) + + if self._state == COMPLETED: + return self._get_result() + else: + raise TimeoutError() + + def get_exception(self, timeout=None): + with self._condition: + if self._state == COMPLETED: + return self._exception + + self._condition.wait(timeout) + + if self._state == COMPLETED: + return self._exception + else: + raise TimeoutError() + +class ExecuteProgress(object): + def __init__(self): + self.successful_tasks = 0 + self._successful_lock = threading.Lock() + self.failed_tasks = 0 + self._failed_lock = threading.Lock() + self.finished_tasks = 0 + self._finished_lock = threading.Lock() + self.total_tasks = 0 + + def _successful_increment(self): + with self._successful_lock: + self.successful_tasks += 1 + return self.successful_tasks + + def _failed_increment(self): + with self._failed_lock: + self.failed_tasks += 1 + return self.failed_tasks + + def _finished_increment(self): + with self._finished_lock: + self.finished_tasks += 1 + return self.finished_tasks + + def get_successful_tasks(self): + with self._successful_lock: + return self.successful_tasks + + def get_failed_tasks(self): + with self._failed_lock: + return self.failed_tasks + + def get_finished_tasks(self): + with self._finished_lock: + return self.finished_tasks + + def get_total_tasks(self): + return self.total_tasks + +def _reportProgress(progress, interval, progressCallback): + finishedTasks = progress._finished_increment() + if finishedTasks % interval == 0 or finishedTasks == progress.get_total_tasks(): + successfulTasks = progress.get_successful_tasks() + failedTasks = progress.get_failed_tasks() + progressCallback(successfulTasks, failedTasks, progress.get_total_tasks()) + +def _checkBulkTasksPara(task_num, task_queue_size, task_interval, threshold): + origine = [task_num, task_queue_size, task_interval, threshold] + default = (const.DEFAULT_TASK_NUM, const.DEFAULT_TASK_QUEUE_SIZE, const.DEFAULT_BYTE_INTTERVAL, const.DEFAULT_MAXIMUM_SIZE) + size = len(origine) + for i in range(size): + origine[i] = util.to_int(origine[i]) + if origine[i] is None or origine[i] <= 0: + origine[i] = default[i] return tuple(origine) \ No newline at end of file diff --git a/src/obs/cache.py b/src/obs/cache.py index ce368e9..6150b4f 100644 --- a/src/obs/cache.py +++ b/src/obs/cache.py @@ -1,43 +1,43 @@ -#!/usr/bin/env python -# -*- coding:utf-8 -*- -# Copyright 2019 Huawei Technologies Co.,Ltd. -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use -# this file except in compliance with the License. You may obtain a copy of the -# License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. - -import weakref, collections -import time - - -class _LocalCacheThread(object): - class Dict(dict): - def __del__(self): - pass - - def __init__(self, maxlen=10): - self.weak = weakref.WeakValueDictionary() - self.strong = collections.deque(maxlen=maxlen) - - @staticmethod - def nowTime(): - return int(time.time()) - - def get(self, key): - value = self.weak.get(key) - if value is not None and hasattr(value, 'expire') and self.nowTime() > value['expire']: - value = None - return value - - def set(self, key, value): - self.weak[key] = strongRef = self.Dict(value) - self.strong.append(strongRef) - - +#!/usr/bin/env python +# -*- coding:utf-8 -*- +# Copyright 2019 Huawei Technologies Co.,Ltd. +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + +import weakref, collections +import time + + +class _LocalCacheThread(object): + class Dict(dict): + def __del__(self): + pass + + def __init__(self, maxlen=10): + self.weak = weakref.WeakValueDictionary() + self.strong = collections.deque(maxlen=maxlen) + + @staticmethod + def nowTime(): + return int(time.time()) + + def get(self, key): + value = self.weak.get(key) + if value is not None and hasattr(value, 'expire') and self.nowTime() > value['expire']: + value = None + return value + + def set(self, key, value): + self.weak[key] = strongRef = self.Dict(value) + self.strong.append(strongRef) + + LocalCache = _LocalCacheThread \ No newline at end of file diff --git a/src/obs/client.py b/src/obs/client.py index 88483f7..d51b88c 100644 --- a/src/obs/client.py +++ b/src/obs/client.py @@ -58,7 +58,7 @@ def __init__(self, msg, location, result=None): self.msg = msg self.location = location self.result = result - + def __str__(self): return self.msg @@ -349,23 +349,23 @@ def _generate_object_url(self, ret, bucketName, objectKey): if ret and ret.status < 300 and ret.body: ret.body.objectUrl = self.calling_format.get_full_url(self.is_secure, self.server, self.port, bucketName, objectKey, {}) - def _make_options_request(self, bucketName, objectKey=None, pathArgs=None, headers=None, methodName=None): - return self._make_request_with_retry(const.HTTP_METHOD_OPTIONS, bucketName, objectKey, pathArgs, headers, methodName=methodName) + def _make_options_request(self, bucketName, objectKey=None, pathArgs=None, headers=None, methodName=None, extensionHeaders=None): + return self._make_request_with_retry(const.HTTP_METHOD_OPTIONS, bucketName, objectKey, pathArgs, headers, methodName=methodName, extensionHeaders=extensionHeaders) - def _make_head_request(self, bucketName, objectKey=None, pathArgs=None, headers=None, methodName=None, skipAuthentication=False): - return self._make_request_with_retry(const.HTTP_METHOD_HEAD, bucketName, objectKey, pathArgs, headers, methodName=methodName, skipAuthentication=skipAuthentication) + def _make_head_request(self, bucketName, objectKey=None, pathArgs=None, headers=None, methodName=None, skipAuthentication=False, extensionHeaders=None): + return self._make_request_with_retry(const.HTTP_METHOD_HEAD, bucketName, objectKey, pathArgs, headers, methodName=methodName, skipAuthentication=skipAuthentication, extensionHeaders=extensionHeaders) - def _make_get_request(self, bucketName='', objectKey=None, pathArgs=None, headers=None, methodName=None, parseMethod=None, readable=False): - return self._make_request_with_retry(const.HTTP_METHOD_GET, bucketName, objectKey, pathArgs, headers, methodName=methodName, parseMethod=parseMethod, readable=readable) + def _make_get_request(self, bucketName='', objectKey=None, pathArgs=None, headers=None, methodName=None, parseMethod=None, readable=False, extensionHeaders=None): + return self._make_request_with_retry(const.HTTP_METHOD_GET, bucketName, objectKey, pathArgs, headers, methodName=methodName, parseMethod=parseMethod, readable=readable, extensionHeaders=extensionHeaders) - def _make_delete_request(self, bucketName, objectKey=None, pathArgs=None, headers=None, entity=None, methodName=None): - return self._make_request_with_retry(const.HTTP_METHOD_DELETE, bucketName, objectKey, pathArgs, headers, entity, methodName=methodName) + def _make_delete_request(self, bucketName, objectKey=None, pathArgs=None, headers=None, entity=None, methodName=None, extensionHeaders=None): + return self._make_request_with_retry(const.HTTP_METHOD_DELETE, bucketName, objectKey, pathArgs, headers, entity, methodName=methodName, extensionHeaders=extensionHeaders) - def _make_post_request(self, bucketName, objectKey=None, pathArgs=None, headers=None, entity=None, chunkedMode=False, methodName=None, readable=False): - return self._make_request_with_retry(const.HTTP_METHOD_POST, bucketName, objectKey, pathArgs, headers, entity, chunkedMode, methodName=methodName, readable=readable) + def _make_post_request(self, bucketName, objectKey=None, pathArgs=None, headers=None, entity=None, chunkedMode=False, methodName=None, readable=False, extensionHeaders=None): + return self._make_request_with_retry(const.HTTP_METHOD_POST, bucketName, objectKey, pathArgs, headers, entity, chunkedMode, methodName=methodName, readable=readable, extensionHeaders=extensionHeaders) - def _make_put_request(self, bucketName, objectKey=None, pathArgs=None, headers=None, entity=None, chunkedMode=False, methodName=None, readable=False): - return self._make_request_with_retry(const.HTTP_METHOD_PUT, bucketName, objectKey, pathArgs, headers, entity, chunkedMode, methodName=methodName, readable=readable) + def _make_put_request(self, bucketName, objectKey=None, pathArgs=None, headers=None, entity=None, chunkedMode=False, methodName=None, readable=False, extensionHeaders=None): + return self._make_request_with_retry(const.HTTP_METHOD_PUT, bucketName, objectKey, pathArgs, headers, entity, chunkedMode, methodName=methodName, readable=readable, extensionHeaders=extensionHeaders) def _make_error_result(self, e, ret): self.log_client.log(ERROR, 'request error, %s' % e) @@ -375,7 +375,7 @@ def _make_error_result(self, e, ret): raise e def _make_request_with_retry(self, methodType, bucketName, objectKey=None, pathArgs=None, headers=None, - entity=None, chunkedMode=False, methodName=None, readable=False, parseMethod=None, redirectLocation=None, skipAuthentication=False): + entity=None, chunkedMode=False, methodName=None, readable=False, parseMethod=None, redirectLocation=None, skipAuthentication=False, extensionHeaders=None): flag = 0 redirect_count = 0 conn = None @@ -383,7 +383,7 @@ def _make_request_with_retry(self, methodType, bucketName, objectKey=None, pathA redirectFlag = False while True: try: - conn = self._make_request_internal(methodType, bucketName, objectKey, pathArgs, headers, entity, chunkedMode, _redirectLocation, skipAuthentication=skipAuthentication, redirectFlag=redirectFlag) + conn = self._make_request_internal(methodType, bucketName, objectKey, pathArgs, headers, entity, chunkedMode, _redirectLocation, skipAuthentication=skipAuthentication, redirectFlag=redirectFlag, extensionHeaders=extensionHeaders) return self._parse_xml(conn, methodName, readable) if not parseMethod else parseMethod(conn) except Exception as e: ret = None @@ -399,8 +399,6 @@ def _make_request_with_retry(self, methodType, bucketName, objectKey=None, pathA ret = e.result if methodType == const.HTTP_METHOD_GET and e.result.status == 302: redirectFlag = True - else: - redirectFlag = False if redirect_count >= self.max_redirect_count: self.log_client.log(ERROR, 'request redirect count [%d] greater than max redirect count [%d]' % ( redirect_count, self.max_redirect_count)) @@ -414,7 +412,7 @@ def _make_request_with_retry(self, methodType, bucketName, objectKey=None, pathA break def _make_request_internal(self, method, bucketName='', objectKey=None, pathArgs=None, headers=None, entity=None, - chunkedMode=False, redirectLocation=None, skipAuthentication=False, redirectFlag=False): + chunkedMode=False, redirectLocation=None, skipAuthentication=False, redirectFlag=False, extensionHeaders=None): objectKey = util.safe_encode(objectKey) if objectKey is None: objectKey = '' @@ -431,7 +429,7 @@ def _make_request_internal(self, method, bucketName='', objectKey=None, pathArgs query = redirectLocation.query path = _path + '?' + query if query else _path skipAuthentication = True - if not redirectFlag and not path: + if not redirectFlag: skipAuthentication = False else: @@ -445,6 +443,15 @@ def _make_request_internal(self, method, bucketName='', objectKey=None, pathArgs if not path: path = self.calling_format.get_url(bucketName, objectKey, pathArgs) + extension_headers = self.convertor.trans_get_extension_headers(extensionHeaders) + if len(extension_headers) > 0: + if headers is None or not isinstance(headers, dict): + headers = {} + else: + headers = headers.copy() + for key, value in extension_headers.items(): + headers[key] = value + headers = self._rename_request_headers(headers, method) if entity is not None and not callable(entity): @@ -1109,178 +1116,185 @@ def _getApiVersion(self, bucketName=''): return const.V2_SIGNATURE, res @funcCache - def listBuckets(self, isQueryLocation=True): + def listBuckets(self, isQueryLocation=True, extensionHeaders=None): if self.is_cname: raise Exception('listBuckets is not allowed in customdomain mode') - return self._make_get_request(methodName='listBuckets', **self.convertor.trans_list_buckets(isQueryLocation=isQueryLocation)) + return self._make_get_request(methodName='listBuckets', extensionHeaders=extensionHeaders, **self.convertor.trans_list_buckets(isQueryLocation=isQueryLocation)) @funcCache - def createBucket(self, bucketName, header=CreateBucketHeader(), location=None): + def createBucket(self, bucketName, header=CreateBucketHeader(), location=None, extensionHeaders=None): if self.is_cname: raise Exception('createBucket is not allowed in customdomain mode') - res = self._make_put_request(bucketName, **self.convertor.trans_create_bucket(header=header, location=location)) + res = self._make_put_request(bucketName, extensionHeaders=extensionHeaders, **self.convertor.trans_create_bucket(header=header, location=location)) try: if self.is_signature_negotiation and res.status == 400 and res.errorMessage == 'Unsupported Authorization Type' and self.thread_local.signature == const.OBS_SIGNATURE: self.thread_local.signature = const.V2_SIGNATURE - res = self._make_put_request(bucketName, **self.convertor.trans_create_bucket(header=header, location=location)) + res = self._make_put_request(bucketName, extensionHeaders=extensionHeaders, **self.convertor.trans_create_bucket(header=header, location=location)) finally: return res @funcCache - def listObjects(self, bucketName, prefix=None, marker=None, max_keys=None, delimiter=None): - return self._make_get_request(bucketName, methodName='listObjects', + def listObjects(self, bucketName, prefix=None, marker=None, max_keys=None, delimiter=None, extensionHeaders=None): + return self._make_get_request(bucketName, methodName='listObjects', extensionHeaders=extensionHeaders, **self.convertor.trans_list_objects(prefix=prefix, marker=marker, max_keys=max_keys, delimiter=delimiter)) @funcCache - def headBucket(self, bucketName): - return self._make_head_request(bucketName) + def headBucket(self, bucketName, extensionHeaders=None): + return self._make_head_request(bucketName, extensionHeaders=extensionHeaders) @funcCache - def getBucketMetadata(self, bucketName, origin=None, requestHeaders=None): - return self._make_head_request(bucketName, methodName='getBucketMetadata', **self.convertor.trans_get_bucket_metadata(origin=origin, requestHeaders=requestHeaders)) + def headObject(self, bucketName, objectKey, versionId=None, extensionHeaders=None): + pathArgs = {} + if versionId: + pathArgs[const.VERSION_ID_PARAM] = util.to_string(versionId) + return self._make_head_request(bucketName, objectKey, pathArgs=pathArgs, extensionHeaders=extensionHeaders) @funcCache - def getBucketLocation(self, bucketName): - return self._make_get_request(bucketName, pathArgs={'location':None}, methodName='getBucketLocation') + def getBucketMetadata(self, bucketName, origin=None, requestHeaders=None, extensionHeaders=None): + return self._make_head_request(bucketName, methodName='getBucketMetadata', extensionHeaders=extensionHeaders, **self.convertor.trans_get_bucket_metadata(origin=origin, requestHeaders=requestHeaders)) @funcCache - def deleteBucket(self, bucketName): - return self._make_delete_request(bucketName) + def getBucketLocation(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, pathArgs={'location':None}, methodName='getBucketLocation', extensionHeaders=extensionHeaders) @funcCache - def setBucketQuota(self, bucketName, quota): + def deleteBucket(self, bucketName, extensionHeaders=None): + return self._make_delete_request(bucketName, extensionHeaders=extensionHeaders) + + @funcCache + def setBucketQuota(self, bucketName, quota, extensionHeaders=None): self._assert_not_null(quota, 'quota is empty') - return self._make_put_request(bucketName, pathArgs={'quota': None}, entity=self.convertor.trans_quota(quota)) + return self._make_put_request(bucketName, pathArgs={'quota': None}, entity=self.convertor.trans_quota(quota), extensionHeaders=extensionHeaders) @funcCache - def getBucketQuota(self, bucketName): - return self._make_get_request(bucketName, pathArgs={'quota': None}, methodName='getBucketQuota') + def getBucketQuota(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, pathArgs={'quota': None}, methodName='getBucketQuota', extensionHeaders=extensionHeaders) @funcCache - def getBucketStorageInfo(self, bucketName): - return self._make_get_request(bucketName, pathArgs={'storageinfo': None}, methodName='getBucketStorageInfo') + def getBucketStorageInfo(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, pathArgs={'storageinfo': None}, methodName='getBucketStorageInfo', extensionHeaders=extensionHeaders) @funcCache - def setBucketAcl(self, bucketName, acl=ACL(), aclControl=None): + def setBucketAcl(self, bucketName, acl=ACL(), aclControl=None, extensionHeaders=None): if acl is not None and len(acl) > 0 and aclControl is not None: raise Exception('Both acl and aclControl are set') if not acl and not aclControl: raise Exception('Both acl and aclControl are not set') - return self._make_put_request(bucketName, **self.convertor.trans_set_bucket_acl(acl=acl, aclControl=aclControl)) + return self._make_put_request(bucketName, extensionHeaders=extensionHeaders, **self.convertor.trans_set_bucket_acl(acl=acl, aclControl=aclControl)) @funcCache - def getBucketAcl(self, bucketName): - return self._make_get_request(bucketName, pathArgs={'acl': None}, methodName='getBucketAcl') + def getBucketAcl(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, pathArgs={'acl': None}, methodName='getBucketAcl', extensionHeaders=extensionHeaders) @funcCache - def setBucketPolicy(self, bucketName, policyJSON): + def setBucketPolicy(self, bucketName, policyJSON, extensionHeaders=None): self._assert_not_null(policyJSON, 'policyJSON is empty') - return self._make_put_request(bucketName, pathArgs={'policy' : None}, entity=policyJSON) + return self._make_put_request(bucketName, pathArgs={'policy' : None}, entity=policyJSON, extensionHeaders=extensionHeaders) @funcCache - def getBucketPolicy(self, bucketName): - return self._make_get_request(bucketName, pathArgs={'policy' : None}, methodName='getBucketPolicy') + def getBucketPolicy(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, pathArgs={'policy' : None}, methodName='getBucketPolicy', extensionHeaders=extensionHeaders) @funcCache - def deleteBucketPolicy(self, bucketName): - return self._make_delete_request(bucketName, pathArgs={'policy' : None}) + def deleteBucketPolicy(self, bucketName, extensionHeaders=None): + return self._make_delete_request(bucketName, pathArgs={'policy' : None}, extensionHeaders=extensionHeaders) @funcCache - def setBucketVersioning(self, bucketName, status): + def setBucketVersioning(self, bucketName, status, extensionHeaders=None): self._assert_not_null(status, 'status is empty') - return self._make_put_request(bucketName, pathArgs={'versioning' : None}, entity=self.convertor.trans_version_status(status)) + return self._make_put_request(bucketName, pathArgs={'versioning' : None}, entity=self.convertor.trans_version_status(status), extensionHeaders=extensionHeaders) @funcCache - def getBucketVersioning(self, bucketName): - return self._make_get_request(bucketName, pathArgs={'versioning' : None}, methodName='getBucketVersioning') + def getBucketVersioning(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, pathArgs={'versioning' : None}, methodName='getBucketVersioning', extensionHeaders=extensionHeaders) @funcCache - def listVersions(self, bucketName, version=Versions()): - return self._make_get_request(bucketName, methodName='listVersions', **self.convertor.trans_list_versions(version=version)) + def listVersions(self, bucketName, version=Versions(), extensionHeaders=None): + return self._make_get_request(bucketName, methodName='listVersions', extensionHeaders=extensionHeaders, **self.convertor.trans_list_versions(version=version)) @funcCache - def listMultipartUploads(self, bucketName, multipart=ListMultipartUploadsRequest()): - return self._make_get_request(bucketName, methodName='listMultipartUploads', **self.convertor.trans_list_multipart_uploads(multipart=multipart)) + def listMultipartUploads(self, bucketName, multipart=ListMultipartUploadsRequest(), extensionHeaders=None): + return self._make_get_request(bucketName, methodName='listMultipartUploads', extensionHeaders=extensionHeaders, **self.convertor.trans_list_multipart_uploads(multipart=multipart)) @funcCache - def deleteBucketLifecycle(self, bucketName): - return self._make_delete_request(bucketName, pathArgs={'lifecycle':None}) + def deleteBucketLifecycle(self, bucketName, extensionHeaders=None): + return self._make_delete_request(bucketName, pathArgs={'lifecycle':None}, extensionHeaders=extensionHeaders) @funcCache - def setBucketLifecycle(self, bucketName, lifecycle): + def setBucketLifecycle(self, bucketName, lifecycle, extensionHeaders=None): self._assert_not_null(lifecycle, 'lifecycle is empty') - return self._make_put_request(bucketName, **self.convertor.trans_set_bucket_lifecycle(lifecycle=lifecycle)) + return self._make_put_request(bucketName, extensionHeaders=extensionHeaders, **self.convertor.trans_set_bucket_lifecycle(lifecycle=lifecycle)) @funcCache - def getBucketLifecycle(self, bucketName): - return self._make_get_request(bucketName, pathArgs={'lifecycle':None}, methodName='getBucketLifecycle') + def getBucketLifecycle(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, pathArgs={'lifecycle':None}, methodName='getBucketLifecycle', extensionHeaders=extensionHeaders) @funcCache - def deleteBucketWebsite(self, bucketName): - return self._make_delete_request(bucketName, pathArgs={'website':None}) + def deleteBucketWebsite(self, bucketName, extensionHeaders=None): + return self._make_delete_request(bucketName, pathArgs={'website':None}, extensionHeaders=extensionHeaders) @funcCache - def setBucketWebsite(self, bucketName, website): + def setBucketWebsite(self, bucketName, website, extensionHeaders=None): self._assert_not_null(website, 'website is empty') - return self._make_put_request(bucketName, pathArgs={'website':None}, entity=self.convertor.trans_website(website)) + return self._make_put_request(bucketName, pathArgs={'website':None}, entity=self.convertor.trans_website(website), extensionHeaders=extensionHeaders) @funcCache - def getBucketWebsite(self, bucketName): - return self._make_get_request(bucketName, pathArgs={'website':None}, methodName='getBucketWebsite') + def getBucketWebsite(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, pathArgs={'website':None}, methodName='getBucketWebsite', extensionHeaders=extensionHeaders) @funcCache - def setBucketLogging(self, bucketName, logstatus=Logging()): + def setBucketLogging(self, bucketName, logstatus=Logging(), extensionHeaders=None): if logstatus is None: logstatus = Logging() - return self._make_put_request(bucketName, pathArgs={'logging':None}, entity=self.convertor.trans_logging(logstatus)) + return self._make_put_request(bucketName, pathArgs={'logging':None}, entity=self.convertor.trans_logging(logstatus), extensionHeaders=extensionHeaders) @funcCache - def getBucketLogging(self, bucketName): - return self._make_get_request(bucketName, pathArgs={'logging':None}, methodName='getBucketLogging') + def getBucketLogging(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, pathArgs={'logging':None}, methodName='getBucketLogging', extensionHeaders=extensionHeaders) @funcCache - def getBucketTagging(self, bucketName): - return self._make_get_request(bucketName, pathArgs={'tagging' : None}, methodName='getBucketTagging') + def getBucketTagging(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, pathArgs={'tagging' : None}, methodName='getBucketTagging', extensionHeaders=extensionHeaders) @funcCache - def setBucketTagging(self, bucketName, tagInfo): + def setBucketTagging(self, bucketName, tagInfo, extensionHeaders=None): self._assert_not_null(tagInfo, 'tagInfo is empty') - return self._make_put_request(bucketName, **self.convertor.trans_set_bucket_tagging(tagInfo=tagInfo)) + return self._make_put_request(bucketName, extensionHeaders=extensionHeaders, **self.convertor.trans_set_bucket_tagging(tagInfo=tagInfo)) @funcCache - def deleteBucketTagging(self, bucketName): - return self._make_delete_request(bucketName, pathArgs={'tagging' : None}) + def deleteBucketTagging(self, bucketName, extensionHeaders=None): + return self._make_delete_request(bucketName, pathArgs={'tagging' : None}, extensionHeaders=extensionHeaders) @funcCache - def setBucketCors(self, bucketName, corsRuleList): + def setBucketCors(self, bucketName, corsRuleList, extensionHeaders=None): self._assert_not_null(corsRuleList, 'corsRuleList is empty') - return self._make_put_request(bucketName, **self.convertor.trans_set_bucket_cors(corsRuleList=corsRuleList)) + return self._make_put_request(bucketName, extensionHeaders=extensionHeaders, **self.convertor.trans_set_bucket_cors(corsRuleList=corsRuleList)) @funcCache - def deleteBucketCors(self, bucketName): - return self._make_delete_request(bucketName, pathArgs={'cors' : None}) + def deleteBucketCors(self, bucketName, extensionHeaders=None): + return self._make_delete_request(bucketName, pathArgs={'cors' : None}, extensionHeaders=extensionHeaders) @funcCache - def getBucketCors(self, bucketName): - return self._make_get_request(bucketName, pathArgs={'cors': None}, methodName='getBucketCors') + def getBucketCors(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, pathArgs={'cors': None}, methodName='getBucketCors', extensionHeaders=extensionHeaders) @funcCache - def optionsBucket(self, bucketName, option): - return self.optionsObject(bucketName, None, option=option) + def optionsBucket(self, bucketName, option, extensionHeaders=None): + return self.optionsObject(bucketName, None, option=option, extensionHeaders=extensionHeaders) @funcCache - def setBucketNotification(self, bucketName, notification=Notification()): + def setBucketNotification(self, bucketName, notification=Notification(), extensionHeaders=None): if notification is None: notification = Notification() - return self._make_put_request(bucketName, pathArgs={'notification': None}, entity=self.convertor.trans_notification(notification)) + return self._make_put_request(bucketName, pathArgs={'notification': None}, entity=self.convertor.trans_notification(notification), extensionHeaders=extensionHeaders) @funcCache - def getBucketNotification(self, bucketName): - return self._make_get_request(bucketName, pathArgs={'notification': None}, methodName='getBucketNotification') + def getBucketNotification(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, pathArgs={'notification': None}, methodName='getBucketNotification', extensionHeaders=extensionHeaders) @funcCache - def optionsObject(self, bucketName, objectKey, option): + def optionsObject(self, bucketName, objectKey, option, extensionHeaders=None): headers = {} if option is not None: if option.get('origin') is not None: @@ -1289,10 +1303,10 @@ def optionsObject(self, bucketName, objectKey, option): headers[const.ACCESS_CONTROL_REQUEST_METHOD_HEADER] = option['accessControlRequestMethods'] if option.get('accessControlRequestHeaders') is not None: headers[const.ACCESS_CONTROL_REQUEST_HEADERS_HEADER] = option['accessControlRequestHeaders'] - return self._make_options_request(bucketName, objectKey, headers=headers, methodName='optionsBucket') + return self._make_options_request(bucketName, objectKey, headers=headers, methodName='optionsBucket', extensionHeaders=extensionHeaders) @funcCache - def getObjectMetadata(self, bucketName, objectKey, versionId=None, sseHeader=None, origin=None, requestHeaders=None): + def getObjectMetadata(self, bucketName, objectKey, versionId=None, sseHeader=None, origin=None, requestHeaders=None, extensionHeaders=None): pathArgs = {} if versionId: pathArgs[const.VERSION_ID_PARAM] = util.to_string(versionId) @@ -1303,39 +1317,39 @@ def getObjectMetadata(self, bucketName, objectKey, versionId=None, sseHeader=Non if _requestHeaders: headers[const.ACCESS_CONTROL_REQUEST_HEADERS_HEADER] = util.to_string(_requestHeaders) return self._make_head_request(bucketName, objectKey, pathArgs=pathArgs, - headers=self.convertor._set_sse_header(sseHeader, headers=headers, onlySseCHeader=True), methodName='getObjectMetadata') + headers=self.convertor._set_sse_header(sseHeader, headers=headers, onlySseCHeader=True), methodName='getObjectMetadata', extensionHeaders=extensionHeaders) @funcCache - def setObjectMetadata(self, bucketName, objectKey, metadata=None, headers=None, versionId=None): + def setObjectMetadata(self, bucketName, objectKey, metadata=None, headers=None, versionId=None, extensionHeaders=None): if headers is None: headers = SetObjectMetadataHeader() - return self._make_put_request(bucketName, objectKey, methodName='setObjectMetadata', **self.convertor.trans_set_object_metadata(metadata=metadata, headers=headers, versionId=versionId)) + return self._make_put_request(bucketName, objectKey, methodName='setObjectMetadata', extensionHeaders=extensionHeaders, **self.convertor.trans_set_object_metadata(metadata=metadata, headers=headers, versionId=versionId)) @funcCache def getObject(self, bucketName, objectKey, downloadPath=None, getObjectRequest=GetObjectRequest(), - headers=GetObjectHeader(), loadStreamInMemory=False, progressCallback=None): + headers=GetObjectHeader(), loadStreamInMemory=False, progressCallback=None, extensionHeaders=None): _parse_content = self._parse_content CHUNKSIZE = self.chunk_size readable = False if progressCallback is None else True def parseMethod(conn): return _parse_content(conn, objectKey, downloadPath, CHUNKSIZE, loadStreamInMemory, progressCallback) - return self._make_get_request(bucketName, objectKey, parseMethod=parseMethod, readable=readable, **self.convertor.trans_get_object(getObjectRequest=getObjectRequest, headers=headers)) + return self._make_get_request(bucketName, objectKey, parseMethod=parseMethod, readable=readable, extensionHeaders=extensionHeaders, **self.convertor.trans_get_object(getObjectRequest=getObjectRequest, headers=headers)) @funcCache def _getObjectWithNotifier(self, bucketName, objectKey, getObjectRequest=GetObjectRequest(), - headers=GetObjectHeader(), downloadPath=None, notifier=None): + headers=GetObjectHeader(), downloadPath=None, notifier=None, extensionHeaders=None): _parse_content_with_notifier = self._parse_content_with_notifier CHUNKSIZE = self.chunk_size readable = False if notifier is None else True def parseMethod(conn): return _parse_content_with_notifier(conn, objectKey, CHUNKSIZE, downloadPath, notifier) - return self._make_get_request(bucketName, objectKey, parseMethod=parseMethod, readable=readable, **self.convertor.trans_get_object(getObjectRequest=getObjectRequest, headers=headers)) + return self._make_get_request(bucketName, objectKey, parseMethod=parseMethod, readable=readable, extensionHeaders=extensionHeaders, **self.convertor.trans_get_object(getObjectRequest=getObjectRequest, headers=headers)) @funcCache - def appendObject(self, bucketName, objectKey, content=None, metadata=None, headers=None, progressCallback=None, autoClose=True): + def appendObject(self, bucketName, objectKey, content=None, metadata=None, headers=None, progressCallback=None, autoClose=True, extensionHeaders=None): objectKey = util.safe_encode(objectKey) if objectKey is None: objectKey = '' @@ -1406,7 +1420,7 @@ def appendObject(self, bucketName, objectKey, content=None, metadata=None, heade if notifier is not None: notifier.start() ret = self._make_post_request(bucketName, objectKey, pathArgs={'append': None, 'position': util.to_string(content['position']) if content.get('position') is not None else 0}, - headers=headers, entity=entity, chunkedMode=chunkedMode, methodName='appendObject', readable=readable) + headers=headers, entity=entity, chunkedMode=chunkedMode, methodName='appendObject', readable=readable, extensionHeaders=extensionHeaders) finally: if notifier is not None: notifier.end() @@ -1414,7 +1428,7 @@ def appendObject(self, bucketName, objectKey, content=None, metadata=None, heade return ret @funcCache - def putContent(self, bucketName, objectKey, content=None, metadata=None, headers=None, progressCallback=None, autoClose=True): + def putContent(self, bucketName, objectKey, content=None, metadata=None, headers=None, progressCallback=None, autoClose=True, extensionHeaders=None): objectKey = util.safe_encode(objectKey) if objectKey is None: objectKey = '' @@ -1444,18 +1458,18 @@ def putContent(self, bucketName, objectKey, content=None, metadata=None, headers entity = util.get_readable_entity_by_totalcount(entity, totalCount, self.chunk_size, notifier, autoClose) notifier.start() - ret = self._make_put_request(bucketName, objectKey, headers=_headers, entity=entity, chunkedMode=chunkedMode, methodName='putContent', readable=readable) + ret = self._make_put_request(bucketName, objectKey, headers=_headers, entity=entity, chunkedMode=chunkedMode, methodName='putContent', readable=readable, extensionHeaders=extensionHeaders) finally: if notifier is not None: notifier.end() self._generate_object_url(ret, bucketName, objectKey) return ret - def putObject(self, bucketName, objectKey, content, metadata=None, headers=None, progressCallback=None, autoClose=True): - return self.putContent(bucketName, objectKey, content, metadata, headers, progressCallback, autoClose) + def putObject(self, bucketName, objectKey, content, metadata=None, headers=None, progressCallback=None, autoClose=True, extensionHeaders=None): + return self.putContent(bucketName, objectKey, content, metadata, headers, progressCallback, autoClose, extensionHeaders=extensionHeaders) @funcCache - def putFile(self, bucketName, objectKey, file_path, metadata=None, headers=None, progressCallback=None): + def putFile(self, bucketName, objectKey, file_path, metadata=None, headers=None, progressCallback=None, extensionHeaders=None): file_path = util.safe_encode(file_path) if not os.path.exists(file_path): file_path = util.safe_trans_to_gb2312(file_path) @@ -1480,7 +1494,7 @@ def putFile(self, bucketName, objectKey, file_path, metadata=None, headers=None, key = util.safe_trans_to_gb2312('{0}/'.format(os.path.split(file_path)[1]) + f) else: key = '{0}/'.format(objectKey) + util.safe_trans_to_gb2312(f) - result = self.putFile(bucketName, key, __file_path, metadata, headers) + result = self.putFile(bucketName, key, __file_path, metadata, headers, extensionHeaders=extensionHeaders) results.append((key, result)) return results @@ -1514,15 +1528,15 @@ def putFile(self, bucketName, objectKey, file_path, metadata=None, headers=None, entity = util.get_file_entity_by_totalcount(file_path, totalCount, self.chunk_size, notifier) try: notifier.start() - ret = self._make_put_request(bucketName, objectKey, headers=_headers, entity=entity, methodName='putContent', readable=readable) + ret = self._make_put_request(bucketName, objectKey, headers=_headers, entity=entity, methodName='putContent', readable=readable, extensionHeaders=extensionHeaders) finally: notifier.end() self._generate_object_url(ret, bucketName, objectKey) return ret - + @funcCache def uploadPart(self, bucketName, objectKey, partNumber, uploadId, object=None, isFile=False, partSize=None, - offset=0, sseHeader=None, isAttachMd5=False, md5=None, content=None, progressCallback=None, autoClose=True): + offset=0, sseHeader=None, isAttachMd5=False, md5=None, content=None, progressCallback=None, autoClose=True, extensionHeaders=None): self._assert_not_null(partNumber, 'partNumber is empty') self._assert_not_null(uploadId, 'uploadId is empty') @@ -1592,7 +1606,7 @@ def uploadPart(self, bucketName, objectKey, partNumber, uploadId, object=None, i if notifier is not None: notifier.start() ret = self._make_put_request(bucketName, objectKey, pathArgs={'partNumber': partNumber, 'uploadId': uploadId}, - headers=headers, entity=entity, chunkedMode=chunkedMode, methodName='uploadPart', readable=readable) + headers=headers, entity=entity, chunkedMode=chunkedMode, methodName='uploadPart', readable=readable, extensionHeaders=extensionHeaders) finally: if notifier is not None: notifier.end() @@ -1600,7 +1614,7 @@ def uploadPart(self, bucketName, objectKey, partNumber, uploadId, object=None, i @funcCache def _uploadPartWithNotifier(self, bucketName, objectKey, partNumber, uploadId, content=None, isFile=False, partSize=None, - offset=0, sseHeader=None, isAttachMd5=False, md5=None, notifier=None): + offset=0, sseHeader=None, isAttachMd5=False, md5=None, notifier=None, extensionHeaders=None): self._assert_not_null(partNumber, 'partNumber is empty') self._assert_not_null(uploadId, 'uploadId is empty') @@ -1656,11 +1670,11 @@ def _uploadPartWithNotifier(self, bucketName, objectKey, partNumber, uploadId, c self.convertor._set_sse_header(sseHeader, headers, True) ret = self._make_put_request(bucketName, objectKey, pathArgs={'partNumber': partNumber, 'uploadId': uploadId}, - headers=headers, entity=entity, chunkedMode=chunkedMode, methodName='uploadPart', readable=readable) + headers=headers, entity=entity, chunkedMode=chunkedMode, methodName='uploadPart', readable=readable, extensionHeaders=extensionHeaders) return ret @funcCache - def copyObject(self, sourceBucketName, sourceObjectKey, destBucketName, destObjectKey, metadata=None, headers=None, versionId=None): + def copyObject(self, sourceBucketName, sourceObjectKey, destBucketName, destObjectKey, metadata=None, headers=None, versionId=None, extensionHeaders=None): self._assert_not_null(sourceBucketName, 'sourceBucketName is empty') sourceObjectKey = util.safe_encode(sourceObjectKey) if sourceObjectKey is None: @@ -1673,46 +1687,46 @@ def copyObject(self, sourceBucketName, sourceObjectKey, destBucketName, destObje headers = CopyObjectHeader() return self._make_put_request(destBucketName, destObjectKey, - methodName='copyObject', **self.convertor.trans_copy_object(metadata=metadata, headers=headers, versionId=versionId, + methodName='copyObject', extensionHeaders=extensionHeaders, **self.convertor.trans_copy_object(metadata=metadata, headers=headers, versionId=versionId, sourceBucketName=sourceBucketName, sourceObjectKey=sourceObjectKey)) @funcCache - def setObjectAcl(self, bucketName, objectKey, acl=ACL(), versionId=None, aclControl=None): + def setObjectAcl(self, bucketName, objectKey, acl=ACL(), versionId=None, aclControl=None, extensionHeaders=None): if acl is not None and len(acl) > 0 and aclControl is not None: raise Exception('Both acl and aclControl are set') if not acl and not aclControl: raise Exception('Both acl and aclControl are not set') - return self._make_put_request(bucketName, objectKey, **self.convertor.trans_set_object_acl(acl=acl, versionId=versionId, aclControl=aclControl)) + return self._make_put_request(bucketName, objectKey, extensionHeaders=extensionHeaders, **self.convertor.trans_set_object_acl(acl=acl, versionId=versionId, aclControl=aclControl)) @funcCache - def getObjectAcl(self, bucketName, objectKey, versionId=None): + def getObjectAcl(self, bucketName, objectKey, versionId=None, extensionHeaders=None): pathArgs = {'acl': None} if versionId: pathArgs[const.VERSION_ID_PARAM] = util.to_string(versionId) - return self._make_get_request(bucketName, objectKey, pathArgs=pathArgs, methodName='getObjectAcl') + return self._make_get_request(bucketName, objectKey, pathArgs=pathArgs, methodName='getObjectAcl', extensionHeaders=extensionHeaders) @funcCache - def deleteObject(self, bucketName, objectKey, versionId=None): + def deleteObject(self, bucketName, objectKey, versionId=None, extensionHeaders=None): path_args = {} if versionId: path_args[const.VERSION_ID_PARAM] = util.to_string(versionId) - return self._make_delete_request(bucketName, objectKey, pathArgs=path_args, methodName='deleteObject') + return self._make_delete_request(bucketName, objectKey, pathArgs=path_args, methodName='deleteObject', extensionHeaders=extensionHeaders) @funcCache - def deleteObjects(self, bucketName, deleteObjectsRequest): + def deleteObjects(self, bucketName, deleteObjectsRequest, extensionHeaders=None): self._assert_not_null(deleteObjectsRequest, 'deleteObjectsRequest is empty') - return self._make_post_request(bucketName, methodName='deleteObjects', **self.convertor.trans_delete_objects(deleteObjectsRequest=deleteObjectsRequest)) + return self._make_post_request(bucketName, methodName='deleteObjects', extensionHeaders=extensionHeaders, **self.convertor.trans_delete_objects(deleteObjectsRequest=deleteObjectsRequest)) @funcCache - def restoreObject(self, bucketName, objectKey, days, tier=None, versionId=None): + def restoreObject(self, bucketName, objectKey, days, tier=None, versionId=None, extensionHeaders=None): self._assert_not_null(days, 'days is empty') - return self._make_post_request(bucketName, objectKey, **self.convertor.trans_restore_object(days=days, tier=tier, versionId=versionId)) + return self._make_post_request(bucketName, objectKey, extensionHeaders=extensionHeaders, **self.convertor.trans_restore_object(days=days, tier=tier, versionId=versionId)) @funcCache def initiateMultipartUpload(self, bucketName, objectKey, acl=None, storageClass=None, - metadata=None, websiteRedirectLocation=None, contentType=None, sseHeader=None, expires=None, extensionGrants=None): + metadata=None, websiteRedirectLocation=None, contentType=None, sseHeader=None, expires=None, extensionGrants=None, extensionHeaders=None): objectKey = util.safe_encode(objectKey) if objectKey is None: objectKey = '' @@ -1720,85 +1734,93 @@ def initiateMultipartUpload(self, bucketName, objectKey, acl=None, storageClass= if contentType is None: contentType = const.MIME_TYPES.get(objectKey[objectKey.rfind('.') + 1:].lower()) - return self._make_post_request(bucketName, objectKey, methodName='initiateMultipartUpload', + return self._make_post_request(bucketName, objectKey, methodName='initiateMultipartUpload', extensionHeaders=extensionHeaders, **self.convertor.trans_initiate_multipart_upload(acl=acl, storageClass=storageClass, metadata=metadata, websiteRedirectLocation=websiteRedirectLocation, contentType=contentType, sseHeader=sseHeader, expires=expires, extensionGrants=extensionGrants)) @funcCache - def copyPart(self, bucketName, objectKey, partNumber, uploadId, copySource, copySourceRange=None, destSseHeader=None, sourceSseHeader=None): + def copyPart(self, bucketName, objectKey, partNumber, uploadId, copySource, copySourceRange=None, destSseHeader=None, sourceSseHeader=None, extensionHeaders=None): self._assert_not_null(partNumber, 'partNumber is empty') self._assert_not_null(uploadId, 'uploadId is empty') self._assert_not_null(copySource, 'copySource is empty') - return self._make_put_request(bucketName, objectKey, methodName='copyPart', **self.convertor.trans_copy_part(partNumber=partNumber, uploadId=uploadId, copySource=copySource, + return self._make_put_request(bucketName, objectKey, methodName='copyPart', extensionHeaders=extensionHeaders, **self.convertor.trans_copy_part(partNumber=partNumber, uploadId=uploadId, copySource=copySource, copySourceRange=copySourceRange, destSseHeader=destSseHeader, sourceSseHeader=sourceSseHeader)) @funcCache - def completeMultipartUpload(self, bucketName, objectKey, uploadId, completeMultipartUploadRequest): + def completeMultipartUpload(self, bucketName, objectKey, uploadId, completeMultipartUploadRequest, extensionHeaders=None): self._assert_not_null(uploadId, 'uploadId is empty') self._assert_not_null(completeMultipartUploadRequest, 'completeMultipartUploadRequest is empty') ret = self._make_post_request(bucketName, objectKey, pathArgs={'uploadId':uploadId}, - entity=self.convertor.trans_complete_multipart_upload_request(completeMultipartUploadRequest), methodName='completeMultipartUpload') + entity=self.convertor.trans_complete_multipart_upload_request(completeMultipartUploadRequest), methodName='completeMultipartUpload', extensionHeaders=extensionHeaders) self._generate_object_url(ret, bucketName, objectKey) return ret @funcCache - def abortMultipartUpload(self, bucketName, objectKey, uploadId): + def abortMultipartUpload(self, bucketName, objectKey, uploadId, extensionHeaders=None): self._assert_not_null(uploadId, 'uploadId is empty') - return self._make_delete_request(bucketName, objectKey, pathArgs={'uploadId' : uploadId}) + return self._make_delete_request(bucketName, objectKey, pathArgs={'uploadId' : uploadId}, extensionHeaders=extensionHeaders) @funcCache - def listParts(self, bucketName, objectKey, uploadId, maxParts=None, partNumberMarker=None): + def listParts(self, bucketName, objectKey, uploadId, maxParts=None, partNumberMarker=None, extensionHeaders=None): self._assert_not_null(uploadId, 'uploadId is empty') pathArgs = {'uploadId': uploadId} if maxParts is not None: pathArgs['max-parts'] = maxParts if partNumberMarker is not None: pathArgs['part-number-marker'] = partNumberMarker - return self._make_get_request(bucketName, objectKey, pathArgs=pathArgs, methodName='listParts') + return self._make_get_request(bucketName, objectKey, pathArgs=pathArgs, methodName='listParts', extensionHeaders=extensionHeaders) @funcCache - def getBucketStoragePolicy(self, bucketName): - return self._make_get_request(bucketName, methodName='getBucketStoragePolicy', **self.convertor.trans_get_bucket_storage_policy()) + def getBucketStoragePolicy(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, methodName='getBucketStoragePolicy', extensionHeaders=extensionHeaders, **self.convertor.trans_get_bucket_storage_policy()) @funcCache - def setBucketStoragePolicy(self, bucketName, storageClass): + def setBucketStoragePolicy(self, bucketName, storageClass, extensionHeaders=None): self._assert_not_null(storageClass, 'storageClass is empty') - return self._make_put_request(bucketName, **self.convertor.trans_set_bucket_storage_policy(storageClass=storageClass)) + return self._make_put_request(bucketName, extensionHeaders=extensionHeaders, **self.convertor.trans_set_bucket_storage_policy(storageClass=storageClass)) @funcCache - def setBucketEncryption(self, bucketName, encryption, key=None): + def setBucketEncryption(self, bucketName, encryption, key=None, extensionHeaders=None): self._assert_not_null(encryption, 'encryption is empty') - return self._make_put_request(bucketName, pathArgs={'encryption': None}, entity=self.convertor.trans_encryption(encryption=encryption, key=key)) + return self._make_put_request(bucketName, pathArgs={'encryption': None}, entity=self.convertor.trans_encryption(encryption=encryption, key=key), extensionHeaders=extensionHeaders) @funcCache - def getBucketEncryption(self, bucketName): - return self._make_get_request(bucketName, methodName='getBucketEncryption', pathArgs={'encryption':None}) + def getBucketEncryption(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, methodName='getBucketEncryption', pathArgs={'encryption':None}, extensionHeaders=extensionHeaders) @funcCache - def deleteBucketEncryption(self, bucketName): - return self._make_delete_request(bucketName, pathArgs={'encryption':None}) + def deleteBucketEncryption(self, bucketName, extensionHeaders=None): + return self._make_delete_request(bucketName, pathArgs={'encryption':None}, extensionHeaders=extensionHeaders) @funcCache - def setBucketReplication(self, bucketName, replication): + def setBucketReplication(self, bucketName, replication, extensionHeaders=None): self._assert_not_null(replication, 'replication is empty') - return self._make_put_request(bucketName, **self.convertor.trans_set_bucket_replication(replication=replication)) + return self._make_put_request(bucketName, extensionHeaders=extensionHeaders, **self.convertor.trans_set_bucket_replication(replication=replication)) @funcCache - def getBucketReplication(self, bucketName): - return self._make_get_request(bucketName, pathArgs={'replication':None}, methodName='getBucketReplication') + def getBucketReplication(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, pathArgs={'replication':None}, methodName='getBucketReplication', extensionHeaders=extensionHeaders) @funcCache - def deleteBucketReplication(self, bucketName): - return self._make_delete_request(bucketName, pathArgs={'replication':None}) - + def deleteBucketReplication(self, bucketName, extensionHeaders=None): + return self._make_delete_request(bucketName, pathArgs={'replication':None}, extensionHeaders=extensionHeaders) + + @funcCache + def setBucketRequestPayment(self, bucketName, payer, extensionHeaders=None): + self._assert_not_null(payer, 'payer is empty') + return self._make_put_request(bucketName, pathArgs={'requestPayment': None}, entity=self.convertor.trans_bucket_request_payment(payer=payer), extensionHeaders=extensionHeaders) + + @funcCache + def getBucketRequestPayment(self, bucketName, extensionHeaders=None): + return self._make_get_request(bucketName, pathArgs={'requestPayment': None}, methodName='getBucketRequestPayment', extensionHeaders=extensionHeaders) @funcCache def uploadFile(self, bucketName, objectKey, uploadFile, partSize=9 * 1024 * 1024, taskNum=1, enableCheckpoint=False, checkpointFile=None, - checkSum=False, metadata=None, progressCallback=None, headers=None): + checkSum=False, metadata=None, progressCallback=None, headers=None, extensionHeaders=None): self.log_client.log(INFO, 'enter resume upload file...') self._assert_not_null(bucketName, 'bucketName is empty') self._assert_not_null(objectKey, 'objectKey is empty') @@ -1816,11 +1838,11 @@ def uploadFile(self, bucketName, objectKey, uploadFile, partSize=9 * 1024 * 1024 taskNum = 1 else: taskNum = int(math.ceil(taskNum)) - return _resumer_upload(bucketName, objectKey, uploadFile, partSize, taskNum, enableCheckpoint, checkpointFile, checkSum, metadata, progressCallback, self, headers) + return _resumer_upload(bucketName, objectKey, uploadFile, partSize, taskNum, enableCheckpoint, checkpointFile, checkSum, metadata, progressCallback, self, headers, extensionHeaders=extensionHeaders) @funcCache def _downloadFileWithNotifier(self, bucketName, objectKey, downloadFile=None, partSize=5 * 1024 * 1024, taskNum=1, enableCheckpoint=False, - checkpointFile=None, header=None, versionId=None, progressCallback=None, imageProcess=None, notifier=progress.NONE_NOTIFIER): + checkpointFile=None, header=None, versionId=None, progressCallback=None, imageProcess=None, notifier=progress.NONE_NOTIFIER, extensionHeaders=None): self.log_client.log(INFO, 'enter resume download...') self._assert_not_null(bucketName, 'bucketName is empty') self._assert_not_null(objectKey, 'objectKey is empty') @@ -1842,18 +1864,18 @@ def _downloadFileWithNotifier(self, bucketName, objectKey, downloadFile=None, pa else: taskNum = int(math.ceil(taskNum)) return _resumer_download(bucketName, objectKey, downloadFile, partSize, taskNum, enableCheckpoint, checkpointFile, header, versionId, progressCallback, self, - imageProcess, notifier) + imageProcess, notifier, extensionHeaders=extensionHeaders) def downloadFile(self, bucketName, objectKey, downloadFile=None, partSize=5 * 1024 * 1024, taskNum=1, enableCheckpoint=False, - checkpointFile=None, header=None, versionId=None, progressCallback=None, imageProcess=None): - return self._downloadFileWithNotifier(bucketName, objectKey, downloadFile, partSize, taskNum, enableCheckpoint, checkpointFile, header, versionId, progressCallback, imageProcess) + checkpointFile=None, header=None, versionId=None, progressCallback=None, imageProcess=None, extensionHeaders=None): + return self._downloadFileWithNotifier(bucketName, objectKey, downloadFile, partSize, taskNum, enableCheckpoint, checkpointFile, header, versionId, progressCallback, imageProcess, extensionHeaders=extensionHeaders) def downloadFiles(self, bucketName, prefix, downloadFolder=None, taskNum=const.DEFAULT_TASK_NUM, taskQueueSize=const.DEFAULT_TASK_QUEUE_SIZE, headers=GetObjectHeader(), imageProcess=None, interval=const.DEFAULT_BYTE_INTTERVAL, taskCallback=None, progressCallback=None, - threshold=const.DEFAULT_MAXIMUM_SIZE, partSize=5*1024*1024, subTaskNum=1, enableCheckpoint=False, checkpointFile=None): + threshold=const.DEFAULT_MAXIMUM_SIZE, partSize=5*1024*1024, subTaskNum=1, enableCheckpoint=False, checkpointFile=None, extensionHeaders=None): return _download_files(self, bucketName, prefix, downloadFolder, taskNum, taskQueueSize, headers, imageProcess, - interval, taskCallback, progressCallback, threshold, partSize, subTaskNum, enableCheckpoint, checkpointFile) + interval, taskCallback, progressCallback, threshold, partSize, subTaskNum, enableCheckpoint, checkpointFile, extensionHeaders=extensionHeaders) diff --git a/src/obs/const.py b/src/obs/const.py index 15904ac..8e2bc0b 100644 --- a/src/obs/const.py +++ b/src/obs/const.py @@ -87,7 +87,7 @@ DEFAULT_TASK_QUEUE_SIZE = 20000 -OBS_SDK_VERSION = '3.19.11' +OBS_SDK_VERSION = '3.20.1' V2_META_HEADER_PREFIX = 'x-amz-meta-' V2_HEADER_PREFIX = 'x-amz-' diff --git a/src/obs/convertor.py b/src/obs/convertor.py index afc6fd7..65aa2ec 100644 --- a/src/obs/convertor.py +++ b/src/obs/convertor.py @@ -164,6 +164,9 @@ def next_position_header(self): def object_type_header(self): return 'x-obs-object-type' + + def request_payer_header(self): + return self._get_header_prefix() + 'request-payer' def adapt_group(self, group): if self.is_obs: @@ -908,7 +911,18 @@ def trans_replication(self, replication): if replicationRule.get('storageClass') is not None: ET.SubElement(destinationEle, 'Bucket').text = self.ha.adapt_storage_class(replicationRule['storageClass']) return ET.tostring(root, 'UTF-8') - + + def trans_bucket_request_payment(self, payer): + root = ET.Element('RequestPaymentConfiguration') + ET.SubElement(root, 'Payer').text = util.to_string(payer) + return ET.tostring(root, 'UTF-8') + + def trans_get_extension_headers(self, headers): + _headers = {} + if headers is not None and len(headers) > 0: + self._put_key_value(_headers, self.ha.request_payer_header(), (headers.get('requesterPayer'))) + return _headers + def _find_item(self, root, itemname): result = root.find(itemname) if result is None: @@ -1628,3 +1642,8 @@ def parseGetBucketReplication(self, xml, headers=None): replication = Replication(agency=agency, replicationRules=_rules) return replication + def parseGetBucketRequestPayment(self, xml, headers=None): + root = ET.fromstring(xml) + payer =self._find_item(root, 'Payer') + payment = GetBucketRequestPaymentResponse(payer=payer) + return payment \ No newline at end of file diff --git a/src/obs/extension.py b/src/obs/extension.py index 6d41b84..2e321f4 100644 --- a/src/obs/extension.py +++ b/src/obs/extension.py @@ -1,119 +1,119 @@ -#!/usr/bin/python -# -*- coding:utf-8 -*- -# Copyright 2019 Huawei Technologies Co.,Ltd. -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use -# this file except in compliance with the License. You may obtain a copy of the -# License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. - -import os -import traceback -from obs import const, util, progress, bulktasks -from obs.model import GetObjectRequest -from obs.model import GetObjectHeader -from obs.ilog import ERROR - -def _download_files(obsClient, bucketName, prefix, downloadFolder=None, taskNum=const.DEFAULT_TASK_NUM, taskQueueSize=const.DEFAULT_TASK_QUEUE_SIZE, - headers=GetObjectHeader(), imageProcess=None, interval=const.DEFAULT_BYTE_INTTERVAL, taskCallback=None, progressCallback=None, - threshold=const.DEFAULT_MAXIMUM_SIZE, partSize=5*1024*1024, subTaskNum=1, enableCheckpoint=False, checkpointFile=None): - try: - executor = None - notifier = None - if downloadFolder is None or not os.path.isdir(downloadFolder): - raise Exception('%s is not a Folder' % downloadFolder) - - if taskCallback is not None and not callable(taskCallback): - raise Exception('Invalid taskCallback') - - (taskNum, taskQueueSize, interval, threshold) = bulktasks._checkBulkTasksPara(taskNum, taskQueueSize, interval, threshold) - - taskCallback = taskCallback if taskCallback is not None else util.lazyCallback - executor = bulktasks.ThreadPool(taskNum, taskQueueSize) - state = bulktasks.ExecuteProgress() - totalTasks = const.LONG(0) - totalAmount = const.LONG(0) - notifier = progress.ProgressNotifier(progressCallback, totalAmount, interval) if progressCallback is not None else progress.NONE_NOTIFIER - notifier.start() - - query = GetObjectRequest(imageProcess=imageProcess) - - prefix = prefix if prefix is not None else '' - prefixDir = prefix[:prefix.rfind('/')+1] - - for content in _list_objects(obsClient, bucketName, prefix=prefix): - objectKey = content.key - totalTasks += 1 - totalAmount += content.size - objectPath = objectKey.replace(prefixDir, '', 1) - if objectPath.startswith('/') or objectPath.find('//') != -1 or objectPath.find('\\') != -1: - state._failed_increment() - taskCallback(objectKey, Exception('illegal path: %s' % objectKey)) - obsClient.log_client.log(ERROR, 'illegal path: %s' % objectKey) - continue - - downloadPath = os.path.join(downloadFolder, objectPath) - downloadPath = util.safe_encode(downloadPath) - if const.IS_WINDOWS: - downloadPath = util.safe_trans_to_gb2312(downloadPath) - - dirName = os.path.dirname(downloadPath) - if not os.path.exists(dirName): - try: - os.makedirs(dirName, 0o755) - except Exception as e: - state._failed_increment() - taskCallback(objectKey, e) - obsClient.log_client.log(ERROR, traceback.format_exc()) - continue - - if objectKey.endswith(('/')): - state._successful_increment() - elif content.size < threshold: - executor.execute(_task_wrap, obsClient, obsClient._getObjectWithNotifier, key=objectKey, taskCallback=taskCallback, state=state, bucketName=bucketName, - objectKey=objectKey, getObjectRequest=query, headers=headers, downloadPath=downloadPath, notifier=notifier) - else: - executor.execute(_task_wrap, obsClient, obsClient._downloadFileWithNotifier, key=objectKey, taskCallback=taskCallback, state=state, bucketName=bucketName, - objectKey=objectKey, downloadFile=downloadPath, partSize=partSize, taskNum=subTaskNum, enableCheckpoint=enableCheckpoint, - checkpointFile=checkpointFile, header=headers, imageProcess=imageProcess, notifier=notifier) - - state.total_tasks = totalTasks - notifier.totalAmount = totalAmount - finally: - if executor is not None: - executor.shutdown() - if notifier is not None: - notifier.end() - - return state - -def _task_wrap(obsClient, func, key, taskCallback=None, state=None, **kwargs): - try: - res = func(**kwargs) - if res.status < 300: - state._successful_increment() - else: - state._failed_increment() - taskCallback(key, res) - except Exception as e: - state._failed_increment() - taskCallback(key, e) - obsClient.log_client.log(ERROR, traceback.format_exc()) - -def _list_objects(obsClient, bucketName, prefix=None, marker=None, max_keys=None, delimiter=None): - while True: - resp = obsClient.listObjects(bucketName, max_keys=max_keys, marker=marker, prefix=prefix, delimiter=delimiter) - if resp.status < 300: - for content in resp.body.contents: - yield content - if not resp.body.is_truncated: - break - marker = resp.body.next_marker - else: - obsClient.log_client.log(ERROR, 'listObjects Error: errorCode:%s, errorMessage:%s' % (resp.errorCode, resp.errorMessage)) +#!/usr/bin/python +# -*- coding:utf-8 -*- +# Copyright 2019 Huawei Technologies Co.,Ltd. +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + +import os +import traceback +from obs import const, util, progress, bulktasks +from obs.model import GetObjectRequest +from obs.model import GetObjectHeader +from obs.ilog import ERROR + +def _download_files(obsClient, bucketName, prefix, downloadFolder=None, taskNum=const.DEFAULT_TASK_NUM, taskQueueSize=const.DEFAULT_TASK_QUEUE_SIZE, + headers=GetObjectHeader(), imageProcess=None, interval=const.DEFAULT_BYTE_INTTERVAL, taskCallback=None, progressCallback=None, + threshold=const.DEFAULT_MAXIMUM_SIZE, partSize=5*1024*1024, subTaskNum=1, enableCheckpoint=False, checkpointFile=None, extensionHeaders=None): + try: + executor = None + notifier = None + if downloadFolder is None or not os.path.isdir(downloadFolder): + raise Exception('%s is not a Folder' % downloadFolder) + + if taskCallback is not None and not callable(taskCallback): + raise Exception('Invalid taskCallback') + + (taskNum, taskQueueSize, interval, threshold) = bulktasks._checkBulkTasksPara(taskNum, taskQueueSize, interval, threshold) + + taskCallback = taskCallback if taskCallback is not None else util.lazyCallback + executor = bulktasks.ThreadPool(taskNum, taskQueueSize) + state = bulktasks.ExecuteProgress() + totalTasks = const.LONG(0) + totalAmount = const.LONG(0) + notifier = progress.ProgressNotifier(progressCallback, totalAmount, interval) if progressCallback is not None else progress.NONE_NOTIFIER + notifier.start() + + query = GetObjectRequest(imageProcess=imageProcess) + + prefix = prefix if prefix is not None else '' + prefixDir = prefix[:prefix.rfind('/')+1] + + for content in _list_objects(obsClient, bucketName, prefix=prefix, extensionHeaders=extensionHeaders): + objectKey = content.key + totalTasks += 1 + totalAmount += content.size + objectPath = objectKey.replace(prefixDir, '', 1) + if objectPath.startswith('/') or objectPath.find('//') != -1 or objectPath.find('\\') != -1: + state._failed_increment() + taskCallback(objectKey, Exception('illegal path: %s' % objectKey)) + obsClient.log_client.log(ERROR, 'illegal path: %s' % objectKey) + continue + + downloadPath = os.path.join(downloadFolder, objectPath) + downloadPath = util.safe_encode(downloadPath) + if const.IS_WINDOWS: + downloadPath = util.safe_trans_to_gb2312(downloadPath) + + dirName = os.path.dirname(downloadPath) + if not os.path.exists(dirName): + try: + os.makedirs(dirName, 0o755) + except Exception as e: + state._failed_increment() + taskCallback(objectKey, e) + obsClient.log_client.log(ERROR, traceback.format_exc()) + continue + + if objectKey.endswith(('/')): + state._successful_increment() + elif content.size < threshold: + executor.execute(_task_wrap, obsClient, obsClient._getObjectWithNotifier, key=objectKey, taskCallback=taskCallback, state=state, bucketName=bucketName, + objectKey=objectKey, getObjectRequest=query, headers=headers, downloadPath=downloadPath, notifier=notifier, extensionHeaders=extensionHeaders) + else: + executor.execute(_task_wrap, obsClient, obsClient._downloadFileWithNotifier, key=objectKey, taskCallback=taskCallback, state=state, bucketName=bucketName, + objectKey=objectKey, downloadFile=downloadPath, partSize=partSize, taskNum=subTaskNum, enableCheckpoint=enableCheckpoint, + checkpointFile=checkpointFile, header=headers, imageProcess=imageProcess, notifier=notifier, extensionHeaders=extensionHeaders) + + state.total_tasks = totalTasks + notifier.totalAmount = totalAmount + finally: + if executor is not None: + executor.shutdown() + if notifier is not None: + notifier.end() + + return state + +def _task_wrap(obsClient, func, key, taskCallback=None, state=None, **kwargs): + try: + res = func(**kwargs) + if res.status < 300: + state._successful_increment() + else: + state._failed_increment() + taskCallback(key, res) + except Exception as e: + state._failed_increment() + taskCallback(key, e) + obsClient.log_client.log(ERROR, traceback.format_exc()) + +def _list_objects(obsClient, bucketName, prefix=None, marker=None, max_keys=None, delimiter=None, extensionHeaders=None): + while True: + resp = obsClient.listObjects(bucketName, max_keys=max_keys, marker=marker, prefix=prefix, delimiter=delimiter, extensionHeaders=extensionHeaders) + if resp.status < 300: + for content in resp.body.contents: + yield content + if not resp.body.is_truncated: + break + marker = resp.body.next_marker + else: + obsClient.log_client.log(ERROR, 'listObjects Error: errorCode:%s, errorMessage:%s' % (resp.errorCode, resp.errorMessage)) break \ No newline at end of file diff --git a/src/obs/model.py b/src/obs/model.py index 557ad9d..7c5f383 100644 --- a/src/obs/model.py +++ b/src/obs/model.py @@ -111,7 +111,10 @@ 'ResponseWrapper', 'ObjectStream', 'GetBucketEncryptionResponse', - 'UploadFileHeader' + 'UploadFileHeader', + 'GetBucketRequestPaymentResponse', + 'Payer', + 'ExtensionHeader' ] @@ -1146,7 +1149,17 @@ def __init__(self, etag=None, sseKms=None, sseKmsKey=None, sseC=None, sseCKeyMd5 self.sseKmsKey = sseKmsKey self.sseC = sseC self.sseCKeyMd5 = sseCKeyMd5 - + +class GetBucketRequestPaymentResponse(BaseModel): + allowedAttr = {'payer': BASESTRING} + def __init__(self, payer=None): + self.payer = payer + +class Payer(object): + BUCKET_OWNER_PAYER = "BucketOwner" + REQUESTER_PAYER = "Requester" + REQUESTER = "requester" + class ResponseWrapper(object): def __init__(self, conn, result, connHolder, contentLength=None, notifier=None): self.conn = conn @@ -1228,3 +1241,8 @@ def __init__(self, response=None, buffer=None, size=None, url=None, deleteMarker self.sseC = sseC self.sseCKeyMd5 = sseCKeyMd5 +class ExtensionHeader(BaseModel): + allowedAttr = {'requesterPayer': BASESTRING} + + def __init__(self, requesterPayer=None): + self.requesterPayer = requesterPayer \ No newline at end of file diff --git a/src/obs/transfer.py b/src/obs/transfer.py index d77cc24..16eeb93 100644 --- a/src/obs/transfer.py +++ b/src/obs/transfer.py @@ -36,16 +36,16 @@ import queue -def _resumer_upload(bucketName, objectKey, uploadFile, partSize, taskNum, enableCheckPoint, checkPointFile, checkSum, metadata, progressCallback, obsClient, headers): +def _resumer_upload(bucketName, objectKey, uploadFile, partSize, taskNum, enableCheckPoint, checkPointFile, checkSum, metadata, progressCallback, obsClient, headers, extensionHeaders=None): upload_operation = uploadOperation(util.to_string(bucketName), util.to_string(objectKey), util.to_string(uploadFile), partSize, taskNum, enableCheckPoint, - util.to_string(checkPointFile), checkSum, metadata, progressCallback, obsClient, headers) + util.to_string(checkPointFile), checkSum, metadata, progressCallback, obsClient, headers, extensionHeaders=extensionHeaders) return upload_operation._upload() def _resumer_download(bucketName, objectKey, downloadFile, partSize, taskNum, enableCheckPoint, checkPointFile, - header, versionId, progressCallback, obsClient, imageProcess=None, notifier=progress.NONE_NOTIFIER): + header, versionId, progressCallback, obsClient, imageProcess=None, notifier=progress.NONE_NOTIFIER, extensionHeaders=None): down_operation = downloadOperation(util.to_string(bucketName), util.to_string(objectKey), util.to_string(downloadFile), partSize, taskNum, enableCheckPoint, util.to_string(checkPointFile), - header, versionId, progressCallback, obsClient, imageProcess, notifier) + header, versionId, progressCallback, obsClient, imageProcess, notifier, extensionHeaders=extensionHeaders) if down_operation.size == 0: down_operation._delete_record() down_operation._delete_tmp_file() @@ -109,12 +109,13 @@ def _write_record(self, record): class uploadOperation(Operation): def __init__(self, bucketName, objectKey, uploadFile, partSize, taskNum, enableCheckPoint, checkPointFile, - checkSum, metadata, progressCallback, obsClient, headers): + checkSum, metadata, progressCallback, obsClient, headers, extensionHeaders=None): super(uploadOperation, self).__init__(bucketName, objectKey, uploadFile, partSize, taskNum, enableCheckPoint, checkPointFile, progressCallback, obsClient) self.checkSum = checkSum self.metadata = metadata self.headers = headers + self.extensionHeaders = extensionHeaders try: self.size = os.path.getsize(self.fileName) @@ -123,7 +124,7 @@ def __init__(self, bucketName, objectKey, uploadFile, partSize, taskNum, enableC self._delete_record() self.obsClient.log_client.log(ERROR, 'something is happened when obtain uploadFile information. Please check') raise e - resp = self.obsClient.headBucket(self.bucketName) + resp = self.obsClient.headBucket(self.bucketName, extensionHeaders=extensionHeaders) if resp.status > 300: raise Exception('head bucket {0} failed. Please check. Status:{1}.'.format(self.bucketName, resp.status)) @@ -157,7 +158,7 @@ def _upload(self): thread_pools.run() if self._abort: - self.obsClient.abortMultipartUpload(self.bucketName, self.objectKey, self._record['uploadId']) + self.obsClient.abortMultipartUpload(self.bucketName, self.objectKey, self._record['uploadId'], extensionHeaders=self.extensionHeaders) self.obsClient.log_client.log(ERROR, 'the code from server is 4**, please check space态persimission and so on.') self._delete_record() if self._exception is not None: @@ -166,7 +167,7 @@ def _upload(self): for p in self._record['uploadParts']: if not p['isCompleted']: if not self.enableCheckPoint: - self.obsClient.abortMultipartUpload(self.bucketName, self.objectKey, self._record['uploadId']) + self.obsClient.abortMultipartUpload(self.bucketName, self.objectKey, self._record['uploadId'], extensionHeaders=self.extensionHeaders) raise Exception('some parts are failed when upload. Please try again') part_Etags = [] @@ -174,18 +175,18 @@ def _upload(self): part_Etags.append(CompletePart(partNum=part['partNum'], etag=part['etag'])) self.obsClient.log_client.log(INFO, 'Completing to upload multiparts') resp = self.obsClient.completeMultipartUpload(self.bucketName, self.objectKey, self._record['uploadId'], - CompleteMultipartUploadRequest(part_Etags)) + CompleteMultipartUploadRequest(part_Etags), extensionHeaders=self.extensionHeaders) if resp.status < 300: if self.enableCheckPoint: self._delete_record() else: if not self.enableCheckPoint: - self.obsClient.abortMultipartUpload(self.bucketName, self.objectKey, self._record['uploadId']) + self.obsClient.abortMultipartUpload(self.bucketName, self.objectKey, self._record['uploadId'], extensionHeaders=self.extensionHeaders) self.obsClient.log_client.log(ERROR, 'something is wrong when complete multipart.ErrorCode:{0}. ErrorMessage:{1}'.format( resp.errorCode, resp.errorMessage)) else: if resp.status > 300 and resp.status < 500: - self.obsClient.abortMultipartUpload(self.bucketName, self.objectKey, self._record['uploadId']) + self.obsClient.abortMultipartUpload(self.bucketName, self.objectKey, self._record['uploadId'], extensionHeaders=self.extensionHeaders) self.obsClient.log_client.log(ERROR, 'something is wrong when complete multipart.ErrorCode:{0}. ErrorMessage:{1}'.format( resp.errorCode, resp.errorMessage)) self._delete_record() @@ -197,7 +198,7 @@ def _load(self): self._record = self._get_record() if self._record and not (self._type_check(self._record) and self._check_upload_record(self._record)): if self._record['bucketName'] and self._record['objectKey'] and self._record['uploadId'] is not None: - self.obsClient.abortMultipartUpload(self._record['bucketName'], self._record['objectKey'], self._record['uploadId']) + self.obsClient.abortMultipartUpload(self._record['bucketName'], self._record['objectKey'], self._record['uploadId'], extensionHeaders=self.extensionHeaders) self.obsClient.log_client.log(ERROR, 'checkpointFile is invalid') self._delete_record() self._record = None @@ -276,7 +277,7 @@ def _prepare(self): resp = self.obsClient.initiateMultipartUpload(self.bucketName, self.objectKey, metadata=self.metadata, acl=self.headers.acl, storageClass=self.headers.storageClass, websiteRedirectLocation=self.headers.websiteRedirectLocation, contentType=self.headers.contentType, sseHeader=self.headers.sseHeader, expires=self.headers.expires, - extensionGrants=self.headers.extensionGrants) + extensionGrants=self.headers.extensionGrants, extensionHeaders=self.extensionHeaders) if resp.status > 300: raise Exception('initiateMultipartUpload failed. ErrorCode:{0}. ErrorMessage:{1}'.format(resp.errorCode, resp.errorMessage)) uploadId = resp.body.uploadId @@ -301,7 +302,7 @@ def _upload_part(self, part): if not self._is_abort(): try: resp = self.obsClient._uploadPartWithNotifier(self.bucketName, self.objectKey, part['partNumber'], self._record['uploadId'], self.fileName, - isFile=True, partSize=part['length'], offset=part['offset'], notifier=self.notifier) + isFile=True, partSize=part['length'], offset=part['offset'], notifier=self.notifier, extensionHeaders=self.extensionHeaders) if resp.status < 300: self._record['uploadParts'][part['partNumber']-1]['isCompleted'] = True self._record['partEtags'].append(CompletePart(util.to_int(part['partNumber']), resp.body.etag)) @@ -319,19 +320,20 @@ def _upload_part(self, part): class downloadOperation(Operation): def __init__(self, bucketName, objectKey, downloadFile, partSize, taskNum, enableCheckPoint, checkPointFile, - header, versionId, progressCallback, obsClient, imageProcess=None, notifier=progress.NONE_NOTIFIER): + header, versionId, progressCallback, obsClient, imageProcess=None, notifier=progress.NONE_NOTIFIER, extensionHeaders=None): super(downloadOperation, self).__init__(bucketName, objectKey, downloadFile, partSize, taskNum, enableCheckPoint, checkPointFile, progressCallback, obsClient, notifier) self.header = header self.versionId = versionId self.imageProcess = imageProcess + self.extensionHeaders = extensionHeaders parent_dir = os.path.dirname(self.fileName) if not os.path.exists(parent_dir): os.makedirs(parent_dir, exist_ok=True) self._tmp_file = self.fileName + '.tmp' - metedata_resp = self.obsClient.getObjectMetadata(self.bucketName, self.objectKey, self.versionId) + metedata_resp = self.obsClient.getObjectMetadata(self.bucketName, self.objectKey, self.versionId, extensionHeaders=self.extensionHeaders) if metedata_resp.status < 300: self.lastModified = metedata_resp.body.lastModified self.size = metedata_resp.body.contentLength if metedata_resp.body.contentLength is not None and metedata_resp.body.contentLength >= 0 else 0 @@ -517,7 +519,7 @@ def _download_part(self, part): response = None try: resp = self.obsClient._getObjectWithNotifier(bucketName=self.bucketName, objectKey=self.objectKey, - getObjectRequest=get_object_request, headers=get_object_header, notifier=self.notifier) + getObjectRequest=get_object_request, headers=get_object_header, notifier=self.notifier, extensionHeaders=self.extensionHeaders) if resp.status < 300: respone = resp.body.response chunk_size = 65536 diff --git a/src/setup.py b/src/setup.py index 8fcf672..c709ebb 100644 --- a/src/setup.py +++ b/src/setup.py @@ -23,7 +23,7 @@ setup( name='esdk-obs-python', - version='3.19.11', + version='3.20.1', packages=find_packages(), zip_safe=False, description='OBS Python SDK',