import WebSocket, { WebSocketServer } from 'ws'
import get from 'lodash-es/get.js'
// import genPm from 'wsemi/src/genPm.mjs'
// import urlParse from 'wsemi/src/urlParse.mjs'
import evem from 'wsemi/src/evem.mjs'
import sendSplitData from './sendSplitData.mjs'
import mergeSplitData from './mergeSplitData.mjs'
/**
* 建立WebSocket伺服器
*
* @class
* @param {Object} [opt={}] 輸入設定物件,預設{}
* @param {Integer} [opt.port=8080] 輸入WebSocket伺服器所在port,預設8080
* @param {Integer} [opt.strSplitLength=1000000] 輸入傳輸封包長度整數,預設為1000000
* @returns {Object} 回傳通訊物件,可監聽事件open、error、clientChange、execute、broadcast、deliver,可使用函數broadcast
* @example
*
* import WConverwsServer from 'w-converws/dist/w-converws-server.umd.js'
*
* let opt = {
* port: 8080,
* }
*
* //new
* let wo = new WConverwsServer(opt)
*
* wo.on('open', function() {
* console.log(`Server[port:${opt.port}]: open`)
*
* //broadcast
* // let n = 0
* // setInterval(() => {
* // n += 1
* // let o = {
* // text: `server broadcast hi(${n})`,
* // data: new Uint8Array([66, 97, 115]), //support Uint8Array data
* // }
* // wo.broadcast(o, function (prog) {
* // console.log('broadcast prog', prog)
* // })
* // }, 1000)
*
* })
* wo.on('error', function(err) {
* console.log(`Server[port:${opt.port}]: error`, err)
* })
* wo.on('clientChange', function(clients) {
* console.log(`Server[port:${opt.port}]: now clients: ${clients.length}`)
* })
* wo.on('execute', function(func, input, callback) {
* console.log(`Server[port:${opt.port}]: execute`, func, input)
*
* if (func === 'add') {
* let r = input.p1 + input.p2
* callback(r)
* }
*
* })
* wo.on('broadcast', function(data) {
* console.log(`Server[port:${opt.port}]: broadcast`, data)
* })
* wo.on('deliver', function(data) {
* console.log(`Server[port:${opt.port}]: deliver`, data)
* })
*
*/
function WConverwsServer(opt = {}) {
//default
if (!opt.port) {
opt.port = 8080
}
if (!opt.strSplitLength) {
opt.strSplitLength = 1000000
}
//ee
let ee = evem()
//eeEmit
function eeEmit(name, ...args) {
setTimeout(() => {
ee.emit(name, ...args)
}, 1)
}
//serverSettings
let serverSettings = {
port: opt.port,
perMessageDeflate: {
zlibDeflateOptions: {
// See zlib defaults.
chunkSize: 1024,
memLevel: 7,
level: 3
},
zlibInflateOptions: {
chunkSize: 10 * 1024
},
// Other options settable:
clientNoContextTakeover: true, // Defaults to negotiated value.
serverNoContextTakeover: true, // Defaults to negotiated value.
serverMaxWindowBits: 10, // Defaults to negotiated value.
// Below options specified as default values.
concurrencyLimit: 10, // Limits zlib concurrency for perf.
threshold: 1024 // Size (in bytes) below which messages
// should not be compressed if context takeover is disabled.
},
}
//wss
let wss = null
try {
wss = new WebSocketServer(serverSettings)
}
catch (err) {
error({ msg: 'can not create websocket', err })
}
//check
if (!wss) {
return ee
}
/**
* WebSocket監聽開啟事件
*
* @memberof WConverwsServer
* @example
* wo.on('open', function() {
* ...
* })
*/
function onOpen() {} onOpen()
function open() {
eeEmit('open')
}
open()
/**
* WebSocket監聽錯誤事件
*
* @memberof WConverwsServer
* @param {*} err 傳入錯誤訊息
* @example
* wo.on('error', function(err) {
* ...
* })
*/
function onError() {} onError()
function error(err) {
eeEmit('error', err)
}
/**
* WebSocket監聽客戶端變更(上下線)事件
*
* @memberof WConverwsServer
* @example
* wo.on('clientChange', function(clients) {
* ...
* })
*/
function onClientChange() {} onClientChange()
function clientChange() {
eeEmit('clientChange', clients)
}
//connection
let clients = []
wss.on('connection', function(wsc, req) {
//console.log('connection', wsc)
//console.log('connection', req.connection.remoteAddress)
//client connected
clients.push(wsc)
clientChange()
//client close
wsc.on('close', function () {
//刪除ws
clients = clients.filter(function(wst) {
return wst !== wsc
})
clientChange()
})
function sendData(data, cbProgress) {
//console.log('sendData', data)
if (wsc.readyState === WebSocket.OPEN) {
//sendSplitData
sendSplitData(wsc, opt.strSplitLength, data, cbProgress, function (err) {
error({ msg: 'can not send message', err })
})
}
}
//bind for execute
wsc.sendData = sendData
function parserData(data) {
//console.log('parserData', data)
//_mode
let _mode = get(data, '_mode', '')
//cbResult for execute
function cbResult(output) {
//add output
data['output'] = output
//delete input, 因input可能很大故回傳數據不包含原input
delete data['input']
//sendData
sendData(data, null) //回傳執行結果就不處理進度回調
}
//emit
if (_mode === 'execute') {
//func
let func = get(data, 'func', '')
//input
let input = get(data, 'input')
//execute 執行
eeEmit('execute', func, input, cbResult, sendData)
}
else if (_mode === 'broadcast') {
//broadcast 廣播
eeEmit('broadcast', get(data, 'data'))
}
else if (_mode === 'deliver') {
//deliver 交付
eeEmit('deliver', get(data, 'data'))
}
else {
error({ msg: 'can not find _mode in data', err: data })
}
}
function message(message) {
//console.log('message', message)
//mergeSplitData
mergeSplitData(message, parserData)
}
wsc.on('message', message)
function triggerBroadcast(data, cbProgress) {
//msg
let msg = {
_mode: 'broadcast',
data,
}
//sendData
sendData(msg, cbProgress)
}
//triggerBroadcast, 需對全部客戶端廣播, 不能清除過去監聽事件
ee.on('triggerBroadcast', triggerBroadcast)
})
/**
* WebSocket監聽客戶端執行事件
*
* @memberof WConverwsServer
* @param {String} func 傳入執行函數名稱字串
* @param {*} input 傳入執行函數之輸入數據
* @param {Function} callback 傳入執行函數之回調函數
* @param {Function} sendData 傳入執行函數之強制回傳函數,提供傳送任意訊息給客戶端,供由服器多次回傳數據之用
* @example
* wo.on('execute', function(func, input, callback, sendData) {
* ...
* })
*/
function onExecute() {} onExecute()
/**
* WebSocket監聽客戶端廣播事件
*
* @memberof WConverwsServer
* @param {*} data 傳入廣播訊息
* @example
* wo.on('broadcast', function(data) {
* ...
* })
*/
function onBroadcast() {} onBroadcast()
/**
* WebSocket監聽客戶端交付事件
*
* @memberof WConverwsServer
* @param {*} data 傳入交付訊息
* @example
* wo.on('deliver', function(data) {
* ...
* })
*/
function onDeliver() {} onDeliver()
/**
* WebSocket通訊物件對客戶端廣播函數
*
* @memberof WConverwsServer
* @function broadcast
* @param {*} data 輸入廣播函數之輸入資訊
* @example
* let data = {...}
* wo.broadcast(data)
*/
ee.broadcast = function (data, cbProgress = function () {}) {
eeEmit('triggerBroadcast', data, cbProgress)
}
return ee
}
export default WConverwsServer