Skip to content

Commit

Permalink
Add properties in Message
Browse files Browse the repository at this point in the history
  • Loading branch information
JonathanPlasse committed Dec 5, 2022
1 parent 0403a17 commit 770f5e7
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
8 changes: 4 additions & 4 deletions asyncio_mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,7 @@ def __init__(
# See the overloads of `socket.setsockopt` for details.
SocketOption: TypeAlias = "tuple[int, int, int | bytes] | tuple[int, int, None, int]"

SubscribeTopic: TypeAlias = (
"str | tuple[str, mqtt.SubscribeOptions] | list[tuple[str, mqtt.SubscribeOptions]]"
" | list[tuple[str, int]]"
)
SubscribeTopic: TypeAlias = "str | tuple[str, mqtt.SubscribeOptions] | list[tuple[str, mqtt.SubscribeOptions]] | list[tuple[str, int]]"

P = ParamSpec("P")

Expand Down Expand Up @@ -207,12 +204,14 @@ def __init__(
qos: int,
retain: bool,
mid: int,
properties: Properties | None,
):
self.topic = Topic(topic) if not isinstance(topic, Topic) else topic
self.payload = payload
self.qos = qos
self.retain = retain
self.mid = mid
self.properties = properties

@classmethod
def _from_paho_message(cls, message: mqtt.MQTTMessage) -> Message:
Expand All @@ -222,6 +221,7 @@ def _from_paho_message(cls, message: mqtt.MQTTMessage) -> Message:
qos=message.qos,
retain=message.retain,
mid=message.mid,
properties=message.properties if hasattr(message, "properties") else None,
)


Expand Down
8 changes: 8 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ async def handler(tg: anyio.abc.TaskGroup) -> None:
await client.subscribe(topic)
tg.start_soon(handler, tg)
tg.start_soon(handler, tg)
await anyio.wait_all_tasks_blocked()
await client.publish(topic)


Expand All @@ -115,6 +116,7 @@ async def handle_messages(tg: anyio.abc.TaskGroup) -> None:
async with anyio.create_task_group() as tg:
await client.subscribe(topic_header + "#")
tg.start_soon(handle_messages, tg)
await anyio.wait_all_tasks_blocked()
await client.publish(bad_topic, 2)
await client.publish(good_topic, 2)

Expand All @@ -140,6 +142,7 @@ async def handle_filtered_messages() -> None:
await client.subscribe(topic_header + "#")
tg.start_soon(handle_filtered_messages)
tg.start_soon(handle_unfiltered_messages, tg)
await anyio.wait_all_tasks_blocked()
await client.publish(topic_filtered, 2)
await client.publish(topic_unfiltered, 2)

Expand All @@ -165,6 +168,7 @@ async def handle_messages(tg: anyio.abc.TaskGroup) -> None:
await client.subscribe(topic1)
await client.subscribe(topic2)
tg.start_soon(handle_messages, tg)
await anyio.wait_all_tasks_blocked()
await client.publish(topic1, 2)
await client.unsubscribe(topic1)
await client.publish(topic1, 2)
Expand Down Expand Up @@ -218,6 +222,7 @@ async def handle_messages(tg: anyio.abc.TaskGroup) -> None:
async with anyio.create_task_group() as tg:
await client.subscribe(topic)
tg.start_soon(handle_messages, tg)
await anyio.wait_all_tasks_blocked()
await client.publish(topic)


Expand All @@ -240,6 +245,7 @@ async def handle_messages(tg: anyio.abc.TaskGroup) -> None:
async with anyio.create_task_group() as tg:
await client.subscribe(topic)
tg.start_soon(handle_messages, tg)
await anyio.wait_all_tasks_blocked()
await client.publish(topic)


Expand All @@ -258,6 +264,7 @@ async def handle_messages(tg: anyio.abc.TaskGroup) -> None:
async with anyio.create_task_group() as tg:
await client.subscribe(topic)
tg.start_soon(handle_messages, tg)
await anyio.wait_all_tasks_blocked()
await client.publish(topic)


Expand Down Expand Up @@ -333,6 +340,7 @@ async def handle_messages(tg: anyio.abc.TaskGroup) -> None:
async with anyio.create_task_group() as tg:
await client.subscribe(topic)
tg.start_soon(handle_messages, tg)
await anyio.wait_all_tasks_blocked()
await client.publish(topic)


Expand Down

0 comments on commit 770f5e7

Please sign in to comment.