import Aedes from 'aedes'
import AedesServerFactory from 'aedes-server-factory'
import net from 'net'
import keys from 'lodash-es/keys.js'
import get from 'lodash-es/get.js'
import genPm from 'wsemi/src/genPm.mjs'
import haskey from 'wsemi/src/haskey.mjs'
import j2o from 'wsemi/src/j2o.mjs'
import isfun from 'wsemi/src/isfun.mjs'
import ispint from 'wsemi/src/ispint.mjs'
import cint from 'wsemi/src/cint.mjs'
import arrHas from 'wsemi/src/arrHas.mjs'
/**
* 建立MQTT伺服器
*
* @param {Object} opt 輸入設定參數物件
* @param {Integer} [opt.port=8080] 輸入MQTT提供給nodejs服務的port,預設8080
* @param {Integer} [opt.portWeb=opt.port+10] 輸入MQTT提供給web服務的port,預設為opt.port+10,也就是8090
* @param {Function} opt.authenticate 輸入使用者身份認證函數,供伺服器端驗證之用,函數會傳入使用者端連線之token參數,回傳為Promise,resolve(true)為驗證通過,resolve(false)為驗證不通過
* @param {Object} [opt.funcs={}] 輸入伺服器端供使用者端呼叫之函數物件,各key為函數名稱,對應value為函數本體。各函數之輸入需為單一物件,而各函數回傳皆為Promise,可通過resolve與reject回傳結果,預設{}
* @example
*
* import WComorMqttServer from 'w-comor-mqtt/dist/w-comor-mqtt-server.umd.js'
*
* function random(min, max) {
* return Math.floor(Math.random() * max) + min
* }
*
* let opt = {
* port: 8080, //for mqtt nodejs server
* portWeb: 8090, //for mqtt web server
* authenticate: function(token) {
* //使用token驗證使用者身份
* return new Promise(function(resolve, reject) {
* setTimeout(function() {
* resolve(true)
* }, 1000)
* })
* },
* filterFuncs: function(token, funcs) {
* //使用token驗證使用者身份與過濾可用funcs
* return new Promise(function(resolve, reject) {
* funcs = funcs.filter(function(v) {
* return v.indexOf('Hide') < 0
* })
* resolve(funcs)
* })
* },
* onClientChange: function(clients, opt) {
* console.log(`Server[port:${opt.port}][port for web:${opt.portWeb}] now clients: ${clients.length}`)
* },
* funcs: {
* 'group.plus': function({ p1, p2 }) {
* return new Promise(function(resolve, reject) {
* setTimeout(function() {
* resolve(p1 * p2)
* }, random(100, 3000))
* })
* },
* 'group.div': function({ p1, p2 }) {
* return new Promise(function(resolve, reject) {
* setTimeout(function() {
* resolve(p1 / p2)
* }, random(100, 3000))
* })
* },
* 'add': function({ p1, p2 }) {
* return new Promise(function(resolve, reject) {
* setTimeout(function() {
* resolve(p1 + p2)
* }, random(100, 3000))
* })
* },
* 'addHide': function({ p1, p2 }) {
* return new Promise(function(resolve, reject) {
* setTimeout(function() {
* resolve(p1 + p2)
* }, random(100, 3000))
* })
* },
* 'minu': function({ p1, p2 }) {
* return new Promise(function(resolve, reject) {
* setTimeout(function() {
* resolve(p1 - p2)
* }, random(100, 3000))
* })
* },
* },
* }
*
* new WComorMqttServer(opt)
*
*/
function WComorMqttServer(opt) {
//port
let port = get(opt, 'port')
if (!ispint(port)) {
port = 8080
}
port = cint(port)
//portWeb
let portWeb = get(opt, 'portWeb')
if (!ispint(portWeb)) {
portWeb = port + 10
}
portWeb = cint(portWeb)
//funcs
let funcs = []
if (haskey(opt, 'funcs')) {
funcs = keys(opt['funcs'])
}
//authenticate
function authenticate(token) {
let pm = genPm()
if (isfun(opt.authenticate)) {
opt.authenticate(token)
.then(function(vd) {
pm.resolve(vd)
})
}
else {
pm.resolve(true)
}
return pm
}
//aedes
let aedes = Aedes()
//client
let clients = []
aedes.on('client', function(client) {
// console.log('aedes client')
//push
clients.push(client)
if (isfun(opt.onClientChange)) {
opt.onClientChange(clients, opt)
}
})
//clientDisconnected
aedes.on('clientDisconnected', function(client) {
// console.log('aedes clientDisconnected')
//刪除client
clients = clients.filter(function(c) {
return c !== client
})
if (isfun(opt.onClientChange)) {
opt.onClientChange(clients, opt)
}
})
//subscribe
aedes.on('subscribe', function (subscriptions, client) {
// console.log('aedes subscribe', subscriptions)
})
//unsubscribe
aedes.on('unsubscribe', function (subscriptions, client) {
// console.log('aedes unsubscribe', subscriptions)
})
//publish, 處理全部message
aedes.on('publish', function(packet, client) {
// console.log('aedes publish')
//topic
let topic = packet.topic
//pl
let pl = packet.payload.toString('utf8')
//check
// console.log(`pl.indexOf('"output"') >= 0`, pl.indexOf('"output"') >= 0)
if (pl.indexOf('"output"') >= 0) {
return
}
//check
// console.log(`topic.indexOf('topic|') >= 0`, topic.indexOf('topic|') >= 0)
if (topic.indexOf('topic|') >= 0) {
//console.log('get client publish', pl)
//topicUQid
let topicUQid = topic
//data
let data = j2o(pl)
//execFunction
execFunction(topicUQid, data)
.catch((err) => {
console.log('execFunction catch', err)
})
}
})
//publish
function publish(topicUQid, data) {
// console.log('publish', topicUQid, data)
let packet = {
topic: topicUQid,
payload: JSON.stringify(data),
qos: 2, // 0, 1, or 2
retain: false //false不保留,true則將此訊息保留,除了發送給當前訂閱者之外,當有新訂閱者時,則將最後為1的訊息發給該新訂閱者
}
aedes.publish(packet, function(err) {
// console.log('aedes.publish err', topicUQid, err)
if (err) {
console.log('publish: error:', err)
}
})
}
//execFunction
async function execFunction(topicUQid, data) {
// console.log('execFunction', topicUQid, data)
//token
let token = get(data, 'token', '')
//vd
let vd = await authenticate(token)
// console.log('authenticate', vd, token)
//check
if (vd) {
//func
let func = get(data, 'func', '')
//input
let input = get(data, 'input')
//getFuncs
if (func === 'getFuncs') {
if (isfun(opt.filterFuncs)) {
funcs = await opt.filterFuncs(token, funcs)
// console.log('filterFuncs', funcs)
}
//add output
data['output'] = { sys: 'sys', funcs }
}
//call
else if (arrHas(funcs, func)) {
if (isfun(opt['funcs'][func])) {
//call func in opt.funcs
let output = await opt['funcs'][func](input)
//add output
data['output'] = output
}
else {
//add output
data['output'] = { err: `${func} is not a function` }
}
}
else {
//publish no func
//add output
data['output'] = { err: `can not find: ${func}` }
}
}
else {
//publish no authenticate
//add output
data['output'] = { err: `can not authenticate token: ${token}` }
}
//delete input, 因input可能很大故回傳數據不包含原input
delete data['input']
//publish data
publish(topicUQid, data)
}
//setup
function setup() {
//authenticate
aedes.authenticate = async function (client, username, password, callback) {
//console.log('authenticate', username, password.toString('utf8'))
//token, from username
let token = username
//vd
let vd = await authenticate(token)
//callback
callback(null, vd)
}
}
// server.on('ready', setup)
//serverNode
let serverNode = net.createServer(aedes.handle)
serverNode.listen(port, function () {
setup()
console.log(`Server running for node at: mqtt://localhost:${port}`)
})
//serverBrowser
let serverBrowser = AedesServerFactory.createServer(aedes, {
ws: true, //經由websocket提供給瀏覽器端通訊
})
serverBrowser.listen(portWeb, function () {
console.log(`Server running for browser at: mqtt://localhost:${portWeb}`)
})
}
export default WComorMqttServer