Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic management #77

Merged
merged 11 commits into from
Mar 20, 2024
81 changes: 64 additions & 17 deletions async_firebase/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@
from google.oauth2 import service_account # type: ignore

from async_firebase._config import DEFAULT_REQUEST_LIMITS, DEFAULT_REQUEST_TIMEOUT, RequestLimits, RequestTimeout
from async_firebase.messages import FCMBatchResponse, FCMResponse
from async_firebase.utils import FCMBatchResponseHandler, FCMResponseHandler, join_url, serialize_mime_message
from async_firebase.messages import FCMBatchResponse, FCMResponse, TopicManagementResponse
from async_firebase.utils import (
FCMBatchResponseHandler,
FCMResponseHandler,
TopicManagementResponseHandler,
join_url,
serialize_mime_message,
)


class AsyncClientBase:
Expand All @@ -32,6 +38,10 @@ class AsyncClientBase:
SCOPES: t.List[str] = [
"https://www.googleapis.com/auth/cloud-platform",
]
IID_URL = "https://iid.googleapis.com"
IID_HEADERS = {"access_token_auth": "true"}
TOPIC_ADD_ACTION = "iid/v1:batchAdd"
TOPIC_REMOVE_ACTION = "iid/v1:batchRemove"

def __init__(
self,
Expand Down Expand Up @@ -162,25 +172,14 @@ async def prepare_headers(self) -> t.Dict[str, str]:
"X-FIREBASE-CLIENT": "async-firebase/{0}".format(version("async-firebase")),
}

async def send_request(
async def _send_request(
self,
uri: str,
response_handler: t.Union[FCMResponseHandler, FCMBatchResponseHandler],
url: str,
response_handler: t.Union[FCMResponseHandler, FCMBatchResponseHandler, TopicManagementResponseHandler],
json_payload: t.Optional[t.Dict[str, t.Any]] = None,
headers: t.Optional[t.Dict[str, str]] = None,
content: t.Union[str, bytes, t.Iterable[bytes], t.AsyncIterable[bytes], None] = None,
) -> t.Union[FCMResponse, FCMBatchResponse]:
"""
Sends an HTTP call using the ``httpx`` library.

:param uri: URI to be requested.
:param response_handler: the model to handle response.
:param json_payload: request JSON payload
:param headers: request headers.
:param content: request content
:return: HTTP response
"""
url = join_url(self.BASE_URL, uri)
) -> t.Union[FCMResponse, FCMBatchResponse, TopicManagementResponse]:
logging.debug(
"Requesting POST %s, payload: %s, content: %s, headers: %s",
url,
Expand All @@ -207,3 +206,51 @@ async def send_request(
response = response_handler.handle_response(raw_fcm_response)

return response

async def send_request(
self,
uri: str,
response_handler: t.Union[FCMResponseHandler, FCMBatchResponseHandler],
json_payload: t.Optional[t.Dict[str, t.Any]] = None,
headers: t.Optional[t.Dict[str, str]] = None,
content: t.Union[str, bytes, t.Iterable[bytes], t.AsyncIterable[bytes], None] = None,
) -> t.Union[FCMResponse, FCMBatchResponse]:
"""
Sends an HTTP call using the ``httpx`` library to FCM.

:param uri: URI to be requested.
:param response_handler: the model to handle response.
:param json_payload: request JSON payload
:param headers: request headers.
:param content: request content
:return: HTTP response
"""
url = join_url(self.BASE_URL, uri)
return await self._send_request( # type: ignore
url=url, response_handler=response_handler, json_payload=json_payload, headers=headers, content=content
)

async def send_iid_request(
self,
uri: str,
response_handler: TopicManagementResponseHandler,
json_payload: t.Optional[t.Dict[str, t.Any]] = None,
headers: t.Optional[t.Dict[str, str]] = None,
content: t.Union[str, bytes, t.Iterable[bytes], t.AsyncIterable[bytes], None] = None,
) -> TopicManagementResponse:
"""
Sends an HTTP call using the ``httpx`` library to the IID service for topic management functionality.

:param uri: URI to be requested.
:param response_handler: the model to handle response.
:param json_payload: request JSON payload
:param headers: request headers.
:param content: request content
:return: HTTP response
"""
url = join_url(self.IID_URL, uri)
headers = headers or await self.prepare_headers()
headers.update(self.IID_HEADERS)
return await self._send_request( # type: ignore
url=url, response_handler=response_handler, json_payload=json_payload, headers=headers, content=content
)
42 changes: 42 additions & 0 deletions async_firebase/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
Message,
MulticastMessage,
PushNotification,
TopicManagementResponse,
WebpushConfig,
WebpushFCMOptions,
WebpushNotification,
Expand All @@ -40,6 +41,7 @@
from async_firebase.utils import (
FCMBatchResponseHandler,
FCMResponseHandler,
TopicManagementResponseHandler,
cleanup_firebase_message,
serialize_mime_message,
)
Expand Down Expand Up @@ -547,3 +549,43 @@ async def send_each_for_multicast(
]

return await self.send_each(messages, dry_run=dry_run)

async def _make_topic_management_request(
self, device_tokens: t.List[str], topic_name: str, action: str
) -> TopicManagementResponse:
payload = {
"to": f"/topics/{topic_name}",
"registration_tokens": device_tokens,
}
response = await self.send_iid_request(
uri=action,
json_payload=payload,
response_handler=TopicManagementResponseHandler(),
)
return response

async def subscribe_devices_to_topic(self, device_tokens: t.List[str], topic_name: str) -> TopicManagementResponse:
"""
Subscribes devices to the topic.

:param device_tokens: devices ids to be subscribed.
:param topic_name: name of the topic.
:returns: Instance of messages.TopicManagementResponse.
"""
return await self._make_topic_management_request(
device_tokens=device_tokens, topic_name=topic_name, action=self.TOPIC_ADD_ACTION
)

async def unsubscribe_devices_from_topic(
self, device_tokens: t.List[str], topic_name: str
) -> TopicManagementResponse:
"""
Unsubscribes devices from the topic.

:param device_tokens: devices ids to be unsubscribed.
:param topic_name: name of the topic.
:returns: Instance of messages.TopicManagementResponse.
"""
return await self._make_topic_management_request(
device_tokens=device_tokens, topic_name=topic_name, action=self.TOPIC_REMOVE_ACTION
)
74 changes: 74 additions & 0 deletions async_firebase/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import typing as t
from dataclasses import dataclass, field

import httpx

from async_firebase.errors import AsyncFirebaseError


Expand Down Expand Up @@ -280,6 +282,17 @@ class WebpushConfig:
fcm_options: t.Optional[WebpushFCMOptions] = field(default=None)


@dataclass
class FcmOptions:
"""
Platform independent options for features provided by the FCM SDKs
Arguments:
analytics_label: Label associated with the message's analytics data.
"""

analytics_label: str


@dataclass
class Message:
"""
Expand All @@ -297,6 +310,7 @@ class Message:
token: the registration token of the device to which the message should be sent.
topic: name of the Firebase topic to which the message should be sent (optional).
condition: the Firebase condition to which the message should be sent (optional).
fcm_options: platform independent options for features provided by the FCM SDKs.
"""

token: t.Optional[str] = None
Expand All @@ -307,6 +321,7 @@ class Message:
apns: t.Optional[APNSConfig] = field(default=None)
topic: t.Optional[str] = None
condition: t.Optional[str] = None
fcm_options: t.Optional[FcmOptions] = None


@dataclass
Expand Down Expand Up @@ -395,3 +410,62 @@ def success_count(self):
@property
def failure_count(self):
return len(self.responses) - self.success_count


class TopicManagementErrorInfo:
"""An error encountered when performing a topic management operation."""

def __init__(self, index, reason):
self._index = index
self._reason = reason

@property
def index(self):
"""Index of the registration token to which this error is related to."""
return self._index

@property
def reason(self):
"""String describing the nature of the error."""
return self._reason


class TopicManagementResponse:
"""The response received from a topic management operation."""

def __init__(self, resp: t.Optional[httpx.Response] = None, exception: t.Optional[AsyncFirebaseError] = None):
self.exception = exception
self._success_count = 0
self._failure_count = 0
self._errors: t.List[TopicManagementErrorInfo] = []

if resp:
self._handle_response(resp)

def _handle_response(self, resp: httpx.Response):
response = resp.json()
results = response.get("results")
if not results:
raise ValueError("Unexpected topic management response: {0}.".format(resp))

for index, result in enumerate(results):
if "error" in result:
self._failure_count += 1
self._errors.append(TopicManagementErrorInfo(index, result["error"]))
else:
self._success_count += 1

@property
def success_count(self):
"""Number of tokens that were successfully subscribed or unsubscribed."""
return self._success_count

@property
def failure_count(self):
"""Number of tokens that could not be subscribed or unsubscribed due to errors."""
return self._failure_count

@property
def errors(self):
"""A list of ``messaging.ErrorInfo`` objects (possibly empty)."""
return self._errors
21 changes: 19 additions & 2 deletions async_firebase/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
UnknownError,
UnregisteredError,
)
from async_firebase.messages import FCMBatchResponse, FCMResponse
from async_firebase.messages import FCMBatchResponse, FCMResponse, TopicManagementResponse


def join_url(
Expand Down Expand Up @@ -158,7 +158,7 @@ def serialize_mime_message(
return fp.getvalue()


FCMResponseType = t.TypeVar("FCMResponseType", FCMResponse, FCMBatchResponse)
FCMResponseType = t.TypeVar("FCMResponseType", FCMResponse, FCMBatchResponse, TopicManagementResponse)


class FCMResponseHandlerBase(ABC, t.Generic[FCMResponseType]):
Expand Down Expand Up @@ -355,3 +355,20 @@ def _deserialize_batch_response(response: httpx.Response) -> t.List[httpx.Respon
responses.append(resp)

return responses


class TopicManagementResponseHandler(FCMResponseHandlerBase[TopicManagementResponse]):
def handle_error(self, error: httpx.HTTPError) -> TopicManagementResponse:
exc = (
(isinstance(error, httpx.HTTPStatusError) and self._handle_fcm_error(error))
or (isinstance(error, httpx.HTTPError) and self._handle_request_error(error))
or AsyncFirebaseError(
code=FcmErrorCode.UNKNOWN.value,
message="Unexpected error has happened when hitting the FCM API",
cause=error,
)
)
return TopicManagementResponse(exception=exc)

def handle_response(self, response: httpx.Response) -> TopicManagementResponse:
return TopicManagementResponse(response)
Loading
Loading