From 732a60c2467aad6b1c98c90c7983487aacd6805a Mon Sep 17 00:00:00 2001 From: Evgeniy Kirov Date: Fri, 27 Oct 2023 03:34:56 +0200 Subject: [PATCH] refactor #263, expose get_token method --- HISTORY.md | 6 +- docs/core_api.rst | 4 + pyuploadcare/client.py | 12 +++ pyuploadcare/secure_url.py | 145 +++++++++++++--------------- tests/functional/test_secure_url.py | 48 +++++++++ 5 files changed, 138 insertions(+), 77 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 25f63bb7..0bcac23d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -12,12 +12,16 @@ This update introduces the ability to generate secure URLs with the same signatu Also, `AkamaiSecureUrlBuilderWithUrlToken` class has been implemented (#263). +A new method called `generate_secure_url_token` is exposed for `Uploadcare`. Instead of full URL it will return just the token (unlike the `generate_secure_url` method). + ### Added -- For `Uploadcare` +- For `Uploadcare`: + - added `generate_secure_url_token` method. - added optional `wildcard` parameter to `generate_secure_url` method. - For `AkamaiSecureUrlBuilderWithAclToken`: + - added `get_token` method. - added optional `wildcard` parameter to `build` method. - `AkamaiSecureUrlBuilderWithUrlToken` class. diff --git a/docs/core_api.rst b/docs/core_api.rst index afedd6cc..ca837374 100644 --- a/docs/core_api.rst +++ b/docs/core_api.rst @@ -405,6 +405,10 @@ Generate secure url for file:: secure_url = uploadcare.generate_secure_url('52da3bfc-7cd8-4861-8b05-126fef7a6994') +Generate just the token:: + + token = uploadcare.get_secure_url_token('52da3bfc-7cd8-4861-8b05-126fef7a6994') + Generate secure url for file with transformations:: secure_url = uploadcare.generate_secure_url( diff --git a/pyuploadcare/client.py b/pyuploadcare/client.py index 18be9b2e..c41f12a1 100644 --- a/pyuploadcare/client.py +++ b/pyuploadcare/client.py @@ -825,3 +825,15 @@ def generate_secure_url( raise ValueError("secure_url_builder must be set") return self.secure_url_builder.build(uuid, wildcard=wildcard) + + def generate_secure_url_token( + self, uuid: Union[str, UUID], wildcard: bool = False + ) -> str: + """Generate token for authenticated URL.""" + if isinstance(uuid, UUID): + uuid = str(uuid) + + if not self.secure_url_builder: + raise ValueError("secure_url_builder must be set") + + return self.secure_url_builder.get_token(uuid, wildcard=wildcard) diff --git a/pyuploadcare/secure_url.py b/pyuploadcare/secure_url.py index 1780625c..270f21af 100644 --- a/pyuploadcare/secure_url.py +++ b/pyuploadcare/secure_url.py @@ -12,6 +12,11 @@ class BaseSecureUrlBuilder(ABC): def build(self, uuid: str, wildcard: bool = False) -> str: raise NotImplementedError + def get_token(self, uuid: str, wildcard: bool = False) -> str: + raise NotImplementedError( + f"{self.__class__} doesn't provide get_token()" + ) + class BaseAkamaiSecureUrlBuilder(BaseSecureUrlBuilder): """Akamai secure url builder. @@ -20,6 +25,7 @@ class BaseAkamaiSecureUrlBuilder(BaseSecureUrlBuilder): for more details. """ + template = "{base}?token={token}" field_delimeter = "~" def __init__( @@ -34,15 +40,31 @@ def __init__( self.window = window self.hash_algo = hash_algo + def build(self, uuid: str, wildcard: bool = False) -> str: + uuid_or_url = self._format_uuid_or_url(uuid) + token = self.get_token(uuid_or_url, wildcard=wildcard) + secure_url = self._build_url(uuid_or_url, token) + return secure_url + + def get_token(self, uuid: str, wildcard: bool = False) -> str: + uuid_or_url = self._format_uuid_or_url(uuid) + expire = self._build_expire_time() + acl = self._format_acl(uuid_or_url, wildcard=wildcard) + signature = self._build_signature(uuid_or_url, expire, acl) + token = self._build_token(expire, acl, signature) + return token + def _build_expire_time(self) -> int: return int(time.time()) + self.window def _build_signature( - self, expire: int, acl: Optional[str] = None, url: Optional[str] = None + self, uuid_or_url: str, expire: int, acl: Optional[str] ) -> str: - assert bool(acl) != bool(url) - hash_source = [f"exp={expire}", f"acl={acl}" if acl else f"url={url}"] + hash_source = [ + f"exp={expire}", + f"acl={acl}" if acl else f"url={uuid_or_url}", + ] signature = hmac.new( binascii.a2b_hex(self.secret_key.encode()), @@ -52,101 +74,72 @@ def _build_signature( return signature + def _build_token(self, expire: int, acl: Optional[str], signature: str): -class AkamaiSecureUrlBuilderWithAclToken(BaseAkamaiSecureUrlBuilder): - template = "https://{cdn}/{uuid}/?token={token}" - - def build(self, uuid: str, wildcard: bool = False) -> str: - uuid = uuid.lstrip("/").rstrip("/") - - expire = self._build_expire_time() - - acl = self._format_acl(uuid, wildcard=wildcard) + token_parts = [ + f"exp={expire}", + f"acl={acl}" if acl else None, + f"hmac={signature}", + ] - signature = self._build_signature(expire, acl=acl) + return self.field_delimeter.join( + part for part in token_parts if part is not None + ) - secure_url = self._build_url(uuid, expire, acl, signature) - return secure_url + @abstractmethod + def _build_base_url(self, uuid_or_url: str): + raise NotImplementedError def _build_url( self, - uuid: str, - expire: int, - acl: str, - signature: str, + uuid_or_url: str, + token: str, ) -> str: - req_parameters = [ - f"exp={expire}", - f"acl={acl}", - f"hmac={signature}", - ] - - token = self.field_delimeter.join(req_parameters) - + base_url = self._build_base_url(uuid_or_url) return self.template.format( - cdn=self.cdn_url, - uuid=uuid, + base=base_url, token=token, ) - def _build_token(self, expire: int, acl: Optional[str], signature: str): - token_parts = [ - f"exp={expire}", - f"acl={acl}", - f"hmac={signature}", - ] - return self.field_delimeter.join(token_parts) + @abstractmethod + def _format_acl(self, uuid_or_url: str, wildcard: bool) -> Optional[str]: + raise NotImplementedError + + @abstractmethod + def _format_uuid_or_url(self, uuid_or_url: str) -> str: + raise NotImplementedError + + +class AkamaiSecureUrlBuilderWithAclToken(BaseAkamaiSecureUrlBuilder): + base_template = "https://{cdn}/{uuid}/" + + def _build_base_url(self, uuid_or_url: str): + return self.base_template.format(cdn=self.cdn_url, uuid=uuid_or_url) - def _format_acl(self, uuid: str, wildcard: bool) -> str: + def _format_acl(self, uuid_or_url: str, wildcard: bool) -> str: if wildcard: - return f"/{uuid}/*" - return f"/{uuid}/" + return f"/{uuid_or_url}/*" + return f"/{uuid_or_url}/" + + def _format_uuid_or_url(self, uuid_or_url: str) -> str: + return uuid_or_url.lstrip("/").rstrip("/") class AkamaiSecureUrlBuilderWithUrlToken(BaseAkamaiSecureUrlBuilder): - template = "{url}?token={token}" + def _build_base_url(self, uuid_or_url: str): + return uuid_or_url - def build(self, uuid: str, wildcard: bool = False) -> str: + def _format_acl(self, uuid_or_url: str, wildcard: bool) -> None: if wildcard: raise ValueError( "Wildcards are not supported in AkamaiSecureUrlBuilderWithUrlToken." ) + return None - url = uuid - - expire = self._build_expire_time() - - signature = self._build_signature(expire, url=url) - - secure_url = self._build_url(url, expire, signature) - - return secure_url - - def _build_url( - self, - url: str, - expire: int, - signature: str, - ) -> str: - req_parameters = [ - f"exp={expire}", - f"hmac={signature}", - ] - - token = self.field_delimeter.join(req_parameters) - - return self.template.format( - url=url, - token=token, - ) - - def _build_token(self, expire: int, url: str, signature: str): - token_parts = [ - f"exp={expire}", - f"url={url}", - f"hmac={signature}", - ] - return self.field_delimeter.join(token_parts) + def _format_uuid_or_url(self, uuid_or_url: str) -> str: + if "://" not in uuid_or_url: + raise ValueError(f"{uuid_or_url} doesn't look like a URL") + return uuid_or_url class AkamaiSecureUrlBuilder(AkamaiSecureUrlBuilderWithAclToken): diff --git a/tests/functional/test_secure_url.py b/tests/functional/test_secure_url.py index 0acbc34e..a7d9106c 100644 --- a/tests/functional/test_secure_url.py +++ b/tests/functional/test_secure_url.py @@ -12,6 +12,19 @@ ) +@pytest.mark.freeze_time("2021-10-12") +def test_get_acl_token(): + secure_url_bulder = AkamaiSecureUrlBuilderWithAclToken( + "cdn.yourdomain.com", known_secret + ) + token = secure_url_bulder.get_token("52da3bfc-7cd8-4861-8b05-126fef7a6994") + assert token == ( + "exp=1633997100~" + "acl=/52da3bfc-7cd8-4861-8b05-126fef7a6994/~" + "hmac=81852547d9dbd9eefd24bee2cada6eab02244b9013533bc8511511923098df72" + ) + + @pytest.mark.freeze_time("2021-10-12") def test_generate_secure_url_acl_token(): secure_url_bulder = AkamaiSecureUrlBuilderWithAclToken( @@ -83,6 +96,41 @@ def test_client_generate_secure_url_acl_token(): ) +@pytest.mark.freeze_time("2021-10-12") +def test_client_generate_secure_url_token_acl_token(): + secure_url_bulder = AkamaiSecureUrlBuilderWithAclToken( + "cdn.yourdomain.com", known_secret + ) + + uploadcare = Uploadcare( + public_key="public", + secret_key="secret", + secure_url_builder=secure_url_bulder, + ) + secure_url = uploadcare.generate_secure_url_token( + "52da3bfc-7cd8-4861-8b05-126fef7a6994" + ) + assert secure_url == ( + "exp=1633997100~" + "acl=/52da3bfc-7cd8-4861-8b05-126fef7a6994/~" + "hmac=81852547d9dbd9eefd24bee2cada6eab02244b9013533bc8511511923098df72" + ) + + +@pytest.mark.freeze_time("2021-10-12") +def test_get_url_token(): + secure_url_bulder = AkamaiSecureUrlBuilderWithUrlToken( + "cdn.yourdomain.com", known_secret + ) + token = secure_url_bulder.get_token( + "https://cdn.yourdomain.com/52da3bfc-7cd8-4861-8b05-126fef7a6994/" + ) + assert token == ( + "exp=1633997100~" + "hmac=32b696b855ddc911b366f11dcecb75789adf6211a72c1dbdf234b83f22aaa368" + ) + + @pytest.mark.freeze_time("2021-10-12") def test_generate_secure_url_url_token(): secure_url_bulder = AkamaiSecureUrlBuilderWithUrlToken(