1. MQTT 协议基础

MQTT(Message Queuing Telemetry Transport)是基于发布/订阅模式的轻量级消息协议,采用客户端-服务器架构,常用于物联网(IoT)等需要低带宽和低功耗的场景 (MQTT协议基本流程、原理_mqtt协议工作原理-CSDN博客)。在 MQTT 中,**发布者(Publisher)订阅者(Subscriber)无需直接通信,而是通过代理服务器(Broker)**中转消息。Broker 接收发布者发送的消息,并根据消息的主题(Topic)将其分发给所有对此主题感兴趣(已订阅该主题)的订阅者 (MQTT 协议快速入门 2025:基础知识和实用教程 | EMQ)。MQTT 使用持久的 TCP 连接(通常端口1883,TLS加密端口8883)来传输数据,可选用 TLS/SSL 加密保证安全 (MQTT 协议快速入门 2025:基础知识和实用教程 | EMQ)。

1.1 发布/订阅模型

MQTT 使用发布/订阅(Pub/Sub)的通信模型。发布者将消息发送到某个主题(Topic)上,Broker 会将收到的消息路由给所有订阅了该主题的客户端。每条消息附带一个主题名称,类似于标签。订阅者通过向 Broker 发出订阅请求(SUBSCRIBE 报文),告诉 Broker 感兴趣的主题。Broker 维护所有客户端的订阅列表,当有发布消息到达时,Broker 根据主题匹配将消息副本发送给所有匹配的订阅者。由于发布者和订阅者解耦,发布者无需了解任何订阅者的存在,也不知道是否有人接收 (MQTT 协议快速入门 2025:基础知识和实用教程 | EMQ)。这种模式非常适合一对多消息分发和松耦合系统,例如传感器网络中的数据采集和分发。

1.2 MQTT 消息类型与格式

MQTT 定义了一系列**控制报文(Control Packet)**用于客户端与 Broker 之间的通信。主要的报文类型包括:

  • CONNECT:客户端请求建立连接时发送的连接报文,包含协议版本、客户端ID、用户名/密码(可选)、遗嘱消息(Last Will,可选)、KeepAlive 保活时间等信息。
  • CONNACK:Broker 对 CONNECT 请求的响应报文,指示连接是否成功(返回码为0表示成功)以及会话是否已存在等。
  • PUBLISH:发布消息报文,由发布者发送。包含主题名和消息内容,可能还附带消息ID(用于 QoS1/QoS2 下确认)等。
  • SUBSCRIBE:订阅请求报文,由订阅者发送以订阅一个或多个主题。携带每个订阅主题的过滤表达式和请求的 QoS 等级。
  • SUBACK:Broker 对 SUBSCRIBE 请求的确认报文,包含各主题订阅的确认结果(如授权的QoS等级或失败)。
  • PUBACK:针对 QoS1 的 PUBLISH 的确认报文,发布端收到 PUBACK 表示消息已成功到达 Broker。
  • PUBRECPUBRELPUBCOMP:针对 QoS2 发布流程的一组握手报文,确保消息“恰好一次”传递(见下文QoS部分)。
  • PINGREQ / PINGRESP:心跳请求与响应,用于保持连接活跃(KeepAlive机制)。
  • DISCONNECT:客户端主动发送的断开连接报文,通知 Broker 将关闭连接。

MQTT 报文采用紧凑的二进制格式。每个报文都有一个固定报头(2字节),包含报文类型和标志,以及剩余长度字段。根据报文类型不同,后续还有可变报头和有效载荷。例如,CONNECT 报文的可变报头包含协议名“MQTT”、版本号、连接标志(包括用户名/密码标志、遗嘱标志、清洁会话标志等)、KeepAlive时间等,随后的有效载荷包含Client ID、用户名、密码等字段;而 PUBLISH 报文的可变报头包含主题名(Topic)和报文标识符(QoS>0时),有效载荷则是消息内容。本教程不深入每种报文的二进制结构,但了解这些概念有助于理解 MQTT 通信流程。

1.3 连接流程与保持活跃

一个典型的 MQTT 连接流程如下:

  1. 客户端发起连接:客户端使用 TCP 连接到 Broker,发送 CONNECT 报文。该报文中客户端会提供 Client ID(标识符)、可选的用户名和密码用于认证,还可以指定 Clean Session 标志和 Will(遗嘱)消息等。Clean Session 标志指示是否建立新的会话(true 表示请求清除之前会话状态),Will 消息是在客户端意外断开时由Broker代为发布的消息。
  2. Broker 响应:Broker 收到 CONNECT 后进行验证,返回一个 CONNACK 报文。如果连接成功,CONNACK 返回码为0,并指示是否会话存在(Session Present)。此时连接建立完成,客户端可以开始发布和订阅。
  3. KeepAlive 心跳:CONNECT 报文中包含一个 KeepAlive 时间(秒)。客户端和 Broker 通过发送 PINGREQ/PINGRESP 保持心跳。如果超过1.5倍 KeepAlive 时间未收到任何数据,Broker将认为客户端掉线并断开连接(同时若设置了 Will 遗嘱消息,则发布该遗嘱)。
  4. 数据传输:连接建立后,客户端可以发布 PUBLISH 消息或发送 SUBSCRIBE 订阅主题。Broker 按请求提供服务,转发消息或返回 ACK 确认等。
  5. 断开连接:客户端可随时发送 DISCONNECT 报文正常断开。Broker也可在检测到超时或错误时断开连接。

KeepAlive(保活)机制确保了客户端与服务器的连接存活状态。设置适当的KeepAlive间隔有助于及时检测到网络中断或客户端故障,同时避免过于频繁地发送心跳造成不必要流量。一般物联网设备可能使用几十秒到几分钟不等的 KeepAlive。

1.4 主题与通配符

主题(Topic)是 MQTT 中对消息进行分类和路由的依据。本质上,主题是一个分层的字符串,层级之间用斜杠/分隔,例如:sensors/temperature/office。客户端订阅主题时可以使用通配符(wildcard)

  • 单层通配符 +:匹配一个层级的任意主题名。例如订阅主题 sensors/+/office,将收到主题如sensors/temperature/officesensors/humidity/office等消息。
  • 多层通配符 #:只能出现在主题过滤器的结尾,匹配包含它的位置及后续所有层级。例如订阅 sensors/office/#,将收到 sensors/office 下所有子层级主题的消息,如sensors/office/temperature, sensors/office/door/status等。

主题区分大小写,订阅时必须精确匹配主题名或使用通配符过滤。Broker 在收到发布消息时,会将消息的主题与每个订阅者的主题过滤器进行匹配(包括处理通配符),如果匹配则发送消息给该订阅者。需要注意 MQTT 的主题是无固定预定义的,每个应用可以自由定义主题命名规范。注意:MQTT 规范中主题过滤器通配符 # 必须独占层级且位于末尾,+ 只能匹配单层,不能跨越斜杠。

1.5 QoS 服务质量等级

MQTT 提供了三种消息传输的服务质量(Quality of Service, QoS)等级,以权衡传输可靠性和开销 (MQTT简介之三(4) MQTT协议QOS 服务质量等级原创 - CSDN博客):

  • QoS 0 – 至多一次:消息最多传输一次,不保证送达(类似UDP)。发送者发送后不要求确认,Broker不会重试。可能丢失消息,但效率最高。适用于对消息丢失不敏感的场景(“尽最大努力”传输)。
  • QoS 1 – 至少一次:消息确保至少到达一次。发送者会等待确认(PUBACK),若未收到将在一定时间后重传,直到收到确认为止。因此可能出现重复消息(接受方需根据报文ID或其他手段去重)。适用于需要保证送达但允许偶尔重复的场景。
  • QoS 2 – 恰好一次:消息确保 严格一次 送达。采用两阶段确认流程:发送方发送消息后,接收方回复 PUBREC,发送方再回复 PUBREL,接收方最后回复 PUBCOMP 完成。这一系列握手确保双方记录消息ID,从而避免重复和丢失。QoS 2 开销最大,但提供最高可靠性,适用于关键指令等不允许重复且必须送达的场景。

在 MQTT 通信中,客户端在发布消息时可以选择 QoS 等级,Broker 会按该等级确保消息投递到 Broker。本质上QoS定义的是发送方和接收方之间单跳传输的可靠性。当 Broker 将消息转发给订阅者时,会使用订阅者所请求的 QoS 等级(通常为发布时QoS和订阅QoS的较低值)进行投递。例如,发布者以 QoS 2 发出消息,而订阅者只请求 QoS 1,那么 Broker 用 QoS 1 向订阅者发送。QoS 等级越高,协议交互的报文越多,因此除非必要应尽量使用较低的 QoS 满足业务需求。

1.6 小结

以上,我们介绍了 MQTT 协议的基本概念和工作机制,包括发布/订阅模型、消息类型和连接流程、主题及通配符、以及 QoS 服务质量等级。总结来说,MQTT 以轻量高效的方式实现了客户端和Broker间的可靠消息传递,开发者可以根据应用需求选择合适的 QoS 等级和平衡可靠性与性能 (MQTT简介之三(4) MQTT协议QOS 服务质量等级原创 - CSDN博客)。理解这些协议基础,将有助于我们在接下来的章节中使用 C 语言和 Mongoose 网络库来构建 MQTT 客户端和服务器。

接下来,我们将从实践角度出发,首先搭建一个运行在 Linux 平台上的 MQTT 客户端示例。

2. 使用 Mongoose 构建 MQTT 客户端

本章我们将使用 C 语言和 Mongoose 网络库实现一个简单的 MQTT 客户端程序,运行于 Linux 平台。Mongoose 7.17 是 Cesanta 公司提供的开源嵌入式网络库,支持包括 TCP、HTTP、WebSocket、MQTT 在内的多种协议 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。Mongoose 提供事件驱动的非阻塞 API,使用单线程事件循环即可同时管理多个网络连接,非常适合在嵌入式或系统编程中使用。我们将利用 Mongoose 内置的 MQTT 支持,快速实现客户端连接到 Broker、订阅主题以及发布消息的功能。

2.1 环境准备与工程设置

首先,请确保在 Linux 环境下已安装基本的构建工具(如 GCC、Make 等)。下载 Mongoose 7.17 库源码,可以通过 Git 克隆官方仓库:

git clone https://github.com/cesanta/mongoose
cd mongoose

Mongoose 是以单个源文件(mongoose.c 搭配 mongoose.h)提供的。使用时,可以直接将这两个文件加入您的工程,或编译为静态/动态库后链接。在本教程示例中,我们直接包含头文件并链接 Mongoose 源,方便起见。

假设我们创建一个 mqtt_client.c 文件用于编写客户端代码,并确保 mongoose.c 在同一目录或者编译时正确链接。

2.2 MQTT 客户端示例概述

我们计划实现的 MQTT 客户端功能如下 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客):

  1. 连接:连接到一个公共 MQTT Broker(这里选用 HiveMQ 提供的公共 Broker broker.hivemq.com,监听在 TCP 1883 端口)。
  2. 订阅:连接建立后,客户端订阅主题 mg/+/test(这表示订阅 mg 开头、任意一级子主题、test 结尾的所有主题)。
  3. 发布:订阅完成后,客户端向主题 mg/clnt/test 发布一条消息 "hello"
  4. 接收:由于我们订阅了 mg/+/test,发布的消息主题 mg/clnt/test 正好匹配订阅过滤器,因此客户端应当收到自己发布的这条消息(Broker 会回送给所有订阅者,包括发布者自己如果也订阅了该主题)。
  5. 断开:成功接收到消息后,客户端关闭连接。本示例还会实现一个简单的自动重连逻辑:使用定时器周期性检查连接状态,若断开则重新建立连接,使上述过程重复。

下面我们给出完整的代码,并逐步讲解其中关键点。

2.3 示例代码:基础 MQTT 客户端

#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include "mongoose.h"

// 目标Broker的URL和主题设置
static const char *s_url = "mqtt://broker.hivemq.com:1883";    // 公共 HiveMQ MQTT 服务器
static const char *s_sub_topic = "mg/+/test";                 // 订阅主题过滤器
static const char *s_pub_topic = "mg/clnt/test";              // 发布主题
static int s_qos = 1;                                         // QoS 等级(这里使用 QoS1 保证至少一次送达)

static struct mg_connection *s_conn = NULL;   // 全局保存MQTT连接

// 信号标志,用于优雅退出
static int s_signo = 0;
static void signal_handler(int signo) {
    s_signo = signo;
}

// 事件回调函数,处理各类事件
static void mqtt_event_handler(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
    if (ev == MG_EV_OPEN) {
        // 连接创建成功(尚未完成TCP握手)
        MG_INFO(("Connection created"));
        // c->is_hexdumping = 1;  // 可选:启用数据十六进制调试输出
    } else if (ev == MG_EV_ERROR) {
        // 发生错误,例如DNS解析失败或TCP无法连接等
        MG_ERROR(("%p Error: %s", c->fd, (char *) ev_data));
    } else if (ev == MG_EV_CONNECT) {
        // TCP 连接已建立
        if (mg_url_is_ssl(s_url)) {
            // 如果是 SSL/TLS 连接,初始化 TLS 会话
            struct mg_tls_opts opts = {
                .ca = "ca.pem"   // 指定CA证书文件验证服务器(这里假设已准备好CA证书)
            };
            mg_tls_init(c, &opts);
        }
    } else if (ev == MG_EV_MQTT_OPEN) {
        // MQTT 协议连接建立(收到CONNACK且成功)
        MG_INFO(("MQTT connected to %s", s_url));
        // 订阅指定主题
        struct mg_str topic_sub = mg_str(s_sub_topic);
        mg_mqtt_sub(c, topic_sub, s_qos);
        MG_INFO(("Subscribed to topic %.*s", (int) topic_sub.len, topic_sub.ptr));
        // 发布消息到指定主题
        struct mg_str topic_pub = mg_str(s_pub_topic);
        struct mg_str message = mg_str("hello");
        mg_mqtt_pub(c, topic_pub, message, s_qos, false);
        MG_INFO(("Published message '%.*s' to topic %.*s",
                 (int) message.len, message.ptr,
                 (int) topic_pub.len, topic_pub.ptr));
    } else if (ev == MG_EV_MQTT_MSG) {
        // 收到订阅的消息
        struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
        MG_INFO(("Received message '%.*s' from topic %.*s",
                 (int) mm->data.len, mm->data.ptr,
                 (int) mm->topic.len, mm->topic.ptr));
        // 一旦收到我们预期的消息,关闭连接
        c->is_closing = 1;
    } else if (ev == MG_EV_CLOSE) {
        // 连接已关闭(无论正常断开还是异常断开都会触发)
        MG_INFO(("Connection closed"));
        s_conn = NULL;  // 标记当前无活动连接
    }
    (void) fn_data;
}

int main() {
    struct mg_mgr mgr;
    // 设置信号处理,用于 Ctrl+C 等中断时安全退出事件循环
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);

    mg_mgr_init(&mgr);  // 初始化事件管理器
    MG_INFO(("Connecting to MQTT broker..."));
    // 首次建立连接,使用 clean session,并可指定遗嘱消息(本例暂不设置遗嘱)
    struct mg_mqtt_opts opts;
    memset(&opts, 0, sizeof(opts));
    opts.clean = true;  // 要求清除会话(不复用旧session)
    // 建立MQTT连接
    s_conn = mg_mqtt_connect(&mgr, s_url, &opts, mqtt_event_handler, NULL);
    if (s_conn == NULL) {
        MG_ERROR(("Failed to create MQTT connection"));
        return 1;
    }

    // 进入事件循环,周期性检查连接状态,每隔1秒运行一次
    while (s_signo == 0) {
        mg_mgr_poll(&mgr, 1000);
        // 如果连接断开(s_conn 被置NULL),尝试重连
        if (s_conn == NULL) {
            MG_INFO(("Reconnecting..."));
            s_conn = mg_mqtt_connect(&mgr, s_url, &opts, mqtt_event_handler, NULL);
        }
    }

    // 收到退出信号,清理资源
    mg_mgr_free(&mgr);
    MG_INFO(("Exited cleanly"));
    return 0;
}

上述代码实现了一个基本的 MQTT 客户端,并包含详细的注释。下面对关键部分逐步解释。

2.4 代码讲解

连接参数与全局变量:首先,我们定义了目标 Broker 的 URL s_url 和发布/订阅使用的主题字符串,以及 QoS 等级。这里选择 HiveMQ 公共 Broker,无需用户名密码,使用明文1883端口。s_conn是一个全局的mg_connection指针,用于指向活动的MQTT连接(以便在其他地方判断连接状态)。

信号处理:为了能优雅地退出程序,我们设置了一个全局整数 s_signo,并注册了 signal_handler 来捕获 SIGINT/SIGTERM。当用户按下 Ctrl+C 时,会发送 SIGINT 信号,使 s_signo 非零,这将跳出事件循环。

事件回调函数 mqtt_event_handler:Mongoose 的事件模型会在各种网络事件发生时调用这个回调。我们在其中处理了几种主要事件:

  • MG_EV_OPEN:表示新连接对象创建。这在调用 mg_mqtt_connect 后立即发生,表示连接结构已建立但还未真正连接完成。这里我们仅打印日志,可选地可以打开 c->is_hexdumping 来调试底层数据包。
  • MG_EV_ERROR:表示连接过程中出现错误,例如解析主机名失败或无法建立 TCP 连接等。ev_data 通常包含错误描述字符串,我们打印出来并记录错误 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。
  • MG_EV_CONNECT:TCP 三次握手成功建立。这时如果目标是 TLS 加密连接,我们需要初始化 TLS。代码中通过 mg_url_is_ssl(s_url) 检测 URL 协议是否为 mqtts://。如果是,我们设置 TLS 选项并调用 mg_tls_init() 来升级连接为 SSL/TLS。这里示例指定了 .ca = "ca.pem",假定当前目录有 Broker 的 CA 根证书用于验证服务器证书。对于纯 TCP 连接,此步骤不做任何操作 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。
  • MG_EV_MQTT_OPEN:表示 MQTT 协议层连接建立成功,即客户端收到了 Broker 发来的 CONNACK 且返回码为成功。此时可以认为 MQTT 会话已经建立 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。我们在这里进行订阅和发布:
  • MG_EV_MQTT_MSG:当收到订阅的主题消息时触发。通过 ev_data 可以取得 struct mg_mqtt_message *mm,其中包含了消息的主题和内容等信息 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。我们将消息内容和主题打印出来。按照我们的逻辑,我们期望收到自己发布的 "hello" 消息。一旦收到,我们就将 c->is_closing 设为1,表示告诉 Mongoose 在处理完事件后关闭该连接。
  • MG_EV_CLOSE:连接关闭事件。在关闭后(无论是我们主动关闭还是远端断开),都会触发此事件。我们在此将全局 s_conn 置为 NULL,表示当前无活动连接 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。主循环会检测到 s_conn == NULL 进而尝试重连。
  • 其他事件如 MG_EV_POLL(定时轮询),MG_EV_MQTT_CMD(MQTT命令报文,如PUBACK等)未在此示例中专门处理,会在需要时提到。

主函数 main:首先初始化 mg_mgr 事件管理器,然后调用 mg_mqtt_connect 尝试建立 MQTT 连接 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。mg_mqtt_connect(&mgr, s_url, &opts, mqtt_event_handler, NULL) 会返回一个 mg_connection*(失败返回NULL)。我们传入了前面准备的 opts 结构,其中设置了 opts.clean = true 来要求服务器创建一个新的清洁会话(不复用之前离线会话)。这里暂未设置 Last Will 遗嘱等其他选项,opts 其他字段默认为0即可。

进入事件循环 mg_mgr_poll(&mgr, 1000) 后,Mongoose 将不断调用事件回调处理网络事件。我们设定每次最多等待1秒,然后检查 s_conn 状态和 s_signo 信号。

主循环内包含一个简单的重连逻辑:如果检测到 s_conn == NULL(说明连接已关闭),则调用 mg_mqtt_connect 再次建立连接 (〖Mongoose笔记〗MQTT 客户端_moongoose mqtt-CSDN博客)。同时打印日志提示正在重连。这样,当第一次收到消息后我们关闭连接,循环会立即尝试重新连接 Broker,并在成功后再次订阅发布,如此每隔3秒(由我们SUB->PUB->收到->关闭的速度)会重复发送 "hello" 并接收自己的回显消息。

当用户按 Ctrl+C 使 s_signo 非零时,退出循环,调用 mg_mgr_free 清理所有连接资源,程序结束。

2.5 运行测试

编译上述代码(确保链接 mongoose.c),然后运行可执行文件。由于我们内置了日志输出(MG_INFO/MG_ERROR等,Mongoose 默认INFO日志级别),可以看到类似输出:

[I] Connecting to MQTT broker...
[I] Connection created
[I] MQTT connected to mqtt://broker.hivemq.com:1883
[I] Subscribed to topic mg/+/test
[I] Published message 'hello' to topic mg/clnt/test
[I] Received message 'hello' from topic mg/clnt/test
[I] Connection closed
[I] Reconnecting...
[I] Connection created
[I] MQTT connected to mqtt://broker.hivemq.com:1883
...

日志显示客户端成功连接并完成了订阅和发布,又收到了自己发布的消息。此循环会持续运行,间隔几秒重新发送消息,直到我们按 Ctrl+C 退出。您也可以使用其他 MQTT 客户端订阅 mg/+/test 或发布消息到 mg/clnt/test 来与本程序交互,验证其功能。

注意:使用公共 Broker 进行测试时,尽量选择独特的主题名称以免和他人冲突。同时,由于公共 Broker 可能有连接和消息频率限制,上述循环发送频率不宜过高,以免被服务拒绝。

2.6 小结

通过本章节,我们实现了一个功能完整的 MQTT 客户端示例,并熟悉了 Mongoose 库的基本用法和事件驱动模型。在代码中,我们展示了如何连接到 Broker、订阅主题、发布消息,并处理接收到的消息和各种连接事件。我们还加入了简单的自动重连机制以提高客户端的健壮性。

至此,读者应该对利用 Mongoose 开发 MQTT 客户端有了初步认识。在下一章中,我们将在此基础上增加更多特性,例如通过 TLS 建立安全连接、使用用户名/密码进行认证,以及调整 KeepAlive 等连接参数,从而构建一个更完善的 MQTT 客户端。

3. 增强 MQTT 客户端:TLS 安全、认证与保活

在实际应用中,MQTT 客户端往往需要考虑安全和可靠性配置。本章我们将扩展上一章的示例,介绍如何:

  • 使用 TLS 加密与 Broker 建立安全连接,
  • 使用用户名和密码进行身份认证,
  • 配置 KeepAlive 保持连接活跃,
  • 设置遗嘱消息(Last Will)等高级选项。

3.1 TLS 安全连接

默认的 MQTT 使用明文 TCP,在开放网络中传输敏感数据可能有风险。因此通常MQTT Broker都会开放加密端口(如8883)供客户端使用 SSL/TLS 连接。使用 Mongoose 建立 TLS 连接非常简单:将 URL 的协议由 mqtt:// 改为 mqtts://,并确保在事件处理中初始化 TLS。

在上一章代码中,我们已经演示了在 MG_EV_CONNECT 事件中调用 mg_tls_init()。实际使用时,还需要提供服务器证书的CA证书用于验证,除非使用不验证模式(不推荐)。可以通过 struct mg_tls_opts 提供 .ca 字段指定 CA 文件路径。如果 Broker 使用公共可信CA签署证书且系统已内置可信根证书,则也可不指定 CA 文件,默认进行验证。

示例:如果我们改用 EMQX 提供的公共 MQTT 服务 broker.emqx.io,其 TLS 端口为8883,并使用 Let’s Encrypt 证书(受信任的CA签发),我们可以这样修改连接:

// 修改URL为SSL协议和端口
s_url = "mqtts://broker.emqx.io:8883";
...
if (mg_url_is_ssl(s_url)) {
    struct mg_tls_opts opts;
    memset(&opts, 0, sizeof(opts));
    // 对于公认CA签名的服务器证书,可以不指定opts.ca以使用系统证书
    mg_tls_init(c, &opts);
}

这样,客户端就会通过 TLS 与 Broker 通信,防止数据在传输中被窃听或篡改。需要注意编译时要启用 Mongoose 的 TLS 支持,例如在编译命令中定义 -DMG_TLS=MG_TLS_BUILTIN 使用内置的mbedTLS实现,或者链接 OpenSSL 等库并定义相应宏 (Mongoose :: Tutorials :: MQTT Client)。具体启用方式可参考 Mongoose 文档和编译说明。

3.2 用户名/密码认证

MQTT 支持在 CONNECT 报文中携带用户名和密码字段,用于身份认证。很多 Broker(尤其公有服务)会要求客户端提供有效的用户名和密码才允许连接。使用 Mongoose,设置用户名密码非常直接:在 struct mg_mqtt_opts 中填充 userpass 字段即可。

例如,假设我们的 Broker 需要用户名testuser和密码secret,可以在构造 opts 时加入:

struct mg_mqtt_opts opts;
memset(&opts, 0, sizeof(opts));
opts.clean = true;
opts.user = mg_str("testuser");
opts.pass = mg_str("secret");

然后将此 opts 传递给 mg_mqtt_connect。Mongoose 会在生成 CONNECT 报文时将用户名和密码字段编码进去 (Mongoose: src/mqtt.c | Fossies) (Mongoose: src/mqtt.c | Fossies)。服务端收到后会校验凭证,通过则返回CONNACK,否则通常会以非0返回码拒绝连接。

需要注意,用户名和密码都会明文包含在 CONNECT 报文中,因此务必在 TLS 加密连接下使用,否则凭证可能被截获。在 TLS 连接下,这些信息会在加密通道中传输,较为安全。

3.3 KeepAlive 保持连接

KeepAlive 时间在 CONNECT 时由客户端指定,Broker 据此判断心跳。如果客户端在一个 KeepAlive 间隔内没有发送任何消息,Broker 应该期望收到一个 PINGREQ,否则将在1.5倍间隔后断开连接。Mongoose 默认会根据我们传入的 opts.keepalive 来设置该值。若不设置,库中默认KeepAlive可能为60秒(MQTT规范允许0表示不保持活跃但通常不建议)。

我们可以通过设置 opts.keepalive 来调整。例如:

opts.keepalive = 30;  // 请求30秒的KeepAlive心跳间隔

Broker 收到后会将此作为双方约定的保活时间。Mongoose 当前并不会自动发送心跳包,需要我们自行在应用层定期发送。最简单的方法是在事件循环中定期调用 mg_mqtt_ping(c)(发送PINGREQ)。或者如上一章,我们其实在不断重新发送消息,也可起到保活作用。

如果需要一个专门的心跳,可以使用 Mongoose 的定时器功能,定期执行:

mg_mqtt_ping(c);

从而在无消息时也保持连接。这通常在应用层实现即可,因为只要我们的客户端定期发布或订阅消息,已有的数据传输也能满足保活。重要的是合理设置KeepAlive:过短会造成频繁心跳增加流量,过长则掉线检测不及时。视具体网络状况和应用要求来配置。

3.4 遗嘱消息(Last Will)

MQTT 遗嘱消息是在客户端异常掉线时,由Broker代为发布的消息。我们可以在 CONNECT 时指定遗嘱主题、遗嘱消息内容、遗嘱QoS和retain标志。Mongoose 支持设置这些,通过 mg_mqtt_opts 里的相应字段。

例如,要在断线时通知其它订阅者“我掉线了”,可以设置:

opts.will_topic = mg_str("clients/status");
opts.will_message = mg_str("client1 offline");
opts.will_qos = 1;
opts.will_retain = false;

这样,如果客户端未正常DISCONNECT就断开(例如程序崩溃或网络断开超过KeepAlive),Broker 会替客户端发布消息 "client1 offline" 到主题 clients/status,QoS1,非保留。其他订阅了该主题的客户端就能收到此通知。

在上一章的代码中,我们演示了 opts.clean = true 开新会话。如果想利用**持久会话(Persistent Session)**存储QoS>0离线消息,可以将 clean 置 false。这样,断线重连后Broker会将离线期间积存的消息(QoS1/2)发送给客户端。这需要Broker支持并启用持久会话。

3.5 综合示例:安全连接和认证

现在,我们将结合以上内容,对第2章的代码进行扩展,展示如何修改以加入 TLS 和用户名密码。假设我们有一个Broker地址 mqtts://mqtt.example.com:8883,要求用户名密码登录,我们将在代码中体现这些更改:

static const char *s_url = "mqtts://mqtt.example.com:8883";
...
int main(void) {
    ...
    struct mg_mqtt_opts opts;
    memset(&opts, 0, sizeof(opts));
    opts.clean = true;
    opts.keepalive = 45;  // 45秒保活
    opts.user = mg_str("testuser");
    opts.pass = mg_str("secret");
    opts.will_topic = mg_str("clients/status");
    opts.will_message = mg_str("client offline");
    opts.will_qos = 1;
    opts.will_retain = false;
    s_conn = mg_mqtt_connect(&mgr, s_url, &opts, mqtt_event_handler, NULL);
    ...
}

事件处理部分,在 MG_EV_CONNECT 中我们之前已处理 TLS 初始化,这里需确保提供正确的 CA 证书。如果 mqtt.example.com 使用受信任证书且系统有相应CA,则可不设置 opts.ca;否则应将 Broker 提供的 CA 链文件路径赋给 opts.ca。例如:

if (mg_url_is_ssl(s_url)) {
    struct mg_tls_opts tls_opts = {
        .ca = "example_ca.pem"
    };
    mg_tls_init(c, &tls_opts);
}

这样,我们的客户端将在建立TLS连接后使用用户名密码登录,KeepAlive设置为45秒,并在异常断线时让Broker发送遗嘱消息。

3.6 小结

通过本章,我们学会了如何加强 MQTT 客户端的安全和可靠性配置,包括使用 TLS 加密连接来防窃听、利用用户名/密码进行身份认证、设置合理的 KeepAlive 心跳维持连接,以及配置遗嘱消息通知离线状态。这些特性往往是实际项目中必须考虑的。

完成这些增强后,我们的 MQTT 客户端已经相当健壮。接下来,我们将讨论 MQTT 消息在不同 QoS 等级下交互时,客户端需要如何处理各种确认事件,确保消息按预期的服务质量传递。

4. 处理 QoS 等级的消息交互

在 MQTT 协议中,不同的 QoS 等级意味着不同的消息确认机制。客户端在发送或接收 QoS1、QoS2 消息时,需要处理相应的确认报文(ACK)以完成完整的消息交互流程。虽然 Mongoose 帮助我们封装了大部分 MQTT 协议,但作为开发者,了解并正确处理这些交互仍然很重要。

本章将讲解如何在应用层处理 QoS0、QoS1、QoS2 级别消息的发送和接收确认。

4.1 QoS 0 —— 至多一次,无确认

发送方:对于 QoS0 消息,调用 mg_mqtt_pub 发送后即完毕。MQTT 不要求任何确认,Mongoose 会在调用后将消息加入发送缓冲,一旦通过TCP发送出去即认为完成。应用层无需特别处理 ACK。

接收方:Broker 或订阅客户端收到 QoS0 的 PUBLISH 后,直接交付应用处理(MG_EV_MQTT_MSG 事件)。因为没有后续确认,应用处理完即可,不需要回复任何 ACK 报文。对开发者来说,在事件回调中拿到 QoS0 消息和 QoS1、QoS2 并无差别,只是 QoS 字段会标记为0。

示例:在我们的客户端代码中,我们发布和订阅使用的是QoS1。但如果改为 QoS0,依然可以用 mg_mqtt_pub(c, topic, data, 0, false),发布后不会有 MG_EV_MQTT_CMD(PUBACK) 事件,也不会自动重发,发送一次即结束。在 MG_EV_MQTT_MSG 中 mm->qos 会是0。

4.2 QoS 1 —— 至少一次,处理 PUBACK

发送方(发布者):QoS1 消息需要确保至少送达一次。Mongoose 的 mg_mqtt_pub 在发送 QoS1 消息时,会自动为消息分配一个报文标识符 ID,并在内部等待 Broker 的 PUBACK 报文。当 Broker 返回 PUBACK,Mongoose 触发 MG_EV_MQTT_CMD 事件,其 ev_data 中的 struct mg_mqtt_message * 会有 mm->cmd == MQTT_CMD_PUBACK,表示我们发布的某个消息得到了ACK。一般情况下,Mongoose 已经完成了QoS1所需的基本工作:如果没有收到PUBACK,它不会自动重发(因为TCP本身已保证尽力送达,大多数情况消息能到Broker)。但 如果需要在应用层实现重传,可以利用事件检测ACK超时。

例如,我们可以这样做:在发布QoS1消息后,启动一个定时器计时,如果在预期时间内未收到 PUBACK(即应用未捕获 MG_EV_MQTT_CMD/PUBACK),则重新发送消息。每次捕获PUBACK则停止重传定时器。下面是一个示意逻辑(伪代码) (Mongoose :: Tutorials :: MQTT Client) (Mongoose :: Tutorials :: MQTT Client):

bool acked = false;
int retries = 3;
uint16_t pub_id = mg_mqtt_pub(c, topic_str, data_str, 1, false);
start_timer(timeout=5s);
while (!acked && retries > 0) {
    // 等待事件...
    // 在事件回调中:
    if (ev == MG_EV_MQTT_CMD) {
        struct mg_mqtt_message *mm = ev_data;
        if (mm->cmd == MQTT_CMD_PUBACK && mm->id == pub_id) {
            acked = true;
            // 停止定时器
        }
    }
    // 如果定时器触发且 acked 仍为 false:
    if (!acked) {
        pub_id = mg_mqtt_pub(c, topic_str, data_str, 1, false);  // 重发
        retries--;
        reset_timer();
    }
}

上面逻辑意思是:发布消息后等待PUBACK,如果超时或丢失则重发,最多重试3次。当然,在大多数情况下第一次就会成功。这个过程在QoS1语义下可以保证Broker最终收到至少一份消息,但Broker可能因为重发而收到多份,需要通过消息ID去重。不过 MQTT 规范并不要求发布者去处理重复ACK,只要发布者收到一次PUBACK即可认为完成。

接收方(订阅者):当客户端作为订阅者收到QoS1的消息时,按照协议必须回复PUBACK给发送方(这里发送方通常是Broker)。Mongoose 已帮我们处理了这一点:当 Mongoose 收到QoS1的PUBLISH,会自动发送PUBACK,无需我们手动干预。因此在 MG_EV_MQTT_MSG 中我们直接处理消息内容即可,不必显式发送ACK。

4.3 QoS 2 —— 恰好一次,处理四步握手

QoS2 的完整流程较复杂,需要双方进行两个往返(PUBREC/PUBREL和PUBCOMP)。Mongoose 尽可能自动化了其中步骤,但仍需要应用层配合确保严格不重复。

发送方:客户端发布QoS2消息时,mg_mqtt_pub 会发送 PUBLISH(QoS=2,带报文ID)。Broker 收到后按协议应回复 PUBREC。Mongoose 收到PUBREC后,会自动发送 PUBREL,然后等待 Broker 的 PUBCOMP。这个过程中,Mongoose 会触发事件:

  • 收到PUBREC时,会触发 MG_EV_MQTT_CMD,其中 mm->cmd == MQTT_CMD_PUBREC。应用可监听该事件以确认Broker已收到消息的一份拷贝 (Mongoose :: Tutorials :: MQTT Client)。在 Mongoose 内部,它已发送了PUBREL。
  • 收到PUBCOMP时,触发 MG_EV_MQTT_CMD,其中 mm->cmd == MQTT_CMD_PUBCOMP,表示QoS2流程彻底完成 (Mongoose :: Tutorials :: MQTT Client)。应用层看到PUBCOMP则可以确认消息“恰好一次”送达了。

类似QoS1,我们也可以做超时重传:如果发布QoS2消息后迟迟未收到PUBREC,可以重发PUBLISH;收到了PUBREC但未收到PUBCOMP,可以重发PUBREL。但这些细节往往由MQTT库管理。Mongoose 当前自动发送PUBREL,但不会自动重试,需要用户监控是否收到PUBCOMP来决定是否重发 PUBREL (Mongoose :: Tutorials :: MQTT Client) (Mongoose :: Tutorials :: MQTT Client)。

接收方:当客户端订阅者收到QoS2的PUBLISH时,协议要求它回复PUBREC,等待发送方的PUBREL,然后再回复PUBCOMP确认完成交互。Mongoose 在接收QoS2 PUBLISH 时已经自动发送了PUBREC给对方,并将收到的消息通过 MG_EV_MQTT_MSG 交给应用处理 (Mongoose :: Tutorials :: MQTT Client)。这时应用需要注意避免重复处理:因为发送方可能没有收到PUBREC而重发PUBLISH,Broker也可能在网络抖动时重复前一步。因此应用层应检查 PUBLISH 报文的重复标志(MQTT Fixed header里的 DUP 标志)或者报文ID,判断该消息是否已处理过。如果是重复消息,应丢弃(因为内容相同),但仍让协议走完。

当发送方后续发送PUBREL过来时,Mongoose 自动回复PUBCOMP。因此,对于订阅者而言,QoS2处理关键在于防止重复消费消息。在 Mongoose 里,可以在 MG_EV_MQTT_MSG 事件中,通过 mm->dup 或维护一个已处理ID列表来判断。如果短时间内收到相同ID的QoS2消息且 DUP=true,则忽略其内容。

小结来说,QoS2保证严格一次,但需要应用在收到消息时做幂等处理,MQTT库帮助我们完成了协议回复:

  • 发布者需关注 PUBREC 和 PUBCOMP 事件,必要时重试确保最终收到 PUBCOMP。
  • 订阅者需处理好重复的 PUBLISH(MQTT规范要求即使重复收到也只交付一次)。

4.4 在 Mongoose 中的实现

Mongoose 将 QoS确认报文的到来统一以事件 MG_EV_MQTT_CMD 通知。我们可以在事件回调中增加相应分支,例如:

else if (ev == MG_EV_MQTT_CMD) {
    struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
    switch (mm->cmd) {
        case MQTT_CMD_PUBACK:
            printf("QoS1 PUBACK received for message ID %u\n", mm->id);
            break;
        case MQTT_CMD_PUBREC:
            printf("QoS2 PUBREC received for message ID %u\n", mm->id);
            // Mongoose已自动发PUBREL
            break;
        case MQTT_CMD_PUBCOMP:
            printf("QoS2 PUBCOMP received for message ID %u (complete)\n", mm->id);
            break;
        // 也可以处理 SUBACK, PINGRESP 等其他命令
    }
}

这样,我们就能捕获这些ACK并用于日志或重传逻辑。如果希望实现更可靠的发送,可在发送时记录返回的 message ID(mg_mqtt_pub 返回值为 QoS>0 的消息ID (Mongoose: src/mqtt.c | Fossies)),然后在 MG_EV_MQTT_CMD 中匹配 mm->id 确认对应消息的ACK。

对于订阅者侧,假如我们实现一个MQTT Broker(下一章会做),也需要在服务器端处理 QoS1/2。这在Broker逻辑中更复杂,因为Broker既是接收方又要成为发送方转发,所以必须对每个客户端维持状态,确保每个消息对每个目标客户端按其订阅QoS完成确认。这超出了一个简单教程的范围,我们在下一章实现的简易Broker不会全面支持QoS2持久会话,仅做基本转发示范。

4.5 小结

在本章,我们深入讨论了 QoS 各级别的消息收发确认流程。对于QoS0,开发者几乎无需关心确认;QoS1 需要处理 PUBACK,可考虑重传机制;QoS2 则涉及PUBREC/PUBREL/PUBCOMP的完整握手,需要注意避免重复处理消息。Mongoose 简化了许多底层细节,我们更多的是在需要时监控事件来增强可靠性。

对于大多数应用,QoS1 已足够可靠且实现简单。而 QoS2 因实现复杂、开销大,一般仅在必要时才使用。如果使用QoS2,也建议充分测试各种异常网络情况下的行为,确保应用逻辑正确。

理解了客户端侧 QoS 处理,我们已经掌握了 MQTT 协议的大部分要点。最后,我们将挑战构建一个简易的 MQTT Broker 服务端,以更深入理解 MQTT 报文在服务器端的流程。

5. 构建简单 MQTT Broker(服务器端)示例

通常,生产环境中我们会使用现成的 MQTT 服务器(如 Mosquitto、EMQX、HiveMQ 等)来充当Broker。但是为了加深理解,我们可以使用 Mongoose 库自行实现一个精简的 MQTT Broker。这个 Broker 将能够:

  • 接受客户端 CONNECT 连接请求(不校验用户名密码,仅简单接受所有连接),
  • 处理客户端的 SUBSCRIBE 请求,记录订阅关系,
  • 处理 PUBLISH 请求,将消息转发给匹配订阅的客户端,
  • 维护基本的客户端会话状态,在客户端断开时清除其订阅。

我们实现的将是 MQTT 3.1.1 协议的子集,不支持持久会话、遗嘱、甚至不处理UNSUBSCRIBE等复杂操作,但足以演示Broker的核心机制 (Mongoose :: Tutorials :: MQTT Server)。

5.1 服务器实现概述

使用 Mongoose 实现服务器的关键在于监听端口并处理 MQTT 命令事件。Mongoose 提供了 mg_mqtt_listen 函数来方便地创建一个 MQTT 监听服务器。我们需要提供一个事件处理函数,类似客户端,但这次事件处理的主体是服务器监听到的各个客户端连接。

我们的Broker需要维护一个全局订阅列表,记录哪些客户端订阅了哪些主题以及QoS。为了简单,我们定义一个结构体:

struct sub {
    struct sub *next;
    struct mg_connection *c;  // 订阅客户端的连接
    struct mg_str topic;      // 订阅的主题过滤器(支持通配符)
    uint8_t qos;              // 订阅使用的QoS
};
static struct sub *s_subs = NULL;  // 全局链表头

我们将每个订阅记录为链表节点 sub 并链接到全局 s_subs。当有新的订阅时,添加节点;当客户端断开时,删除该客户端相关的节点。

Broker 的事件处理函数需要处理的 MQTT命令主要有:

  • MQTT_CMD_CONNECT:客户端请求连接。我们应检查协议版本,并发送 CONNACK。我们的实现对用户名密码不校验、clientID不做特殊处理,只要协议版本是3.1.1就接受。
  • MQTT_CMD_SUBSCRIBE:客户端发来订阅请求。一个 SUBSCRIBE 报文可能包含多个主题订阅,我们需要逐个解析。为每个订阅创建 sub 记录加入列表,并回复 SUBACK 确认订阅成功(返回每个订阅的QoS,简单起见我们就回送客户端要求的QoS)。
  • MQTT_CMD_PUBLISH:客户端发布消息。Broker收到后,需要将消息转发给所有匹配其主题的订阅。我们要遍历 s_subs 列表,找出topic匹配的订阅者,然后调用 Mongoose 函数将消息发送给这些客户端。匹配时要考虑通配符。Mongoose 提供了 mg_globmatch 帮助我们进行类似shell通配符的匹配 (Mongoose :: Tutorials :: MQTT Server)。在使用前,我们可以将订阅记录的主题过滤器中的 '+' 通配符转换为 '*' 以利用 mg_globmatch(因为 * 在 mg_globmatch 中表示任意字符串,包括跨层级)。
  • MQTT_CMD_PINGREQ:客户端发送心跳请求,我们需要回复 PINGRESP。
  • MG_EV_CLOSE(连接关闭事件,不是 MQTT命令):当客户端断开时,移除它的所有订阅记录,避免内存泄漏和无效转发。

通过处理上述几种情况,我们就能实现一个基本可用的Broker。

5.2 服务器代码实现

#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include "mongoose.h"

static const char *s_listen_addr = "mqtt://0.0.0.0:1883";  // 监听地址和端口

// 订阅记录的链表节点结构
struct sub {
    struct sub *next;
    struct mg_connection *c;   // 订阅客户端连接
    struct mg_str topic;       // 订阅的主题过滤器(可能含通配符)
    uint8_t qos;
};
static struct sub *s_subs = NULL;  // 订阅链表头

// 信号处理,用于优雅退出服务器
static int s_signo = 0;
static void signal_handler(int signo) {
    s_signo = signo;
}

// 辅助函数:解析 SUBSCRIBE 报文中的下一个主题过滤器和QoS
// MQTT SUBSCRIBE 报文格式: [Packet ID(2 bytes)] + 若干个 [Topic Length(2 bytes) + Topic Filter + Requested QoS(1 byte)]
// 参数msg为Mongoose解析出的MQTT消息结构,pos为当前解析偏移,返回下一个偏移(0表示解析完毕或出错)
static size_t parse_next_sub_topic(struct mg_mqtt_message *msg, struct mg_str *topic, uint8_t *qos, size_t pos) {
    // msg->dgram 包含原始报文数据,长度在 msg->dgram.len
    if (pos >= msg->dgram.len) return 0;
    const uint8_t *buf = (uint8_t *) msg->dgram.ptr + pos;
    // 读取主题长度
    if (pos + 2 > msg->dgram.len) return 0;
    size_t topic_len = ((size_t)buf[0] << 8) | buf[1];
    if (pos + 2 + topic_len + 1 > msg->dgram.len) return 0;  // 检查数据是否完整
    topic->len = topic_len;
    topic->ptr = (char *) buf + 2;
    // 读取QoS字节
    if (qos) *qos = buf[2 + topic_len];
    // 返回下一个主题项的起始偏移
    return pos + 2 + topic_len + 1;
}

// 事件处理函数(Broker端)
static void mqtt_server_fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
    if (ev == MG_EV_MQTT_CMD) {
        struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
        // 根据 MQTT 控制报文类型进行处理
        switch (mm->cmd) {
        case MQTT_CMD_CONNECT: {
            // 客户端请求连接
            if (mm->dgram.len < 9 || (uint8_t) mm->dgram.ptr[8] != 4) {
                // 报文长度太短 或 协议级别不是4(MQTT3.1.1=4)
                mg_error(c, "Malformed CONNECT or unsupported protocol");
            } else {
                // 发送 CONNACK 确认连接 (固定2字节: 会话Present标志=0, 返回码=0)
                uint8_t response[2] = {0x00, 0x00};
                mg_mqtt_send_header(c, MQTT_CMD_CONNACK, 0, sizeof(response));
                mg_send(c, response, sizeof(response));
            }
            break;
        }
        case MQTT_CMD_SUBSCRIBE: {
            // 客户端订阅主题
            // SUBSCRIBE 报文固定报头后前两个字节是报文标识符(Message ID)
            size_t pos = 2;  // 跳过报文ID开始解析主题列表
            uint8_t req_qos;
            struct mg_str topic;
            uint8_t suback_qos[10];  // 最多支持10个主题的回执(仅示例,实际应动态处理)
            int num_topics = 0;
            // 逐个解析所有订阅主题
            while ((pos = parse_next_sub_topic(mm, &topic, &req_qos, pos)) > 0 && num_topics < 10) {
                // 为每个主题分配订阅记录
                struct sub *sub = (struct sub *) calloc(1, sizeof(*sub));
                sub->c = c;
                sub->topic = mg_strdup(topic);  // 拷贝主题字符串存储
                sub->qos = req_qos & 0x03;      // 仅保留QoS低两位(有效值0,1,2)
                // 将订阅加入链表头
                sub->next = s_subs;
                s_subs = sub;
                // 将通配符'+'替换为'*',方便后续用mg_globmatch匹配单层
                for (size_t i = 0; i < sub->topic.len; i++) {
                    if (sub->topic.ptr[i] == '+') ((char *) sub->topic.ptr)[i] = '*';
                }
                suback_qos[num_topics++] = sub->qos;  // 回应的QoS(这里直接同意请求的QoS)
                MG_INFO(("Client %p subscribed to %.*s (QoS %d)", c->fd,
                         (int) sub->topic.len, sub->topic.ptr, sub->qos));
            }
            // 发送 SUBACK 报文: 包含原订阅报文的Message ID以及每个订阅主题的返回QoS
            if (num_topics > 0) {
                mg_mqtt_send_header(c, MQTT_CMD_SUBACK, 0, 2 + num_topics);
                // 原报文Message ID可以通过 mm->id 获得(Mongoose 已解析出)
                uint16_t id_n = mg_htons(mm->id);
                mg_send(c, &id_n, 2);
                mg_send(c, suback_qos, num_topics);
            }
            break;
        }
        case MQTT_CMD_PUBLISH: {
            // 客户端发布消息
            // 从 mm 中获取主题和消息
            struct mg_str topic = mm->topic;
            struct mg_str data = mm->data;
            MG_INFO(("Received PUBLISH on %.*s: %.*s",
                     (int) topic.len, topic.ptr,
                     (int) data.len, data.ptr));
            // 将消息转发给所有匹配订阅的客户端
            for (struct sub *sub = s_subs; sub != NULL; sub = sub->next) {
                // 使用 mg_globmatch 进行通配符匹配;mg_globmatch(pattern, pat_len, str, str_len)
                if (mg_globmatch(sub->topic.ptr, sub->topic.len, topic.ptr, topic.len)) {
                    // 将消息发送给订阅者sub->c
                    mg_mqtt_pub(sub->c, topic, data, sub->qos < mm->qos ? sub->qos : mm->qos, false);
                    // 这里使用发布者QoS和订阅者QoS的较小值作为实际发送QoS
                    MG_INFO(("Forwarded message to %p (subscribed %.*s)",
                             sub->c->fd, (int) sub->topic.len, sub->topic.ptr));
                }
            }
            // 如果原消息QoS是1或2,按协议需回复ACK:
            if (mm->qos == 1) {
                // QoS1,回复PUBACK
                mg_mqtt_send_header(c, MQTT_CMD_PUBACK, 0, 2);
                uint16_t id_n = mg_htons(mm->id);
                mg_send(c, &id_n, 2);
            } else if (mm->qos == 2) {
                // QoS2,回复PUBREC,等待客户端后续PUBREL,在Mongoose中,会有MG_EV_MQTT_CMD触发PUBREL事件,再处理
                // 但为简单,此处直接回复PUBREC并立刻当做完成(完全实现QoS2较复杂)
                mg_mqtt_send_header(c, MQTT_CMD_PUBREC, 0, 2);
                uint16_t id_n = mg_htons(mm->id);
                mg_send(c, &id_n, 2);
                // 注意:完整实现还需等待PUBREL再PUBCOMP,这里简化处理
            }
            break;
        }
        case MQTT_CMD_PINGREQ: {
            // 心跳请求,回复 PINGRESP
            MG_INFO(("PINGREQ from %p, send PINGRESP", c->fd));
            mg_mqtt_send_header(c, MQTT_CMD_PINGRESP, 0, 0);
            break;
        }
        default:
            // 其他命令未实现,例如 UNSUBSCRIBE,直接忽略或响应不支持
            MG_WARN(("Unhandled MQTT command %d from %p", mm->cmd, c->fd));
        } // switch(mm->cmd)
    } else if (ev == MG_EV_CLOSE) {
        // 某个客户端断开连接,清除其所有订阅
        struct sub **pp = &s_subs;
        while (*pp) {
            if ((*pp)->c == c) {
                struct sub *sub_to_remove = *pp;
                MG_INFO(("Client %p disconnected, remove subscription %.*s", c->fd,
                         (int) sub_to_remove->topic.len, sub_to_remove->topic.ptr));
                // 从链表移除该节点
                *pp = sub_to_remove->next;
                mg_strfree(&sub_to_remove->topic);
                free(sub_to_remove);
                // 不更新pp,继续检查下一个(因为删除后 *pp 已指向下一个节点)
                continue;
            }
            pp = &(*pp)->next;
        }
    }
    (void) fn_data;
}

int main(void) {
    struct mg_mgr mgr;
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);
    mg_mgr_init(&mgr);
    // 启动MQTT监听服务器
    if (mg_mqtt_listen(&mgr, s_listen_addr, mqtt_server_fn, NULL) == NULL) {
        fprintf(stderr, "Error starting MQTT broker on %s\n", s_listen_addr);
        return 1;
    }
    MG_INFO(("MQTT broker started on %s", s_listen_addr));
    // 进入事件循环
    while (s_signo == 0) {
        mg_mgr_poll(&mgr, 1000);
    }
    // 清理
    mg_mgr_free(&mgr);
    return 0;
}

让我们解释一下服务器代码的重要部分:

5.3 运行与测试 Broker

编译运行上述 Broker 程序,会在本地启动一个MQTT服务监听1883端口。可以用 Mosquitto 客户端或 MQTTX 等工具来测试:

  • 打开一个终端,运行 mosquitto_sub -v -h 127.0.0.1 -p 1883 -t 'test/topic' -t 'foo/#' 来订阅主题(例如订阅具体的test/topic以及通配符订阅foo/#)。
  • 再打开另一个终端,尝试发布消息:
    • mosquitto_pub -h 127.0.0.1 -p 1883 -t 'test/topic' -m 'hello'
      订阅端应收到该消息(因为订阅了test/topic)。
    • mosquitto_pub -h 127.0.0.1 -p 1883 -t 'foo/bar/baz' -m 'hey'
      订阅端应收到(匹配foo/#订阅)。
    • 尝试发布一个无人订阅的主题,订阅端不会显示,Broker日志可看到收到但未转发。
  • 也可以尝试多个订阅客户端验证Broker会将消息转发给所有订阅者。

我们的简易Broker不实现认证,因此任何人都可连接订阅。同时它也不支持保留消息、遗嘱、持久会话等高级特性。但通过这个过程,我们已经演练了Broker需要完成的核心操作:CONNECT/CONNACK、SUBSCRIBE/SUBACK、PUBLISH路由、以及客户端断开处理。

5.4 小结

本章节我们使用 Mongoose 快速构建了一个精简的 MQTT Broker,理解了服务器端处理 MQTT 报文的基本流程。在实现过程中,我们管理了客户端订阅列表,并利用 Mongoose 提供的工具函数实现了主题匹配和消息转发。这个Broker能够让多个MQTT客户端连接并交换消息,证明了我们对MQTT协议机制的掌握。

需要指出,这只是一个教学示例,实际可用的Broker需要更多功能和健壮性,例如:

  • 完整处理 QoS2 的四步握手和消息重传,
  • 支持客户端 Clean Session=false 时保存离线消息,
  • 验证用户名/密码,访问控制,
  • 更高效的订阅管理结构(如基于Trie的主题树),
  • 处理 UNSUBSCRIBE,以及遗嘱消息发布等。

即便如此,通过亲手实现这些基本功能,读者应已对 MQTT 工作原理有了深刻理解,也体会到 Mongoose 库在网络编程上的强大之处。

6. 总结

在本教程中,我们从 MQTT 协议基础开始,逐步实践了 MQTT 客户端和服务器端的实现过程。首先,我们介绍了 MQTT 的发布/订阅模型、主题和 QoS 等关键概念 (MQTT简介之三(4) MQTT协议QOS 服务质量等级原创 - CSDN博客)。然后,我们使用 C 语言结合 Mongoose 7.17 库构建了一个基本 MQTT 客户端,演示了如何连接 Broker、订阅/发布消息,并通过事件回调处理 MQTT 通信。我们进一步增强了客户端,加入 TLS 安全连接、用户名密码认证、KeepAlive 和遗嘱消息等特性,提高了客户端的安全性和可靠性。

随后,我们讨论了 QoS0、QoS1、QoS2 消息交互中需要处理的确认报文机制,说明了如何借助 Mongoose 的事件处理对这些 QoS 做支持,保证消息按期望的服务质量传递。最后,我们实现了一个简单的 MQTT Broker 示例,从服务器角度完整走了一遍 CONNECT、SUBSCRIBE、PUBLISH、PING 等流程,加深了对 MQTT 协议运作的理解。

通过这些连续的章节内容,相信读者已经全面了解了 MQTT 协议的基础知识和使用 Mongoose 进行 MQTT 开发的实践技巧。读者现在应该能够:

如果进一步学习,建议读者参考 MQTT 正式规范以及 Mongoose 官方文档,尝试实现更多高级功能。例如,实现持久会话,处理断线重连时未完成的QoS2传输,或者给Broker添加权限控制等。同时,可以尝试将此基础客户端集成到实际项目,比如制作一个传感器数据上传的客户端,或基于此Broker扩展出一个专用的消息服务器。

通过本教程的学习,您已经从底层协议到实际编码全方位掌握了 MQTT,在未来的物联网开发中,相信这些知识将大有裨益。祝您在 MQTT 与 C 编程之旅中不断收获新的成果!

(MQTT 协议快速入门 2025:基础知识和实用教程 | EMQ) (MQTT简介之三(4) MQTT协议QOS 服务质量等级原创 - CSDN博客)