博客
关于我
SpringBoot中集成eclipse.paho.client.mqttv3实现mqtt客户端并支持断线重连、线程池高并发改造、存储入库mqsql和redis示例业务流程,附资源下载
阅读量:797 次
发布时间:2023-02-26

本文共 4270 字,大约阅读时间需要 14 分钟。

Spring Boot 整合 MQTT 服务器实现消息发送与订阅

场景描述

本文将介绍如何在 Spring Boot 项目中整合 MQTT 服务器,实现消息的发送与订阅功能。以下是系统的主要场景:

  • 消息发送:当硬件设备(如摄像头报警系统)触发报警时,通过 MQTT 协议发送 JSON 格式的报警数据到指定主题。
  • 消息订阅:系统需要连接到第三方 MQTT Broker,订阅指定主题,接收推送的消息,并对消息进行解析和存储至数据库(如 MySQL 和 Redis)。
  • 实现步骤

    一、项目依赖管理

  • 引入 MQTT 依赖

    • 使用 org.eclipse.paho.client.mqttv3 作为 MQTT 客户端库,版本建议为 1.2.4。
    • 在项目的 pom.xml 中添加以下依赖:
      org.eclipse.paho
      org.eclipse.paho.client.mqttv3
      1.2.4
  • 引入 Guava 工具库

    • 若需要线程池和并发处理功能,可引入 com.google.guava,版本建议为 32.1.2-jre。
    • 在 pom.xml 中添加:
      com.google.guava
      guava
      32.1.2-jre
      provided
  • 引入 JSON 解析工具

    • 使用 fastjson 进行 JSON 格式数据解析,版本建议为 1.2.75。
    • 添加以下依赖:
      com.alibaba
      fastjson
      1.2.75
  • 引入 Lombok 工具

    • 使用 lombok 进行代码简化和格式化,添加以下依赖:
      org.projectlombok
      lombok
      true
  • 二、配置 MQTT 连接信息

  • 创建 application.yml 文件

    • 在项目根目录下创建 application.yml,添加 MQTT Broker 配置:
      badao-mqtt:
      host: 127.0.0.1
      port: 1883
      user: admin
      password: 123456
  • 创建配置类

    • 创建 com.badao.demo.config.MqttConfig 类,获取上述配置:
      @Component("MqttConfig")
      @ConfigurationProperties(prefix = "badao-mqtt")
      public class MqttConfig {
      private static String host;
      private static String port;
      private static String user;
      private static String password;
      public String getHost() { return host; }
      public void setHost(String host) { MqttConfig.host = host; }
      public String getPort() { return port; }
      public void setPort(String port) { MqttConfig.port = port; }
      public String getUser() { return user; }
      public void setUser(String user) { MqttConfig.user = user; }
      public String getPassword() { return password; }
      public void setPassword(String password) { MqttConfig.password = password; }
      }
  • 三、消息发送与订阅实现

  • 消息发送

    • 使用 MqttClient 发送消息至指定主题。
    • 示例代码:
      @Service
      public class MqttSender {
      @Autowired
      private MqttConfig mqttConfig;
      public void sendWarningMessage(String topic, String message) {
      final String host = mqttConfig.getHost();
      final String port = mqttConfig.getPort();
      final String username = mqttConfig.getUser();
      final String password = mqttConfig.getPassword();
      try {
      MqttClient client = new MqttClient(host, port, username, password);
      client.connect();
      MqttMessage messageObj = new MqttMessage();
      messageObj.setTopic(topic);
      messageObj.setPayload(message);
      new Thread(() -> {
      try {
      client.publish(messageObj);
      } catch (MqttException e) {
      log.error(" MQTT 发送失败:" + e.getMessage());
      }
      }).start();
      client.disconnect();
      } catch (MqttException e) {
      log.error(" MQTT 连接错误:" + e.getMessage());
      }
      }
      }
  • 消息订阅

    • 创建订阅器类,监听指定主题的消息。
    • 示例代码:
      @Service
      public class MqttSubscriber {
      @Autowired
      private MqttConfig mqttConfig;
      @Autowired
      private RedisTemplate redisTemplate;
      @Autowired
      private MySQLDAO mysqlDAO;
      public void subscribe(String topic) {
      final String host = mqttConfig.getHost();
      final String port = mqttConfig.getPort();
      final String username = mqttConfig.getUser();
      final String password = mqttConfig.getPassword();
      try {
      MqttClient client = new MqttClient(host, port, username, password);
      client.connect();
      client.subscribe(topic, new MqttMessageCallback() {
      @Override
      public void onSuccess(MqttMessage message) {
      log.info("接收到消息:" + message.toString());
      // 存储消息至数据库或缓存系统
      this.redisTemplate.convertAndStore(message);
      this.mysqlDAO.saveMessage(message);
      }
      @Override
      public void onFailure(int code, String reason) {
      log.error("订阅失败:" + reason);
      }
      @Override
      public void onDisconnected() {
      log.info("与 MQTT Broker 连接丢失");
      }
      });
      } catch (MqttException e) {
      log.error(" MQTT 订阅错误:" + e.getMessage());
      } finally {
      client.disconnect();
      }
      }
      }
  • 数据库与缓存集成

    • MySQL 数据库:用于存储接收的消息和报警数据。
    • Redis 缓存系统:用于快速存取和检索消息数据,提高系统性能。

    总结

    通过以上步骤,可以在 Spring Boot 项目中成功整合 MQTT 服务器,实现消息的发送与订阅功能。系统能够接收来自硬件设备的报警数据,存储至数据库和缓存系统,满足实际业务需求。

    转载地址:http://ecvfk.baihongyu.com/

    你可能感兴趣的文章
    oracle下的OVER(PARTITION BY)函数介绍
    查看>>
    Oracle中DATE数据相减问题
    查看>>
    Oracle中merge into的使用
    查看>>
    oracle中sql查询上月、本月、上周、本周、昨天、今天的数据!
    查看>>
    oracle中sql的case语句运用--根据不同条件去排序!
    查看>>
    Oracle中Transate函数的使用
    查看>>
    oracle中关于日期问题的汇总!
    查看>>
    Oracle中常用的语句
    查看>>
    Oracle中序列的操作以及使用前对序列的初始化
    查看>>
    oracle中新建用户和赋予权限
    查看>>
    Oracle中的NVL,NVL2,NULLIF以及COALESCE函数使用
    查看>>
    Oracle中的rownum 和rowid的用法和区别
    查看>>
    oracle中的大小写、字符、dual、数字、处理、日期、函数、显/隐式、时间、条件表达式case、decode、to_date、to_char、sysdate
    查看>>
    oracle中表和视图的区别,oracle中常用表和视图
    查看>>
    oracle之表空间(tablespace)、方案(schema)、段(segment)、区(extent)、块(block)
    查看>>
    Oracle从11g导出后导入10g
    查看>>
    oracle从备份归档日志的方法集中回收
    查看>>
    oracle优化器analyzed,Oracle 学习之 性能优化(十三) 索引
    查看>>
    Oracle修改字段类型
    查看>>
    Oracle修改表或者字段的注释
    查看>>