本文共 4270 字,大约阅读时间需要 14 分钟。
本文将介绍如何在 Spring Boot 项目中整合 MQTT 服务器,实现消息的发送与订阅功能。以下是系统的主要场景:
引入 MQTT 依赖
org.eclipse.paho.client.mqttv3 作为 MQTT 客户端库,版本建议为 1.2.4。org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.4
引入 Guava 工具库
com.google.guava,版本建议为 32.1.2-jre。com.google.guava guava 32.1.2-jre provided
引入 JSON 解析工具
fastjson 进行 JSON 格式数据解析,版本建议为 1.2.75。com.alibaba fastjson 1.2.75
引入 Lombok 工具
lombok 进行代码简化和格式化,添加以下依赖: org.projectlombok lombok true
创建 application.yml 文件
application.yml,添加 MQTT Broker 配置: badao-mqtt: host: 127.0.0.1 port: 1883 user: admin password: 123456
创建配置类
com.badao.demo.config.MqttConfig 类,获取上述配置: @Component("MqttConfig")@ConfigurationProperties(prefix = "badao-mqtt")public class MqttConfig { private static String host; private static String port; private static String user; private static String password; public String getHost() { return host; } public void setHost(String host) { MqttConfig.host = host; } public String getPort() { return port; } public void setPort(String port) { MqttConfig.port = port; } public String getUser() { return user; } public void setUser(String user) { MqttConfig.user = user; } public String getPassword() { return password; } public void setPassword(String password) { MqttConfig.password = password; }} 消息发送
MqttClient 发送消息至指定主题。@Servicepublic class MqttSender { @Autowired private MqttConfig mqttConfig; public void sendWarningMessage(String topic, String message) { final String host = mqttConfig.getHost(); final String port = mqttConfig.getPort(); final String username = mqttConfig.getUser(); final String password = mqttConfig.getPassword(); try { MqttClient client = new MqttClient(host, port, username, password); client.connect(); MqttMessage messageObj = new MqttMessage(); messageObj.setTopic(topic); messageObj.setPayload(message); new Thread(() -> { try { client.publish(messageObj); } catch (MqttException e) { log.error(" MQTT 发送失败:" + e.getMessage()); } }).start(); client.disconnect(); } catch (MqttException e) { log.error(" MQTT 连接错误:" + e.getMessage()); } }} 消息订阅
@Servicepublic class MqttSubscriber { @Autowired private MqttConfig mqttConfig; @Autowired private RedisTemplate redisTemplate; @Autowired private MySQLDAO mysqlDAO; public void subscribe(String topic) { final String host = mqttConfig.getHost(); final String port = mqttConfig.getPort(); final String username = mqttConfig.getUser(); final String password = mqttConfig.getPassword(); try { MqttClient client = new MqttClient(host, port, username, password); client.connect(); client.subscribe(topic, new MqttMessageCallback() { @Override public void onSuccess(MqttMessage message) { log.info("接收到消息:" + message.toString()); // 存储消息至数据库或缓存系统 this.redisTemplate.convertAndStore(message); this.mysqlDAO.saveMessage(message); } @Override public void onFailure(int code, String reason) { log.error("订阅失败:" + reason); } @Override public void onDisconnected() { log.info("与 MQTT Broker 连接丢失"); } }); } catch (MqttException e) { log.error(" MQTT 订阅错误:" + e.getMessage()); } finally { client.disconnect(); } }} 通过以上步骤,可以在 Spring Boot 项目中成功整合 MQTT 服务器,实现消息的发送与订阅功能。系统能够接收来自硬件设备的报警数据,存储至数据库和缓存系统,满足实际业务需求。
转载地址:http://ecvfk.baihongyu.com/