From 69310a7de3d963c2bf46250b0965a2c7e8532f1e Mon Sep 17 00:00:00 2001 From: zouyu <2723363702@qq.com> Date: 星期二, 29 七月 2025 13:40:56 +0800 Subject: [PATCH] mqtt调整:实现订阅多个mqtt服务 --- cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java | 57 +---------- ruoyi-admin-ztns/src/main/resources/application-ztns.yml | 30 ++++-- cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java | 10 +- ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java | 21 ++- cnas-device/src/main/java/com/ruoyi/device/mqtt/DeviceMqttConfig.java | 72 ++++++++++++++ cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java | 6 cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java | 35 +++--- ruoyi-admin-ztns/src/main/resources/application-druid.yml | 31 ++++-- 8 files changed, 157 insertions(+), 105 deletions(-) diff --git a/cnas-device/src/main/java/com/ruoyi/device/mqtt/DeviceMqttConfig.java b/cnas-device/src/main/java/com/ruoyi/device/mqtt/DeviceMqttConfig.java new file mode 100644 index 0000000..4b654ed --- /dev/null +++ b/cnas-device/src/main/java/com/ruoyi/device/mqtt/DeviceMqttConfig.java @@ -0,0 +1,72 @@ +package com.ruoyi.device.mqtt; + +import lombok.Data; +import org.springframework.stereotype.Component; + +/** + * mqtt杩炴帴鍙傛暟瀹炰綋瀵硅薄 + */ +@Data +@Component +public class DeviceMqttConfig { + + /** + * MQTT-鏈嶅姟绔�-IP + */ + // @Value("${mqtt.url}") + private String url; + + /** + * MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚� + */ + // @Value("${mqtt.username}") + private String username; + + /** + * MQTT-鏈嶅姟绔�-瀵嗙爜 + */ + // @Value("${mqtt.password}") + private String password; + + /** + * 瓒呮椂鏃堕棿 + */ + // @Value("${mqtt.timeout}") + private int timeout; + + /** + * 蹇冭烦妫�娴嬫椂闂� + */ + // @Value("${mqtt.keepalive}") + private int keepalive; + + /** + * 蹇冭烦鍖呯骇鍒� + */ + // @Value("${mqtt.qos}") + private int qos; + + /** + * 鏈嶅姟绔繛鎺ヨ秴鏃舵椂闂� + */ + // @Value("${mqtt.completion-timeout}") + private int completionTimeout; + + /** + * clientId + */ + // @Value("${mqtt.clientId}") + private String clientId; + + /** + * 璁㈤槄涓婚 + */ + // @Value("${mqtt.subscribe}") + private String subscribe; + + /** + * mqtt杩炴帴寮�鍏� + */ + private Boolean client; + +} diff --git a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java index d429673..c9157f9 100644 --- a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java +++ b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQBean.java @@ -6,9 +6,9 @@ @Component public class MQBean { - @Bean("mqClient") // 鍚姩WEB鏈嶅姟鍣ㄧ殑鏃跺�欒皟鐢ㄦ鏂规硶鍒濆鍖� - public MQClient myMQTTClient(){ - MQClient mqClient = new MQClient(); - return mqClient; - } +// @Bean("mqClient") // 鍚姩WEB鏈嶅姟鍣ㄧ殑鏃跺�欒皟鐢ㄦ鏂规硶鍒濆鍖� +// public MQClient myMQTTClient(){ +// MQClient mqClient = new MQClient(); +// return mqClient; +// } } diff --git a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java index 9cc678f..f85eab0 100644 --- a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java +++ b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQCallback.java @@ -1,15 +1,8 @@ package com.ruoyi.device.mqtt; -import cn.hutool.core.collection.CollectionUtil; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.alibaba.fastjson.TypeReference; -import com.ruoyi.device.constant.DCResistanceMqttConstants; import com.ruoyi.device.service.CollectBridgeService; -import com.ruoyi.device.vo.DCResistanceMqttVO; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; @@ -17,17 +10,14 @@ import javax.annotation.PostConstruct; import javax.annotation.Resource; -import java.util.List; -import java.util.Map; -import java.util.Objects; @Component @Slf4j -public class MQCallback<component> implements MqttCallback { +public class MQCallback implements MqttCallback { - private MQClient mqClient; // MQTT杩炴帴鏁版嵁 + private final MQClient mqClient; // MQTT杩炴帴鏁版嵁 - private MQConfig mqConfig; // yml閰嶇疆鏁版嵁 + private final DeviceMqttConfig mqConfig; // yml閰嶇疆鏁版嵁 private static MQCallback mqCallback; @@ -41,7 +31,7 @@ mqCallback.collectBridgeService = this.collectBridgeService; } - public MQCallback(MQClient mqClient, MQConfig mqConfig) { + public MQCallback(MQClient mqClient, DeviceMqttConfig mqConfig) { this.mqClient = mqClient; this.mqConfig = mqConfig; } @@ -74,17 +64,24 @@ * MQTT鏈嶅姟鍣ㄥ悜WEB鏈嶅姟鍣ㄥ彂閫佺殑鏁版嵁浼氭墽琛屽埌杩欓噷闈紝瀹樻柟璇濈О涓猴細璁㈤槄鍚庣殑娑堟伅 * @param topic 涓婚锛氫篃绉颁负搴曞眰缃戝叧鍞竴鏍囪瘑 * @param message 淇℃伅 - * @throws Exception 鎶ラ敊 */ @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { + public void messageArrived(String topic, MqttMessage message) { try { String parse = new String(message.getPayload()); -// JSONObject jsonObject = JSONObject.parseObject(parse); + switch (topic){ + case "/ztt/v3/2455220/publish": + //鑰愪笣锛氱洿娴佺數闃绘暟鎹В鏋� + mqCallback.collectBridgeService.addBridgeValueByNS(parse); + break; + case "/ztt/v3/2455221/publish": + //鑰愪笣锛氫几闀跨巼鏁版嵁瑙f瀽 + log.info("浼搁暱鐜囨秷鎭綋锛歿}",parse); + break; + } // 濉厖閲囬泦鏁版嵁 +// JSONObject jsonObject = JSONObject.parseObject(parse); // mqCallback.collectBridgeService.addBridgeValue(jsonObject); - //鑰愪笣锛氱洿娴佺數闃绘暟鎹В鏋� - mqCallback.collectBridgeService.addBridgeValueByNS(parse); } catch (Exception e) { e.printStackTrace(); diff --git a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java index 1c3c548..9f002c5 100644 --- a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java +++ b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQClient.java @@ -52,13 +52,13 @@ * WEB鏈嶅姟鍣ㄨ繛鎺QTT鏈嶅姟鍣ㄥ嚱鏁� * @param mqttConfig yml涓璏QTT鐨勯厤缃� */ - public void connect(MQConfig mqttConfig) throws MqttException { + public void connect(DeviceMqttConfig mqttConfig) throws MqttException { client = new MqttClient(mqttConfig.getUrl(), mqttConfig.getClientId(), new MemoryPersistence()); MqttConnectOptions options = getOption(mqttConfig.getUsername(), mqttConfig.getPassword(), - mqttConfig.getTimeout(), mqttConfig.getKeepAlive()); + mqttConfig.getTimeout(), mqttConfig.getKeepalive()); MQClient.setClient(client); //杩炴帴澶辫触璋冪敤鍥炶皟鍑芥暟锛岄噸鏂拌繛鎺� - client.setCallback(new MQCallback<Object>(this, mqttConfig)); + client.setCallback(new MQCallback(this, mqttConfig)); if (!client.isConnected()) { client.connect(options); // 璁㈤槄涓婚 diff --git a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java index a88b29c..c00e36c 100644 --- a/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java +++ b/cnas-device/src/main/java/com/ruoyi/device/mqtt/MQConfig.java @@ -1,64 +1,19 @@ package com.ruoyi.device.mqtt; import lombok.Data; -import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; + +import java.util.List; @Component @Data +@ConfigurationProperties(prefix = "") public class MQConfig { /** - * MQTT-鏈嶅姟绔�-IP + * 璇诲彇yml鐨刴qtt閰嶇疆淇℃伅 */ - @Value("${mqtt.url}") - private String url; + private List<DeviceMqttConfig> mqtt; - /** - * MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚� - */ - @Value("${mqtt.username}") - private String username; - - /** - * MQTT-鏈嶅姟绔�-瀵嗙爜 - */ - @Value("${mqtt.password}") - private String password; - - /** - * 瓒呮椂鏃堕棿 - */ - @Value("${mqtt.timeout}") - private int timeout; - - /** - * 蹇冭烦妫�娴嬫椂闂� - */ - @Value("${mqtt.keepalive}") - private int keepAlive; - - /** - * 蹇冭烦鍖呯骇鍒� - */ - @Value("${mqtt.qos}") - private int qos; - - /** - * 鏈嶅姟绔繛鎺ヨ秴鏃舵椂闂� - */ - @Value("${mqtt.completion-timeout}") - private int completionTimeout; - - /** - * clientId - */ - @Value("${mqtt.clientId}") - private String clientId; - - /** - * 璁㈤槄涓婚 - */ - @Value("${mqtt.subscribe}") - private String subscribe; } diff --git a/ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java b/ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java index 854d84a..7474bee 100644 --- a/ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java +++ b/ruoyi-admin-ztns/src/main/java/com/ruoyi/web/MqttApplicationRunner.java @@ -1,13 +1,16 @@ package com.ruoyi.web; +import com.ruoyi.device.mqtt.DeviceMqttConfig; import com.ruoyi.device.mqtt.MQClient; import com.ruoyi.device.mqtt.MQConfig; import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttException; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; + +import java.util.Objects; @Component @Slf4j @@ -16,14 +19,18 @@ @Autowired private MQConfig mqConfig; - @Value("${mqtt.client}") - private Boolean client; +// @Value("${mqtt.client}") +// private Boolean client; @Override - public void run(ApplicationArguments args) throws Exception { - if (client) { - MQClient mqttPushClient = new MQClient(); - mqttPushClient.connect(mqConfig); + public void run(ApplicationArguments args) throws MqttException { + if(Objects.nonNull(mqConfig)){ + for (DeviceMqttConfig deviceMqttConfig : mqConfig.getMqtt()) { + if (deviceMqttConfig.getClient()) { + MQClient mqttPushClient = new MQClient(); + mqttPushClient.connect(deviceMqttConfig); + } + } } } } diff --git a/ruoyi-admin-ztns/src/main/resources/application-druid.yml b/ruoyi-admin-ztns/src/main/resources/application-druid.yml index 0cee7d2..fef6444 100644 --- a/ruoyi-admin-ztns/src/main/resources/application-druid.yml +++ b/ruoyi-admin-ztns/src/main/resources/application-druid.yml @@ -162,13 +162,24 @@ # 澶囨敞 18083瀵嗙爜锛歾ttZTT123!@ mqtt: - url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p - username: 2455220 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚� - password: 108300 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜 - timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級 - keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級 - qos: 1 # 蹇冭烦鍖呯骇鍒� - completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級 - clientId: ztns # clientId - subscribe: /ztt/v3/2455220/publish # 璁㈤槄涓婚 - client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿� + - url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p + username: 2455220 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚� + password: 108300 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜 + timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級 + keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級 + qos: 1 # 蹇冭烦鍖呯骇鍒� + completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級 + clientId: ztns01 # clientId + subscribe: /ztt/v3/2455220/publish # 璁㈤槄涓婚 + client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿� + - url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p + username: 2455221 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚� + password: 108295 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜 + timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級 + keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級 + qos: 1 # 蹇冭烦鍖呯骇鍒� + completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級 + clientId: ztns02 # clientId + subscribe: /ztt/v3/2455221/publish # 璁㈤槄涓婚 + client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿� + diff --git a/ruoyi-admin-ztns/src/main/resources/application-ztns.yml b/ruoyi-admin-ztns/src/main/resources/application-ztns.yml index 1b88ba5..7c90b2b 100644 --- a/ruoyi-admin-ztns/src/main/resources/application-ztns.yml +++ b/ruoyi-admin-ztns/src/main/resources/application-ztns.yml @@ -163,13 +163,23 @@ # 澶囨敞 18083瀵嗙爜锛歾ttZTT123!@ mqtt: - url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p - username: 2455220 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚� - password: 108300 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜 - timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級 - keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級 - qos: 1 # 蹇冭烦鍖呯骇鍒� - completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級 - clientId: ztns # clientId - subscribe: /ztt/v3/2455220/publish # 璁㈤槄涓婚 - client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿� + - url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p + username: 2455220 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚� + password: 108300 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜 + timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級 + keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級 + qos: 1 # 蹇冭烦鍖呯骇鍒� + completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級 + clientId: ztns # clientId + subscribe: /ztt/v3/2455220/publish # 璁㈤槄涓婚 + client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿� + - url: tcp://mqtt-ztt.zttiot.com:1883 # 鏈嶅姟鍣╥p + username: 2455221 # MQTT-鏈嶅姟绔�-鐢ㄦ埛鍚�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍鍚� + password: 108295 # MQTT-鏈嶅姟绔�-瀵嗙爜锛�,鍚庢湡浼氫慨鏀逛负鐢ㄦ埛鐧诲綍瀵嗙爜 + timeout: 100 # 瓒呮椂鏃堕棿 锛堝崟浣嶏細绉掞級 + keepalive: 60 # 蹇冭烦 锛堝崟浣嶏細绉掞級 + qos: 1 # 蹇冭烦鍖呯骇鍒� + completion-timeout: 3000 # 杩炴帴瓒呮椂鏃堕棿锛堝崟浣嶏細绉掞級 + clientId: ztns # clientId + subscribe: /ztt/v3/2455221/publish # 璁㈤槄涓婚 + client: true # 濡傛灉寮�鍙戦渶瑕佸惎鍔ㄦ祴璇曪紝闇�瑕佹敼涓篺alse涓嶇劧浼氫竴鐩存姤閿� -- Gitblit v1.9.3