Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reinitialize API Missed callback_api_version Argument #850

Open
lizaibeim opened this issue Jun 13, 2024 · 2 comments
Open

Reinitialize API Missed callback_api_version Argument #850

lizaibeim opened this issue Jun 13, 2024 · 2 comments
Labels
Status: Available No one has claimed responsibility for resolving this issue. Type: Bug

Comments

@lizaibeim
Copy link

Hi, I recently met a bug when I tried to reinitialize the mqtt client to reuse the client.

import configparser
import os.path

import paho.mqtt.client as mqtt


class MQTTClientWrapper:
    """Wrapper class for the MQTT client connected to predefined host and port"""

    def __init__(self, user_data=None, on_message=None, topic=None):
        """Initialize the MQTT client from the configuration file"""
        config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../conf', 'audio_base.ini')
        self.subscribed_topic = None
        self.config = configparser.ConfigParser()
        self.config.read(config_path)
        self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
        self.initialize(user_data, on_message, topic)

    def reinit(self, user_data=None, on_message=None, topic=None):
        """Reinitialize the client with new user data, on_message callback, and topic"""
        self.subscribed_topic = None
        self.client.reinitialise()
        self.initialize(user_data, on_message, topic)

    def initialize(self, user_data=None, on_message=None, topic=None):
        self.client.connect(host=self.config['MQTT']['mqtt_host'], port=int(self.config['MQTT']['mqtt_port']),
                            keepalive=60)
        if user_data:
            self.user_data_set(user_data)
        if on_message:
            self.on_message(on_message)
        if topic:
            self.subscribe(topic)
            self.subscribed_topic = topic

    def user_data_set(self, user_data):
        """Set the user data for the client"""
        self.client.user_data_set(user_data)

    def subscribe(self, topic):
        """Unsubscribe from the current topic and subscribe to a new topic"""
        if self.subscribed_topic:
            self.client.unsubscribe(self.subscribed_topic)
        self.client.subscribe(topic)
        self.subscribed_topic = topic

    def publish(self, topic, message, retain=False, qos=0):
        """Publish a message to a topic"""
        self.client.publish(topic, message, retain=retain, qos=qos)

    def on_message(self, on_message):
        """Set the on_message callback function"""
        self.client.on_message = on_message

    def start(self):
        """Start the network loop in a separate thread"""
        self.client.loop_start()

    def stop(self):
        """Stop the network loop"""
        self.client.loop_stop()

The errors shown

self.mqtt_client.reinit()
  File "/Users/ericli/mbox-audio/utils/mqtt_client.py", line 22, in reinit
    self.client.reinitialise()
  File "/Users/ericli/miniforge3/envs/audio-base/lib/python3.10/site-packages/paho/mqtt/client.py", line 1150, in reinitialise
    self.__init__(client_id, clean_session, userdata)  # type: ignore[misc]
  File "/Users/ericli/miniforge3/envs/audio-base/lib/python3.10/site-packages/paho/mqtt/client.py", line 766, in __init__
    raise ValueError(
ValueError: Unsupported callback API version: version 2.0 added a callback_api_version, see migrations.md for details

I check the code of mqtt/client.py

def reinitialise(
        self,
        client_id: str = "",
        clean_session: bool = True,
        userdata: Any = None,
    ) -> None:
        self._reset_sockets()

        self.__init__(client_id, clean_session, userdata)  # type: ignore[misc]

Here, the function call of reinitialise missed the callback_api_version parameter needed in the init function

def __init__(
        self,
        callback_api_version: CallbackAPIVersion,
        client_id: str = "",
        clean_session: bool | None = None,
        userdata: Any = None,
        protocol: int = MQTTv311,
        transport: Literal["tcp", "websockets"] = "tcp",
        reconnect_on_failure: bool = True,
        manual_ack: bool = False,
    ) -> None:
@github-actions github-actions bot added the Status: Available No one has claimed responsibility for resolving this issue. label Jun 13, 2024
@loremus
Copy link

loremus commented Jul 12, 2024

+1, would love to see it fixed

@lizaibeim
Copy link
Author

+1, would love to see it fixed

Hi, I found a tricky way to reinitialize it without causing the error by overriding the reinitialize function for your reference.

import configparser

import paho.mqtt.client as mqtt


class MQTTClientWrapper(mqtt.Client):
    """Extended MQTT client that loads configuration from a file and adds custom functionalities."""

    def __init__(self, config_path, user_data=None, on_message=None, topics=None):
        """Initialize MQTT client with configurations and optional user data, message callback, and topic
        subscription."""
        super().__init__(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)

        self.config_path = config_path
        self.current_topics = []

        config = configparser.ConfigParser()
        config.read(config_path)
        self.connect(config['MQTT']['mqtt_host'], int(config['MQTT']['mqtt_port']), 60)

        if user_data:
            self.user_data_set(user_data)
        if on_message:
            self.on_message = on_message
        if topics:
            self.subscribe(topics)

    def subscribe(self, topics, qos=0, options=None, properties=None):
        """Override: subscribe to new topics, ensuring previous subscriptions are removed."""
        if isinstance(topics, str):
            topics = [(topics, qos)]
        elif isinstance(topics, tuple):
            topics = [topics]

        if self.current_topics:
            self.unsubscribe(self.current_topics)
        for topic, qos in topics:
            super().subscribe(topic, qos)
        self.current_topics = topics

    def unsubscribe_current(self):
        """Unsubscribe from all currently subscribed topics."""
        if self.current_topics:
            for topic, _ in self.current_topics:
                self.unsubscribe(topic)
            self.current_topics = []

    def reinitialise(self, user_data=None, on_message=None, topics=None):
        """Override: reinitialize the client with new user data, on_message callback, and optionally change the topic
        subscription."""
        self._reset_sockets()
        self.__init__(config_path=self.config_path, user_data=user_data, on_message=on_message, topics=topics)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Status: Available No one has claimed responsibility for resolving this issue. Type: Bug
Projects
None yet
Development

No branches or pull requests

3 participants