物联网的标准通信协议:MQTT的介绍和快速上手
最近在给公司的物联网设备(自动售卖机)的控制体系架构进行改版。起因是由于新合作的设备厂商采用了完全不同的通讯协议(MQTT),为兼容新旧设备并尽可能降低对现有业务的影响,我决定重构现有的控制体系。
之前厂家直接给的是串口通讯的协议,我这边还需要自己去写串口通讯的代码,然后在这个基础上去构造整个物联网的控制方案。新的厂家好像提供的解决方案更全面一些,只提供了 MQTT 协议的技术文档,没有涉及到串口通讯的内容,因此之前的方案可以说是完全不兼容了。
虽然可以为新的设备重新开发一套,但是既然已经有在运营的设备了,对于业务影响最小的解决方案,反而是重构现有的设备控制体系,然后去兼容业务方面的接口。所以才决定重构,否则这么大的工作量,完全没有必要做……
当然本文主要还是我在了解和学习 MQTT 之后的一篇(部分)应用费曼学习法的产出文章,如果对读者有帮助,那就再好不过了。
什么是 MQTT
MQTT 是一款由 IBM 的工作人员内部研发并使用近10年的协议,其最开始的定位是用于计算资源较少以及网络环境受限的场景。
而之所以被叫做 MQTT 是因为 MQ 是当时 IBM 一系列的产品的名称的前缀,TT 在最开始是 Telemetry Transport(遥测传输)的缩写, MQTT 最初诞生的应用场景就是人造卫星检测石油管道连接时降低电池损耗以及带宽使用(... developed the MQTT protocol to enable minimal battery loss and bandwidth usage when connecting with oil pipelines via satellite.)
2010 年 IBM 公司以免授权费的形式开放了协议(Opened with royalty free protocol);2013 年全球非营利标准化组织 OASIS 接管 MQTT 协议,并于次年发布了 MQTT 3.1.1 版本。2016 年 MQTT 协议成为 ISO 标准。
由于后续的发展与 IBM 再没关系,所以 MQTT 就从 MQ Telemetry Transport 的缩写,成为了正式的名称,即其正式名称就是由四个字母组成的 MQTT。
在其后续的发展中,特别是随着 2010 年开始消费者相关的物联网设备极速发展, MQTT 最初设计的特性得到了很好体现,以至于在一些场景和需求下,MQTT 被称作是物联网设备通讯的标准协议。
为什么选择 MQTT
根据 ISO 标准的介绍,MQTT 的特性大概包含这些:
- 标准轻量化、简单易于实现,并且可以基于其他网络协议实现
- 使用“发布-订阅”方式提供一对多的消息传递
- 数据载荷与无关的消息传递(协议不假设数据载荷类型,由订阅和发布两方自行约定)
- 多种级别的消息传递 QoS (Quality of Service) 保证(服务质量保证)
原始版本的 MQTT 是基于 TCP/IP 协议来实现的,实际上也可以基于其他协议如 WebSocket, UDP 甚至是蓝牙。
而 MQTT 采用的消息传递是基于发布/订阅的,因此可以实现类似于同一个传感器数据可以同时发送给另一台服务器,或者另一个应用程序。
当然 MQTT 最令人兴奋的还是消息传递的 QoS 保证(下简称 QoS),实际做过工程的都知道,消息传递失败是一个非常痛苦并且处理起来也非常麻烦的事情,而 MQTT 在协议层面就提供了三种 QoS: 最多一次、至少一次、精准一次 (Exactly once) 。而这三种 QoS 实际实现起来也非常简单,这部分内容我会在后面的章节中进行简述。
核心工作原理以及配置
MQTT 的核心工作模式是“发布-订阅”模式,一般采用该模式的架构都会有一个类似于消息总线、消息队列或者消息代理的组件存在,而发送消息和订阅消息的部分都被统称为客户端。 MQTT 也不例外, MQTT 的中间组件被称为 MQTT Broker ,下面这张图可以很好的阐述 MQTT 的整个工作架构。(图来自 EMQX 的文章《MQTT 协议快速入门 2025:基础知识和实用教程》)
温度传感器作为客户端连接到 MQTT Broker,并通过发布操作将温度数据发布到一个特定主题(例如Temperature
)。MQTT Broker 接收到该消息后会负责将其转发给订阅了相应主题(Temperature
)的订阅者客户端。
实际上可以看出来,只要一个设备或者程序实现了 MQTT 客户端的功能,那么它就可以被称为是一个 MQTT 客户端。这个客户端可以是一个库,也可以是一个单独的程序,甚至是一个嵌入式设备。
发布-订阅模式
MQTT 采用发布-订阅模式也就意味着客户端之间是对等的,且不同于客户端-服务端模式,双方没有直接关联。客户端之间发布消息和订阅消息都要基于一套额外的约定,就好比是 BP 机时代大家都约定某个数字代表不同意涵一样。而这个约定在 MQTT 中被称为 Topic (主题)。
主题
MQTT 是一个非常强大的工具,利用它可以实现不同颗粒度内容的订阅。
MQTT 的主题是采用 /
分割的字符串,类似于 URL 路径,每一个 /
符号分割出来的代表不同的层级。
chat/room/1
sensor/10/temperature
在订阅 MQTT 消息时,可以使用通配符实现订阅多个主题的内容, MQTT 支持两种通配符:
+
:表示单层通配符;例如订阅user/+/chat
,就可以匹配诸如user/alice/chat
或者user/bob/chat
的主题。#
:表示多层通配符;例如订阅device/report/#
,就可以匹配诸如device/report/sensor/10/temperature
或者device/report/computer/2/status
的主题。
需要注意的是,#
通配符只能用于订阅主题的最后一个层级,诸如 device/#/report
这样的订阅是无效的。
还有一类 $
开头的主题,不过这类主题是系统主题,根据实现不同而有所不同,这里不再赘述。
QoS (Quality of Service)
MQTT 提供了三种消息传递的 QoS,分别是:
- QoS 0: 最多一次
- QoS 1: 至少一次
- QoS 2: 精确一次
当 QoS 0 时,MQTT 保证消息最多只会被传递一次,这是 MQTT 中的传递效率最高的方式。当然它的问题是不保证信息交付。
QoS 1 则刚好相反,它保证消息至少有一次被成功交付,但是 QoS 1 存在一个限制,即订阅方可能会收到多次发送方的消息,这是由于 QoS 1 的实现限制导致的。但比起 QoS 2 而言, QoS 1 的开销更小一些,适用于某些允许重复传递的情况。
而 QoS 2 则是对 QoS 1 的一种改良,它保证消息仅会被成功投递一次,但 QoS 2 的实现也更加复杂。关于 QoS 1 和 2 的实现将会放到文章末尾简述一下,更详细的资料可以查看这篇文章《MQTT QoS 0、1、2 解析:快速入门指南》
如何选择 QoS
简单来说,QoS 的选择与数据的重要程度成正比。如果一个数据对于核心业务在一定时间周期内需求不那么高,那么可以选择 QoS 0 ,即使遗漏了在短周期内也没有任何关系。例如花卉植物的温湿度传感器,在一定时间内就算确实问题也不大(除非植物本身对于温湿度高度敏感)。
QoS 1 的应用更为广泛一些,因为 QoS 1 保证信息传递成功,而对于消息会重复的情况,实际上对于订阅端来说内部也可以进行去重。因此 QoS 1 比较适合绝大多数的关键操作,只要在实际应用层把去重做好即可。如果不愿意自己去实现去重的话,那么 QoS 2 也是一个更好的选择。
QoS 2 本身就实现了保证消息送达,并且确保不会重复,虽然传输成本较高,但是可靠性也更强。对于一些需要非常严谨的场合,例如金融、航空、交通等领域 QoS 2 的使用可能会更多一些。
QoS 降级
MQTT 中,订阅和发送都可以设置 QoS 等级,在大部分情况下,订阅和发送双方约定一个 QoS 等级时,不会有任何问题。但是当 QoS 订阅和发送方不一致时,Broker 会优先使用订阅者的 QoS 转发发送者的消息,特别是在订阅者的 QoS 等级低于消息的 QoS 等级时。
例如对于主题 a
,订阅者订阅时采用 QoS 1 进行订阅,而发送者发送消息时采用的是 QoS 2。此时 Broker 会将消息降级为 QoS 1 发送给订阅者,从而可能会出现多次重复的情况。
MQTT 中的 QoS 降级遵循的“木桶效应”,即确保最小的 QoS 可用。并且由于 QoS 从 0 到 2 是递增的实现,因此 MQTT 仅保证更小的那个可以实现就可以。并且 QoS 也不存在升级的可能,即不可能从发送方 QoS 0 升级到订阅方的 QoS 2,因为发送方的在发送时就通过 QoS 声明了其不能处理 QoS 1 和 2 的其他响应报文。
遗嘱消息 (Will Message)
在有了 QoS 之后,采用了 MQTT 系统的质量和稳定性将会得到巨大的保证。但是 MQTT 客户端可能运行在一些比较脆弱或者低级的设备上,他们硬件本身的可靠性可能难以保证。因此为了保证整个系统的可靠性, MQTT 有一特性叫做遗嘱消息。
MQTT 的遗嘱消息是在与 MQTT Broker 建立连接由客户端发送给 Broker 的,遗嘱信息包含主题、保留消息标识、属性、 QoS 以及数据载荷。当 MQTT 客户端在非正常情况下与 Broker 断开连接时,Broker 会向所有订阅该主题的客户端发送消息。
在技术上,非常断开连接包含以下情况:
- Broker 监测到 IO 错误或者网络故障
- 客户端在 Keep Alive 时间内没有通讯
- 客户端在没有发送 Reason Code 为 0x00 的 DISCONNECT 报文的情况下关闭网络连接
- Broker 收到了客户端发送的不符合协议要求,而关闭与该客户端的连接
利用遗嘱消息,我们可以和方便的实现客户端状态的监控。关于这部分,在文章《遗嘱消息(Will Message)介绍与示例 | MQTT 5.0 特性详解》有非常好的最佳实践,并且在 MQTT 5.0 中,新增的延迟遗嘱解决了在旧版本中遗嘱消息发布过于即时,而在重连后又立即发送消息的情况。
实战 —— 基于 MQTT 的湿度控制系统
在本章节中,将会使用 Python 作为演示用的编程语言,阐述一下基于 MQTT 应用程序的实际开发流程是怎么样的。当然,受限于篇幅,本文中的代码仅会包含重要部分的内容。
1. 运行 MQTT Broker
要进行 MQTT 的开发,首先需要运行一个 MQTT Broker,MQTT Broker 有非常多的实现,在这里我们使用 Docker 运行一个由国内团队主导开发的 EMQX。
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.8.6
2. 编写传感器和加湿器的代码
在这里我们忽略掉一些代码细节,并假设已经对湿度传感器和加湿器的串口通讯代码进行了封装。
2.1 安装 MQTT 库**
pip install paho-mqtt
这里有一个小知识,包名中的 paho 其实标记着这个 MQTT client 的实现是来自于 Eclipse Paho 计划的,这个计划最初是由 IBM 参与的帮助各个语言实现 MQTT 客户端的一个项目,时至今日许多项目已经实质上交给了开源社区进行更新维护。
2.2 连接到 MQTT Broker
import random
import json
from paho.mqtt import client as mqtt_client
from device import sensor, aircond
BROKER_URL = os.getenv('BROKER_URL')
BROKER_PORT = os.getenv('BROKER_PORT')
# 所有 MQTT 客户端都需要有一个 Client ID 标识其唯一性
client_id = f'device-{random.randint(0,100)}'
# 实现连接 MQTT Broker 的方法
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
# 连接成功发送设备上线的消息
client.publish(f'device/online/{client_id}')
else:
print('failed to connect, return code %d\n', rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(BROKER_URL, BROKER_PORT)
return client
2.3 编写读取湿度信息的定时任务
我们实现一个每 30 秒从传感器读取一次湿度,然后发送消息的方法。
需要注意的是, paho.mqtt
这个库提供了方法,可以方便地让我们的 MQTT 客户端在另一个线程中处理订阅消息以及重连等任务,所以这个任务我们可以将其放在主线程中进行处理。
# 包含之前的代码
TOPIC_HUMIDITY_REPORT = f'device/report/{client_id}/humidity'
def publish_humidity():
while True:
time.sleep(30)
humi = sensor.get_humidity()
payload = json.dumps({
'humidity': humi
})
result = client.publish(TOPIC_HUMIDITY_REPORT, payload)
status = result[0]
if status == 0:
print(f'send humidity to topic')
else:
print(f'failed to send humidity to topic')
2.4 编写订阅消息启动加湿器的任务
# 包含之前的代码
TOPIC_AC_CONTROL = f'device/control/{client_id}/ac'
def on_ac_control(payload):
datum = json.loads(payload.decode('utf-8'))
aircond.set_humidity(datum['humidity'])
def subscribe_topics(client):
def on_message(client, userdata, msg):
match msg.topic:
case TOPIC_AC_CONTROL:
on_ac_control(msg.payload)
case _:
print(f'unknown topic received. {msg.topic}')
client.subscribe(TOPIC_AC_CONTROL)
client.on_message = on_message
2.5 编写运行函数
# 包含之前的代码
def run():
client = connect_mqtt()
# 订阅主题
subscribe_topics(client)
# 启动 MQTT 客户端线程
client.loop_start()
# 在主线程读取传感器的湿度
publish_humidity(client)
# 结束 MQTT 客户端线程
# 但是这里先不处理强制关闭的情况,因为嵌入式设备保证长时间运行
client.loop_stop()
if __name__ == '__main__':
run()
至此,运行在一个“有限设备”上的湿度读取和空调控制的代码就完成了。
3. 编写管理程序的代码
接下来我们要写个控制程序,这个程序会记录所有设备的湿度信息,并在高于某个值时启动空调调整湿度。这里我们忽略掉重复的创建客户端的部分,而专注于 MQTT 的消息订阅和发布的内容。
3.1 订阅湿度传感器报告主题
TOPIC_DEVICE_HUMIDITY_REPORT = 'device/report/+/humidity'
def on_device_humidity_report(client, msg):
client_id = msg.topic().split('/')[2]
msg_data = json.loads(msg.payload.decode())
humidity = msg_data['humidity']
# 这里省略插入数据库的操作
if humidity < 50:
data = json.dumps({
'humidity': humidity
})
client.publish(f'device/control/{client_id}/ac', data)
def subscribe_topics(client):
def on_message(client, userdata, msg):
match msg.topic:
case TOPIC_DEVICE_HUMIDITY_REPORT:
on_device_humidity_report(client, msg)
case _:
print(f'unknown topic received: {msg.topic}')
client.subscribe(TOPIC_DEVICE_HUMIDITY_REPORT)
client.on_message = on_message
3.2 编写运行函数
因为我们这个程序只需要在之前确保数据库连接,然后之后不再需要创建,之后只需要监控湿度。因此我们可以使用另一种方式来启动 MQTT 客户端。
def run():
# 这里省略创建数据库连接的代码
client = connect_mqtt()
subscribe_topics(client)
client.loop_forever()
if __name__ == '__main__':
run()
使用 loop_forever()
方法, MQTT 客户端将会在主线程执行,而不同于 loop_start
方法在另一个进程启动。这个方法会占用主线程,因此一些耗时的初始化方法需要在该方法之前调用。
应用拓展 I :动态订阅
通过上面的实战可以发现, MQTT 客户端订阅消息其实是发生在连接建立之后的,这就给了开发人员很多场景可以使用,例如我们可以根据程序的需要动态去订阅某些主题,更甚一些可以通过其他客户端发送的消息去订阅新的主题。举个简单的例子:
客户端 A 向主题 target/topic/register 发送了一个载荷为 {"topic": "client/name/report", "handler": "ClientReportHandler"}
的消息,收到这条消息的客户端 B 新订阅一个主题,然后通过内部的字典去设置对应的 handler 。
应用拓展 II : 物联网以外
MQTT 算是一个非常好的协议,它足够轻量化,甚至连动手实现一个支持 QoS 1 的 Broker 也不是一件很难的事情。所以 MQTT 可以用在非物联网的场合下,利用其低消耗和弱网络的宽容度和可靠性,可以实现非常多有意思的内容。
网络游戏中部分数据可以通过 MQTT 来传输,特别是对于一些可靠要求较高但数据量不大的内容。另外一些即时通讯软件,可以用来更新和通知用户消息。
不过 MQTT 还是有一些限制,那就是单次消息的数据载荷数据大小需要根据具体 Broker 和客户端实现来决定,但通常都不会太大。对于一些大量数据传输的情况,一般可以使用切片的方式,或者直接选择专为较大数据业务的研发的消息队列更为合适。
深入阅读:QoS 1 和 2 的实现
实际上 MQTT 最吸引人的地方,除了较弱的网络宽容度外,另一点则是提供了三种不同层次的 QoS 可选,这对于复杂的业务开发来说是非常实用的,避免了手工实现 QoS 可能会出现的问题。
但仅仅使用不了解本质,对于网络传输相关的工作来说是非常不好的,于是在本文的最后一章,打算简单地谈一谈 MQTT QoS1 和 QoS 2 的实现。本章的大多数内容都来自于《MQTT QoS 0、1、2 解析:快速入门指南》这篇文章,如有看不太明白的地方也建议直接参考原文。
QoS 1 的实现
如果说 QoS 0 只需要递送,那么 QoS 1 则是发送方需要收到接收方的应答才算是完成。
从技术层面来讲,当发送方发送消息时,如果设置了 QoS = 1
,那么 PUBLISH
报文中会设置一个标记本次消息的唯一值 Packet ID。而接收方收到消息后,需要回传一个 PUBACK
的报文,并附上相同的 Packet ID。而发送方在收到回传的报文后,会释放 Packet ID 使其重新可用于新的消息传输。
通过上图就可以得知,QoS 1 会出现消息重复的原因,很多时候其实由于 PUBACK
报文没有传给发送方导致的,发送方则认为消息没有交付成功,则会重新发送进行重试,直至收到 PUBACK
为止。
为了解决这个问题,于是便有了 QoS 2。
QoS 2 的实现
QoS2 的实现复杂,但其本质是通过存储待发报文、状态共享来实现的。相比较 QoS 1 仅仅新增了一个回传报文,QoS 2 完成整个消息传递需要来回传递共计 4 个报文。
简单来说 QoS 2 的通讯工作比起 QoS 1 来说新增了许多别的操作:
- 发送方发送报文前会存储一份待发送的报文,用于之后的重传;
- 接收方在收到消息后会存储一个 Packet ID 用于同步发送方的状态;
- 收到消息后,接收方使用
PUBREC
回复发送方,告知已收到消息; - 此时发送方移除暂存的发送报文,进入 Packet ID 释放流程,暂存
PUBREL
报文用于之后的重发; - 接收方收到报文
PUBREL
报文后,释放 Packet ID 然后回复PUBCOMP
报文告知发送方释放; - 发送方收到
PUBCOMP
报文后,释放 Packet ID。
而接收方需要存储 Packet ID 实际上就是 QoS 2 不会重复的要点,实际上在真实场景中接收方回复 PUBREC
也依旧可能会失败,但是由于 QoS 2 的实现里,接收方需要暂存 Packet ID ,这样即使发送方重新发送相同 Packet ID 的报文,接收方并不会进行处理,而是直接重试发送 PUBREC
。
之后的 Packet ID 释放流程,则是为了保证双方的 Packet ID 都能被正常释放。不过在实际场景中,这个释放流程依旧有可能遭遇网络问题。例如发送方没有收到接收方回复的 PUBCOMP
报文,而重发 PUBREL
报文。因此接收方实现 QoS 2 时,需要确保释放 Packet ID 这个操作是幂等的(即连续执行多次,预期响应和服务器状态不会发生变化)。
后记
在文章最开始我提到过正在对现有的架构进行改版,实际上让我迫不及待想要更替原因,除了现有架构由于一开始的选型导致有非常严重的缺陷,实际上还有一件非常不齿的事情,那就是我用 WebSocket 实现了在功能上能够覆盖 MQTT 75% 的一个 Agent (主要是它需要根据情况拼装具体给设备的指令脚本,所以不能算是简单的 Broker)。对于不要重复造轮子这件原则而言,我可以说是彻底打破了这一点。
现在的这个架构采用消息总线的形式分发消息,并且支持设备断连之后的自动重连等待以及心跳机制,甚至还支持设备断连后向上游反馈(遗言)。但是我始终没有解决消息投递保证 (QoS) 的问题,没有解决的原因也很简单:一是我确实不知道怎么样设计安全的数据重传;二是传输成本太高了(单次设备控制的指令需要传输近 40KB 的数据,这还是简单压缩过后的)。
实际上,这次架构改版,本质上也是为了偿还当初技术选型的负债,当时由于刚进这家公司,有些意见发表不具备太高参考性,导致了在选型上出现了较大的问题。如今的工作也算是还债了吧。