Migration guide: v2.0.0¶
Version 2.0.0 introduces some breaking changes. This page aims to help you migrate to this new major version. The relevant changes are:
The deprecated
connectanddisconnectmethods have been removedThe deprecated
filtered_messagesandunfiltered_messagesmethods have been removedUser-managed queues for incoming messages have been replaced with a single client-wide queue
Some arguments to the
Clienthave been renamed or removed
Changes to the client lifecycle¶
The deprecated connect and disconnect methods have been removed. The best way to connect and disconnect from the broker is through the client’s context manager:
import asyncio
import aiomqtt
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.publish("temperature/outside", payload=28.4)
asyncio.run(main())
If your use case does not allow you to use a context manager, you can use the client’s __aenter__ and __aexit__ methods almost interchangeably in place of the removed connect and disconnect methods.
The __aenter__ and __aexit__ methods are designed to be called by the async with statement when the execution enters and exits the context manager. However, we can also execute them manually:
import asyncio
import aiomqtt
async def main():
client = aiomqtt.Client("test.mosquitto.org")
await client.__aenter__()
try:
await client.publish("temperature/outside", payload=28.4)
finally:
await client.__aexit__(None, None, None)
asyncio.run(main())
__aenter__ is equivalent to connect. __aexit__ is equivalent to disconnect except that it forces disconnection instead of throwing an exception in case the client cannot disconnect cleanly.
Note
__aexit__ expects three arguments: exc_type, exc, and tb. These arguments describe the exception that caused the context manager to exit, if any. You can pass None to all of these arguments in a manual call to __aexit__.
Changes to the message queue¶
The filtered_messages, unfiltered_messages, and messages methods have been removed and replaced with a single client-wide message queue.
For previous versions, a minimal example of printing all messages (unfiltered) looked like this:
import asyncio
import aiomqtt
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
async with client.messages() as messages:
async for message in messages:
print(message.payload)
asyncio.run(main())
We now no longer need the line async with client.messages() as messages:, but instead access the message generator directly with client.messages:
import asyncio
import aiomqtt
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
async for message in client.messages:
print(message.payload)
asyncio.run(main())
To handle messages from different topics differently, we can use Topic.matches():
import asyncio
import aiomqtt
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
await client.subscribe("humidity/#")
async for message in client.messages:
if message.topic.matches("humidity/inside"):
print(f"[humidity/inside] {message.payload}")
if message.topic.matches("+/outside"):
print(f"[+/outside] {message.payload}")
if message.topic.matches("temperature/#"):
print(f"[temperature/#] {message.payload}")
asyncio.run(main())
Note
In our example, messages to temperature/outside are handled twice!
The filtered_messages, unfiltered_messages, and messages methods created isolated message queues underneath, such that you could invoke them multiple times. From Version 2.0.0 on, the client maintains a single queue that holds all incoming messages, accessible via Client.messages.
If you continue to need multiple queues (e.g. because you have special concurrency requirements), you can build a “distributor” on top:
import asyncio
import aiomqtt
async def temperature_consumer():
while True:
message = await temperature_queue.get()
print(f"[temperature/#] {message.payload}")
async def humidity_consumer():
while True:
message = await humidity_queue.get()
print(f"[humidity/#] {message.payload}")
temperature_queue = asyncio.Queue()
humidity_queue = asyncio.Queue()
async def distributor(client):
# Sort messages into the appropriate queues
async for message in client.messages:
if message.topic.matches("temperature/#"):
temperature_queue.put_nowait(message)
elif message.topic.matches("humidity/#"):
humidity_queue.put_nowait(message)
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
await client.subscribe("humidity/#")
# Use a task group to manage and await all tasks
async with asyncio.TaskGroup() as tg:
tg.create_task(distributor(client))
tg.create_task(temperature_consumer())
tg.create_task(humidity_consumer())
asyncio.run(main())
Changes to client arguments¶
The
queue_classandqueue_maxsizearguments tofiltered_messages,unfiltered_messages, andmessageshave been moved to theClientand have been renamed toqueue_typeandmax_queued_incoming_messagesThe
max_queued_messagesclient argument has been renamed tomax_queued_outgoing_messagesThe deprecated
message_retry_setclient argument has been removed