import get from 'lodash-es/get.js'
import each from 'lodash-es/each.js'
import isestr from 'wsemi/src/isestr.mjs'
import isearr from 'wsemi/src/isearr.mjs'
import ispm from 'wsemi/src/ispm.mjs'
import genID from 'wsemi/src/genID.mjs'
import genPm from 'wsemi/src/genPm.mjs'
import pmSeries from 'wsemi/src/pmSeries.mjs'
import WPubsubClient from './WPubsubClient.mjs'
/**
* 建立一個 MQTT 客戶端,支援持久連線、Token 驗證、自動重連、訂閱與發佈功能
*
* @param {String} serviceName - 服務名稱
* @param {Array} funcs - 函數名稱陣列
* @param {Object} [opt={}] - 設定選項
* @param {String} [opt.url='mqtt://localhost'] - MQTT broker 連線 URL
* @param {Number} [opt.port=8080] - Broker 連線 port
* @param {String} [opt.token=''] - 連線時用來驗證的 Token
* @param {String} [opt.clientId] - 指定 Client ID,若未指定則自動產生
* @param {Number} [opt.timeReconnect=2000] - 斷線後重新連線的間隔時間(毫秒)
* @returns {Object} - 傳回一個具有 `subscribe`、`unsubscribe`、`publish`、`clear` 方法的事件物件
* @example
*/
function WPubsubClientComu(serviceName, funcs, opt = {}) {
//check serviceName
if (!isestr(serviceName)) {
serviceName = '[empty]' //允許不當service只當client調用
}
//check funcs
if (!isearr(funcs)) {
funcs = [] //允許不當service只當client調用
}
//topics, kpTopic
let topics = []
let kpTopic = {}
each(funcs, (func) => {
let topic = `${serviceName}:${func}`
topics.push(topic)
kpTopic[topic] = func
})
// console.log('topics', topics)
// console.log('kpTopic', kpTopic)
//checkTopic
let checkTopic = (topic) => {
let b = topics.indexOf(topic) >= 0
return b
}
//kpPm
let kpPm = {}
//wpc
let wpc = new WPubsubClient(opt)
//procSubscribe
let procSubscribe = async(topics) => {
//check
if (!isearr(topics)) {
topics = [topics]
}
await pmSeries(topics, async(topic) => {
// console.log(`subscribe topic[${topic}]...`)
//subscribe
await wpc.subscribe(topic, 2)
// .then((res) => {
// console.log(`subscribe topic[${topic}] then`, res)
// })
.catch((err) => {
console.log(`subscribe topic[${topic}] catch`, err)
})
// console.log(`subscribe topic[${topic}] done`)
})
}
//訂閱topics
procSubscribe(topics)
.catch((err) => {
console.log(err) //已於內部攔截, 預期不會觸發
})
//procInput
let procInput = async(topic, message) => {
// console.log('procInput...', topic, message)
//check
if (topics.indexOf(topic) < 0) {
// console.log('kpTopicFun', kpTopicFun)
// console.log('procInput', `topic[${topic}] is not in kpTopicFun`)
return
}
//id
let id = get(message, 'id', '')
if (!isestr(id)) {
throw new Error(`invalid id`)
}
//func
let func = get(message, 'func', '')
if (!isestr(func)) {
throw new Error(`invalid func`)
}
//mode
let mode = get(message, 'mode', '')
if (mode !== 'input' && mode !== 'output') {
throw new Error(`invalid mode[${mode}]`)
}
if (mode !== 'input') {
return
}
//core
let core = async(topic, message) => {
//func
let func = get(kpTopic, topic, '')
// console.log('func', func)
//inp
let inp = get(message, 'input', null)
// console.log('inp', inp)
//pm
let pm = genPm()
//emit
wpc.emit('procFun', { func, input: inp, pm }) //不能通過pm通知處理結束, 得要再調整
//r
let r = await pm
// console.log('r', r)
return r
}
//stmg
let stmg = null
// console.log('procInput core...', topic, message)
await core(topic, message)
.then((res) => {
stmg = {
func,
id,
mode: 'output',
output: {
state: 'success',
msg: res,
},
}
})
.catch((err) => {
stmg = {
func,
id,
mode: 'output',
output: {
state: 'error',
msg: err,
},
}
})
// console.log('procInput core done', topic, message)
//publish, 回應output
// console.log('procInput publish...', topic, stmg)
await wpc.publish(topic, stmg, 2)
// .then((res) => {
// console.log(`procInput publish topic[${topic}] for output then`, res)
// })
.catch((err) => {
console.log(`procInput publish topic[${topic}] for output catch`, err)
})
// console.log('procInput publish done', topic, stmg)
// console.log('procInput done', topic, message)
}
//procOutput
let procOutput = async(topic, message) => {
//id
let id = get(message, 'id', '')
if (!isestr(id)) {
throw new Error(`invalid id`)
}
//func
let func = get(message, 'func', '')
if (!isestr(func)) {
throw new Error(`invalid func`)
}
//mode
let mode = get(message, 'mode', '')
if (mode !== 'input' && mode !== 'output') {
throw new Error(`invalid mode[${mode}]`)
}
if (mode !== 'output') {
return
}
// console.log('procOutput...', topic, message)
//pm
let pm = get(kpPm, id, null)
if (!ispm(pm)) {
return
}
//out
let out = get(message, 'output', null)
// console.log('out', out)
//state
let state = get(out, 'state', '')
// console.log('state', state)
//msg
let msg = get(out, 'msg', null)
// console.log('msg', msg)
//resolve or reject
if (state === 'success') {
// console.log('pm.resolve', msg)
pm.resolve(msg)
}
else if (state === 'error') {
// console.log('pm.reject', msg)
pm.reject(msg)
}
else {
console.log('out', out)
console.log('state', state)
throw new Error(`invalid state`)
}
// console.log('procOutput done', topic, message)
}
//處理message
wpc.on('message', ({ topic, message }) => {
// console.log(`message`, topic, message)
//setTimeout脫勾避免卡住mqtt處理其他事件
setTimeout(() => {
//procInput
procInput(topic, message)
.catch((err) => {
console.log(err) //已於內部攔截, 預期不會觸發
})
//procOutput
procOutput(topic, message)
.catch((err) => {
console.log(err) //已於內部攔截, 預期不會觸發
})
}, 1)
})
//reqServiceFunc
let reqServiceFunc = async(serviceName, func, input) => {
// console.log('reqServiceFunc...', serviceName, func, input)
//id
let id = genID()
//stmg
let stmg = {
func,
id,
mode: 'input',
input,
}
//topic
let topic = `${serviceName}:${func}`
//procSubscribe, 若請求分析service不是自己提供, 就需要先訂閱才有辦法接收message事件
if (!checkTopic(topic)) {
// console.log('reqServiceFunc procSubscribe...', topic)
await procSubscribe([topic])
.catch((err) => {
console.log(err) //已於內部攔截, 預期不會觸發
})
// console.log('reqServiceFunc procSubscribe done', topic)
}
//publish
// console.log('reqServiceFunc publish...', topic, stmg)
let pmm = genPm()
await wpc.publish(topic, stmg, 2)
.then((res) => {
// console.log(`reqServiceFunc publish topic[${topic}] then`, res)
//save pmm and wait return
kpPm[id] = pmm
// console.log('save pmm', id, kpPm)
})
.catch((err) => {
// console.log(`reqServiceFunc publish topic[${topic}] catch`, err)
//reject
pmm.reject(err)
})
// console.log('reqServiceFunc publish done', topic, stmg)
//等待請求返回結果
// console.log('reqServiceFunc wait pmm...', topic, stmg)
let pm = genPm()
await pmm
.then((res) => {
// console.log('reqServiceFunc transfer pm then', res)
pm.resolve(res)
})
.catch((err) => {
// console.log('reqServiceFunc transfer pm catch', err)
pm.reject(err)
})
.finally(() => {
//脫勾刪除, pmm結束(不論成功或失敗)皆須從kpPm內刪除
setTimeout(() => {
//delete pmm
delete kpPm[id]
}, 1)
})
// console.log('reqServiceFunc wait pmm done', topic, stmg)
// console.log('reqServiceFunc done', serviceName, func)
return pm
}
//save
wpc.reqServiceFunc = reqServiceFunc
return wpc
}
export default WPubsubClientComu