教程说明
本教程主要内容为使用EMQX开源免费版在Centos7服务器上搭建MQTT代理服务器并完成配置,然后通过Python代码来实现利用搭建的MQTT代理服务器完成通信。如有不懂之处可在评论区询问或加群询问。
前置需求和知识
一台Centos7的云服务器(可在优刻云购买服务器60元一年)、Python、了解基本的Linux命令
大致步骤
步骤1:EMQX官网找到系统适配的包 大致步骤
步骤2:下载安装启动EMQX
步骤3:知晓安装位置
步骤4:开放端口,访问emqx面板
步骤5:配置认证
步骤6:测试消息的发布和接收
进入 EMQX官网 ,点击右上角的Download进入下载页,如图
往下翻,可以看到EMQX Open Source,表示这是一个开源免费的版本,点击Download,如图
然后切换到CentOS/RHEL,可以看到下方列出了几条命令,这便是下载EMQX到Centos并启动的步骤,如图
上面已经看到了需要执行的命令,只需要在任意目录按照顺序依次执行命令就行了,如图
最后一条命令是启动EMQX的,启动后可以通过执行如下命令查看emqx的状态,如图
sudo systemctl status emqx
经过上面一番下载安装启动的操作,但是我们还得知道这东西安装在哪里的,不然配置都不知道去哪里改
配置文件默认在/etc/emqx目录下,如图
日志文件默认在/var/log/emqx目录下,如图
知道了emqx端口是1883,emqx面板端口是18083,为了后续能使用emqx及其面板,我们需要开放端口
先使用如下命令列出已开启的端口,如图可以看到我这里确实没有开放1883和18083端口
sudo firewall-cmd --list-all
因此分别执行如下3条命令开放1883和18083端口并重新加载防火墙
sudo firewall-cmd --permanent --add-port=18083/tcp
sudo firewall-cmd --permanent --add-port=1883/tcp
sudo firewall-cmd --reload
然后再查看下端口是否开放,可以看到已经开放了,如图
由于我这里使用的是优刻云的服务器,还需要在平台上开放下端口(大厂的服务器安全做的高,端口有两道防护),如图直接配置就行了
直接通过IP+18083端口的形式访问即可,账号密码默认初始为admin和public,登录后会提示修改密码,如图
点击左边的盾牌图表->客户端认证,如图
然后点击右上角的创建按钮,为了方便使用Password方式,如图
直接使用内置数据库即可,然后账号类型配置为username,加密加盐方式随意,然后创建,如图
创建后,点击用户管理,然后点击右边的创建按钮,添加用户即可,如图
发布消息测试.py
import paho.mqtt.client as mqtt
import threading
import time
# MQTT 代理配置
BROKER = "" # IP
PORT = 1883 # MQTT 默认端口(非加密)
TOPIC = "test/topic" # 要发布/订阅的主题
USERNAME = "幽络源站长土拨鼠" #MQTT代理服务器配置的账号
PASSWORD = "" # 对应的密码
# 连接成功回调(新版 API)
def on_connect(client, userdata, flags, reasonCode, properties=None):
if reasonCode == 0:
print("已连接到MQTT代理")
client.subscribe(TOPIC) # 订阅主题
print(f"已订阅主题: {TOPIC}")
else:
print(f"连接MQTT代理失败, 响应码为: {reasonCode}")
# 定时发布消息的函数
def publish_message(client):
while True:
client.publish(TOPIC, "你好,我定时发布了一条消息", qos=1)
print("定时消息已发布")
time.sleep(5) # 每隔 5 秒发布一次消息
# 创建 MQTT 客户端
client = mqtt.Client()
client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect
try:
# 连接到 MQTT 代理
client.connect(BROKER, PORT, 60)
# 启动一个线程定时发布消息
threading.Thread(target=publish_message, args=(client,), daemon=True).start()
# 启动网络循环,保持连接
client.loop_forever()
except Exception as e:
print(f"Error: {e}")
订阅消息测试.py
import paho.mqtt.client as mqtt
# MQTT 代理配置
BROKER = "" # IP
PORT = 1883 # MQTT 默认端口(非加密)
TOPIC = "test/topic" # 要发布/订阅的主题
USERNAME = "幽络源站长土拨鼠" #账号
PASSWORD = "" # 对应的密码
# 连接成功回调(新版 API)
def on_connect(client, userdata, flags, reasonCode, properties=None):
if reasonCode == 0:
print("已成功连接到 MQTT 代理")
client.subscribe(TOPIC) # 订阅主题
print(f"已订阅主题: {TOPIC}")
else:
print(f"连接失败, 响应码为: {reasonCode}")
# 收到消息回调(新版 API)
def on_message(client, userdata, msg, properties=None):
print(f"接收到消息: {msg.payload.decode()} 来自主题: {msg.topic}")
# 创建 MQTT 客户端
client = mqtt.Client()
# 设置用户名和密码
client.username_pw_set(USERNAME, PASSWORD)
# 绑定回调函数
client.on_connect = on_connect
client.on_message = on_message
try:
# 连接到 MQTT 代理
client.connect(BROKER, PORT, 60)
# 启动网络循环,保持连接并监听消息
client.loop_forever()
except Exception as e:
print(f"Error: {e}")
上面填写好你自己的IP和账号密码后,运行这两个文件,如图可以看到,发送消息的代码每5秒定时发送一条消息,并且接收消息的代码也在实时的接收
以上是幽络源站长原创MQTT通信搭建并测试的教程,希望对初学者(特别是从事软件者)有帮助,理解后可运用到软件+硬件的架构上。如有不懂之处可加Q群307531422询问,一起交流。
暂无评论内容