使用 Python 开发 MQTT 客户端

    Python 是一种简单易学而又功能强大的解释性语言,它语法简洁,拥有丰富的标准库和第三方库,使用户能够专注于解决问题而非语言本身,非常适合快速开发。

    本章节以简单的例子讲解如何建立一个初步的 Python MQTT 客户端。在这里,我们会用到 paho-mqtt 库。Paho 是 Eclipse 的一个开源 MQTT 项目,包含多种语言实现,Python 是其中之一。

    如果在您的系统中已经有了 Python 环境(大多数 linux 发布和 MacOS 中已经包含 Python 环境,在 Windows 下需要单独安装),使用以下命令即可安装 paho-mqtt:

    或者使用 python 虚拟环境 建立一个和其他项目隔离的 mqtt 客户端项目环境,然后在此环境中安装 paho-mqtt:

    1. virtualenv mqtt-client
    2. source mqtt-client/bin/active
    3. pip install paho-mqtt

    实现一个简单的客户端

    paho-mqtt 提供了 3 个类,Client、Publish 和 Subscribe。后两者仅提供了简单的方法用于一次性的发布和订阅消息,但并不保持连接。Client 包含了连接、订阅、发布和回调函数。编写一个 MQTT 客户端,一般会使用 Client 类,用 Client 的实例来建立并维持和 Broker 的连接、订阅和发送消息,并在需要时断开连接:

    • 建立一个 Client 实例
    • 使用 Client 实例的 connect*() 函数进行连接
    • 用 loop*() 函数保持和处理客户端和 broker 之间连接和数据流
    • 使用 subscribe() 函数订阅主题
    • 使用 publish() 函数发布消息
    • 使用 disconnect() 函数连断开和 broker 的连接在应用程序需要处理事件的时候,回调函数会被调用。

    这里我们只对 Paho 客户端的 Python 实现做简单的介绍,关于该客户端的更详尽的说明请参阅 官方文档

    使用 Client 类的构造函数如下:

    1. Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
    • client_id:MQTT 协议的 client ID;
    • clean_session:MQTT 协议的 clean_session 属性;
    • userdata:用户定义的任何类型的数据。会在回调函数中以 userdata 参数传递给该回调函数;
    • protocol:使用的 MQTT 协议版本;
    • transport:使用的传输协议。除了以上这些基本属性,paho 客户端的 Client 类还提供了其他函数来配置客户端,如设置 Inflight 窗口大小,配置 TLS 连接,设置 Will 消息,配置 Logger 等。
    1. # 导入 paho-mqtt 的 Client:
    2. import paho.mqtt.client as mqtt
    3. # 用于响应服务器端 CONNACK 的 callback,如果连接正常建立,rc 值为 0
    4. def on_connect(client, userdata, flags, rc):
    5. print("Connection returned with result code:" + str(rc))
    6. # 用于响应服务器端 PUBLISH 消息的 callback,打印消息主题和内容
    7. def on_message(client, userdata, msg):
    8. print("Received message, topic:" + msg.topic + "payload:" + str(msg.payload))
    9. # 在连接断开时的 callback,打印 result code
    10. def on_disconnect(client, userdata, rc):
    11. # 构造一个 Client 实例
    12. client = mqtt.Client()
    13. client.on_connect = on_connect
    14. client.on_disconnect= on_disconnect
    15. client.on_message = on_message
    16. # 连接 broker
    17. # connect() 函数是阻塞的,在连接成功或失败后返回。如果想使用异步非阻塞方式,可以使用 connect_async() 函数。
    18. client.connect("192.168.1.165", 1883, 60)

    网络循环

    在连接到 broker 之后,需要网络循环函数来处理消息收发和保持和 broker 的连接。paho 提供 loop(), loop_forever(),loop_start() 和 loop_stop() 四个函数处理网络循环。最基本的方式是使用 loop() 函数:

    参数中的 timeout 单位为秒,设置不应超过 keepalive 值,否则可能会导致经常性的断线。max_packets 参数现已废弃,不应再使用。loop() 函数会阻塞到 select() 调用返回套接字可读 / 写或者发生 timeout。

    loop_forever() 以阻塞的方式处理网络循环,直到客户端调用 disconnect() 之前都不会返回。它自动处理重连。

    loop_start()/loop_stop() 以实现线程接口的方式异步非阻塞的处理网络循环。loop_start() 会在后台起一个线程自动调用 loop() 函数,这样主线程可以处理其他工作。loop_start() 函数自动处理重连。调用 loop_stop() 会中止这个后台线程。

    例程:

    1. client.loop_start()

    函数发送一条消息至 broker。

    1. publish(topic, payload=None, qos=0, retain=False)

    参数 topic 为发送消息的主题,不可为空。payload 长度和 qos 参数的设定需要符合 MQTT 协议标准,retain 默认是 False。

    当消息被发送到 broker 之后,on_publish() 回调函数会被调用。

    例程:

    1. client.publish("hello", payload = "Hello world!")

    订阅消息

    subscribe() 函数为客户端订阅一个或多个主题。

    使用 subscribe() 函数有 3 种形式:

    • subscribe("my/topic", 2)topic 为字符串,指定需要订阅的主题。qos 为订阅的 QoS 级别,默认为 0
    • subscribe(("my/topic", 1))topic 为二元组,二元组的第一个元素为字符串,指定需要订阅的主题,第二个元素为订阅的 QoS 级别。这两个元素都必须出现在元组内。qos 未使用。
    • subscribe([("my/topic", 0), ("another/topic", 2)])topic 为一个二元组列表,列表中的元素同第二种方法。qos 未使用。这种方法可以以一次函数调用完成多个订阅。当 broker 确认订阅以后,on_subscribe() 回调函数会被调用。

    例程:

    1. client.subscribe([("temperature", 0), ("humidity", 0)])

    退订一个主题:

    1. unsubscribe(topic)

    当 broker 确认退订以后,on_unsubscribe() 回调函数会被调用。

    完整例程

    1. #-*-coding:utf-8-*-
    2. # 导入 paho-mqtt 的 Client:
    3. import paho.mqtt.client as mqtt
    4. import time
    5. unacked_sub = [] #未获得服务器响应的订阅消息 id 列表
    6. # 用于响应服务器端 CONNACK 的 callback,如果连接正常建立,rc 值为 0
    7. def on_connect(client, userdata, flags, rc):
    8. print("Connection returned with result code:" + str(rc))
    9. # 用于响应服务器端 PUBLISH 消息的 callback,打印消息主题和内容
    10. def on_message(client, userdata, msg):
    11. print("Received message, topic:" + msg.topic + "payload:" + str(msg.payload))
    12. # 在连接断开时的 callback,打印 result code
    13. def on_disconnect(client, userdata, rc):
    14. # 在订阅获得服务器响应后,从为响应列表中删除该消息 id
    15. def on_subscribe(client, userdata, mid, granted_qos):
    16. # 构造一个 Client 实例
    17. client = mqtt.Client()
    18. client.on_connect = on_connect
    19. client.on_disconnect= on_disconnect
    20. client.on_message = on_message
    21. client.on_subscribe = on_subscribe
    22. # 连接 broker
    23. # connect() 函数是阻塞的,在连接成功或失败后返回。如果想使用异步非阻塞方式,可以使用 connect_async() 函数。
    24. client.connect("192.168.1.165", 1883, 60)
    25. client.loop_start()
    26. # 订阅单个主题
    27. result, mid = client.subscribe("hello", 0)
    28. unacked_sub.append(mid)
    29. # 订阅多个主题
    30. result, mid = client.subscribe([("temperature", 0), ("humidity", 0)])
    31. unacked_sub.append(mid)
    32. while len(unacked_sub) != 0:
    33. time.sleep(1)
    34. client.publish("hello", payload = "Hello world!")
    35. client.publish("temperature", payload = "24.0")
    36. client.publish("humidity", payload = "65%")
    37. # 断开连接
    38. time.sleep(5) #等待消息处理结束
    39. client.loop_stop()