From 3d65c767ebc0044c966a6180add304e4fc7afc13 Mon Sep 17 00:00:00 2001 From: David Lev Date: Sun, 2 Jun 2024 15:21:13 +0300 Subject: [PATCH] [server] improve continue/stop handling --- docs/source/content/examples/sign_up_flow.rst | 2 +- docs/source/content/handlers/overview.rst | 80 ++++++++++++++++++- docs/source/content/types/others.rst | 6 ++ pywa/handlers.py | 13 +-- pywa/server.py | 28 ++++--- 5 files changed, 107 insertions(+), 22 deletions(-) diff --git a/docs/source/content/examples/sign_up_flow.rst b/docs/source/content/examples/sign_up_flow.rst index a2a6c0c..1fc437b 100644 --- a/docs/source/content/examples/sign_up_flow.rst +++ b/docs/source/content/examples/sign_up_flow.rst @@ -1102,7 +1102,7 @@ Sending the Flow To send the flow we need to initialize the :class:`~pywa.client.WhatsApp` client with some specific parameters: .. code-block:: python - :title: wa.py + :caption: main.py :linenos: import fastapi diff --git a/docs/source/content/handlers/overview.rst b/docs/source/content/handlers/overview.rst index 7d63022..5f24532 100644 --- a/docs/source/content/handlers/overview.rst +++ b/docs/source/content/handlers/overview.rst @@ -50,7 +50,8 @@ See `Here `_. +Stop or continue handling updates +_________________________________ + +When a handler is called, when it finishes, in default, the next handler will be called. + +.. code-block:: python + :caption: main.py + :linenos: + + from pywa import WhatsApp + from pywa.types import Message + + wa = WhatsApp(...) + + @wa.on_message() + def handle_message(client: WhatsApp, message: Message): + print(message) + # The next handler will be called + + @wa.on_message() + def handle_message2(client: WhatsApp, message: Message): + print(message) + # The next handler will be called + + ... + + +You can change this behavior by setting the ``continue_handling`` to ``False`` when initializing :class:`~pywa.client.WhatsApp`. + +.. code-block:: python + :caption: main.py + :linenos: + :emphasize-lines: 1 + + wa = WhatsApp(..., continue_handling=False) + + @wa.on_message() + def handle_message(client: WhatsApp, message: Message): + print(message) + # The next handler will NOT be called + ... + +You can also change this behavior inside the callback function by calling the :meth:`~pywa.types.base_update.BaseUpdate.stop_handling` +or :meth:`~pywa.types.base_update.BaseUpdate.continue_handling` methods on the update object. + +.. code-block:: python + :caption: main.py + :linenos: + :emphasize-lines: 10, 12 + + from pywa import WhatsApp, filters + from pywa.types import Message + + wa = WhatsApp(...) + + @wa.on_message(filters.text) + def handle_message(client: WhatsApp, message: Message): + print(message) + if message.text == 'stop': + message.stop_handling() # The next handler will NOT be called + else: + message.continue_handling() # The next handler will be called + + ... + + Available handlers __________________ diff --git a/docs/source/content/types/others.rst b/docs/source/content/types/others.rst index e23e3de..298e5fe 100644 --- a/docs/source/content/types/others.rst +++ b/docs/source/content/types/others.rst @@ -22,3 +22,9 @@ Others .. currentmodule:: pywa.utils .. autoclass:: Version() + +.. currentmodule:: pywa.types.base_update + +.. autoclass:: StopHandling() + +.. autoclass:: ContinueHandling() diff --git a/pywa/handlers.py b/pywa/handlers.py index f33c281..0cee63e 100644 --- a/pywa/handlers.py +++ b/pywa/handlers.py @@ -242,13 +242,13 @@ def __init__( self.callback = callback self.filters = filters - async def handle(self, wa: WhatsApp, data: Any): + async def handle(self, wa: WhatsApp, data: Any) -> bool: for f in self.filters: if inspect.iscoroutinefunction(f): if not await f(wa, data): - return - elif not f(wa, data): - return + return False + elif not await wa._loop.run_in_executor(wa._executor, f, wa, data): + return False if inspect.iscoroutinefunction(self.callback): await self.callback(wa, data) @@ -259,6 +259,7 @@ async def handle(self, wa: WhatsApp, data: Any): wa, data, ) + return True @staticmethod @functools.cache @@ -339,7 +340,7 @@ def __init__( self.factory_before_filters = factory_before_filters super().__init__(callback, *filters) - async def handle(self, wa: WhatsApp, data: Any): + async def handle(self, wa: WhatsApp, data: Any) -> bool: update = await _get_factored_update(self, wa, data, self._data_field) if update is not None: if inspect.iscoroutinefunction(self.callback): @@ -351,6 +352,8 @@ async def handle(self, wa: WhatsApp, data: Any): wa, update, ) + return True + return False def __str__(self) -> str: return f"{self.__class__.__name__}(callback={self.callback!r}, filters={self.filters!r}, factory={self.factory!r})" diff --git a/pywa/server.py b/pywa/server.py index b7eb862..895a433 100644 --- a/pywa/server.py +++ b/pywa/server.py @@ -242,9 +242,11 @@ async def my_webhook_handler(req: web.Request) -> web.Response: return "ok", 200 self._updates_ids_in_process.add(update_id) await self._call_handlers(update) - if self._skip_duplicate_updates and update_id is not None: - if update_id is not None: + if self._skip_duplicate_updates: + try: self._updates_ids_in_process.remove(update_id) + except KeyError: + pass return "ok", 200 def _register_routes(self: "WhatsApp") -> None: @@ -355,22 +357,21 @@ async def _call_callbacks( constructed_update: BaseUpdate | dict, ) -> None: """Call the handler type callbacks for the given update.""" + handled = False for handler in self._handlers[handler_type]: try: - await handler.handle(self, constructed_update) - if not self._continue_handling: - break + handled = await handler.handle(self, constructed_update) except StopHandling: break except ContinueHandling: continue - except Exception as e: - if isinstance(e, StopHandling): - break + except Exception: _logger.exception( "An error occurred while %s was handling an update", handler.callback.__name__, ) + if handled and not self._continue_handling: + break def _get_handler(self: "WhatsApp", update: dict) -> type[Handler] | None: """Get the handler for the given update.""" @@ -607,11 +608,12 @@ async def flow_request_handler(payload: dict) -> tuple[str, int]: return "Failed to construct FlowRequest", 500 try: - response = ( - await callback(self, request) - if asyncio.iscoroutinefunction(callback) - else callback(self, request) - ) + if asyncio.iscoroutinefunction(callback): + response = await callback(self, request) + else: + response = await self._loop.run_in_executor( + self._executor, callback, self, request + ) if isinstance(response, FlowResponseError): raise response except FlowTokenNoLongerValid as e: