import cloneDeep from 'lodash-es/cloneDeep.js'
import get from 'lodash-es/get.js'
import map from 'lodash-es/map.js'
import genID from 'wsemi/src/genID.mjs'
import WConverwsServer from 'w-converws/src/WConverwsServer.mjs' //rollup編譯時得剔除ws與events
import WOrm from 'w-orm-mongodb/src/WOrmMongodb.mjs' //rollup編譯時得剔除mongodb與stream
/**
* 建立WebSocket伺服器
*
* Inherit: {@link https://yuda-lyu.github.io/w-converws/WConverwsServer.html WConverwsServer}
*
* @see {@link https://yuda-lyu.github.io/w-converws/WConverwsServer.html WConverwsServer}
*
* @class
* @param {Object} [opt={}] 輸入設定物件,預設{}
* @param {String} [opt.url='mongodb://127.0.0.1:27017'] 輸入連接資料庫字串,預設'mongodb://127.0.0.1:27017'
* @param {String} [opt.db='wqws'] 輸入使用資料庫名稱字串,預設'wqws'
* @param {String} [opt.cl='queues'] 輸入使用資料表名稱字串,預設'queues'
* @param {Integer} [opt.port=8080] 輸入WebSocket伺服器所在port,預設8080
* @param {Function} opt.authenticate 輸入使用者身份認證函數,供伺服器端驗證之用,函數會傳入使用者端連線之token參數,回傳為Promise,resolve(true)為驗證通過,resolve(false)為驗證不通過
* @returns {Object} 回傳通訊物件,可監聽事件open、error、clientChange、execute、broadcast、deliver,可使用函數broadcast
* @example
*
* import WRunqwsServer from 'w-runqws/dist/w-runqws-server.umd.js'
*
* let opt = {
* mongoUrl: 'mongodb://username:password@127.0.0.1:27017',
* mongoDb: 'wqws',
* mongoCl: 'queues',
* port: 8080,
* authenticate: async function(token) {
* //使用token驗證使用者身份
* return new Promise(function(resolve, reject) {
* setTimeout(function() {
* resolve(true)
* }, 1000)
* })
* },
* }
*
* //new
* let wo = new WRunqwsServer(opt)
*
* wo.on('open', function() {
* console.log(`Server running at: ws://localhost:${opt.port}`)
*
* // //broadcast
* // let n = 0
* // setInterval(() => {
* // n += 1
* // wo.broadcast(`server: broadcast: hi(${n})`)
* // }, 1000)
*
* })
* wo.on('error', function(err) {
* console.log(`Server[port:${opt.port}]: error`, err)
* })
* wo.on('clientChange', function(clients) {
* console.log(`Server[port:${opt.port}]: now clients: ${clients.length}`)
* })
* wo.on('execute', function(func, input, cb) {
* console.log(`Server[port:${opt.port}]: execute`, func, input)
*
* // if (func === 'add') {
* // let r = input.p1 + input.p2
* // cb(r)
* // }
*
* })
* wo.on('broadcast', function(data) {
* console.log(`Server[port:${opt.port}]: broadcast`, data)
* })
* wo.on('deliver', function(data) {
* console.log(`Server[port:${opt.port}]: deliver`, data)
* })
*
*/
function WRunqwsServer(opt = {}) {
//cloneDeep
opt = cloneDeep(opt)
//default
if (!opt.mongoUrl) {
opt.mongoUrl = 'mongodb://127.0.0.1:27017'
}
if (!opt.mongoDb) {
opt.mongoDb = 'wqws'
}
if (!opt.mongoCl) {
opt.mongoCl = 'queues'
}
if (!opt.port) {
opt.port = 8080
}
//worm
let worm = WOrm({
url: opt.mongoUrl,
db: opt.mongoDb,
cl: opt.mongoCl,
})
//new
let wcs = new WConverwsServer(opt)
//execute
wcs.on('execute', async function(func, _intput, callback, sendData) {
//console.log('execute', func, _intput)
//topic
let topic = get(_intput, 'topic', null)
//deliverQueue
function deliverQueue(queue) {
//console.log('deliverQueue', queue)
//data
let data = {
_mode: 'deliver',
data: queue,
}
//sendData
sendData(data)
}
//_output
let _output
//func
if (func === 'pushQueue') {
//id
let id = genID()
//input
let input = get(_intput, 'input', null)
//option
let option = get(_intput, 'option')
//r
let r = {
topic,
id,
input,
output: null,
transData: get(option, 'transData', null),
timeStart: get(option, 'timeStart', null),
timeExp: get(option, 'timeExp', null),
timeTakeStart: get(option, 'timeTakeStart', null),
timeTakeExp: get(option, 'timeTakeExp', null),
state: 'ready',
}
//insert queue
await worm.insert(r)
.then(function(msg) {
//console.log('pushQueue then', msg)
//emit
wcs.emit(topic, {
topic,
id,
input: r.input,
output: r.output,
state: r.state,
})
//_output
_output = 'success'
})
.catch(function(msg) {
//console.log('pushQueue catch', msg)
//_output
_output = { err: msg }
})
//callback
callback(_output)
}
else if (func === 'modifyQueue') {
//id
let id = get(_intput, 'id', null)
//check
if (id !== null) {
//input
let input = get(_intput, 'input', null)
//output
let output = get(_intput, 'output', null)
//state
let state = get(_intput, 'state', null)
//r
let r = {
id,
output,
state,
}
//save queue
await worm.save(r, { atomic: true })
.then(function(msg) {
//console.log('modifyQueue then', msg)
//emit
wcs.emit(topic, {
topic,
id,
input,
output,
state,
})
//_output
_output = 'success'
})
.catch(function(msg) {
//console.log('modifyQueue catch', msg)
//_output
_output = { err: msg }
})
}
else {
//_output
_output = { err: `can not find id` }
}
//callback
callback(_output)
}
else if (func === 'subTopic') {
//bind
wcs.deliverQueue = deliverQueue
//on
wcs.on(topic, wcs.deliverQueue)
//_output
_output = 'success'
//callback
callback(_output)
}
else if (func === 'unsubTopic') {
//off
wcs.off(topic, wcs.deliverQueue)
//unbind
wcs.deliverQueue = null
//_output
_output = 'success'
//callback
callback(_output)
}
else if (func === 'delQueueByTopic') {
//del queue by topic
await worm.delAll({ topic })
.then(function(msg) {
//console.log('delQueueByTopic then', msg)
//_output
_output = 'success'
})
.catch(function(msg) {
//console.log('delQueueByTopic catch', msg)
//_output
_output = { err: msg }
})
//callback
callback(_output)
}
else if (func === 'delQueueByID') {
//id
let id = get(_intput, 'id', null)
//check
if (id !== null) {
//del queue by id
await worm.delAll({ id })
.then(function(msg) {
//console.log('delQueueByID then', msg)
//_output
_output = 'success'
})
.catch(function(msg) {
//console.log('delQueueByID catch', msg)
//_output
_output = { err: msg }
})
}
else {
//_output
_output = { err: `can not find id` }
}
//callback
callback(_output)
}
else if (func === 'delQueueByIDs') {
//ids
let ids = get(_intput, 'ids', null)
//pms
let pms = map(ids, function(id) {
//del queue by id
return worm.delAll({ id })
})
//map
await Promise.all(pms)
.then(function(msg) {
//console.log('delQueueByIDs then', msg)
//_output
_output = 'success'
})
.catch(function(msg) {
//console.log('delQueueByIDs catch', msg)
//_output
_output = { err: msg }
})
//callback
callback(_output)
}
else if (func === 'delQueueByMatches') {
//find
let find = get(_intput, 'find', null)
//del queue by find
await worm.delAll(find)
.then(function(msg) {
//console.log('delQueueByMatches then', msg)
//_output
_output = 'success'
})
.catch(function(msg) {
//console.log('delQueueByMatches catch', msg)
//_output
_output = { err: msg }
})
//callback
callback(_output)
}
else if (func === 'getQueueByTopic') {
//get queue by topic
await worm.select({ topic })
.then(function(msg) {
//console.log('getQueueByTopic then', msg)
//_output
_output = msg
})
.catch(function(msg) {
//console.log('getQueueByTopic catch', msg)
//_output
_output = { err: msg }
})
//callback
callback(_output)
}
else if (func === 'getQueueByID') {
//id
let id = get(_intput, 'id', null)
//check
if (id !== null) {
//get queue by id
await worm.select({ id })
.then(function(msg) {
//console.log('getQueueByID then', msg)
//_output
_output = msg
})
.catch(function(msg) {
//console.log('getQueueByID catch', msg)
//_output
_output = { err: msg }
})
}
else {
//_output
_output = { err: `can not find id` }
}
//callback
callback(_output)
}
else if (func === 'getQueueByIDs') {
//ids
let ids = get(_intput, 'ids', null)
//pms
let pms = map(ids, function(id) {
//get queue by id
return worm.select({ id })
})
//map
await Promise.all(pms)
.then(function(msg) {
//console.log('getQueueByIDs then', msg)
//_output
_output = msg
})
.catch(function(msg) {
//console.log('getQueueByIDs catch', msg)
//_output
_output = { err: msg }
})
//callback
callback(_output)
}
else if (func === 'getQueueByMatches') {
//find
let find = get(_intput, 'find', null)
//get queue by find
await worm.select(find)
.then(function(msg) {
//console.log('getQueueByMatches then', msg)
//_output
_output = msg
})
.catch(function(msg) {
//console.log('getQueueByMatches catch', msg)
//_output
_output = { err: msg }
})
//callback
callback(_output)
}
else {
//其他執行函數
}
})
return wcs
}
export default WRunqwsServer