1. MQTT 协议基础
MQTT(Message Queuing Telemetry Transport)是基于发布/订阅模式的轻量级消息协议,采用客户端-服务器架构,常用于物联网(IoT)等需要低带宽和低功耗的场景 (MQTT协议基本流程、原理_mqtt协议工作原理-CSDN博客)。在 MQTT 中,**发布者(Publisher)和订阅者(Subscriber)无需直接通信,而是通过代理服务器(Broker)**中转消息。Broker 接收发布者发送的消息,并根据消息的主题(Topic)将其分发给所有对此主题感兴趣(已订阅该主题)的订阅者 (MQTT 协议快速入门 2025:基础知识和实用教程 | EMQ)。MQTT 使用持久的 TCP 连接(通常端口1883,TLS加密端口8883)来传输数据,可选用 TLS/SSL 加密保证安全 (MQTT 协议快速入门 2025:基础知识和实用教程 | EMQ)。
1.1 发布/订阅模型
MQTT 使用发布/订阅(Pub/Sub)的通信模型。发布者将消息发送到某个主题(Topic)上,Broker 会将收到的消息路由给所有订阅了该主题的客户端。每条消息附带一个主题名称,类似于标签。订阅者通过向 Broker 发出订阅请求(SUBSCRIBE 报文),告诉 Broker 感兴趣的主题。Broker 维护所有客户端的订阅列表,当有发布消息到达时,Broker 根据主题匹配将消息副本发送给所有匹配的订阅者。由于发布者和订阅者解耦,发布者无需了解任何订阅者的存在,也不知道是否有人接收 (MQTT 协议快速入门 2025:基础知识和实用教程 | EMQ)。这种模式非常适合一对多消息分发和松耦合系统,例如传感器网络中的数据采集和分发。
1.2 MQTT 消息类型与格式
MQTT 定义了一系列**控制报文(Control Packet)**用于客户端与 Broker 之间的通信。主要的报文类型包括:
- CONNECT:客户端请求建立连接时发送的连接报文,包含协议版本、客户端ID、用户名/密码(可选)、遗嘱消息(Last Will,可选)、KeepAlive 保活时间等信息。
- CONNACK:Broker 对 CONNECT 请求的响应报文,指示连接是否成功(返回码为0表示成功)以及会话是否已存在等。
- PUBLISH:发布消息报文,由发布者发送。包含主题名和消息内容,可能还附带消息ID(用于 QoS1/QoS2 下确认)等。
- SUBSCRIBE:订阅请求报文,由订阅者发送以订阅一个或多个主题。携带每个订阅主题的过滤表达式和请求的 QoS 等级。
- SUBACK:Broker 对 SUBSCRIBE 请求的确认报文,包含各主题订阅的确认结果(如授权的QoS等级或失败)。
- PUBACK:针对 QoS1 的 PUBLISH 的确认报文,发布端收到 PUBACK 表示消息已成功到达 Broker。
- PUBREC、PUBREL、PUBCOMP:针对 QoS2 发布流程的一组握手报文,确保消息“恰好一次”传递(见下文QoS部分)。
- PINGREQ / PINGRESP:心跳请求与响应,用于保持连接活跃(KeepAlive机制)。
- DISCONNECT:客户端主动发送的断开连接报文,通知 Broker 将关闭连接。
MQTT 报文采用紧凑的二进制格式。每个报文都有一个固定报头(2字节),包含报文类型和标志,以及剩余长度字段。根据报文类型不同,后续还有可变报头和有效载荷。例如,CONNECT 报文的可变报头包含协议名“MQTT”、版本号、连接标志(包括用户名/密码标志、遗嘱标志、清洁会话标志等)、KeepAlive时间等,随后的有效载荷包含Client ID、用户名、密码等字段;而 PUBLISH 报文的可变报头包含主题名(Topic)和报文标识符(QoS>0时),有效载荷则是消息内容。本教程不深入每种报文的二进制结构,但了解这些概念有助于理解 MQTT 通信流程。
1.3 连接流程与保持活跃
一个典型的 MQTT 连接流程如下:
- 客户端发起连接:客户端使用 TCP 连接到 Broker,发送 CONNECT 报文。该报文中客户端会提供 Client ID(标识符)、可选的用户名和密码用于认证,还可以指定 Clean Session 标志和 Will(遗嘱)消息等。Clean Session 标志指示是否建立新的会话(true 表示请求清除之前会话状态),Will 消息是在客户端意外断开时由Broker代为发布的消息。
- Broker 响应:Broker 收到 CONNECT 后进行验证,返回一个 CONNACK 报文。如果连接成功,CONNACK 返回码为0,并指示是否会话存在(Session Present)。此时连接建立完成,客户端可以开始发布和订阅。
- KeepAlive 心跳:CONNECT 报文中包含一个 KeepAlive 时间(秒)。客户端和 Broker 通过发送 PINGREQ/PINGRESP 保持心跳。如果超过1.5倍 KeepAlive 时间未收到任何数据,Broker将认为客户端掉线并断开连接(同时若设置了 Will 遗嘱消息,则发布该遗嘱)。
- 数据传输:连接建立后,客户端可以发布 PUBLISH 消息或发送 SUBSCRIBE 订阅主题。Broker 按请求提供服务,转发消息或返回 ACK 确认等。
- 断开连接:客户端可随时发送 DISCONNECT 报文正常断开。Broker也可在检测到超时或错误时断开连接。
KeepAlive(保活)机制确保了客户端与服务器的连接存活状态。设置适当的KeepAlive间隔有助于及时检测到网络中断或客户端故障,同时避免过于频繁地发送心跳造成不必要流量。一般物联网设备可能使用几十秒到几分钟不等的 KeepAlive。
1.4 主题与通配符
主题(Topic)是 MQTT 中对消息进行分类和路由的依据。本质上,主题是一个分层的字符串,层级之间用斜杠/
分隔,例如:sensors/temperature/office
。客户端订阅主题时可以使用通配符(wildcard):
- 单层通配符
+
:匹配一个层级的任意主题名。例如订阅主题sensors/+/office
,将收到主题如sensors/temperature/office
、sensors/humidity/office
等消息。 - 多层通配符
#
:只能出现在主题过滤器的结尾,匹配包含它的位置及后续所有层级。例如订阅sensors/office/#
,将收到sensors/office
下所有子层级主题的消息,如sensors/office/temperature
,sensors/office/door/status
等。
主题区分大小写,订阅时必须精确匹配主题名或使用通配符过滤。Broker 在收到发布消息时,会将消息的主题与每个订阅者的主题过滤器进行匹配(包括处理通配符),如果匹配则发送消息给该订阅者。需要注意 MQTT 的主题是无固定预定义的,每个应用可以自由定义主题命名规范。注意:MQTT 规范中主题过滤器通配符 #
必须独占层级且位于末尾,+
只能匹配单层,不能跨越斜杠。
1.5 QoS 服务质量等级
MQTT 提供了三种消息传输的服务质量(Quality of Service, QoS)等级,以权衡传输可靠性和开销 (MQTT简介之三(4) MQTT协议QOS 服务质量等级原创 - CSDN博客):
- QoS 0 – 至多一次:消息最多传输一次,不保证送达(类似UDP)。发送者发送后不要求确认,Broker不会重试。可能丢失消息,但效率最高。适用于对消息丢失不敏感的场景(“尽最大努力”传输)。
- QoS 1 – 至少一次:消息确保至少到达一次。发送者会等待确认(PUBACK),若未收到将在一定时间后重传,直到收到确认为止。因此可能出现重复消息(接受方需根据报文ID或其他手段去重)。适用于需要保证送达但允许偶尔重复的场景。
- QoS 2 – 恰好一次:消息确保 严格一次 送达。采用两阶段确认流程:发送方发送消息后,接收方回复 PUBREC,发送方再回复 PUBREL,接收方最后回复 PUBCOMP 完成。这一系列握手确保双方记录消息ID,从而避免重复和丢失。QoS 2 开销最大,但提供最高可靠性,适用于关键指令等不允许重复且必须送达的场景。
在 MQTT 通信中,客户端在发布消息时可以选择 QoS 等级,Broker 会按该等级确保消息投递到 Broker。本质上QoS定义的是发送方和接收方之间单跳传输的可靠性。当 Broker 将消息转发给订阅者时,会使用订阅者所请求的 QoS 等级(通常为发布时QoS和订阅QoS的较低值)进行投递。例如,发布者以 QoS 2 发出消息,而订阅者只请求 QoS 1,那么 Broker 用 QoS 1 向订阅者发送。QoS 等级越高,协议交互的报文越多,因此除非必要应尽量使用较低的 QoS 满足业务需求。
1.6 小结
以上,我们介绍了 MQTT 协议的基本概念和工作机制,包括发布/订阅模型、消息类型和连接流程、主题及通配符、以及 QoS 服务质量等级。总结来说,MQTT 以轻量高效的方式实现了客户端和Broker间的可靠消息传递,开发者可以根据应用需求选择合适的 QoS 等级和平衡可靠性与性能 (MQTT简介之三(4) MQTT协议QOS 服务质量等级原创 - CSDN博客)。理解这些协议基础,将有助于我们在接下来的章节中使用 C 语言和 Mongoose 网络库来构建 MQTT 客户端和服务器。
接下来,我们将从实践角度出发,首先搭建一个运行在 Linux 平台上的 MQTT 客户端示例。
2. 使用 Mongoose 构建 MQTT 客户端
本章我们将使用 C 语言和 Mongoose 网络库实现一个简单的 MQTT 客户端程序,运行于 Linux 平台。Mongoose 7.17 是 Cesanta 公司提供的开源嵌入式网络库,支持包括 TCP、HTTP、WebSocket、MQTT 在内的多种协议 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。Mongoose 提供事件驱动的非阻塞 API,使用单线程事件循环即可同时管理多个网络连接,非常适合在嵌入式或系统编程中使用。我们将利用 Mongoose 内置的 MQTT 支持,快速实现客户端连接到 Broker、订阅主题以及发布消息的功能。
2.1 环境准备与工程设置
首先,请确保在 Linux 环境下已安装基本的构建工具(如 GCC、Make 等)。下载 Mongoose 7.17 库源码,可以通过 Git 克隆官方仓库:
git clone https://github.com/cesanta/mongoose
cd mongoose
Mongoose 是以单个源文件(mongoose.c
搭配 mongoose.h
)提供的。使用时,可以直接将这两个文件加入您的工程,或编译为静态/动态库后链接。在本教程示例中,我们直接包含头文件并链接 Mongoose 源,方便起见。
假设我们创建一个 mqtt_client.c
文件用于编写客户端代码,并确保 mongoose.c
在同一目录或者编译时正确链接。
2.2 MQTT 客户端示例概述
我们计划实现的 MQTT 客户端功能如下 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客):
- 连接:连接到一个公共 MQTT Broker(这里选用 HiveMQ 提供的公共 Broker
broker.hivemq.com
,监听在 TCP 1883 端口)。 - 订阅:连接建立后,客户端订阅主题
mg/+/test
(这表示订阅mg
开头、任意一级子主题、test
结尾的所有主题)。 - 发布:订阅完成后,客户端向主题
mg/clnt/test
发布一条消息"hello"
。 - 接收:由于我们订阅了
mg/+/test
,发布的消息主题mg/clnt/test
正好匹配订阅过滤器,因此客户端应当收到自己发布的这条消息(Broker 会回送给所有订阅者,包括发布者自己如果也订阅了该主题)。 - 断开:成功接收到消息后,客户端关闭连接。本示例还会实现一个简单的自动重连逻辑:使用定时器周期性检查连接状态,若断开则重新建立连接,使上述过程重复。
下面我们给出完整的代码,并逐步讲解其中关键点。
2.3 示例代码:基础 MQTT 客户端
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include "mongoose.h"
// 目标Broker的URL和主题设置
static const char *s_url = "mqtt://broker.hivemq.com:1883"; // 公共 HiveMQ MQTT 服务器
static const char *s_sub_topic = "mg/+/test"; // 订阅主题过滤器
static const char *s_pub_topic = "mg/clnt/test"; // 发布主题
static int s_qos = 1; // QoS 等级(这里使用 QoS1 保证至少一次送达)
static struct mg_connection *s_conn = NULL; // 全局保存MQTT连接
// 信号标志,用于优雅退出
static int s_signo = 0;
static void signal_handler(int signo) {
s_signo = signo;
}
// 事件回调函数,处理各类事件
static void mqtt_event_handler(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_OPEN) {
// 连接创建成功(尚未完成TCP握手)
MG_INFO(("Connection created"));
// c->is_hexdumping = 1; // 可选:启用数据十六进制调试输出
} else if (ev == MG_EV_ERROR) {
// 发生错误,例如DNS解析失败或TCP无法连接等
MG_ERROR(("%p Error: %s", c->fd, (char *) ev_data));
} else if (ev == MG_EV_CONNECT) {
// TCP 连接已建立
if (mg_url_is_ssl(s_url)) {
// 如果是 SSL/TLS 连接,初始化 TLS 会话
struct mg_tls_opts opts = {
.ca = "ca.pem" // 指定CA证书文件验证服务器(这里假设已准备好CA证书)
};
mg_tls_init(c, &opts);
}
} else if (ev == MG_EV_MQTT_OPEN) {
// MQTT 协议连接建立(收到CONNACK且成功)
MG_INFO(("MQTT connected to %s", s_url));
// 订阅指定主题
struct mg_str topic_sub = mg_str(s_sub_topic);
mg_mqtt_sub(c, topic_sub, s_qos);
MG_INFO(("Subscribed to topic %.*s", (int) topic_sub.len, topic_sub.ptr));
// 发布消息到指定主题
struct mg_str topic_pub = mg_str(s_pub_topic);
struct mg_str message = mg_str("hello");
mg_mqtt_pub(c, topic_pub, message, s_qos, false);
MG_INFO(("Published message '%.*s' to topic %.*s",
(int) message.len, message.ptr,
(int) topic_pub.len, topic_pub.ptr));
} else if (ev == MG_EV_MQTT_MSG) {
// 收到订阅的消息
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
MG_INFO(("Received message '%.*s' from topic %.*s",
(int) mm->data.len, mm->data.ptr,
(int) mm->topic.len, mm->topic.ptr));
// 一旦收到我们预期的消息,关闭连接
c->is_closing = 1;
} else if (ev == MG_EV_CLOSE) {
// 连接已关闭(无论正常断开还是异常断开都会触发)
MG_INFO(("Connection closed"));
s_conn = NULL; // 标记当前无活动连接
}
(void) fn_data;
}
int main() {
struct mg_mgr mgr;
// 设置信号处理,用于 Ctrl+C 等中断时安全退出事件循环
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
mg_mgr_init(&mgr); // 初始化事件管理器
MG_INFO(("Connecting to MQTT broker..."));
// 首次建立连接,使用 clean session,并可指定遗嘱消息(本例暂不设置遗嘱)
struct mg_mqtt_opts opts;
memset(&opts, 0, sizeof(opts));
opts.clean = true; // 要求清除会话(不复用旧session)
// 建立MQTT连接
s_conn = mg_mqtt_connect(&mgr, s_url, &opts, mqtt_event_handler, NULL);
if (s_conn == NULL) {
MG_ERROR(("Failed to create MQTT connection"));
return 1;
}
// 进入事件循环,周期性检查连接状态,每隔1秒运行一次
while (s_signo == 0) {
mg_mgr_poll(&mgr, 1000);
// 如果连接断开(s_conn 被置NULL),尝试重连
if (s_conn == NULL) {
MG_INFO(("Reconnecting..."));
s_conn = mg_mqtt_connect(&mgr, s_url, &opts, mqtt_event_handler, NULL);
}
}
// 收到退出信号,清理资源
mg_mgr_free(&mgr);
MG_INFO(("Exited cleanly"));
return 0;
}
上述代码实现了一个基本的 MQTT 客户端,并包含详细的注释。下面对关键部分逐步解释。
2.4 代码讲解
连接参数与全局变量:首先,我们定义了目标 Broker 的 URL s_url
和发布/订阅使用的主题字符串,以及 QoS 等级。这里选择 HiveMQ 公共 Broker,无需用户名密码,使用明文1883端口。s_conn
是一个全局的mg_connection
指针,用于指向活动的MQTT连接(以便在其他地方判断连接状态)。
信号处理:为了能优雅地退出程序,我们设置了一个全局整数 s_signo
,并注册了 signal_handler
来捕获 SIGINT/SIGTERM。当用户按下 Ctrl+C 时,会发送 SIGINT 信号,使 s_signo
非零,这将跳出事件循环。
事件回调函数 mqtt_event_handler
:Mongoose 的事件模型会在各种网络事件发生时调用这个回调。我们在其中处理了几种主要事件:
MG_EV_OPEN
:表示新连接对象创建。这在调用mg_mqtt_connect
后立即发生,表示连接结构已建立但还未真正连接完成。这里我们仅打印日志,可选地可以打开c->is_hexdumping
来调试底层数据包。MG_EV_ERROR
:表示连接过程中出现错误,例如解析主机名失败或无法建立 TCP 连接等。ev_data
通常包含错误描述字符串,我们打印出来并记录错误 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。MG_EV_CONNECT
:TCP 三次握手成功建立。这时如果目标是 TLS 加密连接,我们需要初始化 TLS。代码中通过mg_url_is_ssl(s_url)
检测 URL 协议是否为mqtts://
。如果是,我们设置 TLS 选项并调用mg_tls_init()
来升级连接为 SSL/TLS。这里示例指定了.ca = "ca.pem"
,假定当前目录有 Broker 的 CA 根证书用于验证服务器证书。对于纯 TCP 连接,此步骤不做任何操作 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。MG_EV_MQTT_OPEN
:表示 MQTT 协议层连接建立成功,即客户端收到了 Broker 发来的 CONNACK 且返回码为成功。此时可以认为 MQTT 会话已经建立 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。我们在这里进行订阅和发布:- 调用
mg_mqtt_sub(c, topic_sub, s_qos)
订阅主题。其中我们使用mg_str
将 C 字符串包装成mg_str
类型,mg_mqtt_sub
会发送 SUBSCRIBE 报文。 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客) - 然后准备好待发布的消息内容
"hello"
,调用mg_mqtt_pub(c, topic_pub, message, s_qos, false)
发布该消息,参数分别为发布主题、消息内容、QoS=1、retain标志=false(不保留消息)。这样客户端将发送 PUBLISH 报文。 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客) - 打印日志提示已订阅和已发布。
- 调用
MG_EV_MQTT_MSG
:当收到订阅的主题消息时触发。通过ev_data
可以取得struct mg_mqtt_message *mm
,其中包含了消息的主题和内容等信息 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。我们将消息内容和主题打印出来。按照我们的逻辑,我们期望收到自己发布的 "hello" 消息。一旦收到,我们就将c->is_closing
设为1,表示告诉 Mongoose 在处理完事件后关闭该连接。MG_EV_CLOSE
:连接关闭事件。在关闭后(无论是我们主动关闭还是远端断开),都会触发此事件。我们在此将全局s_conn
置为 NULL,表示当前无活动连接 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。主循环会检测到s_conn == NULL
进而尝试重连。- 其他事件如
MG_EV_POLL
(定时轮询),MG_EV_MQTT_CMD
(MQTT命令报文,如PUBACK等)未在此示例中专门处理,会在需要时提到。
主函数 main:首先初始化 mg_mgr
事件管理器,然后调用 mg_mqtt_connect
尝试建立 MQTT 连接 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。mg_mqtt_connect(&mgr, s_url, &opts, mqtt_event_handler, NULL)
会返回一个 mg_connection*
(失败返回NULL)。我们传入了前面准备的 opts
结构,其中设置了 opts.clean = true
来要求服务器创建一个新的清洁会话(不复用之前离线会话)。这里暂未设置 Last Will 遗嘱等其他选项,opts
其他字段默认为0即可。
进入事件循环 mg_mgr_poll(&mgr, 1000)
后,Mongoose 将不断调用事件回调处理网络事件。我们设定每次最多等待1秒,然后检查 s_conn
状态和 s_signo
信号。
主循环内包含一个简单的重连逻辑:如果检测到 s_conn == NULL
(说明连接已关闭),则调用 mg_mqtt_connect
再次建立连接 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。同时打印日志提示正在重连。这样,当第一次收到消息后我们关闭连接,循环会立即尝试重新连接 Broker,并在成功后再次订阅发布,如此每隔3秒(由我们SUB->PUB->收到->关闭的速度)会重复发送 "hello" 并接收自己的回显消息。
当用户按 Ctrl+C 使 s_signo
非零时,退出循环,调用 mg_mgr_free
清理所有连接资源,程序结束。
2.5 运行测试
编译上述代码(确保链接 mongoose.c),然后运行可执行文件。由于我们内置了日志输出(MG_INFO/MG_ERROR等,Mongoose 默认INFO日志级别),可以看到类似输出:
[I] Connecting to MQTT broker...
[I] Connection created
[I] MQTT connected to mqtt://broker.hivemq.com:1883
[I] Subscribed to topic mg/+/test
[I] Published message 'hello' to topic mg/clnt/test
[I] Received message 'hello' from topic mg/clnt/test
[I] Connection closed
[I] Reconnecting...
[I] Connection created
[I] MQTT connected to mqtt://broker.hivemq.com:1883
...
日志显示客户端成功连接并完成了订阅和发布,又收到了自己发布的消息。此循环会持续运行,间隔几秒重新发送消息,直到我们按 Ctrl+C 退出。您也可以使用其他 MQTT 客户端订阅 mg/+/test
或发布消息到 mg/clnt/test
来与本程序交互,验证其功能。
注意:使用公共 Broker 进行测试时,尽量选择独特的主题名称以免和他人冲突。同时,由于公共 Broker 可能有连接和消息频率限制,上述循环发送频率不宜过高,以免被服务拒绝。
2.6 小结
通过本章节,我们实现了一个功能完整的 MQTT 客户端示例,并熟悉了 Mongoose 库的基本用法和事件驱动模型。在代码中,我们展示了如何连接到 Broker、订阅主题、发布消息,并处理接收到的消息和各种连接事件。我们还加入了简单的自动重连机制以提高客户端的健壮性。
至此,读者应该对利用 Mongoose 开发 MQTT 客户端有了初步认识。在下一章中,我们将在此基础上增加更多特性,例如通过 TLS 建立安全连接、使用用户名/密码进行认证,以及调整 KeepAlive 等连接参数,从而构建一个更完善的 MQTT 客户端。
3. 增强 MQTT 客户端:TLS 安全、认证与保活
在实际应用中,MQTT 客户端往往需要考虑安全和可靠性配置。本章我们将扩展上一章的示例,介绍如何:
- 使用 TLS 加密与 Broker 建立安全连接,
- 使用用户名和密码进行身份认证,
- 配置 KeepAlive 保持连接活跃,
- 设置遗嘱消息(Last Will)等高级选项。
3.1 TLS 安全连接
默认的 MQTT 使用明文 TCP,在开放网络中传输敏感数据可能有风险。因此通常MQTT Broker都会开放加密端口(如8883)供客户端使用 SSL/TLS 连接。使用 Mongoose 建立 TLS 连接非常简单:将 URL 的协议由 mqtt://
改为 mqtts://
,并确保在事件处理中初始化 TLS。
在上一章代码中,我们已经演示了在 MG_EV_CONNECT
事件中调用 mg_tls_init()
。实际使用时,还需要提供服务器证书的CA证书用于验证,除非使用不验证模式(不推荐)。可以通过 struct mg_tls_opts
提供 .ca
字段指定 CA 文件路径。如果 Broker 使用公共可信CA签署证书且系统已内置可信根证书,则也可不指定 CA 文件,默认进行验证。
示例:如果我们改用 EMQX 提供的公共 MQTT 服务 broker.emqx.io
,其 TLS 端口为8883,并使用 Let’s Encrypt 证书(受信任的CA签发),我们可以这样修改连接:
// 修改URL为SSL协议和端口
s_url = "mqtts://broker.emqx.io:8883";
...
if (mg_url_is_ssl(s_url)) {
struct mg_tls_opts opts;
memset(&opts, 0, sizeof(opts));
// 对于公认CA签名的服务器证书,可以不指定opts.ca以使用系统证书
mg_tls_init(c, &opts);
}
这样,客户端就会通过 TLS 与 Broker 通信,防止数据在传输中被窃听或篡改。需要注意编译时要启用 Mongoose 的 TLS 支持,例如在编译命令中定义 -DMG_TLS=MG_TLS_BUILTIN
使用内置的mbedTLS实现,或者链接 OpenSSL 等库并定义相应宏 (Mongoose :: Tutorials :: MQTT Client)。具体启用方式可参考 Mongoose 文档和编译说明。
3.2 用户名/密码认证
MQTT 支持在 CONNECT 报文中携带用户名和密码字段,用于身份认证。很多 Broker(尤其公有服务)会要求客户端提供有效的用户名和密码才允许连接。使用 Mongoose,设置用户名密码非常直接:在 struct mg_mqtt_opts
中填充 user
和 pass
字段即可。
例如,假设我们的 Broker 需要用户名testuser
和密码secret
,可以在构造 opts
时加入:
struct mg_mqtt_opts opts;
memset(&opts, 0, sizeof(opts));
opts.clean = true;
opts.user = mg_str("testuser");
opts.pass = mg_str("secret");
然后将此 opts
传递给 mg_mqtt_connect
。Mongoose 会在生成 CONNECT 报文时将用户名和密码字段编码进去 (Mongoose: src/mqtt.c | Fossies) (Mongoose: src/mqtt.c | Fossies)。服务端收到后会校验凭证,通过则返回CONNACK,否则通常会以非0返回码拒绝连接。
需要注意,用户名和密码都会明文包含在 CONNECT 报文中,因此务必在 TLS 加密连接下使用,否则凭证可能被截获。在 TLS 连接下,这些信息会在加密通道中传输,较为安全。
3.3 KeepAlive 保持连接
KeepAlive 时间在 CONNECT 时由客户端指定,Broker 据此判断心跳。如果客户端在一个 KeepAlive 间隔内没有发送任何消息,Broker 应该期望收到一个 PINGREQ,否则将在1.5倍间隔后断开连接。Mongoose 默认会根据我们传入的 opts.keepalive
来设置该值。若不设置,库中默认KeepAlive可能为60秒(MQTT规范允许0表示不保持活跃但通常不建议)。
我们可以通过设置 opts.keepalive
来调整。例如:
opts.keepalive = 30; // 请求30秒的KeepAlive心跳间隔
Broker 收到后会将此作为双方约定的保活时间。Mongoose 当前并不会自动发送心跳包,需要我们自行在应用层定期发送。最简单的方法是在事件循环中定期调用 mg_mqtt_ping(c)
(发送PINGREQ)。或者如上一章,我们其实在不断重新发送消息,也可起到保活作用。
如果需要一个专门的心跳,可以使用 Mongoose 的定时器功能,定期执行:
mg_mqtt_ping(c);
从而在无消息时也保持连接。这通常在应用层实现即可,因为只要我们的客户端定期发布或订阅消息,已有的数据传输也能满足保活。重要的是合理设置KeepAlive:过短会造成频繁心跳增加流量,过长则掉线检测不及时。视具体网络状况和应用要求来配置。
3.4 遗嘱消息(Last Will)
MQTT 遗嘱消息是在客户端异常掉线时,由Broker代为发布的消息。我们可以在 CONNECT 时指定遗嘱主题、遗嘱消息内容、遗嘱QoS和retain标志。Mongoose 支持设置这些,通过 mg_mqtt_opts
里的相应字段。
例如,要在断线时通知其它订阅者“我掉线了”,可以设置:
opts.will_topic = mg_str("clients/status");
opts.will_message = mg_str("client1 offline");
opts.will_qos = 1;
opts.will_retain = false;
这样,如果客户端未正常DISCONNECT就断开(例如程序崩溃或网络断开超过KeepAlive),Broker 会替客户端发布消息 "client1 offline"
到主题 clients/status
,QoS1,非保留。其他订阅了该主题的客户端就能收到此通知。
在上一章的代码中,我们演示了 opts.clean = true
开新会话。如果想利用**持久会话(Persistent Session)**存储QoS>0离线消息,可以将 clean 置 false。这样,断线重连后Broker会将离线期间积存的消息(QoS1/2)发送给客户端。这需要Broker支持并启用持久会话。
3.5 综合示例:安全连接和认证
现在,我们将结合以上内容,对第2章的代码进行扩展,展示如何修改以加入 TLS 和用户名密码。假设我们有一个Broker地址 mqtts://mqtt.example.com:8883
,要求用户名密码登录,我们将在代码中体现这些更改:
static const char *s_url = "mqtts://mqtt.example.com:8883";
...
int main(void) {
...
struct mg_mqtt_opts opts;
memset(&opts, 0, sizeof(opts));
opts.clean = true;
opts.keepalive = 45; // 45秒保活
opts.user = mg_str("testuser");
opts.pass = mg_str("secret");
opts.will_topic = mg_str("clients/status");
opts.will_message = mg_str("client offline");
opts.will_qos = 1;
opts.will_retain = false;
s_conn = mg_mqtt_connect(&mgr, s_url, &opts, mqtt_event_handler, NULL);
...
}
事件处理部分,在 MG_EV_CONNECT
中我们之前已处理 TLS 初始化,这里需确保提供正确的 CA 证书。如果 mqtt.example.com
使用受信任证书且系统有相应CA,则可不设置 opts.ca;否则应将 Broker 提供的 CA 链文件路径赋给 opts.ca。例如:
if (mg_url_is_ssl(s_url)) {
struct mg_tls_opts tls_opts = {
.ca = "example_ca.pem"
};
mg_tls_init(c, &tls_opts);
}
这样,我们的客户端将在建立TLS连接后使用用户名密码登录,KeepAlive设置为45秒,并在异常断线时让Broker发送遗嘱消息。
3.6 小结
通过本章,我们学会了如何加强 MQTT 客户端的安全和可靠性配置,包括使用 TLS 加密连接来防窃听、利用用户名/密码进行身份认证、设置合理的 KeepAlive 心跳维持连接,以及配置遗嘱消息通知离线状态。这些特性往往是实际项目中必须考虑的。
完成这些增强后,我们的 MQTT 客户端已经相当健壮。接下来,我们将讨论 MQTT 消息在不同 QoS 等级下交互时,客户端需要如何处理各种确认事件,确保消息按预期的服务质量传递。
4. 处理 QoS 等级的消息交互
在 MQTT 协议中,不同的 QoS 等级意味着不同的消息确认机制。客户端在发送或接收 QoS1、QoS2 消息时,需要处理相应的确认报文(ACK)以完成完整的消息交互流程。虽然 Mongoose 帮助我们封装了大部分 MQTT 协议,但作为开发者,了解并正确处理这些交互仍然很重要。
本章将讲解如何在应用层处理 QoS0、QoS1、QoS2 级别消息的发送和接收确认。
4.1 QoS 0 —— 至多一次,无确认
发送方:对于 QoS0 消息,调用 mg_mqtt_pub
发送后即完毕。MQTT 不要求任何确认,Mongoose 会在调用后将消息加入发送缓冲,一旦通过TCP发送出去即认为完成。应用层无需特别处理 ACK。
接收方:Broker 或订阅客户端收到 QoS0 的 PUBLISH 后,直接交付应用处理(MG_EV_MQTT_MSG
事件)。因为没有后续确认,应用处理完即可,不需要回复任何 ACK 报文。对开发者来说,在事件回调中拿到 QoS0 消息和 QoS1、QoS2 并无差别,只是 QoS 字段会标记为0。
示例:在我们的客户端代码中,我们发布和订阅使用的是QoS1。但如果改为 QoS0,依然可以用 mg_mqtt_pub(c, topic, data, 0, false)
,发布后不会有 MG_EV_MQTT_CMD(PUBACK) 事件,也不会自动重发,发送一次即结束。在 MG_EV_MQTT_MSG 中 mm->qos 会是0。
4.2 QoS 1 —— 至少一次,处理 PUBACK
发送方(发布者):QoS1 消息需要确保至少送达一次。Mongoose 的 mg_mqtt_pub
在发送 QoS1 消息时,会自动为消息分配一个报文标识符 ID,并在内部等待 Broker 的 PUBACK 报文。当 Broker 返回 PUBACK,Mongoose 触发 MG_EV_MQTT_CMD
事件,其 ev_data
中的 struct mg_mqtt_message *
会有 mm->cmd == MQTT_CMD_PUBACK
,表示我们发布的某个消息得到了ACK。一般情况下,Mongoose 已经完成了QoS1所需的基本工作:如果没有收到PUBACK,它不会自动重发(因为TCP本身已保证尽力送达,大多数情况消息能到Broker)。但 如果需要在应用层实现重传,可以利用事件检测ACK超时。
例如,我们可以这样做:在发布QoS1消息后,启动一个定时器计时,如果在预期时间内未收到 PUBACK(即应用未捕获 MG_EV_MQTT_CMD/PUBACK),则重新发送消息。每次捕获PUBACK则停止重传定时器。下面是一个示意逻辑(伪代码) (Mongoose :: Tutorials :: MQTT Client) (Mongoose :: Tutorials :: MQTT Client):
bool acked = false;
int retries = 3;
uint16_t pub_id = mg_mqtt_pub(c, topic_str, data_str, 1, false);
start_timer(timeout=5s);
while (!acked && retries > 0) {
// 等待事件...
// 在事件回调中:
if (ev == MG_EV_MQTT_CMD) {
struct mg_mqtt_message *mm = ev_data;
if (mm->cmd == MQTT_CMD_PUBACK && mm->id == pub_id) {
acked = true;
// 停止定时器
}
}
// 如果定时器触发且 acked 仍为 false:
if (!acked) {
pub_id = mg_mqtt_pub(c, topic_str, data_str, 1, false); // 重发
retries--;
reset_timer();
}
}
上面逻辑意思是:发布消息后等待PUBACK,如果超时或丢失则重发,最多重试3次。当然,在大多数情况下第一次就会成功。这个过程在QoS1语义下可以保证Broker最终收到至少一份消息,但Broker可能因为重发而收到多份,需要通过消息ID去重。不过 MQTT 规范并不要求发布者去处理重复ACK,只要发布者收到一次PUBACK即可认为完成。
接收方(订阅者):当客户端作为订阅者收到QoS1的消息时,按照协议必须回复PUBACK给发送方(这里发送方通常是Broker)。Mongoose 已帮我们处理了这一点:当 Mongoose 收到QoS1的PUBLISH,会自动发送PUBACK,无需我们手动干预。因此在 MG_EV_MQTT_MSG
中我们直接处理消息内容即可,不必显式发送ACK。
4.3 QoS 2 —— 恰好一次,处理四步握手
QoS2 的完整流程较复杂,需要双方进行两个往返(PUBREC/PUBREL和PUBCOMP)。Mongoose 尽可能自动化了其中步骤,但仍需要应用层配合确保严格不重复。
发送方:客户端发布QoS2消息时,mg_mqtt_pub
会发送 PUBLISH(QoS=2,带报文ID)。Broker 收到后按协议应回复 PUBREC。Mongoose 收到PUBREC后,会自动发送 PUBREL,然后等待 Broker 的 PUBCOMP。这个过程中,Mongoose 会触发事件:
- 收到PUBREC时,会触发
MG_EV_MQTT_CMD
,其中mm->cmd == MQTT_CMD_PUBREC
。应用可监听该事件以确认Broker已收到消息的一份拷贝 (Mongoose :: Tutorials :: MQTT Client)。在 Mongoose 内部,它已发送了PUBREL。 - 收到PUBCOMP时,触发
MG_EV_MQTT_CMD
,其中mm->cmd == MQTT_CMD_PUBCOMP
,表示QoS2流程彻底完成 (Mongoose :: Tutorials :: MQTT Client)。应用层看到PUBCOMP则可以确认消息“恰好一次”送达了。
类似QoS1,我们也可以做超时重传:如果发布QoS2消息后迟迟未收到PUBREC,可以重发PUBLISH;收到了PUBREC但未收到PUBCOMP,可以重发PUBREL。但这些细节往往由MQTT库管理。Mongoose 当前自动发送PUBREL,但不会自动重试,需要用户监控是否收到PUBCOMP来决定是否重发 PUBREL (Mongoose :: Tutorials :: MQTT Client) (Mongoose :: Tutorials :: MQTT Client)。
接收方:当客户端订阅者收到QoS2的PUBLISH时,协议要求它回复PUBREC,等待发送方的PUBREL,然后再回复PUBCOMP确认完成交互。Mongoose 在接收QoS2 PUBLISH 时已经自动发送了PUBREC给对方,并将收到的消息通过 MG_EV_MQTT_MSG
交给应用处理 (Mongoose :: Tutorials :: MQTT Client)。这时应用需要注意避免重复处理:因为发送方可能没有收到PUBREC而重发PUBLISH,Broker也可能在网络抖动时重复前一步。因此应用层应检查 PUBLISH 报文的重复标志(MQTT Fixed header里的 DUP 标志)或者报文ID,判断该消息是否已处理过。如果是重复消息,应丢弃(因为内容相同),但仍让协议走完。
当发送方后续发送PUBREL过来时,Mongoose 自动回复PUBCOMP。因此,对于订阅者而言,QoS2处理关键在于防止重复消费消息。在 Mongoose 里,可以在 MG_EV_MQTT_MSG
事件中,通过 mm->dup
或维护一个已处理ID列表来判断。如果短时间内收到相同ID的QoS2消息且 DUP=true,则忽略其内容。
小结来说,QoS2保证严格一次,但需要应用在收到消息时做幂等处理,MQTT库帮助我们完成了协议回复:
- 发布者需关注 PUBREC 和 PUBCOMP 事件,必要时重试确保最终收到 PUBCOMP。
- 订阅者需处理好重复的 PUBLISH(MQTT规范要求即使重复收到也只交付一次)。
4.4 在 Mongoose 中的实现
Mongoose 将 QoS确认报文的到来统一以事件 MG_EV_MQTT_CMD
通知。我们可以在事件回调中增加相应分支,例如:
else if (ev == MG_EV_MQTT_CMD) {
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
switch (mm->cmd) {
case MQTT_CMD_PUBACK:
printf("QoS1 PUBACK received for message ID %u\n", mm->id);
break;
case MQTT_CMD_PUBREC:
printf("QoS2 PUBREC received for message ID %u\n", mm->id);
// Mongoose已自动发PUBREL
break;
case MQTT_CMD_PUBCOMP:
printf("QoS2 PUBCOMP received for message ID %u (complete)\n", mm->id);
break;
// 也可以处理 SUBACK, PINGRESP 等其他命令
}
}
这样,我们就能捕获这些ACK并用于日志或重传逻辑。如果希望实现更可靠的发送,可在发送时记录返回的 message ID(mg_mqtt_pub
返回值为 QoS>0 的消息ID (Mongoose: src/mqtt.c | Fossies)),然后在 MG_EV_MQTT_CMD 中匹配 mm->id 确认对应消息的ACK。
对于订阅者侧,假如我们实现一个MQTT Broker(下一章会做),也需要在服务器端处理 QoS1/2。这在Broker逻辑中更复杂,因为Broker既是接收方又要成为发送方转发,所以必须对每个客户端维持状态,确保每个消息对每个目标客户端按其订阅QoS完成确认。这超出了一个简单教程的范围,我们在下一章实现的简易Broker不会全面支持QoS2持久会话,仅做基本转发示范。
4.5 小结
在本章,我们深入讨论了 QoS 各级别的消息收发确认流程。对于QoS0,开发者几乎无需关心确认;QoS1 需要处理 PUBACK,可考虑重传机制;QoS2 则涉及PUBREC/PUBREL/PUBCOMP的完整握手,需要注意避免重复处理消息。Mongoose 简化了许多底层细节,我们更多的是在需要时监控事件来增强可靠性。
对于大多数应用,QoS1 已足够可靠且实现简单。而 QoS2 因实现复杂、开销大,一般仅在必要时才使用。如果使用QoS2,也建议充分测试各种异常网络情况下的行为,确保应用逻辑正确。
理解了客户端侧 QoS 处理,我们已经掌握了 MQTT 协议的大部分要点。最后,我们将挑战构建一个简易的 MQTT Broker 服务端,以更深入理解 MQTT 报文在服务器端的流程。
5. 构建简单 MQTT Broker(服务器端)示例
通常,生产环境中我们会使用现成的 MQTT 服务器(如 Mosquitto、EMQX、HiveMQ 等)来充当Broker。但是为了加深理解,我们可以使用 Mongoose 库自行实现一个精简的 MQTT Broker。这个 Broker 将能够:
- 接受客户端 CONNECT 连接请求(不校验用户名密码,仅简单接受所有连接),
- 处理客户端的 SUBSCRIBE 请求,记录订阅关系,
- 处理 PUBLISH 请求,将消息转发给匹配订阅的客户端,
- 维护基本的客户端会话状态,在客户端断开时清除其订阅。
我们实现的将是 MQTT 3.1.1 协议的子集,不支持持久会话、遗嘱、甚至不处理UNSUBSCRIBE等复杂操作,但足以演示Broker的核心机制 (Mongoose :: Tutorials :: MQTT Server)。
5.1 服务器实现概述
使用 Mongoose 实现服务器的关键在于监听端口并处理 MQTT 命令事件。Mongoose 提供了 mg_mqtt_listen
函数来方便地创建一个 MQTT 监听服务器。我们需要提供一个事件处理函数,类似客户端,但这次事件处理的主体是服务器监听到的各个客户端连接。
我们的Broker需要维护一个全局订阅列表,记录哪些客户端订阅了哪些主题以及QoS。为了简单,我们定义一个结构体:
struct sub {
struct sub *next;
struct mg_connection *c; // 订阅客户端的连接
struct mg_str topic; // 订阅的主题过滤器(支持通配符)
uint8_t qos; // 订阅使用的QoS
};
static struct sub *s_subs = NULL; // 全局链表头
我们将每个订阅记录为链表节点 sub
并链接到全局 s_subs
。当有新的订阅时,添加节点;当客户端断开时,删除该客户端相关的节点。
Broker 的事件处理函数需要处理的 MQTT命令主要有:
- MQTT_CMD_CONNECT:客户端请求连接。我们应检查协议版本,并发送 CONNACK。我们的实现对用户名密码不校验、clientID不做特殊处理,只要协议版本是3.1.1就接受。
- MQTT_CMD_SUBSCRIBE:客户端发来订阅请求。一个 SUBSCRIBE 报文可能包含多个主题订阅,我们需要逐个解析。为每个订阅创建
sub
记录加入列表,并回复 SUBACK 确认订阅成功(返回每个订阅的QoS,简单起见我们就回送客户端要求的QoS)。 - MQTT_CMD_PUBLISH:客户端发布消息。Broker收到后,需要将消息转发给所有匹配其主题的订阅。我们要遍历
s_subs
列表,找出topic匹配的订阅者,然后调用 Mongoose 函数将消息发送给这些客户端。匹配时要考虑通配符。Mongoose 提供了mg_globmatch
帮助我们进行类似shell通配符的匹配 (Mongoose :: Tutorials :: MQTT Server)。在使用前,我们可以将订阅记录的主题过滤器中的 '+' 通配符转换为 '*' 以利用 mg_globmatch(因为*
在 mg_globmatch 中表示任意字符串,包括跨层级)。 - MQTT_CMD_PINGREQ:客户端发送心跳请求,我们需要回复 PINGRESP。
- MG_EV_CLOSE(连接关闭事件,不是 MQTT命令):当客户端断开时,移除它的所有订阅记录,避免内存泄漏和无效转发。
通过处理上述几种情况,我们就能实现一个基本可用的Broker。
5.2 服务器代码实现
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include "mongoose.h"
static const char *s_listen_addr = "mqtt://0.0.0.0:1883"; // 监听地址和端口
// 订阅记录的链表节点结构
struct sub {
struct sub *next;
struct mg_connection *c; // 订阅客户端连接
struct mg_str topic; // 订阅的主题过滤器(可能含通配符)
uint8_t qos;
};
static struct sub *s_subs = NULL; // 订阅链表头
// 信号处理,用于优雅退出服务器
static int s_signo = 0;
static void signal_handler(int signo) {
s_signo = signo;
}
// 辅助函数:解析 SUBSCRIBE 报文中的下一个主题过滤器和QoS
// MQTT SUBSCRIBE 报文格式: [Packet ID(2 bytes)] + 若干个 [Topic Length(2 bytes) + Topic Filter + Requested QoS(1 byte)]
// 参数msg为Mongoose解析出的MQTT消息结构,pos为当前解析偏移,返回下一个偏移(0表示解析完毕或出错)
static size_t parse_next_sub_topic(struct mg_mqtt_message *msg, struct mg_str *topic, uint8_t *qos, size_t pos) {
// msg->dgram 包含原始报文数据,长度在 msg->dgram.len
if (pos >= msg->dgram.len) return 0;
const uint8_t *buf = (uint8_t *) msg->dgram.ptr + pos;
// 读取主题长度
if (pos + 2 > msg->dgram.len) return 0;
size_t topic_len = ((size_t)buf[0] << 8) | buf[1];
if (pos + 2 + topic_len + 1 > msg->dgram.len) return 0; // 检查数据是否完整
topic->len = topic_len;
topic->ptr = (char *) buf + 2;
// 读取QoS字节
if (qos) *qos = buf[2 + topic_len];
// 返回下一个主题项的起始偏移
return pos + 2 + topic_len + 1;
}
// 事件处理函数(Broker端)
static void mqtt_server_fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
if (ev == MG_EV_MQTT_CMD) {
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
// 根据 MQTT 控制报文类型进行处理
switch (mm->cmd) {
case MQTT_CMD_CONNECT: {
// 客户端请求连接
if (mm->dgram.len < 9 || (uint8_t) mm->dgram.ptr[8] != 4) {
// 报文长度太短 或 协议级别不是4(MQTT3.1.1=4)
mg_error(c, "Malformed CONNECT or unsupported protocol");
} else {
// 发送 CONNACK 确认连接 (固定2字节: 会话Present标志=0, 返回码=0)
uint8_t response[2] = {0x00, 0x00};
mg_mqtt_send_header(c, MQTT_CMD_CONNACK, 0, sizeof(response));
mg_send(c, response, sizeof(response));
}
break;
}
case MQTT_CMD_SUBSCRIBE: {
// 客户端订阅主题
// SUBSCRIBE 报文固定报头后前两个字节是报文标识符(Message ID)
size_t pos = 2; // 跳过报文ID开始解析主题列表
uint8_t req_qos;
struct mg_str topic;
uint8_t suback_qos[10]; // 最多支持10个主题的回执(仅示例,实际应动态处理)
int num_topics = 0;
// 逐个解析所有订阅主题
while ((pos = parse_next_sub_topic(mm, &topic, &req_qos, pos)) > 0 && num_topics < 10) {
// 为每个主题分配订阅记录
struct sub *sub = (struct sub *) calloc(1, sizeof(*sub));
sub->c = c;
sub->topic = mg_strdup(topic); // 拷贝主题字符串存储
sub->qos = req_qos & 0x03; // 仅保留QoS低两位(有效值0,1,2)
// 将订阅加入链表头
sub->next = s_subs;
s_subs = sub;
// 将通配符'+'替换为'*',方便后续用mg_globmatch匹配单层
for (size_t i = 0; i < sub->topic.len; i++) {
if (sub->topic.ptr[i] == '+') ((char *) sub->topic.ptr)[i] = '*';
}
suback_qos[num_topics++] = sub->qos; // 回应的QoS(这里直接同意请求的QoS)
MG_INFO(("Client %p subscribed to %.*s (QoS %d)", c->fd,
(int) sub->topic.len, sub->topic.ptr, sub->qos));
}
// 发送 SUBACK 报文: 包含原订阅报文的Message ID以及每个订阅主题的返回QoS
if (num_topics > 0) {
mg_mqtt_send_header(c, MQTT_CMD_SUBACK, 0, 2 + num_topics);
// 原报文Message ID可以通过 mm->id 获得(Mongoose 已解析出)
uint16_t id_n = mg_htons(mm->id);
mg_send(c, &id_n, 2);
mg_send(c, suback_qos, num_topics);
}
break;
}
case MQTT_CMD_PUBLISH: {
// 客户端发布消息
// 从 mm 中获取主题和消息
struct mg_str topic = mm->topic;
struct mg_str data = mm->data;
MG_INFO(("Received PUBLISH on %.*s: %.*s",
(int) topic.len, topic.ptr,
(int) data.len, data.ptr));
// 将消息转发给所有匹配订阅的客户端
for (struct sub *sub = s_subs; sub != NULL; sub = sub->next) {
// 使用 mg_globmatch 进行通配符匹配;mg_globmatch(pattern, pat_len, str, str_len)
if (mg_globmatch(sub->topic.ptr, sub->topic.len, topic.ptr, topic.len)) {
// 将消息发送给订阅者sub->c
mg_mqtt_pub(sub->c, topic, data, sub->qos < mm->qos ? sub->qos : mm->qos, false);
// 这里使用发布者QoS和订阅者QoS的较小值作为实际发送QoS
MG_INFO(("Forwarded message to %p (subscribed %.*s)",
sub->c->fd, (int) sub->topic.len, sub->topic.ptr));
}
}
// 如果原消息QoS是1或2,按协议需回复ACK:
if (mm->qos == 1) {
// QoS1,回复PUBACK
mg_mqtt_send_header(c, MQTT_CMD_PUBACK, 0, 2);
uint16_t id_n = mg_htons(mm->id);
mg_send(c, &id_n, 2);
} else if (mm->qos == 2) {
// QoS2,回复PUBREC,等待客户端后续PUBREL,在Mongoose中,会有MG_EV_MQTT_CMD触发PUBREL事件,再处理
// 但为简单,此处直接回复PUBREC并立刻当做完成(完全实现QoS2较复杂)
mg_mqtt_send_header(c, MQTT_CMD_PUBREC, 0, 2);
uint16_t id_n = mg_htons(mm->id);
mg_send(c, &id_n, 2);
// 注意:完整实现还需等待PUBREL再PUBCOMP,这里简化处理
}
break;
}
case MQTT_CMD_PINGREQ: {
// 心跳请求,回复 PINGRESP
MG_INFO(("PINGREQ from %p, send PINGRESP", c->fd));
mg_mqtt_send_header(c, MQTT_CMD_PINGRESP, 0, 0);
break;
}
default:
// 其他命令未实现,例如 UNSUBSCRIBE,直接忽略或响应不支持
MG_WARN(("Unhandled MQTT command %d from %p", mm->cmd, c->fd));
} // switch(mm->cmd)
} else if (ev == MG_EV_CLOSE) {
// 某个客户端断开连接,清除其所有订阅
struct sub **pp = &s_subs;
while (*pp) {
if ((*pp)->c == c) {
struct sub *sub_to_remove = *pp;
MG_INFO(("Client %p disconnected, remove subscription %.*s", c->fd,
(int) sub_to_remove->topic.len, sub_to_remove->topic.ptr));
// 从链表移除该节点
*pp = sub_to_remove->next;
mg_strfree(&sub_to_remove->topic);
free(sub_to_remove);
// 不更新pp,继续检查下一个(因为删除后 *pp 已指向下一个节点)
continue;
}
pp = &(*pp)->next;
}
}
(void) fn_data;
}
int main(void) {
struct mg_mgr mgr;
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
mg_mgr_init(&mgr);
// 启动MQTT监听服务器
if (mg_mqtt_listen(&mgr, s_listen_addr, mqtt_server_fn, NULL) == NULL) {
fprintf(stderr, "Error starting MQTT broker on %s\n", s_listen_addr);
return 1;
}
MG_INFO(("MQTT broker started on %s", s_listen_addr));
// 进入事件循环
while (s_signo == 0) {
mg_mgr_poll(&mgr, 1000);
}
// 清理
mg_mgr_free(&mgr);
return 0;
}
让我们解释一下服务器代码的重要部分:
- 监听初始化:
mg_mqtt_listen(&mgr, "mqtt://0.0.0.0:1883", mqtt_server_fn, NULL)
在本地0.0.0.0地址的1883端口启动监听 (〖Mongoose笔记〗MQTT 服务器_mongoose mqtt-CSDN博客)。成功时返回监听连接对象,我们不太需要保留它;失败时返回NULL,我们做了错误检查。 - CONNECT 处理:在
MQTT_CMD_CONNECT
分支,我们简单检查了协议版本是否为4(表示MQTT3.1.1)。如果不是,则调用mg_error(c, ...)
关闭连接 (〖Mongoose笔记〗MQTT 服务器_mongoose mqtt-CSDN博客)。如果正确,发送一个固定的CONNACK应答(字节0x00,0x00表示Session Present=0,返回码0,接受连接) (〖Mongoose笔记〗MQTT 服务器_mongoose mqtt-CSDN博客)。 - SUBSCRIBE 处理:使用
parse_next_sub_topic
函数解析订阅载荷。这个函数根据MQTT协议解析每个主题过滤器字符串和后面的QoS值,返回下一个起点索引。我们循环调用它,将解析出的主题和QoS保存在新分配的sub
节点中,并加入全局链表 (〖Mongoose笔记〗MQTT 服务器_mongoose mqtt-CSDN博客)。为匹配方便,将主题字符串中'+'
替换为'*'
(〖Mongoose笔记〗MQTT 服务器_mongoose mqtt-CSDN博客)。对于#
通配符我们简化处理:mg_globmatch
用*
也能匹配多层(因为*
会匹配包括/
的任意字符序列),所以这里不特地处理#
,不过严格来说#
应该也转换或在匹配时特殊对待。简化起见,#
我们当作普通字符对待,mg_globmatch
的*
实际可以涵盖类似的效果。 解析完所有主题后,我们构造 SUBACK 报文:调用mg_mqtt_send_header
发送固定报头,然后发送原报文ID和每个主题的确认QoS (〖Mongoose笔记〗MQTT 服务器_mongoose mqtt-CSDN博客)。 - PUBLISH 处理:Broker收到了客户端发布的消息。我们打印日志 (〖Mongoose笔记〗MQTT 服务器_mongoose mqtt-CSDN博客)后,遍历订阅链表,对每个订阅检查其 topic filter 是否匹配发布的 topic (〖Mongoose笔记〗MQTT 服务器_mongoose mqtt-CSDN博客)。利用
mg_globmatch(sub->topic.ptr, sub->topic.len, topic.ptr, topic.len)
进行匹配,如果匹配成功,就调用mg_mqtt_pub(sub->c, topic, data, qos, false)
转发消息给该订阅者客户端 (〖Mongoose笔记〗MQTT 服务器_mongoose mqtt-CSDN博客)。这里QoS选用了sub->qos < mm->qos ? sub->qos : mm->qos
(发布QoS和订阅QoS的较小值),符合MQTT规范:发送时不得提升原消息的QoS,可以降低。因此,如果发布者发的是QoS2,但订阅者只要求QoS1,我们用QoS1推送。 最后,根据原消息QoS,Broker要给发布客户端回复相应ACK:- 如果 mm->qos == 1,发送 PUBACK。 (〖Mongoose笔记〗MQTT 服务器_mongoose mqtt-CSDN博客)
- 如果 mm->qos == 2,发送 PUBREC。 (〖Mongoose笔记〗MQTT 服务器_mongoose mqtt-CSDN博客)由于我们没有完整实现QoS2流程,暂未处理后续 PUBREL->PUBCOMP,但至少PUBREC告诉发送方“我收到了”。完善的实现需在收到PUBREL时(会以MG_EV_MQTT_CMD事件出现 mm->cmd==PUBREL)发送PUBCOMP,这里出于简化就不深入了。
- PINGREQ 处理:收到心跳请求即刻回复 PINGRESP。 (〖Mongoose笔记〗MQTT 服务器_mongoose mqtt-CSDN博客)
- 连接关闭:遍历链表删除该连接的所有订阅。 (〖Mongoose笔记〗MQTT 服务器_mongoose mqtt-CSDN博客)这里采用双指针技巧安全地在单链表中删除多个节点。我们也释放了
mg_strdup
分配的字符串(用mg_strfree
)。这样保证客户端断线后不会遗留无效订阅。
5.3 运行与测试 Broker
编译运行上述 Broker 程序,会在本地启动一个MQTT服务监听1883端口。可以用 Mosquitto 客户端或 MQTTX 等工具来测试:
- 打开一个终端,运行
mosquitto_sub -v -h 127.0.0.1 -p 1883 -t 'test/topic' -t 'foo/#'
来订阅主题(例如订阅具体的test/topic
以及通配符订阅foo/#
)。 - 再打开另一个终端,尝试发布消息:
mosquitto_pub -h 127.0.0.1 -p 1883 -t 'test/topic' -m 'hello'
订阅端应收到该消息(因为订阅了test/topic
)。mosquitto_pub -h 127.0.0.1 -p 1883 -t 'foo/bar/baz' -m 'hey'
订阅端应收到(匹配foo/#
订阅)。- 尝试发布一个无人订阅的主题,订阅端不会显示,Broker日志可看到收到但未转发。
- 也可以尝试多个订阅客户端验证Broker会将消息转发给所有订阅者。
我们的简易Broker不实现认证,因此任何人都可连接订阅。同时它也不支持保留消息、遗嘱、持久会话等高级特性。但通过这个过程,我们已经演练了Broker需要完成的核心操作:CONNECT/CONNACK、SUBSCRIBE/SUBACK、PUBLISH路由、以及客户端断开处理。
5.4 小结
本章节我们使用 Mongoose 快速构建了一个精简的 MQTT Broker,理解了服务器端处理 MQTT 报文的基本流程。在实现过程中,我们管理了客户端订阅列表,并利用 Mongoose 提供的工具函数实现了主题匹配和消息转发。这个Broker能够让多个MQTT客户端连接并交换消息,证明了我们对MQTT协议机制的掌握。
需要指出,这只是一个教学示例,实际可用的Broker需要更多功能和健壮性,例如:
- 完整处理 QoS2 的四步握手和消息重传,
- 支持客户端 Clean Session=false 时保存离线消息,
- 验证用户名/密码,访问控制,
- 更高效的订阅管理结构(如基于Trie的主题树),
- 处理 UNSUBSCRIBE,以及遗嘱消息发布等。
即便如此,通过亲手实现这些基本功能,读者应已对 MQTT 工作原理有了深刻理解,也体会到 Mongoose 库在网络编程上的强大之处。
6. 总结
在本教程中,我们从 MQTT 协议基础开始,逐步实践了 MQTT 客户端和服务器端的实现过程。首先,我们介绍了 MQTT 的发布/订阅模型、主题和 QoS 等关键概念 (MQTT简介之三(4) MQTT协议QOS 服务质量等级原创 - CSDN博客)。然后,我们使用 C 语言结合 Mongoose 7.17 库构建了一个基本 MQTT 客户端,演示了如何连接 Broker、订阅/发布消息,并通过事件回调处理 MQTT 通信。我们进一步增强了客户端,加入 TLS 安全连接、用户名密码认证、KeepAlive 和遗嘱消息等特性,提高了客户端的安全性和可靠性。
随后,我们讨论了 QoS0、QoS1、QoS2 消息交互中需要处理的确认报文机制,说明了如何借助 Mongoose 的事件处理对这些 QoS 做支持,保证消息按期望的服务质量传递。最后,我们实现了一个简单的 MQTT Broker 示例,从服务器角度完整走了一遍 CONNECT、SUBSCRIBE、PUBLISH、PING 等流程,加深了对 MQTT 协议运作的理解。
通过这些连续的章节内容,相信读者已经全面了解了 MQTT 协议的基础知识和使用 Mongoose 进行 MQTT 开发的实践技巧。读者现在应该能够:
- 理解 MQTT 协议的工作流程、消息格式以及 QoS 机制 (MQTT 协议快速入门 2025:基础知识和实用教程 | EMQ) (MQTT简介之三(4) MQTT协议QOS 服务质量等级原创 - CSDN博客),
- 使用 Mongoose C 库编写 MQTT 客户端程序,连接到 Broker 进行消息收发,
- 配置客户端的 TLS、认证和 KeepAlive 等参数来满足实际应用需求,
- 处理不同 QoS 等级下的消息确认,确保通信可靠,
- 明白一个MQTT Broker在处理客户端请求和消息转发时所需执行的操作,并用代码实现了简化版本的Broker。
如果进一步学习,建议读者参考 MQTT 正式规范以及 Mongoose 官方文档,尝试实现更多高级功能。例如,实现持久会话,处理断线重连时未完成的QoS2传输,或者给Broker添加权限控制等。同时,可以尝试将此基础客户端集成到实际项目,比如制作一个传感器数据上传的客户端,或基于此Broker扩展出一个专用的消息服务器。
通过本教程的学习,您已经从底层协议到实际编码全方位掌握了 MQTT,在未来的物联网开发中,相信这些知识将大有裨益。祝您在 MQTT 与 C 编程之旅中不断收获新的成果!
(MQTT 协议快速入门 2025:基础知识和实用教程 | EMQ) (MQTT简介之三(4) MQTT协议QOS 服务质量等级原创 - CSDN博客)
评论区