WPubsubClientComu.mjs

import get from 'lodash-es/get.js'
import each from 'lodash-es/each.js'
import isestr from 'wsemi/src/isestr.mjs'
import isearr from 'wsemi/src/isearr.mjs'
import ispm from 'wsemi/src/ispm.mjs'
import genID from 'wsemi/src/genID.mjs'
import genPm from 'wsemi/src/genPm.mjs'
import pmSeries from 'wsemi/src/pmSeries.mjs'
import WPubsubClient from './WPubsubClient.mjs'


/**
 * 建立一個 MQTT 客戶端,支援持久連線、Token 驗證、自動重連、訂閱與發佈功能
 *
 * @param {String} serviceName - 服務名稱
 * @param {Array} funcs - 函數名稱陣列
 * @param {Object} [opt={}] - 設定選項
 * @param {String} [opt.url='mqtt://localhost'] - MQTT broker 連線 URL
 * @param {Number} [opt.port=8080] - Broker 連線 port
 * @param {String} [opt.token=''] - 連線時用來驗證的 Token
 * @param {String} [opt.clientId] - 指定 Client ID,若未指定則自動產生
 * @param {Number} [opt.timeReconnect=2000] - 斷線後重新連線的間隔時間(毫秒)
 * @returns {Object} - 傳回一個具有 `subscribe`、`unsubscribe`、`publish`、`clear` 方法的事件物件
 * @example

 */
function WPubsubClientComu(serviceName, funcs, opt = {}) {

    //check serviceName
    if (!isestr(serviceName)) {
        serviceName = '[empty]' //允許不當service只當client調用
    }

    //check funcs
    if (!isearr(funcs)) {
        funcs = [] //允許不當service只當client調用
    }

    //topics, kpTopic
    let topics = []
    let kpTopic = {}
    each(funcs, (func) => {
        let topic = `${serviceName}:${func}`
        topics.push(topic)
        kpTopic[topic] = func
    })
    // console.log('topics', topics)
    // console.log('kpTopic', kpTopic)

    //checkTopic
    let checkTopic = (topic) => {
        let b = topics.indexOf(topic) >= 0
        return b
    }

    //kpPm
    let kpPm = {}

    //wpc
    let wpc = new WPubsubClient(opt)

    //procSubscribe
    let procSubscribe = async(topics) => {

        //check
        if (!isearr(topics)) {
            topics = [topics]
        }

        await pmSeries(topics, async(topic) => {
            // console.log(`subscribe topic[${topic}]...`)

            //subscribe
            await wpc.subscribe(topic, 2)
                // .then((res) => {
                //     console.log(`subscribe topic[${topic}] then`, res)
                // })
                .catch((err) => {
                    console.log(`subscribe topic[${topic}] catch`, err)
                })
            // console.log(`subscribe topic[${topic}] done`)

        })
    }

    //訂閱topics
    procSubscribe(topics)
        .catch((err) => {
            console.log(err) //已於內部攔截, 預期不會觸發
        })

    //procInput
    let procInput = async(topic, message) => {
        // console.log('procInput...', topic, message)

        //check
        if (topics.indexOf(topic) < 0) {
            // console.log('kpTopicFun', kpTopicFun)
            // console.log('procInput', `topic[${topic}] is not in kpTopicFun`)
            return
        }

        //id
        let id = get(message, 'id', '')
        if (!isestr(id)) {
            throw new Error(`invalid id`)
        }

        //func
        let func = get(message, 'func', '')
        if (!isestr(func)) {
            throw new Error(`invalid func`)
        }

        //mode
        let mode = get(message, 'mode', '')
        if (mode !== 'input' && mode !== 'output') {
            throw new Error(`invalid mode[${mode}]`)
        }
        if (mode !== 'input') {
            return
        }

        //core
        let core = async(topic, message) => {

            //func
            let func = get(kpTopic, topic, '')
            // console.log('func', func)

            //inp
            let inp = get(message, 'input', null)
            // console.log('inp', inp)

            //pm
            let pm = genPm()

            //emit
            wpc.emit('procFun', { func, input: inp, pm }) //不能通過pm通知處理結束, 得要再調整

            //r
            let r = await pm
            // console.log('r', r)

            return r
        }

        //stmg
        let stmg = null
        // console.log('procInput core...', topic, message)
        await core(topic, message)
            .then((res) => {
                stmg = {
                    func,
                    id,
                    mode: 'output',
                    output: {
                        state: 'success',
                        msg: res,
                    },
                }
            })
            .catch((err) => {
                stmg = {
                    func,
                    id,
                    mode: 'output',
                    output: {
                        state: 'error',
                        msg: err,
                    },
                }
            })
        // console.log('procInput core done', topic, message)

        //publish, 回應output
        // console.log('procInput publish...', topic, stmg)
        await wpc.publish(topic, stmg, 2)
            // .then((res) => {
            //     console.log(`procInput publish topic[${topic}] for output then`, res)
            // })
            .catch((err) => {
                console.log(`procInput publish topic[${topic}] for output catch`, err)
            })
        // console.log('procInput publish done', topic, stmg)

        // console.log('procInput done', topic, message)
    }

    //procOutput
    let procOutput = async(topic, message) => {

        //id
        let id = get(message, 'id', '')
        if (!isestr(id)) {
            throw new Error(`invalid id`)
        }

        //func
        let func = get(message, 'func', '')
        if (!isestr(func)) {
            throw new Error(`invalid func`)
        }

        //mode
        let mode = get(message, 'mode', '')
        if (mode !== 'input' && mode !== 'output') {
            throw new Error(`invalid mode[${mode}]`)
        }
        if (mode !== 'output') {
            return
        }
        // console.log('procOutput...', topic, message)

        //pm
        let pm = get(kpPm, id, null)
        if (!ispm(pm)) {
            return
        }

        //out
        let out = get(message, 'output', null)
        // console.log('out', out)

        //state
        let state = get(out, 'state', '')
        // console.log('state', state)

        //msg
        let msg = get(out, 'msg', null)
        // console.log('msg', msg)

        //resolve or reject
        if (state === 'success') {
            // console.log('pm.resolve', msg)
            pm.resolve(msg)
        }
        else if (state === 'error') {
            // console.log('pm.reject', msg)
            pm.reject(msg)
        }
        else {
            console.log('out', out)
            console.log('state', state)
            throw new Error(`invalid state`)
        }

        // console.log('procOutput done', topic, message)
    }

    //處理message
    wpc.on('message', ({ topic, message }) => {
        // console.log(`message`, topic, message)

        //setTimeout脫勾避免卡住mqtt處理其他事件
        setTimeout(() => {

            //procInput
            procInput(topic, message)
                .catch((err) => {
                    console.log(err) //已於內部攔截, 預期不會觸發
                })

            //procOutput
            procOutput(topic, message)
                .catch((err) => {
                    console.log(err) //已於內部攔截, 預期不會觸發
                })

        }, 1)

    })

    //reqServiceFunc
    let reqServiceFunc = async(serviceName, func, input) => {
        // console.log('reqServiceFunc...', serviceName, func, input)

        //id
        let id = genID()

        //stmg
        let stmg = {
            func,
            id,
            mode: 'input',
            input,
        }

        //topic
        let topic = `${serviceName}:${func}`

        //procSubscribe, 若請求分析service不是自己提供, 就需要先訂閱才有辦法接收message事件
        if (!checkTopic(topic)) {
            // console.log('reqServiceFunc procSubscribe...', topic)
            await procSubscribe([topic])
                .catch((err) => {
                    console.log(err) //已於內部攔截, 預期不會觸發
                })
            // console.log('reqServiceFunc procSubscribe done', topic)
        }

        //publish
        // console.log('reqServiceFunc publish...', topic, stmg)
        let pmm = genPm()
        await wpc.publish(topic, stmg, 2)
            .then((res) => {
                // console.log(`reqServiceFunc publish topic[${topic}] then`, res)

                //save pmm and wait return
                kpPm[id] = pmm
                // console.log('save pmm', id, kpPm)

            })
            .catch((err) => {
                // console.log(`reqServiceFunc publish topic[${topic}] catch`, err)

                //reject
                pmm.reject(err)

            })
        // console.log('reqServiceFunc publish done', topic, stmg)

        //等待請求返回結果
        // console.log('reqServiceFunc wait pmm...', topic, stmg)
        let pm = genPm()
        await pmm
            .then((res) => {
                // console.log('reqServiceFunc transfer pm then', res)
                pm.resolve(res)
            })
            .catch((err) => {
                // console.log('reqServiceFunc transfer pm catch', err)
                pm.reject(err)
            })
            .finally(() => {
                //脫勾刪除, pmm結束(不論成功或失敗)皆須從kpPm內刪除
                setTimeout(() => {

                    //delete pmm
                    delete kpPm[id]

                }, 1)
            })
        // console.log('reqServiceFunc wait pmm done', topic, stmg)

        // console.log('reqServiceFunc done', serviceName, func)
        return pm
    }

    //save
    wpc.reqServiceFunc = reqServiceFunc

    return wpc
}


export default WPubsubClientComu