Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error when reusing asyncio connection pool - multiple event loops #3351

Open
ohnoah opened this issue Aug 9, 2024 · 2 comments
Open

Error when reusing asyncio connection pool - multiple event loops #3351

ohnoah opened this issue Aug 9, 2024 · 2 comments

Comments

@ohnoah
Copy link

ohnoah commented Aug 9, 2024

I'm using django and the asyncio Redis client. I want to share a connection pool across my requests. Thus, I initialize the connection pool centrally in my Django settings. I'm using Redis as a distributed semaphore for one of my routes. However, when I have two concurrent requests running, I get an error about got Future <Future pending> attached to a different loop. THis does not happen if I initialize a Redis client for each call to this semaphore. It seems to be some issue with reusing the connection pool across event loops. How would I go about doing this? I don't want to create one connection per call, since this is a distributed semaphore that gets called a lot.

Version: What redis-py and what redis version is the issue happening on?
5.0.6

Platform: What platform / version? (For example Python 3.5.1 on Windows 7 / Ubuntu 15.10 / Azure)
Mac OS. Python 3.11
Description: Description of your issue, stack traces from errors and code that reproduces the issue

Traceback (most recent call last):
...
  File "/Users//answer-grid//endpoints_app/answers/redis_semaphore.py", line 123, in __aexit__
    await self.semaphore.release(self.identifier)
  File "/Users//answer-grid//endpoints_app/answers/redis_semaphore.py", line 102, in release
    result = await pipe.execute()
             ^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 1528, in execute
    return await conn.retry.call_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/retry.py", line 59, in call_with_retry
    return await do()
           ^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 1371, in _execute_transaction
    await self.parse_response(connection, "_")
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 1464, in parse_response
    result = await super().parse_response(connection, command_name, **options)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 633, in parse_response
    response = await connection.read_response()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/connection.py", line 541, in read_response
    response = await self._parser.read_response(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 82, in read_response
    response = await self._read_response(disable_decoding=disable_decoding)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 90, in _read_response
    raw = await self._readline()
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/_parsers/base.py", line 219, in _readline
    data = await self._stream.readline()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
 44 multidict==6.0.5
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/streams.py", line 660, in readuntil
    await self._wait_for_data('readuntil')
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/streams.py", line 545, in _wait_for_data
    await self._waiter
RuntimeError: Task <Task pending name='Task-115' coro=<GeneralWebPageFetcher.async_get_pages.<locals>.get_page_with_semaphore() running at /Users/////answers/myfile.py:313> cb=[gather.<locals>._done_callback() at /opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/tasks.py:767]> got Future <Future pending> attached to a different loop
@ohnoah
Copy link
Author

ohnoah commented Aug 9, 2024

import os
import uuid
import asyncio
import time
from typing import Any
import random
from django.conf import settings
from redis import asyncio as aioredis

STARTING_BACKOFF_S = 4
MAX_BACKOFF_S = 16


class SemaphoreTimeoutError(Exception):
    """Exception raised when a semaphore acquisition times out."""

    def __init__(self, message: str) -> None:
        super().__init__(message)


class RedisSemaphore:
    def __init__(
        self,
        key: str,
        max_locks: int,
        timeout: int = 30,
        wait_timeout: int = 30,
    ) -> None:
        """
        Initialize the RedisSemaphore.

        :param redis_url: URL of the Redis server.
        :param key: Redis key for the semaphore.
        :param max_locks: Maximum number of concurrent locks.
        :param timeout: How long until the lock should automatically be timed out in seconds.
        :param wait_timeout: How long to wait before aborting attempting to acquire a lock.
        """
        self.redis_url = os.environ["REDIS_URL"]
        self.key = key
        self.max_locks = max_locks
        self.timeout = timeout
        self.wait_timeout = wait_timeout
        self.redis = aioredis.Redis(connection_pool=settings.REDIS_POOL)
        self.identifier = "Only identifier"

    async def acquire(self) -> str:
        """
        Acquire a lock from the semaphore.

        :raises SemaphoreTimeoutError: If the semaphore acquisition times out.
        :return: The identifier for the acquired semaphore.
        """
        czset = f"{self.key}:owner"
        ctr = f"{self.key}:counter"
        identifier = str(uuid.uuid4())
        now = time.time()
        start_time = now
        backoff = STARTING_BACKOFF_S

        while True:
            # TODO: Redundant?
            if time.time() - start_time > self.wait_timeout:
                raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")

            async with self.redis.pipeline(transaction=True) as pipe:
                pipe.zremrangebyscore(self.key, "-inf", now - self.timeout)
                pipe.zinterstore(czset, {czset: 1, self.key: 0})
                pipe.incr(ctr)
                counter = (await pipe.execute())[-1]

                pipe.zadd(self.key, {identifier: now})
                pipe.zadd(czset, {identifier: counter})
                pipe.zrank(czset, identifier)
                rank = (await pipe.execute())[-1]

                print(rank)
                if rank < self.max_locks:
                    return identifier

                pipe.zrem(self.key, identifier)
                pipe.zrem(czset, identifier)
                await pipe.execute()

            # Exponential backoff with randomness
            sleep_time = backoff * (1 + random.random() * 0.3)
            if (sleep_time + time.time() - start_time) > self.wait_timeout:
                raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")
            await asyncio.sleep(sleep_time)
            backoff = min(backoff * 2, MAX_BACKOFF_S)

    async def release(self, identifier: str) -> bool:
        """
        Release a lock from the semaphore.

        :param identifier: The identifier for the lock to be released.
        :return: True if the semaphore was properly released, False if it had timed out.
        """
        czset = f"{self.key}:owner"
        async with self.redis.pipeline(transaction=True) as pipe:
            pipe.zrem(self.key, identifier)
            pipe.zrem(czset, identifier)
            result = await pipe.execute()
        return result[0] > 0


class RedisSemaphoreContext:
    def __init__(self, semaphore: RedisSemaphore) -> None:
        """
        Initialize the RedisSemaphoreContext.

        :param semaphore: An instance of RedisSemaphore.
        """
        self.semaphore = semaphore
        self.identifier = None

    async def __aenter__(self) -> "RedisSemaphoreContext":
        """Enter the async context manager."""
        self.identifier = await self.semaphore.acquire()
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Exit the async context manager."""
        await self.semaphore.release(self.identifier)

This is my redis semaphore class

@ohnoah ohnoah changed the title Using the same connection pool across multiple event loops Error when reusing asyncio connection pool - multiple event loops Aug 9, 2024
@ohnoah
Copy link
Author

ohnoah commented Aug 10, 2024

This looks related to Django potentially and running locally. Let me investigate further

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant