package com.ruoyi.device.mqtt; import com.ruoyi.device.service.CollectBridgeService; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; @Component @Slf4j public class MQCallback implements MqttCallback { private final MQClient mqClient; // MQTT连接数据 private final DeviceMqttConfig mqConfig; // yml配置数据 private static MQCallback mqCallback; @Resource private CollectBridgeService collectBridgeService; @PostConstruct public void init() { mqCallback = this; // 初使化时将已静态化的configParam实例化 mqCallback.collectBridgeService = this.collectBridgeService; } public MQCallback(MQClient mqClient, DeviceMqttConfig mqConfig) { this.mqClient = mqClient; this.mqConfig = mqConfig; } /** 连接丢失后,一般在这里面进行重连 **/ @SneakyThrows @Override public void connectionLost(Throwable cause) { /** 连接丢失后,一般在这里面进行重连 **/ if (mqClient != null) { while (true) { try { log.info("==============》》》[MQTT] 连接丢失,尝试重连..."); MQClient mqttPushClient = new MQClient(); mqttPushClient.connect(mqConfig); if (mqClient.getClient().isConnected()) { log.info("=============>>重连成功"); } break; } catch (Exception e) { log.error("=============>>>[MQTT] 连接断开,重连失败!<<============="); continue; } } } log.info(cause.getMessage()); } /** * MQTT服务器向WEB服务器发送的数据会执行到这里面,官方话称为:订阅后的消息 * @param topic 主题:也称为底层网关唯一标识 * @param message 信息 */ @Override public void messageArrived(String topic, MqttMessage message) { try { String parse = new String(message.getPayload()); switch (topic){ case "/ztt/v3/2455220/publish": //耐丝:直流电阻数据解析 mqCallback.collectBridgeService.addBridgeValueByNS(parse); break; case "/ztt/v3/2455221/publish": //耐丝:伸长率数据解析 log.info("伸长率消息体:{}",parse); break; } // 填充采集数据 // JSONObject jsonObject = JSONObject.parseObject(parse); // mqCallback.collectBridgeService.addBridgeValue(jsonObject); } catch (Exception e) { e.printStackTrace(); log.info("============》》接收消息主题异常 : " + e.getMessage()); } } /** * WEB服务器向MQTT服务器发送的数据会执行到这里面 * 官方话称为:发布后会执行到这里 * @param token 连接token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { // log.info("==========发布信息={}==========", token.isComplete()); } }