WPubsubClient.mjs

import mqtt from 'mqtt'
// import mqtt from 'mqtt/build/index.js' //nodejs執行時真正入口, 為package.json內main, 但因路徑與自動補.js導致錯誤無法使用
// import mqtt from '../node_modules/mqtt/build/index.js' //nodejs執行時真正入口, 為package.json內main, 因新版import模組無法識別.js會再另外補.js, 以及相對路徑無法識別問題, 故須改用此格式import
import get from 'lodash-es/get.js'
import isestr from 'wsemi/src/isestr.mjs'
import ispint from 'wsemi/src/ispint.mjs'
import cint from 'wsemi/src/cint.mjs'
import j2o from 'wsemi/src/j2o.mjs'
import evem from 'wsemi/src/evem.mjs'
import genID from 'wsemi/src/genID.mjs'
import genPm from 'wsemi/src/genPm.mjs'
import waitFun from 'wsemi/src/waitFun.mjs'


/**
 * 建立一個 MQTT 客戶端,支援持久連線、Token 驗證、自動重連、訂閱與發佈功能
 *
 * @param {Object} [opt={}] - 設定選項
 * @param {String} [opt.url='mqtt://localhost'] - MQTT broker 連線 URL
 * @param {Number} [opt.port=8080] - Broker 連線 port
 * @param {String} [opt.token=''] - 連線時用來驗證的 Token
 * @param {String} [opt.clientId] - 指定 Client ID,若未指定則自動產生
 * @param {Number} [opt.timeReconnect=2000] - 斷線後重新連線的間隔時間(毫秒)
 * @returns {Object} - 傳回一個具有 `subscribe`、`unsubscribe`、`publish`、`clear` 方法的事件物件
 * @example
 *
 * import w from 'wsemi'
 * import WPubsubClient from './src/WPubsubClient.mjs'
 * // import WPubsubClient from './dist/w-pubsub-client.umd.js'
 * // import WPubsubClient from './dist/w-pubsub-client.wk.umd.js'
 *
 * let test = async () => {
 *     let pm = w.genPm()
 *
 *     let ms = []
 *
 *     let clientId = 'id-for-client'
 *
 *     let opt = {
 *         port: 8080,
 *         token: 'token-for-test',
 *         clientId,
 *     }
 *     let wpc = new WPubsubClient(opt)
 *     // console.log('wpc', wpc)
 *
 *     let topic = 'task'
 *
 *     wpc.on('connect', () => {
 *         console.log('connect')
 *         ms.push({ clientId: `connect` })
 *     })
 *     wpc.on('reconnect', () => {
 *         console.log('reconnect')
 *     })
 *     wpc.on('offline', () => {
 *         console.log('offline')
 *     })
 *     wpc.on('message', ({ topic, message }) => {
 *         console.log(`message`, topic, message)
 *         ms.push({ clientId: `receive topic[${topic}], message[${message}]` })
 *     })
 *     wpc.on('close', () => {
 *         console.log('close')
 *         ms.push({ clientId: `close` })
 *     })
 *     wpc.on('end', () => {
 *         console.log('end')
 *     })
 *     wpc.on('error', (err) => {
 *         console.log('error', err)
 *     })
 *
 *     await wpc.subscribe(topic, 2)
 *         .then((res) => {
 *             console.log('subscribe then', res)
 *             ms.push({ clientId: `subscribe`, subscriptions: JSON.stringify(res) })
 *         })
 *         .catch((err) => {
 *             console.log('subscribe catch', err)
 *         })
 *
 *     await wpc.publish(topic, 'result', 2)
 *         .then((res) => {
 *             console.log('publish then', res)
 *             ms.push({ clientId: `publish`, res })
 *         })
 *         .catch((err) => {
 *             console.log('publish catch', err)
 *         })
 *
 *     setTimeout(async() => {
 *         await wpc.clear()
 *         try { //使用worker版時要另外呼叫terminate中止
 *             wpc.terminate()
 *         }
 *         catch (err) {}
 *         console.log('ms', ms)
 *         pm.resolve(ms)
 *     }, 5000)
 *
 *     return pm
 * }
 * await test()
 *     .catch((err) => {
 *         console.log(err)
 *     })
 * // => ms [
 * //   { clientId: 'connect' },
 * //   {
 * //     clientId: 'subscribe',
 * //     subscriptions: '[{"topic":"task","qos":2}]'
 * //   },
 * //   { clientId: 'publish', res: 'done' },
 * //   { clientId: 'receive topic[task]' },
 * //   { clientId: 'close' }
 * // ]
 *
 */
function WPubsubClient(opt = {}) {

    //keyMsg
    let keyMsg = '__msg__'

    //url
    let url = get(opt, 'url')
    if (!isestr(url)) {
        url = 'mqtt://localhost'
    }

    //port
    let port = get(opt, 'port')
    if (!ispint(port)) {
        port = 8080
    }
    port = cint(port)

    //token
    let token = get(opt, 'token')
    if (!isestr(token)) {
        token = ''
    }

    //clientId
    let clientId = get(opt, 'clientId')
    if (!isestr(clientId)) {
        clientId = `cl-${genID()}`
    }

    //timeReconnect
    let timeReconnect = get(opt, 'timeReconnect')
    if (!ispint(timeReconnect)) {
        timeReconnect = 2000
    }
    timeReconnect = cint(timeReconnect)

    //urlBroker
    let urlBroker = `${url}:${port}`

    //client
    let client = mqtt.connect(urlBroker, {
        clientId,
        username: token, //提供token驗證
        password: '', //不提供
        clean: false, //設定持久Session(離線補收)
        reconnectPeriod: timeReconnect, //斷線後自動重連時間
    })

    //ev
    let ev = evem()

    //online
    let online = false

    //connect
    client.on('connect', () => {
        // console.log(`client in`, clientId)
        online = true
        ev.emit('connect')
    })

    //reconnect, 自動重連期間每次retry都會觸發一次
    client.on('reconnect', () => {
        // console.log(`client reconnect`, clientId)
        ev.emit('reconnect')
    })

    //subscribe
    let subscribe = async(topic, qos = 2) => {
        //qos:
        //0, 最多送一次
        //1, 至少送一次, 保證送到但可能重複送
        //2, 剛好送一次, 保證只送一次且不重複

        //pm
        let pm = genPm()

        //wait online
        await waitFun(() => {
            return online
        })

        //訂閱主題
        client.subscribe(topic, { qos }, (err, granted) => {
            // granted => [
            //   { topic: 'generate/report', qos: 2 },
            //   { topic: 'news/update', qos: 1 }
            // ]
            if (err) {
                pm.reject(err)
            }
            else {
                pm.resolve(granted)
            }
        })

        return pm
    }

    //unsubscribe
    let unsubscribe = async(topic) => {

        //pm
        let pm = genPm()

        //wait online
        await waitFun(() => {
            return online
        })

        //取消訂閱主題
        client.unsubscribe(topic, (err) => {
            if (err) {
                pm.reject(err)
            }
            else {
                pm.resolve()
            }
        })

        return pm
    }

    //publish
    let publish = async (topic, msg, qos = 2) => {
        //qos:
        //0, 最多送一次
        //1, 至少送一次, 保證送到但可能重複送
        //2, 剛好送一次, 保證只送一次且不重複

        //wait online
        await waitFun(() => {
            return online
        })

        //payload, 型別可支援: String, Buffer, Uint8Array, Number, Object(要JSON.stringify)
        let payload = JSON.stringify({ [keyMsg]: msg }) //封裝至msg可簡化使用型別, 僅支援Object, String, Number, Boolean
        // console.log('msg', msg)
        // console.log('payload', payload)

        //pm
        let pm = genPm()

        //發布主題
        client.publish(topic, payload, { qos }, (err) => {
            if (err) {
                pm.reject(err)
            }
            else {
                pm.resolve('done')
            }
        })

        return pm
    }

    //message
    client.on('message', (topic, message) => {
        // console.log(`client receive`, clientId, topic, message)
        let _message = ''
        try {
            let j = message.toString() //mqtt接收message時會變成Buffer, 得轉message.toString()
            // console.log('message j', j)
            let o = j2o(j)
            // console.log('message o', o)
            _message = get(o, keyMsg, '')
        }
        catch (err) {
            console.log(err)
        }
        ev.emit('message', { topic, message: _message })
    })

    // //simplifyError
    // let simplifyError = (err) => {
    //     if (err instanceof AggregateError && Array.isArray(err.errors)) {
    //         return err.errors.map(e => `[${e.address}:${e.port}] ${e.code}`).join(' | ')
    //     }
    //     if (err instanceof Error) {
    //         return err.message || String(err)
    //     }
    //     return String(err)
    // }

    //error
    client.on('error', (err) => {
        // console.log(`client error`, clientId, err.message)
        ev.emit('error', err)
        // ev.emit('error', simplifyError(err), err)
    })

    //offline, client判定已離線無法再與broker溝通時觸發
    client.on('offline', () => {
        // console.log(`client offline`, clientId)
        online = false
        ev.emit('offline')
    })

    //close
    client.on('close', () => {
        // console.log(`client close`, clientId)
        online = false
        ev.emit('close')
    })

    //end, 呼叫client.end()後觸發
    client.on('end', () => {
        // console.log(`client end`, clientId)
        online = false
        ev.emit('end')
    })

    //clear
    let clear = () => {
        return new Promise((resolve, reject) => {
            client.end(false, {}, (err) => {
                if (err) {
                    return reject(err)
                }
                resolve()
            })
        })
    }

    //save
    ev.subscribe = subscribe
    ev.unsubscribe = unsubscribe
    ev.publish = publish
    ev.clear = clear

    return ev
}


export default WPubsubClient