From 69310a7de3d963c2bf46250b0965a2c7e8532f1e Mon Sep 17 00:00:00 2001
From: zouyu <2723363702@qq.com>
Date: 星期二, 29 七月 2025 13:40:56 +0800
Subject: [PATCH] mqtt调整:实现订阅多个mqtt服务
---
cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java | 57 +----------
ruoyi-admin-ztns/src/main/resources/application-ztns.yml | 30 ++++--
cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java | 10 +-
ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java | 21 ++-
cnas-device/src/main/java/com/ruoyi/device/mqtt/DeviceMqttConfig.java | 72 ++++++++++++++
cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java | 6
cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java | 35 +++---
ruoyi-admin-ztns/src/main/resources/application-druid.yml | 31 ++++--
8 files changed, 157 insertions(+), 105 deletions(-)
diff --git a/cnas-device/src/main/java/com/ruoyi/device/mqtt/DeviceMqttConfig.java b/cnas-device/src/main/java/com/ruoyi/device/mqtt/DeviceMqttConfig.java
new file mode 100644
index 0000000..4b654ed
--- /dev/null
+++ b/cnas-device/src/main/java/com/ruoyi/device/mqtt/DeviceMqttConfig.java
@@ -0,0 +1,72 @@
+package com.ruoyi.device.mqtt;
+
+import lombok.Data;
+import org.springframework.stereotype.Component;
+
+/**
+ * mqtt杩炴帴鍙傛暟瀹炰綋瀵硅薄
+ */
+@Data
+@Component
+public class DeviceMqttConfig {
+
+ /**
+ * MQTT-鏈嶅姟绔�-IP
+ */
+ // @Value("${mqtt.url}")
+ private String url;
+
+ /**
+ * MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�
+ */
+ // @Value("${mqtt.username}")
+ private String username;
+
+ /**
+ * MQTT-鏈嶅姟绔�-瀵嗙爜
+ */
+ // @Value("${mqtt.password}")
+ private String password;
+
+ /**
+ * 瓒呮椂鏃堕棿
+ */
+ // @Value("${mqtt.timeout}")
+ private int timeout;
+
+ /**
+ * 蹇冭烦妫�娴嬫椂闂�
+ */
+ // @Value("${mqtt.keepalive}")
+ private int keepalive;
+
+ /**
+ * 蹇冭烦鍖呯骇鍒�
+ */
+ // @Value("${mqtt.qos}")
+ private int qos;
+
+ /**
+ * 鏈嶅姟绔繛鎺ヨ秴鏃舵椂闂�
+ */
+ // @Value("${mqtt.completion-timeout}")
+ private int completionTimeout;
+
+ /**
+ * clientId
+ */
+ // @Value("${mqtt.clientId}")
+ private String clientId;
+
+ /**
+ * 璁㈤槄涓婚
+ */
+ // @Value("${mqtt.subscribe}")
+ private String subscribe;
+
+ /**
+ * mqtt杩炴帴寮�鍏�
+ */
+ private Boolean client;
+
+}
diff --git a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java
index d429673..c9157f9 100644
--- a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java
+++ b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java
@@ -6,9 +6,9 @@
@Component
public class MQBean {
- @Bean("mqClient") // 鍚姩WEB鏈嶅姟鍣ㄧ殑鏃跺�欒皟鐢ㄦ鏂规硶鍒濆鍖�
- public MQClient myMQTTClient(){
- MQClient mqClient = new MQClient();
- return mqClient;
- }
+// @Bean("mqClient") // 鍚姩WEB鏈嶅姟鍣ㄧ殑鏃跺�欒皟鐢ㄦ鏂规硶鍒濆鍖�
+// public MQClient myMQTTClient(){
+// MQClient mqClient = new MQClient();
+// return mqClient;
+// }
}
diff --git a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java
index 9cc678f..f85eab0 100644
--- a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java
+++ b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java
@@ -1,15 +1,8 @@
package com.ruoyi.device.mqtt;
-import cn.hutool.core.collection.CollectionUtil;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.TypeReference;
-import com.ruoyi.device.constant.DCResistanceMqttConstants;
import com.ruoyi.device.service.CollectBridgeService;
-import com.ruoyi.device.vo.DCResistanceMqttVO;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@@ -17,17 +10,14 @@
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
@Component
@Slf4j
-public class MQCallback<component> implements MqttCallback {
+public class MQCallback implements MqttCallback {
- private MQClient mqClient; // MQTT杩炴帴鏁版嵁
+ private final MQClient mqClient; // MQTT杩炴帴鏁版嵁
- private MQConfig mqConfig; // yml閰嶇疆鏁版嵁
+ private final DeviceMqttConfig mqConfig; // yml閰嶇疆鏁版嵁
private static MQCallback mqCallback;
@@ -41,7 +31,7 @@
mqCallback.collectBridgeService = this.collectBridgeService;
}
- public MQCallback(MQClient mqClient, MQConfig mqConfig) {
+ public MQCallback(MQClient mqClient, DeviceMqttConfig mqConfig) {
this.mqClient = mqClient;
this.mqConfig = mqConfig;
}
@@ -74,17 +64,24 @@
* MQTT鏈嶅姟鍣ㄥ悜WEB鏈嶅姟鍣ㄥ彂閫佺殑鏁版嵁浼氭墽琛屽埌杩欓噷闈紝瀹樻柟璇濈О涓猴細璁㈤槄鍚庣殑娑堟伅
* @param topic 涓婚锛氫篃绉颁负搴曞眰缃戝叧鍞竴鏍囪瘑
* @param message 淇℃伅
- * @throws Exception 鎶ラ敊
*/
@Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
+ public void messageArrived(String topic, MqttMessage message) {
try {
String parse = new String(message.getPayload());
-// JSONObject jsonObject = JSONObject.parseObject(parse);
+ switch (topic){
+ case "/ztt/v3/2455220/publish":
+ //鑰愪笣锛氱洿娴佺數闃绘暟鎹В鏋�
+ mqCallback.collectBridgeService.addBridgeValueByNS(parse);
+ break;
+ case "/ztt/v3/2455221/publish":
+ //鑰愪笣锛氫几闀跨巼鏁版嵁瑙f瀽
+ log.info("浼搁暱鐜囨秷鎭綋锛歿}",parse);
+ break;
+ }
// 濉厖閲囬泦鏁版嵁
+// JSONObject jsonObject = JSONObject.parseObject(parse);
// mqCallback.collectBridgeService.addBridgeValue(jsonObject);
- //鑰愪笣锛氱洿娴佺數闃绘暟鎹В鏋�
- mqCallback.collectBridgeService.addBridgeValueByNS(parse);
} catch (Exception e) {
e.printStackTrace();
diff --git a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java
index 1c3c548..9f002c5 100644
--- a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java
+++ b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java
@@ -52,13 +52,13 @@
* WEB鏈嶅姟鍣ㄨ繛鎺QTT鏈嶅姟鍣ㄥ嚱鏁�
* @param mqttConfig yml涓璏QTT鐨勯厤缃�
*/
- public void connect(MQConfig mqttConfig) throws MqttException {
+ public void connect(DeviceMqttConfig mqttConfig) throws MqttException {
client = new MqttClient(mqttConfig.getUrl(), mqttConfig.getClientId(), new MemoryPersistence());
MqttConnectOptions options = getOption(mqttConfig.getUsername(), mqttConfig.getPassword(),
- mqttConfig.getTimeout(), mqttConfig.getKeepAlive());
+ mqttConfig.getTimeout(), mqttConfig.getKeepalive());
MQClient.setClient(client);
//杩炴帴澶辫触璋冪敤鍥炶皟鍑芥暟锛岄噸鏂拌繛鎺�
- client.setCallback(new MQCallback<Object>(this, mqttConfig));
+ client.setCallback(new MQCallback(this, mqttConfig));
if (!client.isConnected()) {
client.connect(options);
// 璁㈤槄涓婚
diff --git a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java
index a88b29c..c00e36c 100644
--- a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java
+++ b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java
@@ -1,64 +1,19 @@
package com.ruoyi.device.mqtt;
import lombok.Data;
-import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
+
+import java.util.List;
@Component
@Data
+@ConfigurationProperties(prefix = "")
public class MQConfig {
/**
- * MQTT-鏈嶅姟绔�-IP
+ * 璇诲彇yml鐨刴qtt閰嶇疆淇℃伅
*/
- @Value("${mqtt.url}")
- private String url;
+ private List<DeviceMqttConfig> mqtt;
- /**
- * MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�
- */
- @Value("${mqtt.username}")
- private String username;
-
- /**
- * MQTT-鏈嶅姟绔�-瀵嗙爜
- */
- @Value("${mqtt.password}")
- private String password;
-
- /**
- * 瓒呮椂鏃堕棿
- */
- @Value("${mqtt.timeout}")
- private int timeout;
-
- /**
- * 蹇冭烦妫�娴嬫椂闂�
- */
- @Value("${mqtt.keepalive}")
- private int keepAlive;
-
- /**
- * 蹇冭烦鍖呯骇鍒�
- */
- @Value("${mqtt.qos}")
- private int qos;
-
- /**
- * 鏈嶅姟绔繛鎺ヨ秴鏃舵椂闂�
- */
- @Value("${mqtt.completion-timeout}")
- private int completionTimeout;
-
- /**
- * clientId
- */
- @Value("${mqtt.clientId}")
- private String clientId;
-
- /**
- * 璁㈤槄涓婚
- */
- @Value("${mqtt.subscribe}")
- private String subscribe;
}
diff --git a/ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java b/ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java
index 854d84a..7474bee 100644
--- a/ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java
+++ b/ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java
@@ -1,13 +1,16 @@
package com.ruoyi.web;
+import com.ruoyi.device.mqtt.DeviceMqttConfig;
import com.ruoyi.device.mqtt.MQClient;
import com.ruoyi.device.mqtt.MQConfig;
import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
+
+import java.util.Objects;
@Component
@Slf4j
@@ -16,14 +19,18 @@
@Autowired
private MQConfig mqConfig;
- @Value("${mqtt.client}")
- private Boolean client;
+// @Value("${mqtt.client}")
+// private Boolean client;
@Override
- public void run(ApplicationArguments args) throws Exception {
- if (client) {
- MQClient mqttPushClient = new MQClient();
- mqttPushClient.connect(mqConfig);
+ public void run(ApplicationArguments args) throws MqttException {
+ if(Objects.nonNull(mqConfig)){
+ for (DeviceMqttConfig deviceMqttConfig : mqConfig.getMqtt()) {
+ if (deviceMqttConfig.getClient()) {
+ MQClient mqttPushClient = new MQClient();
+ mqttPushClient.connect(deviceMqttConfig);
+ }
+ }
}
}
}
diff --git a/ruoyi-admin-ztns/src/main/resources/application-druid.yml b/ruoyi-admin-ztns/src/main/resources/application-druid.yml
index 0cee7d2..fef6444 100644
--- a/ruoyi-admin-ztns/src/main/resources/application-druid.yml
+++ b/ruoyi-admin-ztns/src/main/resources/application-druid.yml
@@ -162,13 +162,24 @@
# 澶囨敞 18083瀵嗙爜锛歾ttZTT123!@
mqtt:
- url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p
- username: 2455220 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚�
- password: 108300 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜
- timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級
- keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級
- qos: 1 # 蹇冭烦鍖呯骇鍒�
- completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級
- clientId: ztns # clientId
- subscribe: /ztt/v3/2455220/publish # 璁㈤槄涓婚
- client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿�
+ - url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p
+ username: 2455220 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚�
+ password: 108300 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜
+ timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級
+ keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級
+ qos: 1 # 蹇冭烦鍖呯骇鍒�
+ completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級
+ clientId: ztns01 # clientId
+ subscribe: /ztt/v3/2455220/publish # 璁㈤槄涓婚
+ client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿�
+ - url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p
+ username: 2455221 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚�
+ password: 108295 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜
+ timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級
+ keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級
+ qos: 1 # 蹇冭烦鍖呯骇鍒�
+ completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級
+ clientId: ztns02 # clientId
+ subscribe: /ztt/v3/2455221/publish # 璁㈤槄涓婚
+ client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿�
+
diff --git a/ruoyi-admin-ztns/src/main/resources/application-ztns.yml b/ruoyi-admin-ztns/src/main/resources/application-ztns.yml
index 1b88ba5..7c90b2b 100644
--- a/ruoyi-admin-ztns/src/main/resources/application-ztns.yml
+++ b/ruoyi-admin-ztns/src/main/resources/application-ztns.yml
@@ -163,13 +163,23 @@
# 澶囨敞 18083瀵嗙爜锛歾ttZTT123!@
mqtt:
- url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p
- username: 2455220 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚�
- password: 108300 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜
- timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級
- keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級
- qos: 1 # 蹇冭烦鍖呯骇鍒�
- completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級
- clientId: ztns # clientId
- subscribe: /ztt/v3/2455220/publish # 璁㈤槄涓婚
- client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿�
+ - url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p
+ username: 2455220 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚�
+ password: 108300 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜
+ timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級
+ keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級
+ qos: 1 # 蹇冭烦鍖呯骇鍒�
+ completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級
+ clientId: ztns # clientId
+ subscribe: /ztt/v3/2455220/publish # 璁㈤槄涓婚
+ client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿�
+ - url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p
+ username: 2455221 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚�
+ password: 108295 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜
+ timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級
+ keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級
+ qos: 1 # 蹇冭烦鍖呯骇鍒�
+ completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級
+ clientId: ztns # clientId
+ subscribe: /ztt/v3/2455221/publish # 璁㈤槄涓婚
+ client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿�
--
Gitblit v1.9.3