Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOSUB error in PubSub class while it should be a RuntimeError #115

Open
amirreza8002 opened this issue Oct 27, 2024 · 4 comments
Open

NOSUB error in PubSub class while it should be a RuntimeError #115

amirreza8002 opened this issue Oct 27, 2024 · 4 comments

Comments

@amirreza8002
Copy link
Contributor

amirreza8002 commented Oct 27, 2024

hi
we are trying to write a valkey backend for celery: the PR
the integration test (and some normal usage) error with the following traceback:

self = <t.integration.test_canvas.test_chain object at 0x105e79310>, manager = <celery.contrib.testing.manager.Manager object at 0x119174260>

    def test_chain_child_replaced_with_chain_middle(self, manager):
        orig_sig = chain(
            identity.s(42), replace_with_chain.s(), identity.s()
        )
        res_obj = orig_sig.delay()
>       assert res_obj.get(timeout=TIMEOUT) == 42

t/integration/test_canvas.py:803:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
celery/result.py:251: in get
    return self.backend.wait_for_pending(
celery/backends/asynchronous.py:221: in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
celery/backends/asynchronous.py:287: in _wait_for_pending
    for _ in self.drain_events_until(
celery/backends/asynchronous.py:54: in drain_events_until
    yield self.wait_for(p, wait, timeout=interval)
celery/backends/asynchronous.py:63: in wait_for
    wait(timeout=timeout)
celery/backends/redis.py:161: in drain_events
    message = self._pubsub.get_message(timeout=timeout)
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/client.py:1072: in get_message
    response = self.parse_response(block=(timeout is None), timeout=timeout)
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/client.py:883: in parse_response
    response = self._execute(conn, try_read)
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/client.py:859: in _execute
    return conn.retry.call_with_retry(
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/retry.py:62: in call_with_retry
    return do()
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/client.py:860: in <lambda>
    lambda: command(*args, **kwargs),
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/client.py:881: in try_read
    return conn.read_response(disconnect_on_error=False, push_request=True)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <valkey.connection.Connection(host=localhost,port=6380,db=0)>, disable_decoding = False

    def read_response(
        self,
        disable_decoding=False,
        *,
        disconnect_on_error=True,
        push_request=False,
    ):
        """Read the response from a previously sent command"""

        host_error = self._host_error()

        try:
            if self.protocol in ["3", 3] and not LIBVALKEY_AVAILABLE:
                response = self._parser.read_response(
                    disable_decoding=disable_decoding, push_request=push_request
                )
            else:
                response = self._parser.read_response(disable_decoding=disable_decoding)
        except socket.timeout:
            if disconnect_on_error:
                self.disconnect()
            raise TimeoutError(f"Timeout reading from {host_error}")
        except OSError as e:
            if disconnect_on_error:
                self.disconnect()
            raise ConnectionError(
                f"Error while reading from {host_error}" f" : {e.args}"
            )
        except BaseException:
            # Also by default close in case of BaseException.  A lot of code
            # relies on this behaviour when doing Command/Response pairs.
            # See #1128.
            if disconnect_on_error:
                self.disconnect()
            raise

        if self.health_check_interval:
            self.next_health_check = time() + self.health_check_interval

        if isinstance(response, ResponseError):
            try:
>               raise response
E               valkey.exceptions.ResponseError: NOSUB 'unsubscribe' command executed not in subscribed mode

there are a couple of problems here:

  1. the error shouldn't happen, at least not in this form
    as you can see in the traceback, PubSub.get_message() is called which containes this check
    and even if this doesn't work, this method is calling PubSub.parse_response() which should raise an error as this shows

  2. this error is not consistent, which leads me to believe some sort of race condition is happening, i'm not sure if this is true yet, and if it is, what is causing it.

but the main issue is the first one, if you could help me understand why valkey itself is raising NOSUB, while valkey-py should have raised an error before valkey even gets envolved

important note: this problem is also true when using redis-py

@aiven-sal
Copy link
Member

Hi!
Do you have a way to reproduce the issue?

BTW there isn't anything in valkey-py that tries to hide that kind of error from valkey. Getting ResponseError: NOSUB 'unsubscribe' command executed not in subscribed mode doesn't sound like a bug to me.

@amirreza8002
Copy link
Contributor Author

amirreza8002 commented Oct 28, 2024

hi
the bug is in celery. no doubt in that

the question in my mind is about the two links i shared
from what i understand, the first one should return None and don't pass the command to valkey
even if that doesn't work, the second link should raise a RuntimeError and don't pass the command to valkey

or am i misunderstanding this?
I'm judging based on the comments in the code, perhaps there's more to it

about reproducing... I'm not sure i know a way outside of running celery, which I'll share information shortly

@amirreza8002
Copy link
Contributor Author

this seems to raise the error

this from the issue i made at valkey explains how to run the celery integration test

@amirreza8002
Copy link
Contributor Author

hi
just an FYI, valkey-py does stop unsubscribe commands if there is no subscription

you can test it by just sending an unsubscribe command using valkey-py, then doing the same using valkey-cli.

tho valkey has reverted this behavior and sending unsubscribe no longer errors, it's still a question on how this happened in the first place.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants