zhuo
2025-02-22 ed8d0914b272da2006b2ec53cc7374d2e0734de8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.ruoyi.device.mqtt;
 
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Component;
 
@Component
@Slf4j
public class MQPublic {
 
    /**
     * 通信质量qos为0:表示WEB服务器向MQTT服务器只发送一次,不管MQTT服务器有没有收到
     * WEB服务器向MQTT服务器发布数据,此方法封装了publish函数
     * @param topic 向底层网关发送数据,官方话:称此为主题,向那个主题发送数据 网关:即主题,设备唯一标识
     * @param pushMessage WEB服务器向MQTT服务器发送的数据
     */
    public void publish_0(int qos, String topic, byte[] pushMessage) {
        publish(0, false, topic, pushMessage);
    }
 
    /**
     * 通信质量qos为1:表示WEB服务器向MQTT服务器发送数据,MQTT服务器一定会收到一次数据,如果MQTT服务器没有响应“收到数据”,那么WEB服务器就会一直发送数据
     * WEB服务器向MQTT服务器发布数据,调用此函数,此方法封装了publish函数,通信质量qos为1
     * @param topic 向底层网关发送数据,官方话:称此为主题,向那个主题发送数据 网关:即主题,设备唯一标识
     * @param pushMessage WEB服务器向MQTT服务器发送的数据
     */
    public void publish_1(String topic, byte[] pushMessage) {
        publish(1, false, topic, pushMessage);
    }
 
    /**
     *通信质量qos为2:表示WEB服务器向MQTT服务器发送数据,两者会进行至少两次请求/响应流程,避免数据在传输中的丢失,但是相应的也会消耗计算机中的资源
     * WEB服务器向MQTT服务器发布数据,此方法封装了publish函数,通信质量:2
     * @param topic 向底层网关发送数据,官方话:称此为主题,向那个主题发送数据 网关:即主题,设备唯一标识
     * @param pushMessage WEB服务器向MQTT服务器发送的数据
     */
    public void publish_2(String topic, byte[] pushMessage) {
        publish(2,false, topic, pushMessage);
    }
 
    /**
     * 发布函数:WEB服务器向MQTT服务器发送数据
     *
     * @param qos 通信质量
     * @param retained 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费)
     * @param topic 向底层网关发送数据,官方话:称此为主题,向那个主题发送数据 网关:即主题,设备唯一标识
     * @param pushMessage WEB服务器向MQTT服务器发送的数据
     */
    public void publish(int qos, boolean retained, String topic, byte[] pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        // 将String[]数组转换为byte数组发送
        message.setPayload(pushMessage);
        MqttTopic mTopic = MQClient.getClient().getTopic(topic);
        if (null == mTopic) {
            log.error("===============>>>MQTT {} 不存在<<=======================",topic);
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}