MQTTClient API

The MQTTClient class implements the client part of MQTT protocol. It can be used to publish and/or subscribe MQTT message on a broker accessible on the network through TCP or websocket protocol, both secured or unsecured.

Usage examples

Subscriber

The example below shows how to write a simple MQTT client which subscribes a topic and prints every messages received from the broker :

import logging
import anyio

from moat.mqtt.client import open_mqttclient, ClientException
from moat.mqtt.mqtt.constants import QOS_1, QOS_2

logger = logging.getLogger(__name__)

async def uptime_coro():
    async with open_mqttclient(uri='mqtt://test.mosquitto.org/') as C:
        # Subscribe to '$SYS/broker/uptime' with QOS=1
        # Subscribe to '$SYS/broker/load/#' with QOS=2
        await C.subscribe([
                ('$SYS/broker/uptime', QOS_1),
                ('$SYS/broker/load/#', QOS_2),
             ])
        for i in range(1, 100):
            message = await C.deliver_message()
            packet = message.publish_packet
            print("%d:  %s => %s" % (i, packet.variable_header.topic_name, str(packet.payload.data)))
        await C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])

if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=formatter)
    anyio.run(uptime_coro)

This code has a problem: there’s one central dispatcher which needs to know all message types. Fortunately moat.mqtt has a built-in dispatcher.

async def show(C, topic, qos):
    async with C.subscription(topic, qos) as sub:
        count = 0
        async for message in sub:
            packet = message.publish_packet
            print("%d:  %s => %s" % (i, packet.variable_header.topic_name, str(packet.payload.data)))
            count += 1
            if count >= 100:
            break

async def uptime_coro():
    async with open_mqttclient(uri='mqtt://test.mosquitto.org/') as C:
        # Subscribe to '$SYS/broker/uptime' with QOS=1
        # Subscribe to '$SYS/broker/load/#' with QOS=2
        async with anyio.create_task_group() as tg:
           tg.start_soon(show, C, '$SYS/broker/uptime', QOS_1);
           tg.start_soon(show, C, '$SYS/broker/load/#', QOS_2);

if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=formatter)
    anyio.run(uptime_coro)

Publisher

The example below uses the MQTTClient class to implement a publisher. This test publish 3 messages asynchronously to the broker on a test topic. For the purposes of the test, each message is published with a different Quality Of Service.

import logging
import anyio

from moat.mqtt.client import MQTTClient
from moat.mqtt.mqtt.constants import QOS_0, QOS_1, QOS_2

logger = logging.getLogger(__name__)

async def test_coro():
    """Publish in parallel"""
    async with open_mqttclient(uri='mqtt://test.mosquitto.org/') as C:
        async with anyio.create_task_group() as tg:
            tg.start_soon(C.publish,'a/b', b'TEST MESSAGE WITH QOS_0')
            tg.start_soon(C.publish,'a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)),
            tg.start_soon(C.publish,'a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)),
        logger.info("messages published")


async def test_coro2():
    """Publish sequentially"""
    try:
        async with open_mqttclient(uri='mqtt://test.mosquitto.org/') as C:
           await C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)
           await C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)
           await C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
           logger.info("messages published")
    except ConnectException as ce:
        logger.error("Connection failed: %s", ce)


if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=formatter)
    anyio.run(test_coro)
    anyio.run(test_coro2)

Both coroutines have the same results except that test_coro() sends its messages in parallel, and thus is probably a bit faster.

Reference

MQTTClient API

MQTTClient configuration

Typically, you create a MQTTClient instance with an async context manager, i.e. by way of async with open_mqttclient()(). This context manager creates a taskgroup for the client’s housekeeping tasks to run in.

open_mqttclient() accepts a config parameter which allows to setup some behaviour and defaults settings. This argument must be a Python dictionary which may contain the following entries:

  • keep_alive: keep alive interval (in seconds) to send when connecting to the broker (defaults to 10 seconds). MQTTClient will auto-ping the broker if no message is sent within the keep-alive interval. This avoids disconnection from the broker.
  • ping_delay: auto-ping delay before keep-alive times out (defaults to 1 seconds). This should be larger than twice the worst-case roundtrip between your client and the broker.
  • default_qos: Default QoS (0) used by publish() if qos argument is not given.
  • default_retain: Default retain (False) used by publish() if retain argument is not given.
  • auto_reconnect: enable or disable auto-reconnect feature (defaults to True).
  • reconnect_max_interval: maximum interval (in seconds) to wait before two connection retries (defaults to 10).
  • reconnect_retries: maximum number of connect retries (defaults to 2). Negative value will cause client to reconnect infinietly.
  • codec: the codec to use by default. May be overridden.
  • codec_params: Config values to use with a particular codec. Indexed by codec name.

Default QoS and default retain can also be overriden by adding a topics entry with may contain QoS and retain values for specific topics. See the following example:

config = {
    'keep_alive': 10,
    'ping_delay': 1,
    'default_qos': 0,
    'default_retain': False,
    'auto_reconnect': True,
    'reconnect_max_interval': 5,
    'reconnect_retries': 10,
    'codec': 'utf8',
    'codec_params': {
        'bool': {on='on',off='off'}, ## default, actually
        'BOOL': {on='ON',off='OFF',name='bool'}
        'yesno': {on='yes',off='no', name='bool'}
    },
    'topics': {
        '/test': { 'qos': 1 },
        '/some_topic': { 'qos': 2, 'retain': True }
    }
}

With this setting any message published will set with QOS_0 and retain flag unset except for

  • messages sent to /test topic will be sent with QOS_1
  • messages sent to /some_topic topic will be sent with QOS_2 and retained

Also, ‘codec=”yesno”’ will only accept a bool as message, and translate that to “yes” and “no” messages.

In any case, any qos and retain arguments passed to method publish() will override these settings.