SpringBoot整合MQTT利用EMQX完成消息的发布与接收+Python模拟硬件测试通信

SpringBoot整合MQTT利用EMQX完成消息的发布与接收+Python模拟硬件测试通信

教程说明

本教程主要内容为使用SpringBoot整合MQTT利用EMQX代理服务完成MQTT的消息发送与接收,然后用Python模拟硬件与SpringBoot应用进行了MQTT消息的通信,教程详细,并在最后讲解了开发中的注意事项,本教程适用于物联网领域、JavaWeb领域的开发人员。

前置所需

已经搭建好了EMQX代理服务,不懂的可以参考上一篇文章 CentOS7安装EMQX并搭建个人MQTT代理服务器 | Python实现MQTT通信测试教程

内容架构

如图,教程内容主要是用到了这三个端,对于Python的测试,可有可无,这里只是模拟硬件测试与SpringBoot的通信,并且便于理解MQTT的通信

QQ_1732244919285

大致步骤

1.确定SpringBoot版本并导入所需依赖
2.yaml配置相关参数
3.确定包结构并编写读取yaml参数的组件
4.编写客户端回调组件
5.编写MQTT的发送客户端和接收客户端
6.编写MQTT监听接收消息的配置类
7.编写控制器测试SpringBoot应用自身消息与发送
8.利用Python与SpringBoot进行MQTT通信交互
9.重要的注意事项分析

步骤1.确定SpringBoot版本并导入所需依赖

我这里所使用的SpringBoot版本为2.3.4RELEASE,JDK使用1.8,Maven随意,如图

QQ_1732246422790

然后所使用的依赖为如下,我这里列出我所有使用到的依赖,最上面三个是MQTT相关的依赖

<!-- mqtt -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

步骤2.yaml配置相关参数

在application.yaml中配置如下参数,注意将hostUrl的ip替换为你自己的EMQX所在ip,如图

mqtt:
  hostUrl: tcp://111.111.11.111:1883
  username: 123
  password: 321
  client-id: testId #虽然这里 幽络源配置了client-id,但是在获取客户端连接时,我是用的uuid,避免重复
  cleanSession: false

  #开发时这里的自动重连接一定要设置false,不然会由于热部署导致连接两个相同的clientId,出现bug
  #真正部署时这里的自动重连接 再设置为true,这样就可以保证 由于网络原因即使连接断开也会自动重连接
  reconnect: true
  timeout: 100
  keepAlive: 100
  topic: test/topic
  isOpen: true
  qos: 0

QQ_1732246924976

步骤3.确定包结构并编写读取yaml参数的组件

包结构比较简单,只需要建立config和controller包即可,如图,config包用于放置我们的MQTT相关配置文件,controller用于放置接口

QQ_1732249646196

由于我们的MQTT相关配置文件甚至接口中都会用到yaml中配置的参数,方便起见,建立一个专门读取yaml中mqtt参数的组件并利用lombok提供get/set方法,在config包下建立MqttProperties.java文件,代码与图如下

package com.youluoyuan.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;


@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {

    //mqtt认证的用户名
    private String username;
    //mqtt认证的用户对应的密码
    private String password;
    //ip地址
    private String hostUrl;
    //客户端id
    private String clientId;
    //主题
    private String topic;
    //超时时间
    private int timeout;
    //心跳
    private int keepAlive;
    //是否清理session
    private Boolean cleanSession;
    //是否自动重连
    private Boolean reconnect;
    //启动项目的时候是否启动mqtt
    private Boolean isOpen;
    //连接方式
    private Integer qos;

}

QQ_1732252001748

步骤4.编写客户端回调组件

在config包下建立MqttCallBack.java文件,用于消息发送和接收客户端进行触发,加入如下代码

package com.youluoyuan.config;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * MQTT客户端回调
 */
@Component
public class MqttCallBack implements MqttCallbackExtended {

    private static final Logger logger = LoggerFactory.getLogger(MqttCallBack.class);

    /**
     * 客户端断开后的回调
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        logger.info("客户端连接已断开");
    }

    /**
     * 客户端接收消息的回调
     * @param topic  主题
     * @param mqttMessage 消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage){
        logger.info("收到消息:主题 => "+ topic+",Qos => "+ mqttMessage.getQos() +",内容 => "  + new String(mqttMessage.getPayload()));
    }


    /**
     * 客户端发送消息的回调
     * @param token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, "UTF-8");
            for (String topic : topics) {
                logger.info("消息发送成功,主题为 => "+topic+",内容为 => " + s);
            }
        } catch (Exception e) {
            logger.error("消息发送失败,错误信息 => {}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * MQTT连接回调
     * @param reconnect 是否为重连
     * @param serverURI 服务URI
     */
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        logger.info("===客户端" + MqttAcceptClient.client.getClientId() + "连接成功!===");
    }
}

步骤5.编写MQTT的发送客户端和接收客户端

在config包下建立MqttSendClient.java,作为消息发送客户端,代码如下

package com.youluoyuan.config;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * MQTT消息发送客户端
 */
@Component
public class MqttSendClient {

    private static final Logger logger = LoggerFactory.getLogger(MqttSendClient.class);

    @Autowired
    private MqttCallBack mqttSendCallBack;

    @Autowired
    private MqttProperties mqttProperties;

    //连接MQTT代理服务器并设置回调
    public MqttClient connect() {
        MqttClient client = null;
        try {
            String clientId = UUID.randomUUID().toString().replaceAll("-", "");
            //获取MQTT客户端
            client = new MqttClient(mqttProperties.getHostUrl(), clientId, new MemoryPersistence());
            //设置连接参数
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setCleanSession(mqttProperties.getCleanSession());
            options.setAutomaticReconnect(mqttProperties.getReconnect());
            // 设置回调
            client.setCallback(mqttSendCallBack);
            client.connect(options);
        } catch (Exception e) {
            logger.error("MQTT消息发送客户端连接失败,错误信息 => {}", e.getMessage());
            e.printStackTrace();
        }
        return client;
    }

    /**
     * 发送消息
     * @param retained 是否保留
     * @param topic 主题
     * @param content 内容
     */
    public void publish(boolean retained, String topic, String content) {
        //创建消息
        MqttMessage message = new MqttMessage();
        //设置消息参数
        message.setQos(mqttProperties.getQos());
        message.setRetained(retained);
        message.setPayload(content.getBytes());
        MqttDeliveryToken token;
        //获取连接
        MqttClient mqttClient = connect();
        try {
            mqttClient.publish(mqttProperties.getTopic(), message);
        } catch (MqttException e) {
            logger.error("MQTT消息发送客户端发送消息失败,错误信息 => {}", e.getMessage());
            e.printStackTrace();
        } finally {
            disconnect(mqttClient);
            close(mqttClient);
        }
    }

    /**
     * 取消连接
     * @param mqttClient 客户端
     */
    public static void disconnect(MqttClient mqttClient) {
        try {
            if (mqttClient != null)
                mqttClient.disconnect();
        } catch (MqttException e) {
            logger.error("MQTT消息发送客户端取消连接失败,错误信息 => {}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 释放资源
     * @param mqttClient 客户端
     */
    public static void close(MqttClient mqttClient) {
        try {
            if (mqttClient != null)
                mqttClient.close();
        } catch (MqttException e) {
            logger.error("MQTT消息发送客户端释放资源失败,错误信息 => {}", e.getMessage());
            e.printStackTrace();
        }
    }
}

再在config包下建立MqttAcceptClient.java,作为消息接收客户端,代码如下

package com.youluoyuan.config;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;


/**
 * MQTT消息接收客户端
 */
@Component
public class MqttAcceptClient {

    private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class);

    @Autowired
    private MqttCallBack mqttCallBack;

    @Autowired
    private MqttProperties mqttProperties;

    public static MqttClient client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttAcceptClient.client = client;
    }

    //连接MQTT代理服务器并设置回调
    public void connect() {
        MqttClient client;
        try {
            String clientId = UUID.randomUUID().toString().replaceAll("-", "");
            //获取MQTT客户端
            client = new MqttClient(mqttProperties.getHostUrl(), clientId,new MemoryPersistence());
            //设置连接参数
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setCleanSession(mqttProperties.getCleanSession());
            options.setAutomaticReconnect(mqttProperties.getReconnect());
            MqttAcceptClient.setClient(client);
            // 设置回调
            client.setCallback(mqttCallBack);
            client.connect(options);
        } catch (Exception e) {
            logger.error("MQTT消息接收客户端连接失败,错误信息 => {}", e.getMessage());
            e.printStackTrace();
        }
    }

    //重新连接
    public void reconnection() {
        try {
            client.connect();
        } catch (MqttException e) {
            logger.error("MQTT消息接收客户端重新连接失败,错误信息 => {}", e.getMessage());
            e.printStackTrace();
        }
    }
    /**
     * 订阅主题
     * @param topic 主题
     * @param qos 连接方式
     */
    public void subscribe(String topic, int qos) {
        logger.info("===MQTT消息接收客户端订阅主题:" + topic + "===");
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            logger.error("MQTT消息接收客户端订阅主题失败,错误信息 => {}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 取消订阅主题
     * @param topic
     */
    public void unsubscribe(String topic) {
        logger.info("===MQTT消息接收客户端取消订阅主题:" + topic + "====");
        try {
            client.unsubscribe(topic);
        } catch (MqttException e) {
            logger.error("MQTT消息接收客户端取消订阅主题失败,错误信息 => {}", e.getMessage());
            e.printStackTrace();
        }
    }
}

有了如上两个客户端,我们的SpringBoot就能既作消息发送者,也能作为消息接收者,也就是可以既能获取到硬件传来的信息,也能通过发送消息去控制硬件

步骤6.编写MQTT监听接收消息的配置类

在config包下建立MqttConfig.java,此配置文件用于启动SpringBoot使,就订阅我们配置的主题进行消息的监听接收,代码与图如下

package com.youluoyuan.config;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 *启动服务的时候开启监听客户端
 */
@Configuration
public class MqttConfig {

    @Autowired
    private MqttAcceptClient mqttAcceptClient;

    @Autowired
    private MqttProperties mqttProperties;

    //订阅主题
    @Bean
    public MqttAcceptClient getMqttPushClient() {
        System.out.println("开启监听客户端");
        mqttAcceptClient.connect();
        mqttAcceptClient.subscribe(mqttProperties.getTopic(),mqttProperties.getQos());
        return mqttAcceptClient;
    }
}

QQ_1732252966566步骤7.编写控制器测试SpringBoot应用自身消息与发送

在controller包中建立MqttController.java,提供一个GET,用于触发消息的发送,代码与图如下

package com.youluoyuan.controller;

import com.youluoyuan.config.MqttSendClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 测试接口
 */
@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttSendClient mqttSendClient;

    @GetMapping(value = "/public")
    public String publishTopic(String topic, String msg) {
        mqttSendClient.publish(false, topic, msg);
        return "topic:" + topic + ",msg:" + msg;
    }

}

QQ_1732253048957

然后启动SpringBoot应用,如图可以看到控制台中,我们配置的消息接收监听已经启动了,并且客户端的clientId为uuid,默认启动的端口为8080

QQ_1732253233499

由于我们的接口是GET请求的,直接在浏览器输入如下地址,然后访问,如图

http://localhost:8080/mqtt/public?topic=test/topic&msg=222

QQ_1732253660654

然后可以看到控制台中,日志输出了消息的发送和接收,到这一步,可以说SpringBoot已经做到了既可作为消息发送客户端又可作为消息接收客户端了(如果你请求一次接口,发现输出了多次收到消息,这是因为热部署导致的,步骤9会讲解原因)

QQ_1732253560225

步骤8.利用Python与SpringBoot进行MQTT通信交互

如果你只是需要明白SpingBoot如何整合MQTT,并且对MQTT的通信已经有足够的理解,这一步骤可以跳过,但是步骤9一定要看

本步骤主要是用于使用Python模拟硬件测试与SpringBoot程序MQTT通信

创建定时发布消息的代码如下,注意将BROKER填上自己的EMQX服务所在IP

import paho.mqtt.client as mqtt
import threading
import time

# MQTT 代理配置
BROKER = ""  # 例如 "127.0.0.1" 或 公网IP
PORT = 1883  # MQTT 默认端口(非加密)
TOPIC = "test/topic"  # 要发布/订阅的主题
USERNAME = "123"
PASSWORD = "321"  # 对应的密码
CLIENTID = "testId"

# 连接成功回调(新版 API)
def on_connect(client, userdata, flags, reasonCode, properties=None):
    if reasonCode == 0:
        print("已连接到MQTT代理")
        client.subscribe(TOPIC)  # 订阅主题
        print(f"已订阅主题: {TOPIC}")
    else:
        print(f"连接MQTT代理失败, 响应码为: {reasonCode}")

# 定时发布消息的函数
def publish_message(client):
    while True:
        client.publish(TOPIC, "你好,我定时发布了一条消息", qos=1)
        print("定时消息已发布")
        time.sleep(5)  # 每隔 5 秒发布一次消息

# 创建 MQTT 客户端
client = mqtt.Client()
client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect

try:
    # 连接到 MQTT 代理
    client.connect(BROKER, PORT, 60)

    # 启动一个线程定时发布消息
    threading.Thread(target=publish_message, args=(client,), daemon=True).start()

    # 启动网络循环,保持连接
    client.loop_forever()
except Exception as e:
    print(f"Error: {e}")

然后启动Python程序,如图可以看到Python程序在定时的发送消息,并且SpringBoot应用也在实时的接收消息

QQ_1732254079552

然后来测试利用SpringBoot中的接口向Python模拟的硬件发送消息,看看是否能收到消息

消息接收代码如下

import paho.mqtt.client as mqtt

# MQTT 代理配置
BROKER = ""  # 例如 "127.0.0.1" 或 公网IP
PORT = 1883  # MQTT 默认端口(非加密)
TOPIC = "test/topic"  # 要发布/订阅的主题
USERNAME = "123"
PASSWORD = "321"  # 对应的密码

# 连接成功回调(新版 API)
def on_connect(client, userdata, flags, reasonCode, properties=None):
    if reasonCode == 0:
        print("已成功连接到 MQTT 代理")
        client.subscribe(TOPIC)  # 订阅主题
        print(f"已订阅主题: {TOPIC}")
    else:
        print(f"连接失败, 响应码为: {reasonCode}")

# 收到消息回调(新版 API)
def on_message(client, userdata, msg, properties=None):
    print(f"接收到消息: {msg.payload.decode()} 来自主题: {msg.topic}")

# 创建 MQTT 客户端
client = mqtt.Client()

# 设置用户名和密码
client.username_pw_set(USERNAME, PASSWORD)

# 绑定回调函数
client.on_connect = on_connect
client.on_message = on_message

try:
    # 连接到 MQTT 代理
    client.connect(BROKER, PORT, 60)

    # 启动网络循环,保持连接并监听消息
    client.loop_forever()
except Exception as e:
    print(f"Error: {e}")

然后还是在浏览器访问我们刚才那个地址,可以看到调用接口后,SpringBoot和Python控制台中都显示出收到消息了,说明我们的模拟没问题

QQ_1732254231555

步骤9.重要的注意事项分析

这里的重要的注意事项分析主要指的是yaml中的参数reconnect,这个参数本身不存在BUG,但是用idea在开发时,由于SpringBoot自身的热部署原因会出现BUG,具体BUG和原因为我们来复现一下

首先将yaml中的reconnect参数设置为true,如图

QQ_1732255749390

然后将MqttSendClient.java文件和MqttAcceptClient.java文件中的connect函数在获取客户端时,将clientId修改为从yaml中读取,不再使用uuid,如图

QQ_1732254595047

然后我们启动SpringBoot项目,刚启动时没什么问题,然后我们在任意java文件中的任意位置随便打上几个空格,然后ctrl+s保存来触发SpringBoot的热部署功能,然后就会发现,控制台中客户端在不断的重连和断开。

这是因为我们的客户端使用的是yaml中同一个clientId,热部署后又会重新启动一个客户端,而先前启动的客户端还存在,因此两个客户端会不断的重启将对方挤下去,所以一定要注意开发时,reconnect设置为false,最后不要忘记了,将客户端获取连接时传入的clientId恢复为uuid。

QQ_1732254949418

结语

以上是幽络源原创的SpringBoot整合MQTT,并通过EMQX代理,完成与Python模拟硬件进行MQTT通信的教程。如有不懂之处,可加Q群307531422询问交流。

© 版权声明
THE END
喜欢就支持一下吧
分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称

    暂无评论内容