package com.ruoyi.device.mqtt; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.stereotype.Component; @Component @Slf4j public class MQClient { private static MqttClient client; public static MqttClient getClient() { return client; } public static void setClient(MqttClient client) { MQClient.client = client; } /** * WEB服务器连接MQTT服务器的配置 * @param userName 账号 * @param password 密码 * @param outTime 超时时间 * @param KeepAlive 心跳检测时间 * @return */ private MqttConnectOptions getOption(String userName, String password, int outTime, int KeepAlive) { MqttConnectOptions option = new MqttConnectOptions(); // 设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接 option.setCleanSession(true); // 设置连接的用户名 option.setUserName(userName); // 设置连接的密码 option.setPassword(password.toCharArray()); // 设置超时时间 单位为秒 option.setConnectionTimeout(outTime); // 设置会话心跳时间 单位为秒 服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 option.setKeepAliveInterval(KeepAlive); // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 // option.setWill(topic, "close".getBytes(), 2, true); //设置最大速度 option.setMaxInflight(1000); log.info("================>>>MQTT连接认证成功<<======================"); return option; } /** * WEB服务器连接MQTT服务器函数 * @param mqttConfig yml中MQTT的配置 */ public void connect(MQConfig mqttConfig) throws MqttException { client = new MqttClient(mqttConfig.getUrl(), mqttConfig.getClientId(), new MemoryPersistence()); MqttConnectOptions options = getOption(mqttConfig.getUsername(), mqttConfig.getPassword(), mqttConfig.getTimeout(), mqttConfig.getKeepAlive()); MQClient.setClient(client); //连接失败调用回调函数,重新连接 client.setCallback(new MQCallback(this, mqttConfig)); if (!client.isConnected()) { client.connect(options); // 订阅主题 MQSubscribe.subscribe_0(mqttConfig.getSubscribe()); log.info("================>>>MQTT连接成功<<======================"); } else {// 这里的逻辑是如果连接不成功就重新连接 client.disconnect(); client.connect(options); } } /** * WEB服务器与MQTT服务器的断线重连 * @throws Exception */ public Boolean reConnect() throws Exception { Boolean isConnected = false; if (null != client) { client.connect(); if (client.isConnected()) { isConnected = true; } } return isConnected; } /** * 异常关闭服务,WEB服务器与MQTT服务器的断开连接函数 */ @SneakyThrows public void close(){ client.close(); client.disconnect(); log.info("================>>异常关闭与mqtt服务器的连接<<======================"); } }