From 69310a7de3d963c2bf46250b0965a2c7e8532f1e Mon Sep 17 00:00:00 2001
From: zouyu <2723363702@qq.com>
Date: 星期二, 29 七月 2025 13:40:56 +0800
Subject: [PATCH] mqtt调整:实现订阅多个mqtt服务

---
 cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java           |   57 +----------
 ruoyi-admin-ztns/src/main/resources/application-ztns.yml                |   30 ++++--
 cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java             |   10 +-
 ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java |   21 ++-
 cnas-device/src/main/java/com/ruoyi/device/mqtt/DeviceMqttConfig.java   |   72 ++++++++++++++
 cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java           |    6 
 cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java         |   35 +++---
 ruoyi-admin-ztns/src/main/resources/application-druid.yml               |   31 ++++--
 8 files changed, 157 insertions(+), 105 deletions(-)

diff --git a/cnas-device/src/main/java/com/ruoyi/device/mqtt/DeviceMqttConfig.java b/cnas-device/src/main/java/com/ruoyi/device/mqtt/DeviceMqttConfig.java
new file mode 100644
index 0000000..4b654ed
--- /dev/null
+++ b/cnas-device/src/main/java/com/ruoyi/device/mqtt/DeviceMqttConfig.java
@@ -0,0 +1,72 @@
+package com.ruoyi.device.mqtt;
+
+import lombok.Data;
+import org.springframework.stereotype.Component;
+
+/**
+ * mqtt杩炴帴鍙傛暟瀹炰綋瀵硅薄
+ */
+@Data
+@Component
+public class DeviceMqttConfig {
+
+    /**
+     * MQTT-鏈嶅姟绔�-IP
+     */
+    //	@Value("${mqtt.url}")
+    private String url;
+
+    /**
+     * MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�
+     */
+    //	@Value("${mqtt.username}")
+    private String username;
+
+    /**
+     * MQTT-鏈嶅姟绔�-瀵嗙爜
+     */
+    //	@Value("${mqtt.password}")
+    private String password;
+
+    /**
+     * 瓒呮椂鏃堕棿
+     */
+    //	@Value("${mqtt.timeout}")
+    private int timeout;
+
+    /**
+     * 蹇冭烦妫�娴嬫椂闂�
+     */
+    //	@Value("${mqtt.keepalive}")
+    private int keepalive;
+
+    /**
+     * 蹇冭烦鍖呯骇鍒�
+     */
+    //	@Value("${mqtt.qos}")
+    private int qos;
+
+    /**
+     * 鏈嶅姟绔繛鎺ヨ秴鏃舵椂闂�
+     */
+    //	@Value("${mqtt.completion-timeout}")
+    private int completionTimeout;
+
+    /**
+     * clientId
+     */
+    //	@Value("${mqtt.clientId}")
+    private String clientId;
+
+    /**
+     * 璁㈤槄涓婚
+     */
+    //	@Value("${mqtt.subscribe}")
+    private String subscribe;
+
+    /**
+     * mqtt杩炴帴寮�鍏�
+     */
+    private Boolean client;
+
+}
diff --git a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java
index d429673..c9157f9 100644
--- a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java
+++ b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java
@@ -6,9 +6,9 @@
 @Component
 public class MQBean {
 
-	@Bean("mqClient") // 鍚姩WEB鏈嶅姟鍣ㄧ殑鏃跺�欒皟鐢ㄦ鏂规硶鍒濆鍖�
-	public MQClient myMQTTClient(){
-		MQClient mqClient = new MQClient();
-		return mqClient;
-	}
+//	@Bean("mqClient") // 鍚姩WEB鏈嶅姟鍣ㄧ殑鏃跺�欒皟鐢ㄦ鏂规硶鍒濆鍖�
+//	public MQClient myMQTTClient(){
+//		MQClient mqClient = new MQClient();
+//		return mqClient;
+//	}
 }
diff --git a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java
index 9cc678f..f85eab0 100644
--- a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java
+++ b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java
@@ -1,15 +1,8 @@
 package com.ruoyi.device.mqtt;
 
-import cn.hutool.core.collection.CollectionUtil;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.TypeReference;
-import com.ruoyi.device.constant.DCResistanceMqttConstants;
 import com.ruoyi.device.service.CollectBridgeService;
-import com.ruoyi.device.vo.DCResistanceMqttVO;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
@@ -17,17 +10,14 @@
 
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
 
 @Component
 @Slf4j
-public class MQCallback<component> implements MqttCallback {
+public class MQCallback implements MqttCallback {
 
-    private MQClient mqClient; // MQTT杩炴帴鏁版嵁
+    private final MQClient mqClient; // MQTT杩炴帴鏁版嵁
 
-    private MQConfig mqConfig; // yml閰嶇疆鏁版嵁
+    private final DeviceMqttConfig mqConfig; // yml閰嶇疆鏁版嵁
 
     private static MQCallback mqCallback;
 
@@ -41,7 +31,7 @@
 		mqCallback.collectBridgeService = this.collectBridgeService;
     }
 
-    public MQCallback(MQClient mqClient, MQConfig mqConfig) {
+    public MQCallback(MQClient mqClient, DeviceMqttConfig mqConfig) {
         this.mqClient = mqClient;
         this.mqConfig = mqConfig;
     }
@@ -74,17 +64,24 @@
      * MQTT鏈嶅姟鍣ㄥ悜WEB鏈嶅姟鍣ㄥ彂閫佺殑鏁版嵁浼氭墽琛屽埌杩欓噷闈紝瀹樻柟璇濈О涓猴細璁㈤槄鍚庣殑娑堟伅
      * @param topic 涓婚锛氫篃绉颁负搴曞眰缃戝叧鍞竴鏍囪瘑
      * @param message 淇℃伅
-     * @throws Exception 鎶ラ敊
      */
     @Override
-    public void messageArrived(String topic, MqttMessage message) throws Exception {
+    public void messageArrived(String topic, MqttMessage message) {
         try {
             String parse = new String(message.getPayload());
-//            JSONObject jsonObject = JSONObject.parseObject(parse);
+            switch (topic){
+                case "/ztt/v3/2455220/publish":
+                    //鑰愪笣锛氱洿娴佺數闃绘暟鎹В鏋�
+                    mqCallback.collectBridgeService.addBridgeValueByNS(parse);
+                    break;
+                case "/ztt/v3/2455221/publish":
+                    //鑰愪笣锛氫几闀跨巼鏁版嵁瑙f瀽
+                    log.info("浼搁暱鐜囨秷鎭綋锛歿}",parse);
+                    break;
+            }
             // 濉厖閲囬泦鏁版嵁
+//            JSONObject jsonObject = JSONObject.parseObject(parse);
 //            mqCallback.collectBridgeService.addBridgeValue(jsonObject);
-            //鑰愪笣锛氱洿娴佺數闃绘暟鎹В鏋�
-            mqCallback.collectBridgeService.addBridgeValueByNS(parse);
 
         } catch (Exception e) {
             e.printStackTrace();
diff --git a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java
index 1c3c548..9f002c5 100644
--- a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java
+++ b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java
@@ -52,13 +52,13 @@
 	 * WEB鏈嶅姟鍣ㄨ繛鎺QTT鏈嶅姟鍣ㄥ嚱鏁�
 	 * @param mqttConfig yml涓璏QTT鐨勯厤缃�
 	 */
-	public void connect(MQConfig mqttConfig) throws MqttException {
+	public void connect(DeviceMqttConfig mqttConfig) throws MqttException {
 		client = new MqttClient(mqttConfig.getUrl(), mqttConfig.getClientId(), new MemoryPersistence());
 		MqttConnectOptions options = getOption(mqttConfig.getUsername(), mqttConfig.getPassword(),
-				mqttConfig.getTimeout(), mqttConfig.getKeepAlive());
+				mqttConfig.getTimeout(), mqttConfig.getKeepalive());
 		MQClient.setClient(client);
 		//杩炴帴澶辫触璋冪敤鍥炶皟鍑芥暟锛岄噸鏂拌繛鎺�
-		client.setCallback(new MQCallback<Object>(this, mqttConfig));
+		client.setCallback(new MQCallback(this, mqttConfig));
 		if (!client.isConnected()) {
 			client.connect(options);
 			// 璁㈤槄涓婚
diff --git a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java
index a88b29c..c00e36c 100644
--- a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java
+++ b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java
@@ -1,64 +1,19 @@
 package com.ruoyi.device.mqtt;
 
 import lombok.Data;
-import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
+
+import java.util.List;
 
 @Component
 @Data
+@ConfigurationProperties(prefix = "")
 public class MQConfig {
 
 	/**
-	 * MQTT-鏈嶅姟绔�-IP
+	 * 璇诲彇yml鐨刴qtt閰嶇疆淇℃伅
 	 */
-	@Value("${mqtt.url}")
-	private String url;
+	private List<DeviceMqttConfig> mqtt;
 
-	/**
-	 * MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�
-	 */
-	@Value("${mqtt.username}")
-	private String username;
-
-	/**
-	 * MQTT-鏈嶅姟绔�-瀵嗙爜
-	 */
-	@Value("${mqtt.password}")
-	private String password;
-
-	/**
-	 * 瓒呮椂鏃堕棿
-	 */
-	@Value("${mqtt.timeout}")
-	private int timeout;
-
-	/**
-	 * 蹇冭烦妫�娴嬫椂闂�
-	 */
-	@Value("${mqtt.keepalive}")
-	private int keepAlive;
-
-	/**
-	 * 蹇冭烦鍖呯骇鍒�
-	 */
-	@Value("${mqtt.qos}")
-	private int qos;
-
-	/**
-	 * 鏈嶅姟绔繛鎺ヨ秴鏃舵椂闂�
-	 */
-	@Value("${mqtt.completion-timeout}")
-	private int completionTimeout;
-
-	/**
-	 * clientId
-	 */
-	@Value("${mqtt.clientId}")
-	private String clientId;
-
-	/**
-	 * 璁㈤槄涓婚
-	 */
-	@Value("${mqtt.subscribe}")
-	private String subscribe;
 }
diff --git a/ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java b/ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java
index 854d84a..7474bee 100644
--- a/ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java
+++ b/ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java
@@ -1,13 +1,16 @@
 package com.ruoyi.web;
 
+import com.ruoyi.device.mqtt.DeviceMqttConfig;
 import com.ruoyi.device.mqtt.MQClient;
 import com.ruoyi.device.mqtt.MQConfig;
 import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.stereotype.Component;
+
+import java.util.Objects;
 
 @Component
 @Slf4j
@@ -16,14 +19,18 @@
 	@Autowired
 	private MQConfig mqConfig;
 
-	@Value("${mqtt.client}")
-	private Boolean client;
+//	@Value("${mqtt.client}")
+//	private Boolean client;
 
 	@Override
-	public void run(ApplicationArguments args) throws Exception {
-		if (client) {
-			MQClient mqttPushClient = new MQClient();
-			mqttPushClient.connect(mqConfig);
+	public void run(ApplicationArguments args) throws MqttException {
+		if(Objects.nonNull(mqConfig)){
+			for (DeviceMqttConfig deviceMqttConfig : mqConfig.getMqtt()) {
+				if (deviceMqttConfig.getClient()) {
+					MQClient mqttPushClient = new MQClient();
+					mqttPushClient.connect(deviceMqttConfig);
+				}
+			}
 		}
 	}
 }
diff --git a/ruoyi-admin-ztns/src/main/resources/application-druid.yml b/ruoyi-admin-ztns/src/main/resources/application-druid.yml
index 0cee7d2..fef6444 100644
--- a/ruoyi-admin-ztns/src/main/resources/application-druid.yml
+++ b/ruoyi-admin-ztns/src/main/resources/application-druid.yml
@@ -162,13 +162,24 @@
 
 # 澶囨敞 18083瀵嗙爜锛歾ttZTT123!@
 mqtt:
-  url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p
-  username: 2455220 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚�
-  password: 108300 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜
-  timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級
-  keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級
-  qos: 1 # 蹇冭烦鍖呯骇鍒�
-  completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級
-  clientId: ztns # clientId
-  subscribe: /ztt/v3/2455220/publish # 璁㈤槄涓婚
-  client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿�
+  - url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p
+    username: 2455220 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚�
+    password: 108300 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜
+    timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級
+    keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級
+    qos: 1 # 蹇冭烦鍖呯骇鍒�
+    completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級
+    clientId: ztns01 # clientId
+    subscribe: /ztt/v3/2455220/publish # 璁㈤槄涓婚
+    client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿�
+  - url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p
+    username: 2455221 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚�
+    password: 108295 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜
+    timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級
+    keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級
+    qos: 1 # 蹇冭烦鍖呯骇鍒�
+    completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級
+    clientId: ztns02 # clientId
+    subscribe: /ztt/v3/2455221/publish # 璁㈤槄涓婚
+    client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿�
+
diff --git a/ruoyi-admin-ztns/src/main/resources/application-ztns.yml b/ruoyi-admin-ztns/src/main/resources/application-ztns.yml
index 1b88ba5..7c90b2b 100644
--- a/ruoyi-admin-ztns/src/main/resources/application-ztns.yml
+++ b/ruoyi-admin-ztns/src/main/resources/application-ztns.yml
@@ -163,13 +163,23 @@
 
 # 澶囨敞 18083瀵嗙爜锛歾ttZTT123!@
 mqtt:
-  url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p
-  username: 2455220 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚�
-  password: 108300 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜
-  timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級
-  keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級
-  qos: 1 # 蹇冭烦鍖呯骇鍒�
-  completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級
-  clientId: ztns # clientId
-  subscribe: /ztt/v3/2455220/publish # 璁㈤槄涓婚
-  client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿�
+  - url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p
+    username: 2455220 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚�
+    password: 108300 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜
+    timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級
+    keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級
+    qos: 1 # 蹇冭烦鍖呯骇鍒�
+    completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級
+    clientId: ztns # clientId
+    subscribe: /ztt/v3/2455220/publish # 璁㈤槄涓婚
+    client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿�
+  - url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p
+    username: 2455221 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚�
+    password: 108295 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜
+    timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級
+    keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級
+    qos: 1 # 蹇冭烦鍖呯骇鍒�
+    completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級
+    clientId: ztns # clientId
+    subscribe: /ztt/v3/2455221/publish # 璁㈤槄涓婚
+    client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿�

--
Gitblit v1.9.3