diff --git a/README.md b/README.md index 2c5e5ef..2af6734 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,12 @@ +Version 3.22.2 + +New Features: + +1. Added interfaces related to virtual buckets +2. Compatibility changes have been made for the use of the Python3 HTTPS parameter + +------------------------------------------------------------------------------------------------- + Version 3.21.8 New Features: diff --git a/README_CN.md b/README_CN.md index 8d37d32..dfd5b9c 100644 --- a/README_CN.md +++ b/README_CN.md @@ -1,4 +1,10 @@ -Version 3.21.8 +Version 3.22.2 +新特性: +1. 增加虚拟桶相关接口 +2. 针对Python3 HTTPS参数的使用做了兼容修改 + +------------------------------------------------------------------------------------------------- +Version 3.21.8 新特性: diff --git a/release/huaweicloud-obs-sdk-python_3.22.2.tar.gz.sha256 b/release/huaweicloud-obs-sdk-python_3.22.2.tar.gz.sha256 new file mode 100644 index 0000000..76b9aa3 --- /dev/null +++ b/release/huaweicloud-obs-sdk-python_3.22.2.tar.gz.sha256 @@ -0,0 +1 @@ +c1be9812a702d1301be1bbeb07da91fd18d8dd89895169c5a229b18b59d08f9d *huaweicloud-obs-sdk-python_.tar.gz diff --git a/release/huaweicloud-obs-sdk-python_3.22.2.zip b/release/huaweicloud-obs-sdk-python_3.22.2.zip new file mode 100644 index 0000000..05be0e3 Binary files /dev/null and b/release/huaweicloud-obs-sdk-python_3.22.2.zip differ diff --git a/src/obs/__init__.py b/src/obs/__init__.py index 0e7237c..7229c1c 100644 --- a/src/obs/__init__.py +++ b/src/obs/__init__.py @@ -23,7 +23,7 @@ from obs.model import Redirect, RoutingRule, Tag, TagInfo, Transition, NoncurrentVersionTransition, Rule, Versions from obs.model import Object, WebsiteConfiguration, Logging, CompleteMultipartUploadRequest, DeleteObjectsRequest from obs.model import ListMultipartUploadsRequest, GetObjectRequest, UploadFileHeader, Payer -from obs.model import ExtensionHeader, FetchStatus +from obs.model import ExtensionHeader, FetchStatus, BucketAliasModel, ListBucketAliasModel from obs.workflow import WorkflowClient from obs.crypto_client import CryptoObsClient from obs.obs_cipher_suite import CTRCipherGenerator @@ -91,5 +91,7 @@ 'WorkflowClient', 'CryptoObsClient', 'CTRCipherGenerator', - 'CtrRSACipherGenerator' + 'CtrRSACipherGenerator', + 'BucketAliasModel', + 'ListBucketAliasModel' ] diff --git a/src/obs/client.py b/src/obs/client.py index 0b74156..81492f7 100644 --- a/src/obs/client.py +++ b/src/obs/client.py @@ -22,8 +22,8 @@ import threading import time import traceback +import json from inspect import isfunction - from obs import auth, const, convertor, loadtoken, locks, progress, util from obs.bucket import BucketClient from obs.cache import LocalCache @@ -31,7 +31,8 @@ from obs.ilog import DEBUG, ERROR, INFO, LogClient, NoneLogClient, WARNING from obs.model import ACL, AppendObjectContent, AppendObjectHeader, BaseModel, CopyObjectHeader, CreateBucketHeader, \ FetchPolicy, GetObjectHeader, GetObjectRequest, GetResult, ListMultipartUploadsRequest, Logging, Notification, \ - ObjectStream, PutObjectHeader, ResponseWrapper, SetObjectMetadataHeader, Versions, _FetchJob + ObjectStream, PutObjectHeader, ResponseWrapper, SetObjectMetadataHeader, Versions, _FetchJob, ExtensionHeader, \ + BucketAliasModel, Replication, ReplicationRule from obs.transfer import _resume_download, _resume_upload if const.IS_PYTHON2: @@ -68,8 +69,9 @@ def __init__(self, access_key_id, secret_access_key, security_token=None): self.security_token = security_token -def _isCopyOrList(name, obsClient, *args, **kwargs): +def _getCacheKey(name, obsClient, *args, **kwargs): key = '' + list_operation = ['listBuckets', 'listBucketAlias'] if name == 'copyObject': if kwargs.get('destBucketName'): key = kwargs['destBucketName'] @@ -79,7 +81,7 @@ def _isCopyOrList(name, obsClient, *args, **kwargs): if not obsClient.is_cname: obsClient._assert_not_null(key, 'destBucketName is empty') - elif name != 'listBuckets': + elif name not in list_operation: if len(args) > 1: key = args[1] elif kwargs.get('bucketName'): @@ -139,12 +141,17 @@ def wrapper(*args, **kwargs): try: if obsClient: obsClient.log_client.log(INFO, 'enter %s ...' % func.__name__) - key = _isCopyOrList(func.__name__, obsClient, *args, **kwargs) + key = _getCacheKey(func.__name__, obsClient, *args, **kwargs) if obsClient.is_signature_negotiation: - authType, resp = _is_signature_negotiation(obsClient, func.__name__, key) - if not authType: - return resp + black_list = ['listAvailableZoneInfo', 'createVirtualBucket', 'createBucketAlias', + 'listBucketAlias', 'deleteBucketAlias'] + if func.__name__ in black_list: + obsClient.thread_local.signature = const.OBS_SIGNATURE + else: + authType, resp = _is_signature_negotiation(obsClient, func.__name__, key) + if not authType: + return resp ret = func(*args, **kwargs) except Exception as e: if obsClient and obsClient.log_client: @@ -261,6 +268,7 @@ def __init__(self, access_key_id='', secret_access_key='', is_secure=True, serve else: self.ha = convertor.Adapter(self.signature) self.convertor = convertor.Convertor(self.signature, self.ha) + self.json_response_method_name = 'GetJsonResponse' @staticmethod def _parse_server_hostname(server): @@ -667,7 +675,7 @@ def _get_server_connection_use_http1x(self, is_secure, server, port, proxy_host, conn = httplib.HTTPSConnection(server, port=port, timeout=self.timeout) else: conn = httplib.HTTPSConnection(server, port=port, timeout=self.timeout, context=self.context, - check_hostname=False) + check_hostname=None) else: conn = httplib.HTTPConnection(server, port=port, timeout=self.timeout) @@ -1272,11 +1280,22 @@ def _getApiVersion(self, bucketName=''): return const.V2_SIGNATURE, res @funcCache - def listBuckets(self, isQueryLocation=True, extensionHeaders=None): + def listBuckets(self, isQueryLocation=True, extensionHeaders=None, bucketType=None): + """ + Obtain a bucket list. + :param isQueryLocation: Whether to query the bucket location. + :param extensionHeaders: Other headers + :param bucketType: Type of the buckets you want to list. The value can be OBJECT or POSIX, or be left blank. + "OBJECT": To list common buckets. + "POSIX": To list parallel file systems. + If this parameter is left blank, both buckets and parallel file systems will be listed. + :return: A bucket list + """ if self.is_cname: raise Exception('listBuckets is not allowed in custom domain mode') return self._make_get_request(methodName='listBuckets', extensionHeaders=extensionHeaders, - **self.convertor.trans_list_buckets(isQueryLocation=isQueryLocation)) + **self.convertor.trans_list_buckets(isQueryLocation=isQueryLocation, + bucketType=bucketType)) @funcCache def createBucket(self, bucketName, header=None, location=None, extensionHeaders=None): @@ -2174,6 +2193,270 @@ def getBucketRequestPayment(self, bucketName, extensionHeaders=None): return self._make_get_request(bucketName, pathArgs={'requestPayment': None}, methodName='getBucketRequestPayment', extensionHeaders=extensionHeaders) + # begin virtual bucket related + # begin virtual bucket related + # begin virtual bucket related + + @funcCache + def listAvailableZoneInfo(self, regionId, token, extensionHeaders=None): + self._assert_not_null(regionId, 'regionId should not be empty') + self._assert_not_null(token, 'token should not be empty') + + pathArgs = {'regionId': regionId} + header = { + const.X_AUTH_TOKEN_HEADER: token, + const.CONTENT_TYPE_HEADER: const.MIME_TYPES.get('json') + } + + return self._make_get_request( + objectKey='v1/services/clusters', + pathArgs=pathArgs, + headers=header, + methodName=self.json_response_method_name, + extensionHeaders=extensionHeaders + ) + + @funcCache + def createBucketAlias(self, bucketName, aliasInfo=None, extensionHeaders=None): + if aliasInfo is None: + raise Exception('aliasInfo is None') + self._assert_not_null(aliasInfo.get('bucket1'), 'bucket1 should not be empty') + self._assert_not_null(aliasInfo.get('bucket2'), 'bucket2 should not be empty') + return self._make_put_request(bucketName, extensionHeaders=extensionHeaders, + **self.convertor.trans_set_bucket_alias(aliasInfo=aliasInfo)) + + @funcCache + def bindBucketAlias(self, bucketName, aliasInfo=None, extensionHeaders=None): + if aliasInfo is None: + raise Exception('aliasInfo is None') + self._assert_not_null(aliasInfo.get('alias'), 'bucket alias should not be empty') + return self._make_put_request(bucketName, extensionHeaders=extensionHeaders, + **self.convertor.trans_bind_bucket_alias(aliasInfo=aliasInfo)) + + @funcCache + def deleteBucketAlias(self, bucketAlias, extensionHeaders=None): + self._assert_not_null(bucketAlias, 'bucket alias should not be empty') + pathArgs = {const.OBSBUCKETALIAS_PARAM: None} + return self._make_delete_request(bucketName=bucketAlias, pathArgs=pathArgs, extensionHeaders=extensionHeaders) + + @funcCache + def unbindBucketAlias(self, bucketName, extensionHeaders=None): + pathArgs = {const.OBSALIAS_PARAM: None} + return self._make_delete_request(bucketName, pathArgs=pathArgs, extensionHeaders=extensionHeaders) + + @funcCache + def getBucketAlias(self, bucketName, extensionHeaders=None): + pathArgs = {const.OBSALIAS_PARAM: None} + return self._make_get_request(bucketName, pathArgs=pathArgs, methodName='getBucketAlias', + extensionHeaders=extensionHeaders) + + @funcCache + def listBucketAlias(self, extensionHeaders=None): + pathArgs = {const.OBSBUCKETALIAS_PARAM: None} + return self._make_get_request(pathArgs=pathArgs, methodName='ListBucketAlias', + extensionHeaders=extensionHeaders) + + @funcCache + def createVirtualBucket(self, regionId, token, bucketName1, bucketName2, bucketAlias, agencyName, header=None): + # step 1: 列举指定region下的集群信息 + azResp = self.listAvailableZoneInfo(regionId, token) + if azResp.status != 200: + raise Exception('list AZ infos failed, resp: %s' % azResp) + + firstAZCgId, secondAZCgId = self._get_cluster_group_id(azResp) + + # step 2: 指定集群id创桶 + self._create_bucket_with_cluster_id(firstAZCgId, secondAZCgId, bucketName1, bucketName2, bucketAlias, header) + + # step 3: 创建一次别名 + aliasInfo = BucketAliasModel() + aliasInfo.bucket1 = bucketName1 + aliasInfo.bucket2 = bucketName2 + cbaResp = self.createBucketAlias(bucketAlias, aliasInfo) + if cbaResp.status != 200: + self._clear_virtual_bucket(const.VIRTUAL_BUCKET_CREATEBUCKET_STAGED, bucketName1, bucketName2, bucketAlias) + raise Exception('create bucket alias failed, resp: %s' % cbaResp) + + # step 4: 绑定两次别名,为两个真实桶分别绑定 + aliasInfo.alias = bucketAlias + bindResp = self.bindBucketAlias(bucketName1, aliasInfo) + if bindResp.status != 200: + self._clear_virtual_bucket(const.VIRTUAL_BUCKET_CREATEALIAS_STAGED, bucketName1, bucketName2, bucketAlias) + raise Exception('binding bucket alias failed, resp: %s' % bindResp) + + bindResp = self.bindBucketAlias(bucketName2, aliasInfo) + if bindResp.status != 200: + self._clear_virtual_bucket(const.VIRTUAL_BUCKET_BINDALIAS_STAGED, bucketName1, bucketName2, bucketAlias) + raise Exception('binding bucket alias failed, resp: %s' % bindResp) + + # step 5: 双向配置复制关系,并开启历史对象复制 + replicationResp = self._set_virtual_replication(agencyName, bucketName1, bucketName2) + if replicationResp.status != 200: + self._clear_virtual_bucket(const.VIRTUAL_BUCKET_BINDALIAS_STAGED, bucketName1, bucketName2, bucketAlias) + raise Exception('set replication failed, resp: %s' % replicationResp) + + replicationResp = self._set_virtual_replication(agencyName, bucketName2, bucketName1) + if replicationResp.status != 200: + self._clear_virtual_bucket(const.VIRTUAL_BUCKET_BINDALIAS_STAGED, bucketName1, bucketName2, bucketAlias) + raise Exception('set replication failed, resp: %s' % replicationResp) + + return {'code': 'OK', 'message': 'create virtual bucket success', 'virtualBucketName': bucketAlias, + 'bucketName1': bucketName1, 'bucketName2': bucketName2} + + def _get_cluster_group_id(self, azResp): + azJsonResp = util.jsonLoadsForPy2(azResp.body) if const.IS_PYTHON2 else json.loads(azResp.body) + azInfos = azJsonResp.get('infos') + + if len(azInfos) != const.VIRTUAL_BUCKET_NEED_AZ_COUNT: + raise Exception('the number of AZs does not meet the requirements') + + firstAZKey = next(iter(azInfos)) + firstAZValue = azInfos.get(firstAZKey) + if len(firstAZValue) == 0: + raise Exception('no cluster exists in the AZ, AZ key: %s' % firstAZKey) + + firstAZCgId = firstAZValue[0].get(const.KEY_CLUSTER_GROUP_ID) + if not firstAZCgId: + raise Exception('this AZs first cluster group id is None, AZ key: %s' % firstAZKey) + + secondAZKey = list(azInfos.keys())[-1] + secondAZValue = azInfos.get(secondAZKey) + if len(secondAZValue) == 0: + raise Exception('no cluster exists in the AZ, AZ key: %s' % secondAZKey) + + secondAZCgId = secondAZValue[0].get(const.KEY_CLUSTER_GROUP_ID) + if not secondAZCgId: + raise Exception('this AZs first cluster group id is None, AZ key: %s' % secondAZKey) + + return firstAZCgId, secondAZCgId + + def _create_bucket_with_cluster_id(self, firstAZCgId, secondAZCgId, bucketName1, bucketName2, bucketAlias, header): + # 判断桶是否存在 + bucket1CgId, bucket2CgId = self._head_virtual_bucket(bucketName1, bucketName2) + + # 两个桶都存在,且cluster group id一致 + if bucket1CgId and bucket2CgId and bucket1CgId == bucket2CgId: + raise Exception('create bucket failed, both buckets exist and cluster group id is the same') + + extensionHeaders = ExtensionHeader() + + # 只存在桶1 + if bucket1CgId: + extensionHeaders.locationClusterGroupId = secondAZCgId if bucket1CgId == firstAZCgId else firstAZCgId + cbResp = self.createBucket(bucketName2, header=header, extensionHeaders=extensionHeaders) + if cbResp.status != 200: + self._clear_virtual_bucket(const.VIRTUAL_BUCKET_CREATEBUCKET_STAGED, bucketName1, bucketName2, + bucketAlias) + raise Exception('create bucket failed, cluster group id: %s, resp: %s' % ( + extensionHeaders.locationClusterGroupId, cbResp)) + + # 只存在桶2 + if bucket2CgId: + extensionHeaders.locationClusterGroupId = secondAZCgId if bucket2CgId == firstAZCgId else firstAZCgId + cbResp = self.createBucket(bucketName1, header=header, extensionHeaders=extensionHeaders) + if cbResp.status != 200: + self._clear_virtual_bucket(const.VIRTUAL_BUCKET_CREATEBUCKET_STAGED, bucketName1, bucketName2, + bucketAlias) + raise Exception('create bucket failed, cluster group id: %s, resp: %s' % ( + extensionHeaders.locationClusterGroupId, cbResp)) + + # 两个桶都不存在 + if not bucket1CgId and not bucket2CgId: + extensionHeaders.locationClusterGroupId = firstAZCgId + cbResp = self.createBucket(bucketName1, header=header, extensionHeaders=extensionHeaders) + if cbResp.status != 200: + self._clear_virtual_bucket(const.VIRTUAL_BUCKET_CREATEBUCKET_STAGED, bucketName1, bucketName2, + bucketAlias) + raise Exception('create bucket failed, cluster group id: %s, resp: %s' % (firstAZCgId, cbResp)) + + extensionHeaders.locationClusterGroupId = secondAZCgId + cbResp = self.createBucket(bucketName2, header=header, extensionHeaders=extensionHeaders) + if cbResp.status != 200: + self._clear_virtual_bucket(const.VIRTUAL_BUCKET_CREATEBUCKET_STAGED, bucketName1, bucketName2, + bucketAlias) + raise Exception('create bucket failed, cluster group id: %s, resp: %s' % (secondAZCgId, cbResp)) + + def _head_virtual_bucket(self, bucketName1, bucketName2): + bucket1CgId = None + bucket2CgId = None + + head1Resp = self.headBucket(bucketName1) + if head1Resp.status == 200: + for h in head1Resp.header: + if const.LOCATION_CLUSTERGROUP_ID in h: + bucket1CgId = h[1] + break + + head2Resp = self.headBucket(bucketName2) + if head2Resp.status == 200: + for h in head2Resp.header: + if const.LOCATION_CLUSTERGROUP_ID in h: + bucket2CgId = h[1] + break + + return bucket1CgId, bucket2CgId + + def _set_virtual_replication(self, agencyName, sourceBucketName, destBucketName): + replication = Replication() + replication.agency = agencyName + + # rule的默认规则如下: + # id:{sourceBucketName}_to_{destBucketName} + # prefix为空 + # status为Enabled + # storageClass为STANDARD + # deleteData为Enabled + # historicalObjectReplication为Enabled + _rules = [] + replicationRule = ReplicationRule() + replicationRule.id = sourceBucketName + '_to_' + destBucketName + replicationRule.prefix = '' + replicationRule.status = 'Enabled' + replicationRule.bucket = destBucketName + replicationRule.storageClass = 'STANDARD' + replicationRule.deleteData = 'Enabled' + replicationRule.historicalObjectReplication = 'Enabled' + + _rules.append(replicationRule) + replication.replicationRules = _rules + resp = self.setBucketReplication(sourceBucketName, replication) + return resp + + def _clear_virtual_bucket(self, staged, bucketName1, bucketName2, bucketAlias): + # staged取值: + # 1:创建真实桶 + # 2:创建桶别名 + # 3:绑定桶别名 + if staged >= const.VIRTUAL_BUCKET_BINDALIAS_STAGED: + # 解绑桶别名 + unbindResp = self.unbindBucketAlias(bucketName1) + if unbindResp.status != 204: + raise Exception('unbind bucket alias failed, resp: %s' % unbindResp) + + unbindResp = self.unbindBucketAlias(bucketName2) + if unbindResp.status != 204: + raise Exception('unbind bucket alias failed, resp: %s' % unbindResp) + + if staged >= const.VIRTUAL_BUCKET_CREATEALIAS_STAGED: + # 删除桶别名 + deleteAliasResp = self.deleteBucketAlias(bucketAlias) + if deleteAliasResp.status != 204: + raise Exception('delete bucket alias failed, resp: %s' % deleteAliasResp) + + if staged >= const.VIRTUAL_BUCKET_CREATEBUCKET_STAGED: + # 删除真实桶 + deleteBucketResp = self.deleteBucket(bucketName1) + if deleteBucketResp.status != 204: + raise Exception('delete bucket failed, resp: %s' % deleteBucketResp) + + deleteBucketResp = self.deleteBucket(bucketName2) + if deleteBucketResp.status != 204: + raise Exception('delete bucket failed, resp: %s' % deleteBucketResp) + + # end virtual bucket related + # end virtual bucket related + # end virtual bucket related + @funcCache def uploadFile(self, bucketName, objectKey, uploadFile, partSize=9 * 1024 * 1024, taskNum=1, enableCheckpoint=False, checkpointFile=None, diff --git a/src/obs/const.py b/src/obs/const.py index 1e193f4..5b9943c 100644 --- a/src/obs/const.py +++ b/src/obs/const.py @@ -26,7 +26,6 @@ CONTENT_ENCODING_HEADER = 'Content-Encoding' CONTENT_LANGUAGE_HEADER = 'Content-Language' EXPIRES_HEADER = 'Expires' - DATE_HEADER = 'Date' CONTENT_LIST = [CONTENT_TYPE_HEADER.lower(), CONTENT_MD5_HEADER.lower(), DATE_HEADER.lower()] @@ -54,6 +53,14 @@ ETAG_HEADER = 'ETag' LAST_MODIFIED_HEADER = 'Last-Modified' +LOCATION_CLUSTERGROUP_ID = 'location-clustergroup-id' +X_AUTH_TOKEN_HEADER = 'X-Auth-Token' +KEY_CLUSTER_GROUP_ID = 'cgId' +VIRTUAL_BUCKET_NEED_AZ_COUNT = 2 +VIRTUAL_BUCKET_CREATEBUCKET_STAGED = 1 +VIRTUAL_BUCKET_CREATEALIAS_STAGED = 2 +VIRTUAL_BUCKET_BINDALIAS_STAGED = 3 + VERSION_ID_PARAM = 'versionId' RESPONSE_CACHE_CONTROL_PARAM = 'response-cache-control' RESPONSE_CONTENT_DISPOSITION_PARAM = 'response-content-disposition' @@ -62,6 +69,8 @@ RESPONSE_CONTENT_TYPE_PARAM = 'response-content-type' RESPONSE_EXPIRES_PARAM = 'response-expires' X_IMAGE_PROCESS_PARAM = 'x-image-process' +OBSALIAS_PARAM = 'obsalias' +OBSBUCKETALIAS_PARAM = 'obsbucketalias' HTTP_METHOD_PUT = 'PUT' HTTP_METHOD_POST = 'POST' @@ -87,7 +96,7 @@ DEFAULT_TASK_NUM = 8 DEFAULT_TASK_QUEUE_SIZE = 20000 -OBS_SDK_VERSION = '3.21.8' +OBS_SDK_VERSION = '3.21.12' V2_META_HEADER_PREFIX = 'x-amz-meta-' V2_HEADER_PREFIX = 'x-amz-' @@ -176,7 +185,11 @@ 'x-workflow-execution-state', 'x-workflow-execution-type', 'x-workflow-next-marker', - 'obsworkflowtriggerpolicy' + 'obsworkflowtriggerpolicy', + + # virtual bucket api + 'obsbucketalias', + 'obsalias' ) ALLOWED_REQUEST_HTTP_HEADER_METADATA_NAMES = ( @@ -203,7 +216,8 @@ 'if-match', 'if-none-match', 'last-modified', - 'content-range' + 'content-range', + 'x-auth-token' ) ALLOWED_RESPONSE_HTTP_HEADER_METADATA_NAMES = ( diff --git a/src/obs/convertor.py b/src/obs/convertor.py index a8eb4aa..9cf1276 100644 --- a/src/obs/convertor.py +++ b/src/obs/convertor.py @@ -16,6 +16,7 @@ import xml.etree.cElementTree as ET except Exception: import xml.etree.ElementTree as ET + import json from obs import util from obs import const @@ -36,7 +37,7 @@ from obs.model import DateTime, ListObjectsResponse, Content, CorsRule, ObjectVersionHead, ObjectVersion, \ ObjectDeleteMarker, DeleteObjectResult, NoncurrentVersionExpiration, NoncurrentVersionTransition, Rule, Condition, \ Redirect, FilterRule, FunctionGraphConfiguration, Upload, CompleteMultipartUploadResponse, ListPartsResponse, \ - Grant, ReplicationRule, Transition, Grantee + Grant, ReplicationRule, Transition, Grantee, BucketAliasModel, ListBucketAliasModel if const.IS_PYTHON2: from urllib import unquote_plus, quote_plus @@ -87,6 +88,10 @@ def acl_header(self): def epid_header(self): return self._get_header_prefix() + 'epid' + @staticmethod + def pfs_header(): + return 'x-obs-fs-file-interface' + def date_header(self): return self._get_header_prefix() + 'date' @@ -116,6 +121,10 @@ def indicator_header(): def location_header(self): return self._get_header_prefix() + 'location' + @staticmethod + def queryPFS_header(): + return 'x-obs-bucket-type' + def bucket_region_header(self): return self._get_header_prefix() + 'bucket-location' if self.is_obs \ else self._get_header_prefix() + 'bucket-region' @@ -207,6 +216,9 @@ def object_type_header(): def request_payer_header(self): return self._get_header_prefix() + 'request-payer' + def location_clustergroup_id_header(self): + return self._get_header_prefix() + const.LOCATION_CLUSTERGROUP_ID + def oef_marker_header(self): return self._get_header_prefix() + 'oef-marker' @@ -305,6 +317,8 @@ def trans_create_bucket(self, **kwargs): self.ha.adapt_storage_class(header.get('storageClass'))) self._put_key_value(headers, self.ha.az_redundancy_header(), header.get('availableZone')) self._put_key_value(headers, self.ha.epid_header(), header.get('epid')) + if header.get('isPFS'): + self._put_key_value(headers, self.ha.pfs_header(), "Enabled") extensionGrants = header.get('extensionGrants') if extensionGrants is not None and len(extensionGrants) > 0: grantDict = {} @@ -331,6 +345,8 @@ def trans_list_buckets(self, **kwargs): headers = {} if kwargs.get('isQueryLocation'): self._put_key_value(headers, self.ha.location_header(), 'true') + if kwargs.get('bucketType'): + self._put_key_value(headers, self.ha.queryPFS_header(), kwargs.get('bucketType')) return {'headers': headers} def trans_list_objects(self, **kwargs): @@ -1049,16 +1065,23 @@ def trans_replication(self, replication): ET.SubElement(ruleEle, 'Prefix').text = util.safe_decode(replicationRule['prefix']) if replicationRule.get('status') is not None: ET.SubElement(ruleEle, 'Status').text = util.to_string(replicationRule['status']) + if replicationRule.get('historicalObjectReplication') is not None: + ET.SubElement(ruleEle, 'HistoricalObjectReplication').text = util.to_string( + replicationRule['historicalObjectReplication']) - if replication.get('bucket') is not None: + if replicationRule.get('bucket') is not None: destinationEle = ET.SubElement(ruleEle, 'Destination') bucket_name = util.to_string(replicationRule['bucket']) bucket_name = bucket_name if self.is_obs else bucket_name if bucket_name.startswith( 'arn:aws:s3:::') else 'arn:aws:s3:::' + bucket_name ET.SubElement(destinationEle, 'Bucket').text = bucket_name + if replicationRule.get('storageClass') is not None: - ET.SubElement(destinationEle, 'Bucket').text = self.ha.adapt_storage_class( + ET.SubElement(destinationEle, 'StorageClass').text = self.ha.adapt_storage_class( replicationRule['storageClass']) + + if replicationRule.get('deleteData') is not None: + ET.SubElement(destinationEle, 'DeleteData').text = util.to_string(replicationRule['deleteData']) return ET.tostring(root, 'UTF-8') @staticmethod @@ -1070,7 +1093,9 @@ def trans_bucket_request_payment(payer): def trans_get_extension_headers(self, headers): _headers = {} if headers is not None and len(headers) > 0: - self._put_key_value(_headers, self.ha.request_payer_header(), (headers.get('requesterPayer'))) + self._put_key_value(_headers, self.ha.request_payer_header(), headers.get('requesterPayer')) + self._put_key_value(_headers, self.ha.location_clustergroup_id_header(), + headers.get('locationClusterGroupId')) return _headers # OEF trans func @@ -1113,6 +1138,16 @@ def _find_item(root, item_name, encoding_type=None): return util.to_string(unquote_plus(result)) return util.to_string(result) + @staticmethod + def _find_text(result, encoding_type=None): + if result is None: + return None + if const.IS_PYTHON2: + result = util.safe_encode(result) + if encoding_type == "url": + return util.to_string(unquote_plus(result)) + return util.to_string(result) + def parseListBuckets(self, xml, headers=None): root = ET.fromstring(xml) owner = root.find('Owner') @@ -1129,8 +1164,9 @@ def parseListBuckets(self, xml, headers=None): name = self._find_item(bucket, 'Name') d = self._find_item(bucket, 'CreationDate') location = self._find_item(bucket, 'Location') + bucket_type = self._find_item(bucket, 'BucketType') create_date = DateTime.UTCToLocal(d) - curr_bucket = Bucket(name=name, create_date=create_date, location=location) + curr_bucket = Bucket(name=name, create_date=create_date, location=location, bucket_type=bucket_type) entries.append(curr_bucket) return ListBucketsResponse(buckets=entries, owner=Owners) @@ -1873,8 +1909,11 @@ def parseGetBucketReplication(self, xml, headers=None): status = self._find_item(rule, 'Status') bucket = self._find_item(rule, 'Destination/Bucket') storageClass = self._find_item(rule, 'Destination/StorageClass') + deleteData = self._find_item(rule, 'Destination/DeleteData') + historicalObjectReplication = self._find_item(rule, 'Destination/HistoricalObjectReplication') _rules.append( - ReplicationRule(id=_id, prefix=prefix, status=status, bucket=bucket, storageClass=storageClass)) + ReplicationRule(id=_id, prefix=prefix, status=status, bucket=bucket, storageClass=storageClass, + deleteData=deleteData, historicalObjectReplication=historicalObjectReplication)) replication = Replication(agency=agency, replicationRules=_rules) return replication @@ -2066,3 +2105,79 @@ def parseGetTriggerPolicyResponse(jsons, header=None): # end workflow related # end workflow related # end workflow related + + # begin virtual bucket related + # begin virtual bucket related + # begin virtual bucket related + + def trans_set_bucket_alias(self, **kwargs): + aliasInfo = kwargs.get('aliasInfo') + entity = None if aliasInfo is None or len(aliasInfo) == 0 else self.trans_set_aliasInfo(aliasInfo) + return {'pathArgs': {const.OBSBUCKETALIAS_PARAM: None}, 'entity': entity} + + def trans_set_aliasInfo(self, aliasInfo): + root = ET.Element('CreateBucketAlias') + bucketListEle = ET.SubElement(root, 'BucketList') + ET.SubElement(bucketListEle, 'Bucket').text = util.to_string(aliasInfo.get('bucket1')) + ET.SubElement(bucketListEle, 'Bucket').text = util.to_string(aliasInfo.get('bucket2')) + return ET.tostring(root, 'UTF-8') + + def trans_bind_bucket_alias(self, **kwargs): + aliasInfo = kwargs.get('aliasInfo') + entity = None if aliasInfo is None or len(aliasInfo) == 0 else self.trans_bind_aliasInfo(aliasInfo) + return {'pathArgs': {const.OBSALIAS_PARAM: None}, 'entity': entity} + + def trans_bind_aliasInfo(self, aliasInfo): + root = ET.Element('AliasList') + ET.SubElement(root, 'Alias').text = util.to_string(aliasInfo.get('alias')) + return ET.tostring(root, 'UTF-8') + + def parseGetBucketAlias(self, xml, header=None): + root = ET.fromstring(xml) + bucketAliasXml = root.find('BucketAlias') + alias = self._find_item(bucketAliasXml, 'Alias') + bucketAlias = BucketAliasModel(alias=alias) + + bucketListXml = bucketAliasXml.find('BucketList').findall('Bucket') + bucketNameList = [] + for bucketXml in bucketListXml: + bucketNameList.append(self._find_text(bucketXml.text)) + + if len(bucketNameList) > 0: + bucketAlias.bucket1 = bucketNameList[0] + if len(bucketNameList) > 1: + bucketAlias.bucket2 = bucketNameList[1] + + return bucketAlias + + def parseListBucketAlias(self, xml, header=None): + root = ET.fromstring(xml) + ownerXml = root.find('Owner') + ownerID = self._find_item(ownerXml, 'ID') + listBucketAlias = ListBucketAliasModel(owner=ownerID) + + bucketAliasListXml = root.find('BucketAliasList').findall('BucketAlias') + bucketAliasList = [] + for bucketAliasXml in bucketAliasListXml: + alias = self._find_item(bucketAliasXml, 'Alias') + creationDate = self._find_item(bucketAliasXml, 'CreationDate') + bucketAlias = BucketAliasModel(alias=alias, creationDate=creationDate) + + bucketListXml = bucketAliasXml.find('BucketList').findall('Bucket') + bucketNameList = [] + for bucketXml in bucketListXml: + bucketNameList.append(self._find_text(bucketXml.text)) + + if len(bucketNameList) > 0: + bucketAlias.bucket1 = bucketNameList[0] + if len(bucketNameList) > 1: + bucketAlias.bucket2 = bucketNameList[1] + + bucketAliasList.append(bucketAlias) + + listBucketAlias.bucketAlias = bucketAliasList + return listBucketAlias + + # end virtual bucket related + # end virtual bucket related + # end virtual bucket related diff --git a/src/obs/model.py b/src/obs/model.py index 687e23f..89f42e4 100644 --- a/src/obs/model.py +++ b/src/obs/model.py @@ -132,7 +132,9 @@ 'ListWorkflowExecutionResponse', 'GetWorkflowExecutionResponse', 'RestoreFailedWorkflowExecutionResponse', - 'GetTriggerPolicyResponse' + 'GetTriggerPolicyResponse', + 'BucketAliasModel', + 'ListBucketAliasModel' ] @@ -291,12 +293,13 @@ def add_grant(self, grant): class Bucket(BaseModel): - allowedAttr = {'name': BASESTRING, 'create_date': BASESTRING, 'location': BASESTRING} + allowedAttr = {'name': BASESTRING, 'create_date': BASESTRING, 'location': BASESTRING, "bucket_type": BASESTRING} - def __init__(self, name=None, create_date=None, location=None): + def __init__(self, name=None, create_date=None, location=None, bucket_type=None): self.name = name self.create_date = create_date self.location = location + self.bucket_type = bucket_type class CommonPrefix(BaseModel): @@ -476,14 +479,28 @@ def __init__(self, id=None, allowedMethod=None, allowedOrigin=None, allowedHeade class CreateBucketHeader(BaseModel): allowedAttr = {'aclControl': BASESTRING, 'storageClass': BASESTRING, 'extensionGrants': list, - 'availableZone': BASESTRING, 'epid': BASESTRING} - - def __init__(self, aclControl=None, storageClass=None, extensionGrants=None, availableZone=None, epid=None): + 'availableZone': BASESTRING, 'epid': BASESTRING, "isPFS": bool} + + def __init__(self, aclControl=None, storageClass=None, extensionGrants=None, + availableZone=None, epid=None, isPFS=False): + """ + Headers that can be carried during bucket creation + :param aclControl: ACL policy of a bucket + :param storageClass: Specifies the default storage class of a bucket. The value can be "STANDARD" (Standard), + "WARM" (Infrequent Access), and "COLD" (Archive). The Standard storage class is used by default. + :param extensionGrants: Extended permission list that can be specified during bucket creation. + :param availableZone: AZ type that can be specified during bucket creation. A single AZ is used by default. + You can also use "3az". + :param epid: Enterprise project ID. Users who have enabled the enterprise project function can obtain this ID + from the enterprise project management service. + :param isPFS: Specifies whether to create a parallel file system. + """ self.aclControl = aclControl self.storageClass = storageClass self.extensionGrants = extensionGrants self.availableZone = availableZone self.epid = epid + self.isPFS = isPFS class ExtensionGrant(BaseModel): @@ -558,14 +575,17 @@ def __init__(self, replicationRules=None, agency=None): class ReplicationRule(BaseModel): allowedAttr = {'id': BASESTRING, 'prefix': BASESTRING, 'status': BASESTRING, 'bucket': BASESTRING, - 'storageClass': BASESTRING} + 'storageClass': BASESTRING, 'deleteData': BASESTRING, 'historicalObjectReplication': BASESTRING} - def __init__(self, id=None, prefix=None, status=None, bucket=None, storageClass=None): + def __init__(self, id=None, prefix=None, status=None, bucket=None, storageClass=None, deleteData=None, + historicalObjectReplication=None): self.id = id self.prefix = prefix self.status = status self.bucket = bucket self.storageClass = storageClass + self.deleteData = deleteData + self.historicalObjectReplication = historicalObjectReplication class Notification(BaseModel): @@ -1393,10 +1413,11 @@ def __init__(self, response=None, buffer=None, size=None, url=None, deleteMarker class ExtensionHeader(BaseModel): - allowedAttr = {'requesterPayer': BASESTRING} + allowedAttr = {'requesterPayer': BASESTRING, 'locationClusterGroupId': BASESTRING} - def __init__(self, requesterPayer=None): + def __init__(self, requesterPayer=None, locationClusterGroupId=None): self.requesterPayer = requesterPayer + self.locationClusterGroupId = locationClusterGroupId # OEF Model @@ -1594,6 +1615,32 @@ class GetTriggerPolicyResponse(BaseModel): def __init__(self, rules=None): self.rules = rules + # end workflow related # end workflow related # end workflow related + +# begin virtual bucket related +# begin virtual bucket related +# begin virtual bucket related + +class BucketAliasModel(BaseModel): + allowedAttr = {'alias': BASESTRING, 'bucket1': BASESTRING, 'bucket2': BASESTRING, 'creationDate': BASESTRING} + + def __init__(self, alias=None, bucket1=None, bucket2=None, creationDate=None): + self.alias = alias + self.bucket1 = bucket1 + self.bucket2 = bucket2 + self.creationDate = creationDate + + +class ListBucketAliasModel(BaseModel): + allowedAttr = {'owner': BASESTRING, 'bucketAlias': list} + + def __init__(self, owner=None, bucketAlias=None): + self.owner = owner + self.bucketAlias = bucketAlias + +# end virtual bucket related +# end virtual bucket related +# end virtual bucket related diff --git a/src/setup.py b/src/setup.py index 65da0d8..7866e71 100644 --- a/src/setup.py +++ b/src/setup.py @@ -21,13 +21,16 @@ setup( name='esdk-obs-python', - version='3.21.4', - packages=find_packages(), + version='3.21.12', + packages=find_packages(exclude=['tests']), zip_safe=False, description='OBS Python SDK', long_description='OBS Python SDK', license='Apache-2.0', keywords=('obs', 'python'), + install_requires=[ + 'pycryptodome==3.10.1' + ], platforms='Independant', url='', ) diff --git a/src/tests/conftest.py b/src/tests/conftest.py index 7977e87..affecdf 100644 --- a/src/tests/conftest.py +++ b/src/tests/conftest.py @@ -68,6 +68,16 @@ def gen_test_file(request): os.remove(test_config["path_prefix"] + file_name) +@pytest.fixture() +def delete_bucket_after_test(): + results = {"need_delete_buckets": []} + yield results + if "client" in results: + for bucket_name in results["need_delete_buckets"]: + delete_result = results["client"].deleteBucket(bucket_name) + assert delete_result.status == 204 + + def gen_random_file(file_name, file_size): tmp_1024 = "".join(chr(random.randint(10000, 40000)) for _ in range(341)).encode("UTF-8") tmp_1024 += b"m" diff --git a/src/tests/test_config.json b/src/tests/test_config.json index 17e8000..506d30b 100644 --- a/src/tests/test_config.json +++ b/src/tests/test_config.json @@ -2,6 +2,8 @@ "ak": "", "sk": "", "endpoint": "", + "location": "", + "bucket_prefix": "", "bucketName": "", "path_prefix": "", "public_key": "", diff --git a/src/tests/test_obs_client.py b/src/tests/test_obs_client.py index 55062d9..b3b6554 100644 --- a/src/tests/test_obs_client.py +++ b/src/tests/test_obs_client.py @@ -8,7 +8,7 @@ import pytest import conftest -from obs import GetObjectHeader, ObsClient, UploadFileHeader +from obs import CreateBucketHeader, GetObjectHeader, ObsClient, UploadFileHeader from conftest import test_config from obs.const import IS_PYTHON2 @@ -29,6 +29,75 @@ def get_client(self): is_signature_negotiation=False, path_style=path_style) return client_type, uploadClient, downloadClient + def test_create_PFS_bucket(self, delete_bucket_after_test): + _, uploadClient, _ = self.get_client() + bucket_name = test_config["bucket_prefix"] + "create-pfs-001" + delete_bucket_after_test["client"] = uploadClient + delete_bucket_after_test["need_delete_buckets"].append(bucket_name) + create_bucket_header = CreateBucketHeader(isPFS=True) + create_result = uploadClient.createBucket(bucket_name, header=create_bucket_header, + location=test_config["location"]) + assert create_result.status == 200 + + bucket_metadata = uploadClient.getBucketMetadata(bucket_name) + assert bucket_metadata.status == 200 + assert ("fs-file-interface", 'Enabled') in bucket_metadata.header + + def test_create_object_bucket(self, delete_bucket_after_test): + _, uploadClient, _ = self.get_client() + bucket_name = test_config["bucket_prefix"] + "create-pfs-002" + delete_bucket_after_test["client"] = uploadClient + delete_bucket_after_test["need_delete_buckets"].append(bucket_name) + create_bucket_header = CreateBucketHeader(isPFS=False) + create_result = uploadClient.createBucket(bucket_name, header=create_bucket_header, + location=test_config["location"]) + assert create_result.status == 200 + + bucket_metadata = uploadClient.getBucketMetadata(bucket_name) + assert bucket_metadata.status == 200 + assert ("fs-file-interface", 'Enabled') not in bucket_metadata.header + + list_bucket_result = uploadClient.listBuckets(bucketType="OBJECT") + assert list_bucket_result.status == 200 + all_object_buckets = [i["name"] for i in list_bucket_result.body["buckets"]] + assert bucket_name in all_object_buckets + + def test_list_buckets(self, delete_bucket_after_test): + _, uploadClient, _ = self.get_client() + bucket_name = test_config["bucket_prefix"] + "list-pfs-001" + delete_bucket_after_test["client"] = uploadClient + delete_bucket_after_test["need_delete_buckets"].append(bucket_name) + create_bucket_header = CreateBucketHeader(isPFS=True) + create_result = uploadClient.createBucket(bucket_name, header=create_bucket_header, + location=test_config["location"]) + assert create_result.status == 200 + bucket_name2 = test_config["bucket_prefix"] + "list-pfs-002" + delete_bucket_after_test["need_delete_buckets"].append(bucket_name2) + create_result2 = uploadClient.createBucket(bucket_name2, location=test_config["location"]) + assert create_result2.status == 200 + + list_bucket_result = uploadClient.listBuckets(bucketType="POSIX") + assert list_bucket_result.status == 200 + all_pfs_buckets = [i["name"] for i in list_bucket_result.body["buckets"]] + assert bucket_name in all_pfs_buckets + assert bucket_name2 not in all_pfs_buckets + + list_bucket_result2 = uploadClient.listBuckets() + all_buckets = [i["name"] for i in list_bucket_result2.body["buckets"]] + assert bucket_name in all_buckets + assert bucket_name2 in all_buckets + + list_bucket_result3 = uploadClient.listBuckets(bucketType="OBJECT") + assert list_bucket_result3.status == 200 + all_object_buckets = [i["name"] for i in list_bucket_result3.body["buckets"]] + assert bucket_name not in all_object_buckets + assert bucket_name2 in all_object_buckets + + list_bucket_result4 = uploadClient.listBuckets(bucketType="Wrong_Value") + all_buckets2 = [i["name"] for i in list_bucket_result4.body["buckets"]] + assert bucket_name in all_buckets2 + assert bucket_name2 in all_buckets2 + def test_uploadFile_and_getObject_to_file(self, gen_test_file): client_type, uploadClient, downloadClient = self.get_client() object_name = client_type + "test_uploadFile_and_getObject_to_file_" + gen_test_file @@ -206,7 +275,10 @@ def test_uploadFile_with_storage_type(self): checkSum=True, taskNum=10) assert upload_result.status == 200 object_metadata = uploadClient.getObjectMetadata(test_config["bucketName"], object_name) - assert dict(object_metadata.header)["storage-class"] == i + if i == "WARM": + assert dict(object_metadata.header)["storage-class"] in (i, "STANDARD_IA") + else: + assert dict(object_metadata.header)["storage-class"] in (i, "GLACIER") uploadClient.deleteObject(test_config["bucketName"], object_name) os.remove(test_config["path_prefix"] + object_name)