zouyu
昨天 69310a7de3d963c2bf46250b0965a2c7e8532f1e
mqtt调整:实现订阅多个mqtt服务
已修改7个文件
已添加1个文件
262 ■■■■■ 文件已修改
cnas-device/src/main/java/com/ruoyi/device/mqtt/DeviceMqttConfig.java 72 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java 57 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-admin-ztns/src/main/resources/application-druid.yml 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-admin-ztns/src/main/resources/application-ztns.yml 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cnas-device/src/main/java/com/ruoyi/device/mqtt/DeviceMqttConfig.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,72 @@
package com.ruoyi.device.mqtt;
import lombok.Data;
import org.springframework.stereotype.Component;
/**
 * mqtt连接参数实体对象
 */
@Data
@Component
public class DeviceMqttConfig {
    /**
     * MQTT-服务端-IP
     */
    //    @Value("${mqtt.url}")
    private String url;
    /**
     * MQTT-服务端-用户名
     */
    //    @Value("${mqtt.username}")
    private String username;
    /**
     * MQTT-服务端-密码
     */
    //    @Value("${mqtt.password}")
    private String password;
    /**
     * è¶…æ—¶æ—¶é—´
     */
    //    @Value("${mqtt.timeout}")
    private int timeout;
    /**
     * å¿ƒè·³æ£€æµ‹æ—¶é—´
     */
    //    @Value("${mqtt.keepalive}")
    private int keepalive;
    /**
     * å¿ƒè·³åŒ…级别
     */
    //    @Value("${mqtt.qos}")
    private int qos;
    /**
     * æœåŠ¡ç«¯è¿žæŽ¥è¶…æ—¶æ—¶é—´
     */
    //    @Value("${mqtt.completion-timeout}")
    private int completionTimeout;
    /**
     * clientId
     */
    //    @Value("${mqtt.clientId}")
    private String clientId;
    /**
     * è®¢é˜…主题
     */
    //    @Value("${mqtt.subscribe}")
    private String subscribe;
    /**
     * mqtt连接开关
     */
    private Boolean client;
}
cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java
@@ -6,9 +6,9 @@
@Component
public class MQBean {
    @Bean("mqClient") // å¯åЍWEB服务器的时候调用此方法初始化
    public MQClient myMQTTClient(){
        MQClient mqClient = new MQClient();
        return mqClient;
    }
//    @Bean("mqClient") // å¯åЍWEB服务器的时候调用此方法初始化
//    public MQClient myMQTTClient(){
//        MQClient mqClient = new MQClient();
//        return mqClient;
//    }
}
cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java
@@ -1,15 +1,8 @@
package com.ruoyi.device.mqtt;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.ruoyi.device.constant.DCResistanceMqttConstants;
import com.ruoyi.device.service.CollectBridgeService;
import com.ruoyi.device.vo.DCResistanceMqttVO;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@@ -17,17 +10,14 @@
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@Component
@Slf4j
public class MQCallback<component> implements MqttCallback {
public class MQCallback implements MqttCallback {
    private MQClient mqClient; // MQTT连接数据
    private final MQClient mqClient; // MQTT连接数据
    private MQConfig mqConfig; // yml配置数据
    private final DeviceMqttConfig mqConfig; // yml配置数据
    private static MQCallback mqCallback;
@@ -41,7 +31,7 @@
        mqCallback.collectBridgeService = this.collectBridgeService;
    }
    public MQCallback(MQClient mqClient, MQConfig mqConfig) {
    public MQCallback(MQClient mqClient, DeviceMqttConfig mqConfig) {
        this.mqClient = mqClient;
        this.mqConfig = mqConfig;
    }
@@ -74,17 +64,24 @@
     * MQTT服务器向WEB服务器发送的数据会执行到这里面,官方话称为:订阅后的消息
     * @param topic ä¸»é¢˜ï¼šä¹Ÿç§°ä¸ºåº•层网关唯一标识
     * @param message ä¿¡æ¯
     * @throws Exception æŠ¥é”™
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
    public void messageArrived(String topic, MqttMessage message) {
        try {
            String parse = new String(message.getPayload());
//            JSONObject jsonObject = JSONObject.parseObject(parse);
            switch (topic){
                case "/ztt/v3/2455220/publish":
                    //耐丝:直流电阻数据解析
                    mqCallback.collectBridgeService.addBridgeValueByNS(parse);
                    break;
                case "/ztt/v3/2455221/publish":
                    //耐丝:伸长率数据解析
                    log.info("伸长率消息体:{}",parse);
                    break;
            }
            // å¡«å……采集数据
//            JSONObject jsonObject = JSONObject.parseObject(parse);
//            mqCallback.collectBridgeService.addBridgeValue(jsonObject);
            //耐丝:直流电阻数据解析
            mqCallback.collectBridgeService.addBridgeValueByNS(parse);
        } catch (Exception e) {
            e.printStackTrace();
cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java
@@ -52,13 +52,13 @@
     * WEB服务器连接MQTT服务器函数
     * @param mqttConfig yml中MQTT的配置
     */
    public void connect(MQConfig mqttConfig) throws MqttException {
    public void connect(DeviceMqttConfig mqttConfig) throws MqttException {
        client = new MqttClient(mqttConfig.getUrl(), mqttConfig.getClientId(), new MemoryPersistence());
        MqttConnectOptions options = getOption(mqttConfig.getUsername(), mqttConfig.getPassword(),
                mqttConfig.getTimeout(), mqttConfig.getKeepAlive());
                mqttConfig.getTimeout(), mqttConfig.getKeepalive());
        MQClient.setClient(client);
        //连接失败调用回调函数,重新连接
        client.setCallback(new MQCallback<Object>(this, mqttConfig));
        client.setCallback(new MQCallback(this, mqttConfig));
        if (!client.isConnected()) {
            client.connect(options);
            // è®¢é˜…主题
cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java
@@ -1,64 +1,19 @@
package com.ruoyi.device.mqtt;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Data
@ConfigurationProperties(prefix = "")
public class MQConfig {
    /**
     * MQTT-服务端-IP
     * è¯»å–yml的mqtt配置信息
     */
    @Value("${mqtt.url}")
    private String url;
    private List<DeviceMqttConfig> mqtt;
    /**
     * MQTT-服务端-用户名
     */
    @Value("${mqtt.username}")
    private String username;
    /**
     * MQTT-服务端-密码
     */
    @Value("${mqtt.password}")
    private String password;
    /**
     * è¶…æ—¶æ—¶é—´
     */
    @Value("${mqtt.timeout}")
    private int timeout;
    /**
     * å¿ƒè·³æ£€æµ‹æ—¶é—´
     */
    @Value("${mqtt.keepalive}")
    private int keepAlive;
    /**
     * å¿ƒè·³åŒ…级别
     */
    @Value("${mqtt.qos}")
    private int qos;
    /**
     * æœåŠ¡ç«¯è¿žæŽ¥è¶…æ—¶æ—¶é—´
     */
    @Value("${mqtt.completion-timeout}")
    private int completionTimeout;
    /**
     * clientId
     */
    @Value("${mqtt.clientId}")
    private String clientId;
    /**
     * è®¢é˜…主题
     */
    @Value("${mqtt.subscribe}")
    private String subscribe;
}
ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java
@@ -1,13 +1,16 @@
package com.ruoyi.web;
import com.ruoyi.device.mqtt.DeviceMqttConfig;
import com.ruoyi.device.mqtt.MQClient;
import com.ruoyi.device.mqtt.MQConfig;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.Objects;
@Component
@Slf4j
@@ -16,14 +19,18 @@
    @Autowired
    private MQConfig mqConfig;
    @Value("${mqtt.client}")
    private Boolean client;
//    @Value("${mqtt.client}")
//    private Boolean client;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        if (client) {
            MQClient mqttPushClient = new MQClient();
            mqttPushClient.connect(mqConfig);
    public void run(ApplicationArguments args) throws MqttException {
        if(Objects.nonNull(mqConfig)){
            for (DeviceMqttConfig deviceMqttConfig : mqConfig.getMqtt()) {
                if (deviceMqttConfig.getClient()) {
                    MQClient mqttPushClient = new MQClient();
                    mqttPushClient.connect(deviceMqttConfig);
                }
            }
        }
    }
}
ruoyi-admin-ztns/src/main/resources/application-druid.yml
@@ -162,13 +162,24 @@
# å¤‡æ³¨ 18083密码:zttZTT123!@
mqtt:
  url: tcp://mqtt-ztt.zttiot.com:1883 # æœåС噍ip
  username: 2455220 # MQTT-服务端-用户名,后期会修改为用户登录名
  password: 108300 # MQTT-服务端-密码,,后期会修改为用户登录密码
  timeout: 100 # è¶…æ—¶æ—¶é—´ ï¼ˆå•位:秒)
  keepalive: 60 # å¿ƒè·³ ï¼ˆå•位:秒)
  qos: 1 # å¿ƒè·³åŒ…级别
  completion-timeout: 3000 # è¿žæŽ¥è¶…时时间(单位:秒)
  clientId: ztns # clientId
  subscribe: /ztt/v3/2455220/publish # è®¢é˜…主题
  client: true # å¦‚果开发需要启动测试,需要改为false不然会一直报错
  - url: tcp://mqtt-ztt.zttiot.com:1883 # æœåС噍ip
    username: 2455220 # MQTT-服务端-用户名,后期会修改为用户登录名
    password: 108300 # MQTT-服务端-密码,,后期会修改为用户登录密码
    timeout: 100 # è¶…æ—¶æ—¶é—´ ï¼ˆå•位:秒)
    keepalive: 60 # å¿ƒè·³ ï¼ˆå•位:秒)
    qos: 1 # å¿ƒè·³åŒ…级别
    completion-timeout: 3000 # è¿žæŽ¥è¶…时时间(单位:秒)
    clientId: ztns01 # clientId
    subscribe: /ztt/v3/2455220/publish # è®¢é˜…主题
    client: true # å¦‚果开发需要启动测试,需要改为false不然会一直报错
  - url: tcp://mqtt-ztt.zttiot.com:1883 # æœåС噍ip
    username: 2455221 # MQTT-服务端-用户名,后期会修改为用户登录名
    password: 108295 # MQTT-服务端-密码,,后期会修改为用户登录密码
    timeout: 100 # è¶…æ—¶æ—¶é—´ ï¼ˆå•位:秒)
    keepalive: 60 # å¿ƒè·³ ï¼ˆå•位:秒)
    qos: 1 # å¿ƒè·³åŒ…级别
    completion-timeout: 3000 # è¿žæŽ¥è¶…时时间(单位:秒)
    clientId: ztns02 # clientId
    subscribe: /ztt/v3/2455221/publish # è®¢é˜…主题
    client: true # å¦‚果开发需要启动测试,需要改为false不然会一直报错
ruoyi-admin-ztns/src/main/resources/application-ztns.yml
@@ -163,13 +163,23 @@
# å¤‡æ³¨ 18083密码:zttZTT123!@
mqtt:
  url: tcp://mqtt-ztt.zttiot.com:1883 # æœåС噍ip
  username: 2455220 # MQTT-服务端-用户名,后期会修改为用户登录名
  password: 108300 # MQTT-服务端-密码,,后期会修改为用户登录密码
  timeout: 100 # è¶…æ—¶æ—¶é—´ ï¼ˆå•位:秒)
  keepalive: 60 # å¿ƒè·³ ï¼ˆå•位:秒)
  qos: 1 # å¿ƒè·³åŒ…级别
  completion-timeout: 3000 # è¿žæŽ¥è¶…时时间(单位:秒)
  clientId: ztns # clientId
  subscribe: /ztt/v3/2455220/publish # è®¢é˜…主题
  client: true # å¦‚果开发需要启动测试,需要改为false不然会一直报错
  - url: tcp://mqtt-ztt.zttiot.com:1883 # æœåС噍ip
    username: 2455220 # MQTT-服务端-用户名,后期会修改为用户登录名
    password: 108300 # MQTT-服务端-密码,,后期会修改为用户登录密码
    timeout: 100 # è¶…æ—¶æ—¶é—´ ï¼ˆå•位:秒)
    keepalive: 60 # å¿ƒè·³ ï¼ˆå•位:秒)
    qos: 1 # å¿ƒè·³åŒ…级别
    completion-timeout: 3000 # è¿žæŽ¥è¶…时时间(单位:秒)
    clientId: ztns # clientId
    subscribe: /ztt/v3/2455220/publish # è®¢é˜…主题
    client: true # å¦‚果开发需要启动测试,需要改为false不然会一直报错
  - url: tcp://mqtt-ztt.zttiot.com:1883 # æœåС噍ip
    username: 2455221 # MQTT-服务端-用户名,后期会修改为用户登录名
    password: 108295 # MQTT-服务端-密码,,后期会修改为用户登录密码
    timeout: 100 # è¶…æ—¶æ—¶é—´ ï¼ˆå•位:秒)
    keepalive: 60 # å¿ƒè·³ ï¼ˆå•位:秒)
    qos: 1 # å¿ƒè·³åŒ…级别
    completion-timeout: 3000 # è¿žæŽ¥è¶…时时间(单位:秒)
    clientId: ztns # clientId
    subscribe: /ztt/v3/2455221/publish # è®¢é˜…主题
    client: true # å¦‚果开发需要启动测试,需要改为false不然会一直报错