Skip to content

Commit

Permalink
update 3.24.6
Browse files Browse the repository at this point in the history
  • Loading branch information
noaccident committed Jul 1, 2024
1 parent c58bc2e commit 0da1595
Show file tree
Hide file tree
Showing 9 changed files with 730 additions and 105 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
Version 3.24.6

New Features:

1. Supported check data integrity by crc64, including ObsClient.putFile,ObsClient.putContent,ObsClient.appendObject,ObsClient.uploadPart,ObsClient.completeMultipartUpload.

-------------------------------------------------------------------------------------------------
Version 3.24.3

New Features:
Expand Down
9 changes: 8 additions & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
Version 3.24.3
Version 3.24.6

新特性:

1. 支持crc64校验(ObsClient.putFile/ObsClient.putContent/ObsClient.appendObject/ObsClient.uploadPart/ObsClient.completeMultipartUpload)

-------------------------------------------------------------------------------------------------
Version 3.24.3

新特性:

Expand Down
50 changes: 28 additions & 22 deletions src/obs/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1692,7 +1692,7 @@ def appendObject(self, bucketName, objectKey, content=None, metadata=None, heade
headers, readable, notifier, entity = self._prepare_file_notifier_and_entity(offset, file_size, headers,
progressCallback, file_path,
readable)
headers = self.convertor.trans_put_object(metadata=metadata, headers=headers)
headers = self.convertor.trans_put_object(metadata=metadata, headers=headers, file_path=file_path)
self.log_client.log(DEBUG, 'send Path:%s' % file_path)
else:
entity = content.get('content')
Expand All @@ -1701,7 +1701,7 @@ def appendObject(self, bucketName, objectKey, content=None, metadata=None, heade
autoClose, readable,
chunkedMode, notifier)

headers = self.convertor.trans_put_object(metadata=metadata, headers=headers)
headers = self.convertor.trans_put_object(metadata=metadata, headers=headers, content=content.get('content'))

try:
if notifier is not None:
Expand All @@ -1727,7 +1727,7 @@ def putContent(self, bucketName, objectKey, content=None, metadata=None, headers
headers = PutObjectHeader()
if headers.get('contentType') is None:
headers['contentType'] = const.MIME_TYPES.get(objectKey[objectKey.rfind('.') + 1:].lower())
_headers = self.convertor.trans_put_object(metadata=metadata, headers=headers)
_headers = self.convertor.trans_put_object(metadata=metadata, headers=headers, content=content)

readable = False
chunkedMode = False
Expand Down Expand Up @@ -1789,12 +1789,12 @@ def putFile(self, bucketName, objectKey, file_path, metadata=None, headers=None,
__file_path = os.path.join(file_path, f)
if not const.IS_PYTHON2:
if not objectKey:
key = util.safe_trans_to_gb2312('{0}/'.format(os.path.split(file_path)[1]) + f)
key = util.safe_trans_to_gb2312('{0}'.format(os.path.split(file_path)[1]) + f)
else:
key = '{0}/'.format(objectKey) + util.safe_trans_to_gb2312(f)
else:
if not objectKey:
key = util.safe_trans_to_gb2312('{0}/'.format(os.path.split(file_path)[1]) + f).decode(
key = util.safe_trans_to_gb2312('{0}'.format(os.path.split(file_path)[1]) + f).decode(
'GB2312').encode('UTF-8')
else:
key = '{0}/'.format(objectKey) + util.safe_trans_to_gb2312(f).decode('GB2312').encode('UTF-8')
Expand All @@ -1810,7 +1810,7 @@ def putFile(self, bucketName, objectKey, file_path, metadata=None, headers=None,

headers = self._putFileHandleHeader(headers, size, objectKey, file_path)

_headers = self.convertor.trans_put_object(metadata=metadata, headers=headers)
_headers = self.convertor.trans_put_object(metadata=metadata, headers=headers, file_path=file_path)
if const.CONTENT_LENGTH_HEADER not in _headers:
_headers[const.CONTENT_LENGTH_HEADER] = util.to_string(size)
self.log_client.log(DEBUG, 'send Path:%s' % file_path)
Expand Down Expand Up @@ -1857,13 +1857,16 @@ def _get_part_size(partSize, file_size, offset):
partSize = partSize if partSize is not None and 0 < partSize <= (file_size - offset) else file_size - offset
return partSize

def _prepare_headers(self, md5, isAttachMd5, file_path, partSize, offset, sseHeader, headers):
def _prepare_headers(self, md5, isAttachMd5, crc64, isAttachCrc64, file_path, partSize, offset, sseHeader, headers):
if md5:
headers[const.CONTENT_MD5_HEADER] = md5
elif isAttachMd5:
headers[const.CONTENT_MD5_HEADER] = util.base64_encode(
util.md5_file_encode_by_size_offset(file_path, partSize, offset, self.chunk_size))

if crc64:
self.convertor._put_key_value(headers, self.ha.crc64_header(), crc64)
elif isAttachCrc64:
self.convertor._put_key_value(headers, self.ha.crc64_header(), util.calculate_file_crc64(file_path, offset=offset, totalCount=partSize))
if sseHeader is not None:
self.convertor._set_sse_header(sseHeader, headers, True)

Expand All @@ -1879,12 +1882,15 @@ def _prepare_upload_part_notifier(partSize, progressCallback, readable):

return readable, notifier

def _get_headers(self, md5, sseHeader, headers):
def _get_headers(self, md5, crc64, isAttachCrc64, content, sseHeader, headers):
if md5:
headers[const.CONTENT_MD5_HEADER] = md5
if sseHeader is not None:
self.convertor._set_sse_header(sseHeader, headers, True)

if crc64:
headers[self.ha.crc64_header()] = crc64
elif isAttachCrc64:
headers[self.ha.crc64_header()] = util.calculate_content_crc64(util.covert_string_to_bytes(content))
return headers

@staticmethod
Expand All @@ -1911,7 +1917,7 @@ def _check_file_part_info(self, file_path, offset, partSize):
@funcCache
def uploadPart(self, bucketName, objectKey, partNumber, uploadId, object=None, isFile=False, partSize=None,
offset=0, sseHeader=None, isAttachMd5=False, md5=None, content=None, progressCallback=None,
autoClose=True, extensionHeaders=None):
autoClose=True, isAttachCrc64=False, crc64=None, extensionHeaders=None):
self._assert_not_null(partNumber, 'partNumber is empty')
self._assert_not_null(uploadId, 'uploadId is empty')

Expand All @@ -1926,7 +1932,7 @@ def uploadPart(self, bucketName, objectKey, partNumber, uploadId, object=None, i
checked_file_part_info = self._check_file_part_info(content, offset, partSize)

headers = {const.CONTENT_LENGTH_HEADER: util.to_string(checked_file_part_info["partSize"])}
headers = self._prepare_headers(md5, isAttachMd5, checked_file_part_info["file_path"],
headers = self._prepare_headers(md5, isAttachMd5, crc64, isAttachCrc64, checked_file_part_info["file_path"],
checked_file_part_info["partSize"], checked_file_part_info["offset"],
sseHeader, headers)

Expand All @@ -1938,7 +1944,7 @@ def uploadPart(self, bucketName, objectKey, partNumber, uploadId, object=None, i
headers = {}
if content is not None and hasattr(content, 'read') and callable(content.read):
readable = True
headers = self._get_headers(md5, sseHeader, headers)
headers = self._get_headers(md5, crc64, isAttachCrc64, content, sseHeader, headers)

if partSize is None:
self.log_client.log(DEBUG, 'missing partSize when uploading a readable stream')
Expand All @@ -1956,7 +1962,7 @@ def uploadPart(self, bucketName, objectKey, partNumber, uploadId, object=None, i
entity = content
if entity is None:
entity = ''
headers = self._get_headers(md5, sseHeader, headers)
headers = self._get_headers(md5, crc64, isAttachCrc64, content, sseHeader, headers)

try:
if notifier is not None:
Expand All @@ -1982,7 +1988,7 @@ def check_file_path(file_path):
@funcCache
def _uploadPartWithNotifier(self, bucketName, objectKey, partNumber, uploadId, content=None, isFile=False,
partSize=None, offset=0, sseHeader=None, isAttachMd5=False, md5=None, notifier=None,
extensionHeaders=None, headers=None):
extensionHeaders=None, headers=None, isAttachCrc64=False, crc64=None):
self._assert_not_null(partNumber, 'partNumber is empty')
self._assert_not_null(uploadId, 'uploadId is empty')

Expand All @@ -1994,7 +2000,7 @@ def _uploadPartWithNotifier(self, bucketName, objectKey, partNumber, uploadId, c
checked_file_part_info = self._check_file_part_info(content, offset, partSize)

headers[const.CONTENT_LENGTH_HEADER] = util.to_string(checked_file_part_info["partSize"])
headers = self._prepare_headers(md5, isAttachMd5, checked_file_part_info["file_path"],
headers = self._prepare_headers(md5, isAttachMd5, crc64, isAttachCrc64, checked_file_part_info["file_path"],
checked_file_part_info["partSize"], checked_file_part_info["offset"],
sseHeader, headers)

Expand All @@ -2005,7 +2011,7 @@ def _uploadPartWithNotifier(self, bucketName, objectKey, partNumber, uploadId, c
else:
if content is not None and hasattr(content, 'read') and callable(content.read):
readable = True
headers = self._get_headers(md5, sseHeader, headers)
headers = self._get_headers(md5, crc64, isAttachCrc64, content, sseHeader, headers)

if partSize is None:
chunkedMode = True
Expand All @@ -2018,7 +2024,7 @@ def _uploadPartWithNotifier(self, bucketName, objectKey, partNumber, uploadId, c
entity = content
if entity is None:
entity = ''
headers = self._get_headers(md5, sseHeader, headers)
headers = self._get_headers(md5, crc64, isAttachCrc64, content, sseHeader, headers)

ret = self._make_put_request(bucketName, objectKey, pathArgs={'partNumber': partNumber, 'uploadId': uploadId},
headers=headers, entity=entity, chunkedMode=chunkedMode, methodName='uploadPart',
Expand Down Expand Up @@ -2132,16 +2138,16 @@ def copyPart(self, bucketName, objectKey, partNumber, uploadId, copySource, copy

@funcCache
def completeMultipartUpload(self, bucketName, objectKey, uploadId, completeMultipartUploadRequest,
extensionHeaders=None, encoding_type=None):
isAttachCrc64=False, extensionHeaders=None, encoding_type=None):
self._assert_not_null(uploadId, 'uploadId is empty')
self._assert_not_null(completeMultipartUploadRequest, 'completeMultipartUploadRequest is empty')
pathArgs = {'uploadId': uploadId}
if encoding_type is not None:
pathArgs["encoding-type"] = encoding_type
entity, headers = self.convertor.trans_complete_multipart_upload_request(completeMultipartUploadRequest, isAttachCrc64)
ret = self._make_post_request(bucketName, objectKey,
pathArgs=pathArgs,
entity=self.convertor.trans_complete_multipart_upload_request(
completeMultipartUploadRequest), methodName='completeMultipartUpload',
pathArgs=pathArgs, headers=headers,
entity=entity, methodName='completeMultipartUpload',
extensionHeaders=extensionHeaders)
self._generate_object_url(ret, bucketName, objectKey)
return ret
Expand Down
3 changes: 2 additions & 1 deletion src/obs/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
DEFAULT_TASK_NUM = 8
DEFAULT_TASK_QUEUE_SIZE = 20000

OBS_SDK_VERSION = '3.24.3'
OBS_SDK_VERSION = '3.24.6'

V2_META_HEADER_PREFIX = 'x-amz-meta-'
V2_HEADER_PREFIX = 'x-amz-'
Expand Down Expand Up @@ -223,6 +223,7 @@
ALLOWED_RESPONSE_HTTP_HEADER_METADATA_NAMES = (
'content-type',
'content-md5',
'checksum-crc64ecma',
'content-length',
'content-language',
'expires',
Expand Down
34 changes: 29 additions & 5 deletions src/obs/convertor.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ def security_token_header(self):
def content_sha256_header(self):
return self._get_header_prefix() + 'content-sha256'

def crc64_header(self):
return self._get_header_prefix() + 'checksum-crc64ecma'

def default_storage_class_header(self):
return self._get_header_prefix() + 'storage-class' if self.is_obs else 'x-default-storage-class'

Expand Down Expand Up @@ -680,16 +683,21 @@ def _set_configuration(config_type, urn_type):

return ET.tostring(root, 'UTF-8')

@staticmethod
def trans_complete_multipart_upload_request(completeMultipartUploadRequest):
def trans_complete_multipart_upload_request(self, completeMultipartUploadRequest, isAttachCrc64):
root = ET.Element('CompleteMultipartUpload')
headers = {}

parts = [] if completeMultipartUploadRequest.get('parts') is None else (
sorted(completeMultipartUploadRequest['parts'], key=lambda d: d.partNum))
if isAttachCrc64:
object_crc = util.calc_obj_crc_from_parts(parts)
self._put_key_value(headers, self.ha.crc64_header(), object_crc)

for obj in parts:
partEle = ET.SubElement(root, 'Part')
ET.SubElement(partEle, 'PartNumber').text = util.to_string(obj.get('partNum'))
ET.SubElement(partEle, 'ETag').text = util.to_string(obj.get('etag'))
return ET.tostring(root, 'UTF-8')
return (ET.tostring(root, 'UTF-8'), headers)

def trans_restore_object(self, **kwargs):
pathArgs = {'restore': None}
Expand Down Expand Up @@ -801,6 +809,8 @@ def trans_restore(self, days, tier):

def trans_put_object(self, **kwargs):
_headers = {}
file_path = kwargs.get('file_path')
content = kwargs.get('content')
metadata = kwargs.get('metadata')
headers = kwargs.get('headers')
if metadata is not None:
Expand All @@ -819,7 +829,14 @@ def trans_put_object(self, **kwargs):
self.ha.adapt_storage_class(headers.get('storageClass')))
self._put_key_value(_headers, const.CONTENT_LENGTH_HEADER, headers.get('contentLength'))
self._put_key_value(_headers, self.ha.expires_header(), headers.get('expires'))

if headers.get('crc64') is not None:
self._put_key_value(_headers, self.ha.crc64_header(), headers.get('crc64'))
elif headers.get('isAttachCrc64'):
if file_path:
crc64 = util.calculate_file_crc64(file_path)
else:
crc64 = util.calculate_content_crc64(util.covert_string_to_bytes(content))
self._put_key_value(_headers, self.ha.crc64_header(), crc64)
if self.is_obs:
self._put_key_value(_headers, self.ha.success_action_redirect_header(),
headers.get('successActionRedirect'))
Expand Down Expand Up @@ -917,7 +934,7 @@ def trans_copy_object(self, **kwargs):
self._put_key_value(_headers, self.ha.acl_header(), self.ha.adapt_acl_control(headers.get('acl')))
self._put_key_value(_headers, self.ha.storage_class_header(),
self.ha.adapt_storage_class(headers.get('storageClass')))

self._put_key_value(_headers, self.ha.crc64_header(), headers.get('crc64'))
self._put_key_value(_headers, self.ha.metadata_directive_header(), headers.get('directive'))
self._put_key_value(_headers, self.ha.copy_source_if_match_header(), headers.get('if_match'))
self._put_key_value(_headers, self.ha.copy_source_if_none_match_header(), headers.get('if_none_match'))
Expand Down Expand Up @@ -1711,6 +1728,7 @@ def parseCompleteMultipartUpload(self, xml, headers=None):
completeMultipartUploadResponse.sseKms = headers.get(self.ha.sse_kms_header())
completeMultipartUploadResponse.sseKmsKey = headers.get(self.ha.sse_kms_key_header())
completeMultipartUploadResponse.sseC = headers.get(self.ha.sse_c_header())
completeMultipartUploadResponse.crc64 = headers.get(self.ha.crc64_header())
completeMultipartUploadResponse.sseCKeyMd5 = headers.get(self.ha.sse_c_key_md5_header().lower())

return completeMultipartUploadResponse
Expand Down Expand Up @@ -1841,6 +1859,7 @@ def parsePutContent(self, headers):
option.sseC = headers.get(self.ha.sse_c_header())
option.sseCKeyMd5 = headers.get(self.ha.sse_c_key_md5_header().lower())
option.etag = headers.get(const.ETAG_HEADER.lower())
option.crc64 = headers.get(self.ha.crc64_header())
return option

def parseAppendObject(self, headers):
Expand All @@ -1852,6 +1871,7 @@ def parseAppendObject(self, headers):
option.sseCKeyMd5 = headers.get(self.ha.sse_c_key_md5_header().lower())
option.etag = headers.get(const.ETAG_HEADER.lower())
option.nextPosition = util.to_long(headers.get(self.ha.next_position_header()))
option.crc64 = headers.get(self.ha.crc64_header())
return option

def parseInitiateMultipartUpload(self, xml, headers=None):
Expand Down Expand Up @@ -1902,6 +1922,8 @@ def _parseGetObjectCommonHeader(self, headers, option):
option.contentLength = util.to_long(headers.get(const.CONTENT_LENGTH_HEADER.lower()))
option.contentType = headers.get(const.CONTENT_TYPE_HEADER.lower())
option.lastModified = headers.get(const.LAST_MODIFIED_HEADER.lower())
option.crc64 = headers.get(self.ha.crc64_header())


def parseGetObjectMetadata(self, headers):
option = GetObjectMetadataResponse()
Expand Down Expand Up @@ -1940,6 +1962,7 @@ def parseUploadPart(self, headers):
uploadPartResponse.sseKmsKey = headers.get(self.ha.sse_kms_key_header())
uploadPartResponse.sseC = headers.get(self.ha.sse_c_header())
uploadPartResponse.sseCKeyMd5 = headers.get(self.ha.sse_c_key_md5_header().lower())
uploadPartResponse.crc64 = headers.get(self.ha.crc64_header())
return uploadPartResponse

def parseCopyPart(self, xml, headers=None):
Expand All @@ -1952,6 +1975,7 @@ def parseCopyPart(self, xml, headers=None):
copyPartResponse.sseKmsKey = headers.get(self.ha.sse_kms_key_header())
copyPartResponse.sseC = headers.get(self.ha.sse_c_header())
copyPartResponse.sseCKeyMd5 = headers.get(self.ha.sse_c_key_md5_header().lower())
copyPartResponse.crc64 = headers.get(self.ha.crc64_header())
return copyPartResponse

def parseGetBucketReplication(self, xml, headers=None):
Expand Down
Loading

0 comments on commit 0da1595

Please sign in to comment.