gaoluyang
3 天以前 92230c9a97dc9ce9df3313d11d26999c04bb6b26
src/plugins/mqttclient.ts
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,165 @@
import { generateUUID } from "@/utils/geek";
import * as mqtt from "mqtt/dist/mqtt";
const enableJSONDecoding = true // å¼€å¯JSON解析消息,需要开启JSON解析消息才能开启uuid和event
const enableJSONEncoding = true // å¼€å¯JSON消息编码
const enableUUID = true // éœ€è¦æŽ¥æ”¶ä¿¡æ¯ä¸­åŒ…含uuid字段,uuid优先级高于event
type MqttConnectOptions = {
    clean: boolean,
    connectTimeout: number,
    clientId: string, // è®¤è¯ä¿¡æ¯
    keepalive: number,
    protocolId?: string, // é»˜è®¤ 'MQTT'
    protocolVersion?: number, // é»˜è®¤4
    username?: string,
    password?: string,
}
type MqttConnectConfig = {
    url?: string,
    options: MqttConnectOptions
}
interface MqttClient {
    connected:boolean,
    on: {
        (event: "connect", fun: () => void): void;
        (event: "reconnect", fun: () => void): void;
        (event: "disconnect", fun: () => void): void;
        (event: "offline", fun: () => void): void;
        (event: "message", fun: (topic: string, message: any, packet?: any) => void): void;
        (event: "packetsend", fun: (packet: any) => void): void;
        (event: "packetreceive", fun: (packet: any) => void): void;
        (event: "error", fun: (err: any) => void): void;
    };
    publish: {
        (topic: string, message: any, options?: any, callback?: (err: any) => void): void;
    }
    subscribe: {
        (topic: string, options?: any, callback?: (err: any, granted: { topic: string, qos: any }) => void): void
    }
    unsubscribe: {
        (topic: string, callback?: (err: any) => void): void
    }
    end(force?:boolean, options?:any, callback?: (err: any) => void): void
}
let _client: MqttClient;
let _callback: { [key: string]: (data: any) => void } = {}
let onmassage = (topic:string,message:string)=>{}
export default {
    /**
     * è¿žæŽ¥websocket
     * æœ€ç®€å•的用法就是传入{url:"ws://demo"}
     * å½“连接成功后触发回调函数
     */
    connect(config: MqttConnectConfig) {
        return new Promise<void>((resolve, reject) => {
            if(!_client || !_client.connected){
                _client = mqtt.connect(config.url, config.options);
                _client.on('connect', resolve);
                _client.on('error', reject);
                _client.on('message', (topic,message) => {
                    if (enableJSONDecoding) {
                        let data = JSON.parse(message)
                        if (enableUUID && (data || {}).uuid !== undefined) {
                            _callback[data.uuid](data)
                            delete _callback[data.uuid]
                        }
                    }
                    onmassage(topic,message)
                })
            }
        })
    },
    /**
     * å‘送信息
     * @param msg æ¶ˆæ¯ï¼Œä¼šè¢«å¤„理成json字符串
     * @param uuid å”¯ä¸€æ ‡è¯†,可以传入uuid,也可以传入true自动生成uuid,flase表示该消息不需要单独处理
     * @returns
     */
    send(topic: string, msg: any, uuid: string | boolean = false, options: any = undefined) {
        return new Promise((resolve, reject) => {
            const useUUID = enableUUID && uuid != undefined && uuid != "" && uuid != false
            if (useUUID) {
                if (uuid === true) {
                    msg.uuid = generateUUID()
                    _callback[msg.uuid] = resolve
                } else {
                    _callback[uuid] = resolve
                }
            }
            if(enableJSONEncoding){
                msg = JSON.stringify(msg)
            }
            _client.publish(topic, msg, options, err => {
                if (err) {
                    reject(err)
                } else if (uuid === false) {
                    resolve(err)
                }
            })
        })
    },
    /**
     * å…³é—­è¿žæŽ¥
     * @returns å…³é—­è¿žæŽ¥çš„Promise,回调函数只会运行一次
     */
    close(force?: boolean, options?: any, callback?: (err: any) => void) {
        return new Promise<void>((resolve, reject) => {
            _client.end(force,options,()=>{
                resolve()
            })
        })
    },
    /**
     * è®¢é˜…事件
     * @param event è¦ç›‘听的事件
     * @returns åœ¨å›žè°ƒå‡½æ•°ä¸­å¤„理事件
     */
    subscribe(topic: string, options: string | undefined = undefined) {
        return new Promise<void>((resolve, reject) => {
            _client.subscribe(topic, options, err => {
                if (err) {
                    reject(err)
                } else {
                    resolve()
                }
            })
        })
    },
    /**
     * å–消订阅事件
     * @param event è¦å–消监听的事件
     */
    unsubscribe(topic: string) {
        return new Promise<void>((resolve, reject) => {
            _client.unsubscribe(topic, err => {
                if (err) {
                    reject(err)
                } else {
                    resolve()
                }
            })
        })
    },
    /**
     * å®šä¹‰é»˜è®¤ç›‘听事件
     * @param callback é»˜è®¤ç›‘听事件的处理函数
     */
    onMessage(callback: (topic:string,message:string) => void) {
        onmassage = callback
    },
    /**
     * å®šä¹‰å¼‚常事件
     * @param callback é»˜è®¤å¼‚常事件的处理函数
     */
    onError(callback: (data: any) => void) {
        _client.on('error', callback)
    },
    /**
     * å®šä¹‰å…³é—­äº‹ä»¶
     * @param callback é»˜è®¤å…³é—­äº‹ä»¶çš„处理函数
     */
    onClose(callback: () => void) {
        _client.on('disconnect', callback)
    }
}