Skip to content

Commit

Permalink
Add async equivalents for API interfacing (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
caspervdw authored Oct 4, 2023
1 parent 944e5e2 commit dc421ef
Show file tree
Hide file tree
Showing 15 changed files with 735 additions and 32 deletions.
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
0.6.5 (unreleased)
------------------

- Nothing changed yet.
- Added async `ApiProvider` and `ApiGateway`.

- Added `request_raw` to `ApiProvider` for handling arbitrary responses.


0.6.4 (2023-10-03)
Expand Down
1 change: 1 addition & 0 deletions clean_python/api_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from .api_provider import * # NOQA
from .exceptions import * # NOQA
from .files import * # NOQA
from .sync_api_provider import * # NOQA
75 changes: 72 additions & 3 deletions clean_python/api_client/api_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,84 @@
import inject

from clean_python import DoesNotExist
from clean_python import Gateway
from clean_python import Id
from clean_python import Json
from clean_python import Mapper
from clean_python import SyncGateway

from .. import SyncGateway
from .api_provider import SyncApiProvider
from .api_provider import ApiProvider
from .exceptions import ApiException
from .sync_api_provider import SyncApiProvider

__all__ = ["SyncApiGateway"]
__all__ = ["ApiGateway", "SyncApiGateway"]


class ApiGateway(Gateway):
path: str
mapper = Mapper()

def __init__(self, provider_override: Optional[ApiProvider] = None):
self.provider_override = provider_override

def __init_subclass__(cls, path: str) -> None:
assert not path.startswith("/")
assert "{id}" in path
cls.path = path
super().__init_subclass__()

@property
def provider(self) -> ApiProvider:
return self.provider_override or inject.instance(ApiProvider)

async def get(self, id: Id) -> Optional[Json]:
try:
result = await self.provider.request("GET", self.path.format(id=id))
assert result is not None
return self.mapper.to_internal(result)
except ApiException as e:
if e.status is HTTPStatus.NOT_FOUND:
return None
raise e

async def add(self, item: Json) -> Json:
item = self.mapper.to_external(item)
result = await self.provider.request("POST", self.path.format(id=""), json=item)
assert result is not None
return self.mapper.to_internal(result)

async def remove(self, id: Id) -> bool:
try:
await self.provider.request("DELETE", self.path.format(id=id)) is not None
except ApiException as e:
if e.status is HTTPStatus.NOT_FOUND:
return False
raise e
else:
return True

async def update(
self, item: Json, if_unmodified_since: Optional[datetime] = None
) -> Json:
if if_unmodified_since is not None:
raise NotImplementedError("if_unmodified_since not implemented")
item = self.mapper.to_external(item)
id_ = item.pop("id", None)
if id_ is None:
raise DoesNotExist("resource", id_)
try:
result = await self.provider.request(
"PATCH", self.path.format(id=id_), json=item
)
assert result is not None
return self.mapper.to_internal(result)
except ApiException as e:
if e.status is HTTPStatus.NOT_FOUND:
raise DoesNotExist("resource", id_)
raise e


# This is a copy-paste of ApiGateway:


class SyncApiGateway(SyncGateway):
Expand Down
98 changes: 74 additions & 24 deletions clean_python/api_client/api_provider.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import json as json_lib
import asyncio
import re
from http import HTTPStatus
from typing import Callable
Expand All @@ -7,16 +7,21 @@
from urllib.parse import urlencode
from urllib.parse import urljoin

import aiohttp
from aiohttp import ClientResponse
from aiohttp import ClientSession
from pydantic import AnyHttpUrl
from urllib3 import PoolManager
from urllib3 import Retry

from clean_python import ctx
from clean_python import Json

from .exceptions import ApiException
from .response import Response

__all__ = ["SyncApiProvider"]
__all__ = ["ApiProvider"]


RETRY_STATUSES = frozenset({413, 429, 503}) # like in urllib3


def is_success(status: HTTPStatus) -> bool:
Expand Down Expand Up @@ -49,7 +54,7 @@ def add_query_params(url: str, params: Optional[Json]) -> str:
return url + "?" + urlencode(params, doseq=True)


class SyncApiProvider:
class ApiProvider:
"""Basic JSON API provider with retry policy and bearer tokens.
The default retry policy has 3 retries with 1, 2, 4 second intervals.
Expand All @@ -64,43 +69,70 @@ class SyncApiProvider:
def __init__(
self,
url: AnyHttpUrl,
fetch_token: Callable[[PoolManager, int], Optional[str]],
fetch_token: Callable[[ClientSession, int], Optional[str]],
retries: int = 3,
backoff_factor: float = 1.0,
):
self._url = str(url)
assert self._url.endswith("/")
self._fetch_token = fetch_token
self._pool = PoolManager(retries=Retry(retries, backoff_factor=backoff_factor))
assert retries > 0
self._retries = retries
self._backoff_factor = backoff_factor
self._session = ClientSession()

def request(
async def _request_with_retry(
self,
method: str,
path: str,
params: Optional[Json] = None,
json: Optional[Json] = None,
fields: Optional[Json] = None,
timeout: float = 5.0,
) -> Optional[Json]:
params: Optional[Json],
json: Optional[Json],
fields: Optional[Json],
timeout: float,
) -> ClientResponse:
assert ctx.tenant is not None
headers = {}
request_kwargs = {
"method": method,
"url": add_query_params(join(self._url, quote(path)), params),
"timeout": timeout,
"json": json,
"data": fields,
}
# for urllib3<2, we dump json ourselves
if json is not None and fields is not None:
raise ValueError("Cannot both specify 'json' and 'fields'")
elif json is not None:
request_kwargs["body"] = json_lib.dumps(json).encode()
headers["Content-Type"] = "application/json"
elif fields is not None:
request_kwargs["fields"] = fields
token = self._fetch_token(self._pool, ctx.tenant.id)
token = self._fetch_token(self._session, ctx.tenant.id)
if token is not None:
headers["Authorization"] = f"Bearer {token}"
response = self._pool.request(headers=headers, **request_kwargs)
for attempt in range(self._retries):
if attempt > 0:
backoff = self._backoff_factor * 2 ** (attempt - 1)
await asyncio.sleep(backoff)

try:
response = await self._session.request(
headers=headers, **request_kwargs
)
await response.read()
except (aiohttp.ClientError, asyncio.exceptions.TimeoutError):
if attempt == self._retries - 1:
raise # propagate ClientError in case no retries left
else:
if response.status not in RETRY_STATUSES:
return response # on all non-retry statuses: return response

return response # retries exceeded; return the (possibly error) response

async def request(
self,
method: str,
path: str,
params: Optional[Json] = None,
json: Optional[Json] = None,
fields: Optional[Json] = None,
timeout: float = 5.0,
) -> Optional[Json]:
response = await self._request_with_retry(
method, path, params, json, fields, timeout
)
status = HTTPStatus(response.status)
content_type = response.headers.get("Content-Type")
if status is HTTPStatus.NO_CONTENT:
Expand All @@ -109,8 +141,26 @@ def request(
raise ApiException(
f"Unexpected content type '{content_type}'", status=status
)
body = json_lib.loads(response.data.decode())
body = await response.json()
if is_success(status):
return body
else:
raise ApiException(body, status=status)

async def request_raw(
self,
method: str,
path: str,
params: Optional[Json] = None,
json: Optional[Json] = None,
fields: Optional[Json] = None,
timeout: float = 5.0,
) -> Response:
response = await self._request_with_retry(
method, path, params, json, fields, timeout
)
return Response(
status=response.status,
data=await response.read(),
content_type=response.headers.get("Content-Type"),
)
12 changes: 12 additions & 0 deletions clean_python/api_client/response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from http import HTTPStatus
from typing import Optional

from clean_python import ValueObject

__all__ = ["Response"]


class Response(ValueObject):
status: HTTPStatus
data: bytes
content_type: Optional[str]
115 changes: 115 additions & 0 deletions clean_python/api_client/sync_api_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import json as json_lib
from http import HTTPStatus
from typing import Callable
from typing import Optional
from urllib.parse import quote

from pydantic import AnyHttpUrl
from urllib3 import PoolManager
from urllib3 import Retry

from clean_python import ctx
from clean_python import Json

from .api_provider import add_query_params
from .api_provider import is_json_content_type
from .api_provider import is_success
from .api_provider import join
from .exceptions import ApiException
from .response import Response

__all__ = ["SyncApiProvider"]


class SyncApiProvider:
"""Basic JSON API provider with retry policy and bearer tokens.
The default retry policy has 3 retries with 1, 2, 4 second intervals.
Args:
url: The url of the API (with trailing slash)
fetch_token: Callable that returns a token for a tenant id
retries: Total number of retries per request
backoff_factor: Multiplier for retry delay times (1, 2, 4, ...)
"""

def __init__(
self,
url: AnyHttpUrl,
fetch_token: Callable[[PoolManager, int], Optional[str]],
retries: int = 3,
backoff_factor: float = 1.0,
):
self._url = str(url)
assert self._url.endswith("/")
self._fetch_token = fetch_token
self._pool = PoolManager(retries=Retry(retries, backoff_factor=backoff_factor))

def _request(
self,
method: str,
path: str,
params: Optional[Json],
json: Optional[Json],
fields: Optional[Json],
timeout: float,
):
assert ctx.tenant is not None
headers = {}
request_kwargs = {
"method": method,
"url": add_query_params(join(self._url, quote(path)), params),
"timeout": timeout,
}
# for urllib3<2, we dump json ourselves
if json is not None and fields is not None:
raise ValueError("Cannot both specify 'json' and 'fields'")
elif json is not None:
request_kwargs["body"] = json_lib.dumps(json).encode()
headers["Content-Type"] = "application/json"
elif fields is not None:
request_kwargs["fields"] = fields
token = self._fetch_token(self._pool, ctx.tenant.id)
if token is not None:
headers["Authorization"] = f"Bearer {token}"
return self._pool.request(headers=headers, **request_kwargs)

def request(
self,
method: str,
path: str,
params: Optional[Json] = None,
json: Optional[Json] = None,
fields: Optional[Json] = None,
timeout: float = 5.0,
) -> Optional[Json]:
response = self._request(method, path, params, json, fields, timeout)
status = HTTPStatus(response.status)
content_type = response.headers.get("Content-Type")
if status is HTTPStatus.NO_CONTENT:
return None
if not is_json_content_type(content_type):
raise ApiException(
f"Unexpected content type '{content_type}'", status=status
)
body = json_lib.loads(response.data.decode())
if is_success(status):
return body
else:
raise ApiException(body, status=status)

def request_raw(
self,
method: str,
path: str,
params: Optional[Json] = None,
json: Optional[Json] = None,
fields: Optional[Json] = None,
timeout: float = 5.0,
) -> Response:
response = self._request(method, path, params, json, fields, timeout)
return Response(
status=response.status,
data=response.data,
content_type=response.headers.get("Content-Type"),
)
Loading

0 comments on commit dc421ef

Please sign in to comment.