Skip to content

Commit

Permalink
docs: Add link to OASIS specification and some wording tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
empicano committed Aug 3, 2023
1 parent 08ac275 commit 0381d11
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 57 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

Write code like this:

**Publisher**
**Publish**

```python
async with Client("test.mosquitto.org") as client:
await client.publish("humidity/outside", payload=0.38)
```

**Subscriber**
**Subscribe**

```python
async with Client("test.mosquitto.org") as client:
Expand Down Expand Up @@ -63,7 +63,7 @@ If you can't wait for the latest version and want to install directly from GitHu

### Note for Windows users

Since Python 3.8, the default asyncio event loop is the `ProactorEventLoop`. Said loop [doesn't support the `add_reader` method](https://docs.python.org/3/library/asyncio-platforms.html#windows) that is required by aiomqtt. Please switch to an event loop that supports the `add_reader` method such as the built-in `SelectorEventLoop`:
Since Python `3.8`, the default asyncio event loop is the `ProactorEventLoop`. Said loop [doesn't support the `add_reader` method](https://docs.python.org/3/library/asyncio-platforms.html#windows) that is required by aiomqtt. Please switch to an event loop that supports the `add_reader` method such as the built-in `SelectorEventLoop`:

```python
# Change to the "Selector" event loop if platform is Windows
Expand Down
4 changes: 1 addition & 3 deletions docs/alongside-fastapi-and-co.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
# Alongside FastAPI & Co.

MQTT is often used in the context of web servers.

Many web frameworks take control over the main function, which can make it tricky to figure out where to create the `Client` and how to share this connection.

With [FastAPI](https://github.com/tiangolo/fastapi) (`0.93+`) and [Starlette](https://github.com/encode/starlette) you can use lifespan context managers to safely set up a global client instance. Here's a minimal working example of FastAPI side by side with an aiomqtt listener task and message publication on `GET /`:
With [FastAPI](https://github.com/tiangolo/fastapi) (`0.93+`) and [Starlette](https://github.com/encode/starlette) you can use lifespan context managers to safely set up a global client instance. This is a minimal working example of FastAPI side by side with an aiomqtt listener task and message publication on `GET /`:

```python
import asyncio
Expand Down
20 changes: 10 additions & 10 deletions docs/connecting-to-the-broker.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Connecting to the broker

To publish messages and subscribe to topics, we need to connect to a broker. A minimal working example that connects to a broker and then publishes a message looks like this:
To publish messages and subscribe to topics, we first need to connect to a broker. This is a minimal working example that connects to a broker and then publishes a message:

```python
import asyncio
Expand All @@ -15,30 +15,30 @@ async def main():
asyncio.run(main())
```

The connection to the broker is managed by the `Client` context manager.
The connection to the broker is managed by the `Client` context manager. This context manager connects to the broker when we enter the `with` statement and disconnects when we exit it again.

This context manager connects to the broker when you enter the `with` statement and disconnects when you exit it again. Similar to e.g. the `with open(file)` context manager, this ensures that the teardown logic is always executed at least, and at most, once -- even in case of an exception.
Context managers make it easier to manage resources like network connections or files by ensuring that their teardown logic is always executed -- even in case of an exception.

```{tip}
If you're used to calling something like `connect` and `disconnect` explicitly, working with the client via a context manager might feel a bit strange at first. We will see a few examples which will hopefully clear this up.
If your use case does not allow you to use a context manager, you can use the client's `__aenter__` and `__aexit__` methods directly as a workaround, similar to how you would use manual `connect` and `disconnect` methods.
In case your use case does not allow you to use a context manager, you can use the context manager's `__aenter__` and `__aexit__` methods directly, similar to something like `connect` and `disconnect`. Note that you loose the benefits of context managers in this case. We do not recommend this approach; It's a workaround and a bit tricky to get right.
With this approach you need to make sure that `___aexit___` is also called in case of an exception. Avoid this workaround if you can, it's a bit tricky to get right.
```

```{note}
Examples use the public [mosquitto test broker](https://test.mosquitto.org/). You can connect to this broker without any credentials. Please use this broker in issues or discussions (if possible) to make it easier for others to test your code.
Examples use the public [mosquitto test broker](https://test.mosquitto.org/). You can connect to this broker without any credentials. All examples in this documentation are self-contained and runnable as-is.
```

For a list of all available arguments to the client, see the [API reference](#developer-interface).

## Sharing the connection

In many cases, you'll want to send and receive messages in different locations in your code. You could create a new client each time, but:
We often want to send and receive messages in multiple different locations in our code. We could create a new client each time, but:

1. This is not very performant, and
2. You'll use more network bandwidth.
2. We'll use more network bandwidth.

You can share the connection by passing the `Client` instance to all functions that need it:
You can share the connection to the broker by passing the `Client` instance to all functions that need it:

```python
import asyncio
Expand All @@ -64,7 +64,7 @@ asyncio.run(main())

## Persistent sessions

Connections to the MQTT broker can be persistent or non-persistent. Persistent sessions are kept alive when the client is offline. This means that the broker stores the client's subscriptions and queues any messages of [QoS 1 and 2](publishing-a-message.md#quality-of-service-qos) that the client misses or has not yet acknowledged. The broker will then retransmit the messages when the client reconnects.
Connections to the MQTT broker can be persistent or non-persistent. Persistent sessions are kept alive when the client goes offline. This means that the broker stores the client's subscriptions and queues any messages of [QoS 1 and 2](publishing-a-message.md#quality-of-service-qos) that the client misses or has not yet acknowledged. The broker will then retransmit the messages when the client reconnects.

To create a persistent session, set the `clean_session` parameter to `False` when initializing the client. For a non-persistent session, set `clean_session` to `True`.

Expand Down
2 changes: 1 addition & 1 deletion docs/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This documentation aims to cover everything you need to know to use aiomqtt in your projects. We expect some knowledge of MQTT and asyncio, but we'll try to explain things as clearly as possible.

If you're new to MQTT, we recommend reading the [HiveMQ MQTT essentials](https://www.hivemq.com/mqtt-essentials/) guide. For asyncio, we recommend the [RealPython asyncio walkthrough](https://realpython.com/async-io-python/) as an introduction and the [official asyncio docs](https://docs.python.org/3/library/asyncio.html) as a reference.
If you're new to MQTT, we recommend reading the [HiveMQ MQTT essentials](https://www.hivemq.com/mqtt-essentials/) guide as an introduction. Afterward, the [OASIS specification](https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html) is the best reference. For asyncio, we recommend the [RealPython asyncio walkthrough](https://realpython.com/async-io-python/) as an introduction and the [asyncio docs](https://docs.python.org/3/library/asyncio.html) as a reference.

Especially first-time readers have a very valuable view of the documentation. You can spot ambiguities and unclear explanations that simply stay hidden from us maintainers! If you find an error somewhere or if you feel there's anything that can improve these docs, you're a big help if you open an issue or a pull request on GitHub.

Expand Down
5 changes: 1 addition & 4 deletions docs/reconnection.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

Network connections are inherently unstable and can fail at any time. Especially for long-running applications, this can be a challenge.

To make an application resilient against connection failures, we have to be able to:

1. detect failures in the first place, and
1. recover from them.
To make an application resilient against connection failures, we have to be able to detect failures and recover from them.

The `Client` context is designed to be [reusable (but not reentrant)](https://docs.python.org/3/library/contextlib.html#reusable-context-managers). This means that we can wrap our code in a `try`/`except`-block, listen for `MqttError`s, and reconnect like so:

Expand Down
76 changes: 40 additions & 36 deletions docs/subscribing-to-a-topic.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Subscribing to a topic

To receive messages for a topic, we need to subscribe to it and listen for messages. A minimal working example that listens for messages to the `temperature/#` wildcard looks like this:
To receive messages for a topic, we need to subscribe to it and listen for messages. This is a minimal working example that listens for messages to the `temperature/#` wildcard:

```python
import asyncio
Expand All @@ -20,43 +20,13 @@ asyncio.run(main())

Now you can use the [minimal publisher example](publishing-a-message.md) to publish a message to `temperature/outside` and see it appear in the console.

````{important}
Messages are handled _one after another_. If a message takes a long time to handle, other [messages are queued](#the-message-queue) and handled only after the first one is done.
You can handle messages in parallel by using an `asyncio.TaskGroup` like so:
```python
import asyncio
import aiomqtt
async def handle(message):
await asyncio.sleep(5) # Simulate some I/O-bound work
print(message.payload)
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
async with client.messages() as messages:
await client.subscribe("temperature/#")
async with asyncio.TaskGroup() as tg:
async for message in messages:
tg.create_task(process(message))
asyncio.run(main())
```
Note that this only makes sense if your message handling is I/O-bound. If it's CPU-bound, you should spawn multiple processes instead.
````

```{tip}
You can set the [Quality of Service](publishing-a-message.md#quality-of-service-qos) of the subscription by passing the `qos` parameter to `subscribe()`.
```

## Filtering messages

Imagine we measure temperature and humidity on the outside and inside, and our topics look like this: `temperature/outside`. We want to receive all types of measurements but handle them differently.
Imagine that we measure temperature and humidity on the outside and inside, and our topics have the structure `temperature/outside`. We want to receive all types of measurements but handle them differently.

aiomqtt provides `Topic.matches()` to make this easy:

Expand Down Expand Up @@ -86,9 +56,15 @@ asyncio.run(main())
In our example, messages to `temperature/outside` are handled twice!
```

```{tip}
For details on the `+` and `#` wildcards and what topics they match, see the [OASIS specification](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901241).
```

## The message queue

Messages are buffered in a queue internally. The default queue is `asyncio.Queue` which returns messages on a FIFO ("first in first out") basis. You can pass [other asyncio queues](https://docs.python.org/3/library/asyncio-queue.html) as `queue_class` to `messages()` to modify the order in which messages are returned, e.g. `asyncio.LifoQueue`.
Messages are buffered in a queue internally and handled one after another.

The default queue is `asyncio.Queue` which returns messages on a FIFO ("first in first out") basis. You can pass [other asyncio queues](https://docs.python.org/3/library/asyncio-queue.html) as `queue_class` to `messages()` to modify the order in which messages are returned, e.g. `asyncio.LifoQueue`.

If you want to queue based on priority, you can subclass `asyncio.PriorityQueue`. This queue returns messages in priority order (lowest priority first). In case of ties, messages with lower message identifiers are returned first.

Expand Down Expand Up @@ -122,15 +98,43 @@ async def main():
asyncio.run(main())
```

```{note}
```{tip}
By default, the size of the queue is unlimited. You can limit it by passing the `queue_maxsize` parameter to `messages()`.
```

````{important}
If a message takes a long time to handle, it blocks the handling of other messages. You can handle messages in parallel by using an `asyncio.TaskGroup` like so:
```python
import asyncio
import aiomqtt
async def handle(message):
await asyncio.sleep(5) # Simulate some I/O-bound work
print(message.payload)
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
async with client.messages() as messages:
await client.subscribe("temperature/#")
async with asyncio.TaskGroup() as tg:
async for message in messages:
tg.create_task(handle(message))
asyncio.run(main())
```
Note that this only makes sense if your message handling is I/O-bound. If it's CPU-bound, you should spawn multiple processes instead.
````

## Listening without blocking

When you run the minimal example for subscribing and listening for messages, you'll notice that the program doesn't finish. Waiting for messages through the `messages()` generator blocks the execution of everything that comes afterward.

In case you want to run other code after starting your listener, this is not practical.
In case you want to run other code after starting your listener, this is not very practical.

You can use `asyncio.TaskGroup` (or `asyncio.gather` for Python `<3.11`) to safely run other tasks alongside the MQTT listener:

Expand Down Expand Up @@ -163,7 +167,7 @@ async def main():
asyncio.run(main())
```

In case task groups are not an option (e.g. because you run aiomqtt [alongside a web framework](alongside-fastapi-and-co.md)) you can start the listener in a fire-and-forget way. The idea is to use asyncio's `create_task` but not `await` the created task:
In case task groups are not an option (e.g. because you run aiomqtt [alongside a web framework](alongside-fastapi-and-co.md)) you can start the listener in a fire-and-forget way. The idea is to use asyncio's `create_task` without awaiting the created task:

```{caution}
You need to be a bit careful with this approach. Exceptions raised in asyncio tasks are propagated only when we `await` the task. In this case, we explicitly don't.
Expand Down

0 comments on commit 0381d11

Please sign in to comment.