zss
2025-04-17 e9bf5a0070b5fbdf9afd4475adbf51e46f579040
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package com.ruoyi.device.mqtt;
 
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
 
@Component
@Slf4j
public class MQClient {
 
    private static MqttClient client;
    public static MqttClient getClient() {
        return client;
    }
    public static void setClient(MqttClient client) {
        MQClient.client = client;
    }
 
    /**
     * WEB服务器连接MQTT服务器的配置
     * @param userName 账号
     * @param password 密码
     * @param outTime 超时时间
     * @param KeepAlive 心跳检测时间
     * @return
     */
    private MqttConnectOptions getOption(String userName, String password, int outTime, int KeepAlive) {
        MqttConnectOptions option = new MqttConnectOptions();
        // 设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
        option.setCleanSession(true);
        // 设置连接的用户名
        option.setUserName(userName);
        // 设置连接的密码
        option.setPassword(password.toCharArray());
        // 设置超时时间 单位为秒
        option.setConnectionTimeout(outTime);
        // 设置会话心跳时间 单位为秒 服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        option.setKeepAliveInterval(KeepAlive);
        // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
        // option.setWill(topic, "close".getBytes(), 2, true);
        //设置最大速度
        option.setMaxInflight(1000);
        log.info("================>>>MQTT连接认证成功<<======================");
        return option;
    }
 
    /**
     * WEB服务器连接MQTT服务器函数
     * @param mqttConfig yml中MQTT的配置
     */
    public void connect(MQConfig mqttConfig) throws MqttException {
        client = new MqttClient(mqttConfig.getUrl(), mqttConfig.getClientId(), new MemoryPersistence());
        MqttConnectOptions options = getOption(mqttConfig.getUsername(), mqttConfig.getPassword(),
                mqttConfig.getTimeout(), mqttConfig.getKeepAlive());
        MQClient.setClient(client);
        //连接失败调用回调函数,重新连接
        client.setCallback(new MQCallback<Object>(this, mqttConfig));
        if (!client.isConnected()) {
            client.connect(options);
            // 订阅主题
            MQSubscribe.subscribe_0(mqttConfig.getSubscribe());
            log.info("================>>>MQTT连接成功<<======================");
        } else {// 这里的逻辑是如果连接不成功就重新连接
            client.disconnect();
            client.connect(options);
        }
    }
 
    /**
     * WEB服务器与MQTT服务器的断线重连
     * @throws Exception
     */
    public Boolean reConnect() throws Exception {
        Boolean isConnected = false;
        if (null != client) {
            client.connect();
            if (client.isConnected()) {
                isConnected = true;
            }
        }
        return isConnected;
    }
 
    /**
     * 异常关闭服务,WEB服务器与MQTT服务器的断开连接函数
     */
    @SneakyThrows
    public void close(){
        client.close();
        client.disconnect();
        log.info("================>>异常关闭与mqtt服务器的连接<<======================");
    }
}