本教程主要内容为使用SpringBoot整合MQTT利用EMQX代理服务完成MQTT的消息发送与接收,然后用Python模拟硬件与SpringBoot应用进行了MQTT消息的通信,教程详细,并在最后讲解了开发中的注意事项,本教程适用于物联网领域、JavaWeb领域的开发人员。
内容架构
如图,教程内容主要是用到了这三个端,对于Python的测试,可有可无,这里只是模拟硬件测试与SpringBoot的通信,并且便于理解MQTT的通信
1.确定SpringBoot版本并导入所需依赖
2.yaml配置相关参数
3.确定包结构并编写读取yaml参数的组件
4.编写客户端回调组件
5.编写MQTT的发送客户端和接收客户端
6.编写MQTT监听接收消息的配置类
7.编写控制器测试SpringBoot应用自身消息与发送
8.利用Python与SpringBoot进行MQTT通信交互
9.重要的注意事项分析
步骤1.确定SpringBoot版本并导入所需依赖
我这里所使用的SpringBoot版本为2.3.4RELEASE,JDK使用1.8,Maven随意,如图
然后所使用的依赖为如下,我这里列出我所有使用到的依赖,最上面三个是MQTT相关的依赖
<!-- mqtt -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
步骤2.yaml配置相关参数
在application.yaml中配置如下参数,注意将hostUrl的ip替换为你自己的EMQX所在ip,如图
mqtt:
hostUrl: tcp://111.111.11.111:1883
username: 123
password: 321
client-id: testId #虽然这里 幽络源配置了client-id,但是在获取客户端连接时,我是用的uuid,避免重复
cleanSession: false
#开发时这里的自动重连接一定要设置false,不然会由于热部署导致连接两个相同的clientId,出现bug
#真正部署时这里的自动重连接 再设置为true,这样就可以保证 由于网络原因即使连接断开也会自动重连接
reconnect: true
timeout: 100
keepAlive: 100
topic: test/topic
isOpen: true
qos: 0
步骤3.确定包结构并编写读取yaml参数的组件
包结构比较简单,只需要建立config和controller包即可,如图,config包用于放置我们的MQTT相关配置文件,controller用于放置接口
由于我们的MQTT相关配置文件甚至接口中都会用到yaml中配置的参数,方便起见,建立一个专门读取yaml中mqtt参数的组件并利用lombok提供get/set方法,在config包下建立MqttProperties.java文件,代码与图如下
package com.youluoyuan.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {
//mqtt认证的用户名
private String username;
//mqtt认证的用户对应的密码
private String password;
//ip地址
private String hostUrl;
//客户端id
private String clientId;
//主题
private String topic;
//超时时间
private int timeout;
//心跳
private int keepAlive;
//是否清理session
private Boolean cleanSession;
//是否自动重连
private Boolean reconnect;
//启动项目的时候是否启动mqtt
private Boolean isOpen;
//连接方式
private Integer qos;
}
步骤4.编写客户端回调组件
在config包下建立MqttCallBack.java文件,用于消息发送和接收客户端进行触发,加入如下代码
package com.youluoyuan.config;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* MQTT客户端回调
*/
@Component
public class MqttCallBack implements MqttCallbackExtended {
private static final Logger logger = LoggerFactory.getLogger(MqttCallBack.class);
/**
* 客户端断开后的回调
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
logger.info("客户端连接已断开");
}
/**
* 客户端接收消息的回调
* @param topic 主题
* @param mqttMessage 消息
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage){
logger.info("收到消息:主题 => "+ topic+",Qos => "+ mqttMessage.getQos() +",内容 => " + new String(mqttMessage.getPayload()));
}
/**
* 客户端发送消息的回调
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
String[] topics = token.getTopics();
try {
MqttMessage message = token.getMessage();
byte[] payload = message.getPayload();
String s = new String(payload, "UTF-8");
for (String topic : topics) {
logger.info("消息发送成功,主题为 => "+topic+",内容为 => " + s);
}
} catch (Exception e) {
logger.error("消息发送失败,错误信息 => {}", e.getMessage());
e.printStackTrace();
}
}
/**
* MQTT连接回调
* @param reconnect 是否为重连
* @param serverURI 服务URI
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
logger.info("===客户端" + MqttAcceptClient.client.getClientId() + "连接成功!===");
}
}
步骤5.编写MQTT的发送客户端和接收客户端
在config包下建立MqttSendClient.java,作为消息发送客户端,代码如下
package com.youluoyuan.config;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* MQTT消息发送客户端
*/
@Component
public class MqttSendClient {
private static final Logger logger = LoggerFactory.getLogger(MqttSendClient.class);
@Autowired
private MqttCallBack mqttSendCallBack;
@Autowired
private MqttProperties mqttProperties;
//连接MQTT代理服务器并设置回调
public MqttClient connect() {
MqttClient client = null;
try {
String clientId = UUID.randomUUID().toString().replaceAll("-", "");
//获取MQTT客户端
client = new MqttClient(mqttProperties.getHostUrl(), clientId, new MemoryPersistence());
//设置连接参数
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAlive());
options.setCleanSession(mqttProperties.getCleanSession());
options.setAutomaticReconnect(mqttProperties.getReconnect());
// 设置回调
client.setCallback(mqttSendCallBack);
client.connect(options);
} catch (Exception e) {
logger.error("MQTT消息发送客户端连接失败,错误信息 => {}", e.getMessage());
e.printStackTrace();
}
return client;
}
/**
* 发送消息
* @param retained 是否保留
* @param topic 主题
* @param content 内容
*/
public void publish(boolean retained, String topic, String content) {
//创建消息
MqttMessage message = new MqttMessage();
//设置消息参数
message.setQos(mqttProperties.getQos());
message.setRetained(retained);
message.setPayload(content.getBytes());
MqttDeliveryToken token;
//获取连接
MqttClient mqttClient = connect();
try {
mqttClient.publish(mqttProperties.getTopic(), message);
} catch (MqttException e) {
logger.error("MQTT消息发送客户端发送消息失败,错误信息 => {}", e.getMessage());
e.printStackTrace();
} finally {
disconnect(mqttClient);
close(mqttClient);
}
}
/**
* 取消连接
* @param mqttClient 客户端
*/
public static void disconnect(MqttClient mqttClient) {
try {
if (mqttClient != null)
mqttClient.disconnect();
} catch (MqttException e) {
logger.error("MQTT消息发送客户端取消连接失败,错误信息 => {}", e.getMessage());
e.printStackTrace();
}
}
/**
* 释放资源
* @param mqttClient 客户端
*/
public static void close(MqttClient mqttClient) {
try {
if (mqttClient != null)
mqttClient.close();
} catch (MqttException e) {
logger.error("MQTT消息发送客户端释放资源失败,错误信息 => {}", e.getMessage());
e.printStackTrace();
}
}
}
再在config包下建立MqttAcceptClient.java,作为消息接收客户端,代码如下
package com.youluoyuan.config;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* MQTT消息接收客户端
*/
@Component
public class MqttAcceptClient {
private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class);
@Autowired
private MqttCallBack mqttCallBack;
@Autowired
private MqttProperties mqttProperties;
public static MqttClient client;
private static MqttClient getClient() {
return client;
}
private static void setClient(MqttClient client) {
MqttAcceptClient.client = client;
}
//连接MQTT代理服务器并设置回调
public void connect() {
MqttClient client;
try {
String clientId = UUID.randomUUID().toString().replaceAll("-", "");
//获取MQTT客户端
client = new MqttClient(mqttProperties.getHostUrl(), clientId,new MemoryPersistence());
//设置连接参数
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAlive());
options.setCleanSession(mqttProperties.getCleanSession());
options.setAutomaticReconnect(mqttProperties.getReconnect());
MqttAcceptClient.setClient(client);
// 设置回调
client.setCallback(mqttCallBack);
client.connect(options);
} catch (Exception e) {
logger.error("MQTT消息接收客户端连接失败,错误信息 => {}", e.getMessage());
e.printStackTrace();
}
}
//重新连接
public void reconnection() {
try {
client.connect();
} catch (MqttException e) {
logger.error("MQTT消息接收客户端重新连接失败,错误信息 => {}", e.getMessage());
e.printStackTrace();
}
}
/**
* 订阅主题
* @param topic 主题
* @param qos 连接方式
*/
public void subscribe(String topic, int qos) {
logger.info("===MQTT消息接收客户端订阅主题:" + topic + "===");
try {
client.subscribe(topic, qos);
} catch (MqttException e) {
logger.error("MQTT消息接收客户端订阅主题失败,错误信息 => {}", e.getMessage());
e.printStackTrace();
}
}
/**
* 取消订阅主题
* @param topic
*/
public void unsubscribe(String topic) {
logger.info("===MQTT消息接收客户端取消订阅主题:" + topic + "====");
try {
client.unsubscribe(topic);
} catch (MqttException e) {
logger.error("MQTT消息接收客户端取消订阅主题失败,错误信息 => {}", e.getMessage());
e.printStackTrace();
}
}
}
有了如上两个客户端,我们的SpringBoot就能既作消息发送者,也能作为消息接收者,也就是可以既能获取到硬件传来的信息,也能通过发送消息去控制硬件
步骤6.编写MQTT监听接收消息的配置类
在config包下建立MqttConfig.java,此配置文件用于启动SpringBoot使,就订阅我们配置的主题进行消息的监听接收,代码与图如下
package com.youluoyuan.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
*启动服务的时候开启监听客户端
*/
@Configuration
public class MqttConfig {
@Autowired
private MqttAcceptClient mqttAcceptClient;
@Autowired
private MqttProperties mqttProperties;
//订阅主题
@Bean
public MqttAcceptClient getMqttPushClient() {
System.out.println("开启监听客户端");
mqttAcceptClient.connect();
mqttAcceptClient.subscribe(mqttProperties.getTopic(),mqttProperties.getQos());
return mqttAcceptClient;
}
}
步骤7.编写控制器测试SpringBoot应用自身消息与发送
在controller包中建立MqttController.java,提供一个GET,用于触发消息的发送,代码与图如下
package com.youluoyuan.controller;
import com.youluoyuan.config.MqttSendClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 测试接口
*/
@RestController
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttSendClient mqttSendClient;
@GetMapping(value = "/public")
public String publishTopic(String topic, String msg) {
mqttSendClient.publish(false, topic, msg);
return "topic:" + topic + ",msg:" + msg;
}
}
然后启动SpringBoot应用,如图可以看到控制台中,我们配置的消息接收监听已经启动了,并且客户端的clientId为uuid,默认启动的端口为8080
由于我们的接口是GET请求的,直接在浏览器输入如下地址,然后访问,如图
http://localhost:8080/mqtt/public?topic=test/topic&msg=222
然后可以看到控制台中,日志输出了消息的发送和接收,到这一步,可以说SpringBoot已经做到了既可作为消息发送客户端又可作为消息接收客户端了(如果你请求一次接口,发现输出了多次收到消息,这是因为热部署导致的,步骤9会讲解原因)
步骤8.利用Python与SpringBoot进行MQTT通信交互
如果你只是需要明白SpingBoot如何整合MQTT,并且对MQTT的通信已经有足够的理解,这一步骤可以跳过,但是步骤9一定要看
本步骤主要是用于使用Python模拟硬件测试与SpringBoot程序MQTT通信
创建定时发布消息的代码如下,注意将BROKER填上自己的EMQX服务所在IP
import paho.mqtt.client as mqtt
import threading
import time
# MQTT 代理配置
BROKER = "" # 例如 "127.0.0.1" 或 公网IP
PORT = 1883 # MQTT 默认端口(非加密)
TOPIC = "test/topic" # 要发布/订阅的主题
USERNAME = "123"
PASSWORD = "321" # 对应的密码
CLIENTID = "testId"
# 连接成功回调(新版 API)
def on_connect(client, userdata, flags, reasonCode, properties=None):
if reasonCode == 0:
print("已连接到MQTT代理")
client.subscribe(TOPIC) # 订阅主题
print(f"已订阅主题: {TOPIC}")
else:
print(f"连接MQTT代理失败, 响应码为: {reasonCode}")
# 定时发布消息的函数
def publish_message(client):
while True:
client.publish(TOPIC, "你好,我定时发布了一条消息", qos=1)
print("定时消息已发布")
time.sleep(5) # 每隔 5 秒发布一次消息
# 创建 MQTT 客户端
client = mqtt.Client()
client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect
try:
# 连接到 MQTT 代理
client.connect(BROKER, PORT, 60)
# 启动一个线程定时发布消息
threading.Thread(target=publish_message, args=(client,), daemon=True).start()
# 启动网络循环,保持连接
client.loop_forever()
except Exception as e:
print(f"Error: {e}")
然后启动Python程序,如图可以看到Python程序在定时的发送消息,并且SpringBoot应用也在实时的接收消息
然后来测试利用SpringBoot中的接口向Python模拟的硬件发送消息,看看是否能收到消息
消息接收代码如下
import paho.mqtt.client as mqtt
# MQTT 代理配置
BROKER = "" # 例如 "127.0.0.1" 或 公网IP
PORT = 1883 # MQTT 默认端口(非加密)
TOPIC = "test/topic" # 要发布/订阅的主题
USERNAME = "123"
PASSWORD = "321" # 对应的密码
# 连接成功回调(新版 API)
def on_connect(client, userdata, flags, reasonCode, properties=None):
if reasonCode == 0:
print("已成功连接到 MQTT 代理")
client.subscribe(TOPIC) # 订阅主题
print(f"已订阅主题: {TOPIC}")
else:
print(f"连接失败, 响应码为: {reasonCode}")
# 收到消息回调(新版 API)
def on_message(client, userdata, msg, properties=None):
print(f"接收到消息: {msg.payload.decode()} 来自主题: {msg.topic}")
# 创建 MQTT 客户端
client = mqtt.Client()
# 设置用户名和密码
client.username_pw_set(USERNAME, PASSWORD)
# 绑定回调函数
client.on_connect = on_connect
client.on_message = on_message
try:
# 连接到 MQTT 代理
client.connect(BROKER, PORT, 60)
# 启动网络循环,保持连接并监听消息
client.loop_forever()
except Exception as e:
print(f"Error: {e}")
然后还是在浏览器访问我们刚才那个地址,可以看到调用接口后,SpringBoot和Python控制台中都显示出收到消息了,说明我们的模拟没问题
步骤9.重要的注意事项分析
这里的重要的注意事项分析主要指的是yaml中的参数reconnect,这个参数本身不存在BUG,但是用idea在开发时,由于SpringBoot自身的热部署原因会出现BUG,具体BUG和原因为我们来复现一下
首先将yaml中的reconnect参数设置为true,如图
然后将MqttSendClient.java文件和MqttAcceptClient.java文件中的connect函数在获取客户端时,将clientId修改为从yaml中读取,不再使用uuid,如图
然后我们启动SpringBoot项目,刚启动时没什么问题,然后我们在任意java文件中的任意位置随便打上几个空格,然后ctrl+s保存来触发SpringBoot的热部署功能,然后就会发现,控制台中客户端在不断的重连和断开。
这是因为我们的客户端使用的是yaml中同一个clientId,热部署后又会重新启动一个客户端,而先前启动的客户端还存在,因此两个客户端会不断的重启将对方挤下去,所以一定要注意开发时,reconnect设置为false,最后不要忘记了,将客户端获取连接时传入的clientId恢复为uuid。
结语
以上是幽络源原创的SpringBoot整合MQTT,并通过EMQX代理,完成与Python模拟硬件进行MQTT通信的教程。如有不懂之处,可加Q群307531422询问交流。
暂无评论内容