WConverhpServer.mjs

// import fs from 'fs'
import Hapi from '@hapi/hapi'
import Inert from '@hapi/inert' //提供靜態檔案
import events from 'events'
import stream from 'stream'
import get from 'lodash-es/get.js'
import map from 'lodash-es/map.js'
import each from 'lodash-es/each.js'
import cloneDeep from 'lodash-es/cloneDeep.js'
import genPm from 'wsemi/src/genPm.mjs'
import alive from 'wsemi/src/alive.mjs'
import isstr from 'wsemi/src/isstr.mjs'
import iseobj from 'wsemi/src/iseobj.mjs'
import obj2u8arr from 'wsemi/src/obj2u8arr.mjs'
import u8arr2obj from 'wsemi/src/u8arr2obj.mjs'
import iser from 'wsemi/src/iser.mjs'


/**
 * 建立Hapi伺服器
 *
 * @class
 * @param {Object} [opt={}] 輸入設定物件,預設{}
 * @param {Integer} [opt.port=8080] 輸入Hapi伺服器所在port,預設8080
 * @param {String} [opt.apiName='api'] 輸入http API伺服器網址的api名稱,預設'api'
 * @returns {Object} 回傳通訊物件,可監聽事件open、error、clientChange、execute、broadcast、deliver,可使用函數broadcast
 * @example
 *
 * import WConverhpServer from 'w-converhp/dist/w-converhp-server.umd.js'
 *
 * let opt = {
 *     port: 8080,
 *     apiName: 'api',
 * }
 *
 * //new
 * let wo = new WConverhpServer(opt)
 *
 * wo.on('open', function() {
 *     console.log(`Server[port:${opt.port}]: open`)
 *
 *     //broadcast
 *     let n = 0
 *     setInterval(() => {
 *         n += 1
 *         let o = {
 *             text: `server broadcast hi(${n})`,
 *             data: new Uint8Array([66, 97, 115]), //support Uint8Array data
 *         }
 *         wo.broadcast(o, function (prog) {
 *             console.log('broadcast prog', prog)
 *         })
 *     }, 1000)
 *
 * })
 * wo.on('error', function(err) {
 *     console.log(`Server[port:${opt.port}]: error`, err)
 * })
 * wo.on('clientChange', function(num) {
 *     console.log(`Server[port:${opt.port}]: now clients: ${num}`)
 * })
 * wo.on('clientEnter', function(clientId, data) {
 *     console.log(`Server[port:${opt.port}]: client enter: ${clientId}`)
 *
 *     //deliver
 *     wo.deliver(clientId, `server deliver hi(${clientId})`)
 *
 * })
 * wo.on('clientLeave', function(clientId, data) {
 *     console.log(`Server[port:${opt.port}]: client leave: ${clientId}`)
 * })
 * wo.on('execute', function(func, input, pm) {
 *     //console.log(`Server[port:${opt.port}]: execute`, func, input)
 *     console.log(`Server[port:${opt.port}]: execute`, func)
 *
 *     try {
 *
 *         if (func === 'add') {
 *
 *             //save
 *             if (_.get(input, 'p.d.u8a', null)) {
 *                 // fs.writeFileSync(input.p.d.name, Buffer.from(input.p.d.u8a))
 *                 // console.log('writeFileSync input.p.d.name', input.p.d.name)
 *             }
 *
 *             let r = {
 *                 ab: input.p.a + input.p.b,
 *                 v: [11, 22.22, 'abc', { x: '21', y: 65.43, z: 'test中文' }],
 *                 file: {
 *                     name: 'zdata.b2',
 *                     u8a: new Uint8Array([66, 97, 115]),
 *                     //u8a: new Uint8Array(fs.readFileSync('C:\\Users\\Administrator\\Desktop\\z500mb.7z')),
 *                 },
 *             }
 *             pm.resolve(r)
 *         }
 *         else {
 *             console.log('invalid func')
 *             pm.reject('invalid func')
 *         }
 *
 *     }
 *     catch (err) {
 *         console.log('execute error', err)
 *         pm.reject('execute error')
 *     }
 *
 * })
 * wo.on('broadcast', function(data) {
 *     console.log(`Server[port:${opt.port}]: broadcast`, data)
 * })
 * wo.on('deliver', function(data) {
 *     console.log(`Server[port:${opt.port}]: deliver`, data)
 * })
 * wo.on('handler', function(data) {
 *     // console.log(`Server[port:${opt.port}]: handler`, data)
 * })
 *
 *
 */
function WConverhpServer(opt = {}) {
    let broadcastMessages = {}


    //default
    if (!opt.port) {
        opt.port = 8080
    }
    if (!opt.apiName) {
        opt.apiName = 'api'
    }


    //ee
    let ee = new events.EventEmitter()


    //ea
    let ea = alive()


    //ea broadcastMessages
    setInterval(() => {

        //now alive
        let nowAlive = ea.get()
        //console.log('nowAlive', nowAlive)

        //clientIds
        let clientIds = map(nowAlive, 'key')
        //console.log('clientIds', clientIds)

        //pick
        let t = {}
        each(clientIds, (clientId) => {
            t[clientId] = get(broadcastMessages, clientId, [])
        })
        broadcastMessages = t
        //console.log('broadcastMessages', broadcastMessages)

    }, 1000)


    //eeEmit
    function eeEmit(name, ...args) {
        setTimeout(() => {
            ee.emit(name, ...args)
        }, 1)
    }


    //server
    let server
    if (opt.serverHapi) {

        //use serverHapi
        server = opt.serverHapi

    }
    else {

        //create server
        server = Hapi.server({
            port: opt.port,
            //host: 'localhost',
            routes: {
                // cors: true,
                cors: {
                    origin: ['*'], //Access-Control-Allow-Origin
                    //credentials: true //Access-Control-Allow-Credentials
                }
            },
        })

    }


    /**
     * Hapi監聽開啟事件
     *
     * @memberof WConverhpServer
     * @example
     * wo.on('open', function() {
     *     ...
     * })
     */
    function onOpen() {} onOpen()
    function open() {
        eeEmit('open')
    }
    open()


    /**
     * Hapi監聽錯誤事件
     *
     * @memberof WConverhpServer
     * @param {*} err 傳入錯誤訊息
     * @example
     * wo.on('error', function(err) {
     *     ...
     * })
     */
    function onError() {} onError()
    function error(msg, err) {
        eeEmit('error', { msg, err })
    }


    /**
     * Hapi監聽客戶端上線事件
     *
     * @memberof WConverhpServer
     * @param {String} clientId 識別用使用者主鍵
     * @param {*} data 使用者request訊息
     * @example
     * wo.on('clientEnter', function(clientId, data) {
     *     ...
     * })
     */
    function onClientEnter() {} onClientEnter()
    /**
     * Hapi監聽客戶端下線事件
     *
     * @memberof WConverhpServer
     * @example
     * wo.on('clientLeave', function(clientId, data) {
     *     ...
     * })
     */
    function onClientLeave() {} onClientLeave()
    /**
     * Hapi監聽客戶端變更(上下線)事件
     *
     * @memberof WConverhpServer
     * @example
     * wo.on('clientChange', function(num) {
     *     ...
     * })
     */
    function onClientChange() {} onClientChange()
    ea.on('message', function({ eventName, key, data, now }) {
        //console.log({ eventName, key, data, now })
        if (eventName === 'enter') {
            eeEmit('clientEnter', key, data)
        }
        else if (eventName === 'leave') {
            eeEmit('clientLeave', key, data)
        }
        eeEmit('clientChange', now)
    })


    //dealData
    async function dealData(data) {
        //console.log('dealData', data)

        //pm, pmm
        let pm = genPm()
        let pmm = genPm()

        //_mode
        let _mode = get(data, '_mode', '')

        //重新處理回傳結果
        pmm
            .then((output) => {

                //add output
                data['output'] = output

                //delete input, 因input可能很大故回傳數據不包含原input
                delete data['input']

                //resolve
                pm.resolve(data)

            })
            .catch((err) => {
                pm.reject(err)
            })

        //emit
        if (_mode === 'execute') {

            //func
            let func = get(data, 'func', '')

            //input
            let input = get(data, 'input', null)

            //execute 執行
            eeEmit('execute', func, input, pmm)

        }
        else if (_mode === 'broadcast') {

            //input
            let input = get(data, 'input', null)

            //broadcast 廣播
            eeEmit('broadcast', input)

            //resolve
            pmm.resolve('ok')

        }
        else if (_mode === 'deliver') {

            //input
            let input = get(data, 'input', null)

            //deliver 發送
            eeEmit('deliver', input)

            //resolve
            pmm.resolve('ok')

        }
        else if (_mode === 'polling') {

            //clientId
            let clientId = get(data, 'clientId', '')

            //polling messages
            let pms = get(broadcastMessages, clientId, [])
            pms = cloneDeep(pms)

            //clear
            broadcastMessages[clientId] = []

            //resolve
            pmm.resolve(pms)

        }
        else {
            let msg = 'can not find _mode in data'
            error(msg, data)
            pm.reject(msg)
            pmm.reject()
        }

        return pm
    }


    //apiMain
    let apiMain = {
        path: '/' + opt.apiName,
        method: 'POST',
        // config: {
        //     payload: {
        //         output: 'stream',
        //         maxBytes: 1024 * 1024 * 1024, //1g
        //     },
        // },
        options: {
            payload: {
                maxBytes: 1024 * 1024 * 1024, //1g
                timeout: 3 * 60 * 1000, //3分鐘, 注意payload timeout必須小於socket timeout
                multipart: true, //hapi 19之後修改multipart預設值為false
            },
            timeout: {
                socket: 5 * 60 * 1000, //5分鐘
            },
        },
        handler: async function (req, res) {
            //console.log(req, res)
            //console.log('payload', req.payload)

            //發送原始接收訊息
            let headers = get(req, 'headers')
            headers = iseobj(headers) ? headers : ''
            let query = get(req, 'query')
            query = iseobj(query) ? query : ''
            eeEmit('handler', {
                headers,
                query,
            })

            // if (Math.random() < 0.5) {
            //     console.log('return code 500: Internal Server Error')
            //     return res.response('Internal Server Error').code(500)
            // }

            //bbInp
            let bbInp = get(req, 'payload.bb', null)
            // console.log('bbInp', bbInp)

            //check
            //console.log('isstr(bbInp)', isstr(bbInp))
            if (isstr(bbInp)) {
                bbInp = Buffer.from(bbInp, 'utf8') //收nodejs client的buffer會自動解析變成字串
            }

            //u8aInp
            let u8aInp = new Uint8Array(bbInp)
            //console.log('u8aInp', u8aInp)

            //u8arr2obj
            let inp = u8arr2obj(u8aInp)
            //console.log('inp', inp)

            //clientId
            let clientId = get(inp, 'clientId')

            //client
            let client = {
                headers: req.headers,
                info: req.info,
            }

            //trigger
            ea.trigger(clientId, client)

            //dealData
            let out = {}
            await dealData(inp)
                .then(function(msg) {
                    out.success = msg
                })
                .catch(function(msg) {
                    out.error = msg
                })

            //u8aOut
            let u8aOut = obj2u8arr(out)

            //stream
            let sm = new stream.Readable()
            sm._read = () => {}
            sm.push(u8aOut)
            sm.push(null)
            // if (out.success._mode !== 'polling') {
            //     console.log('out', out)
            //     console.log('u8aOut', u8aOut)
            // }

            return res.response(sm)
                .header('Cache-Control', 'no-cache, no-store, must-revalidate')
                .header('Content-Type', 'application/octet-stream')
                .header('Content-Length', sm.readableLength)
        },
    }


    /**
     * Hapi監聽客戶端執行事件
     *
     * @memberof WConverhpServer
     * @param {String} func 傳入執行函數名稱字串
     * @param {*} input 傳入執行函數之輸入數據
     * @param {Function} callback 傳入執行函數之回調函數
     * @param {Function} sendData 傳入執行函數之強制回傳函數,提供傳送任意訊息給客戶端,供由服器多次回傳數據之用
     * @example
     * wo.on('execute', function(func, input, callback, sendData) {
     *     ...
     * })
     */
    function onExecute() {} onExecute()


    /**
     *  Hapi監聽客戶端廣播事件
     *
     * @memberof WConverhpServer
     * @param {*} data 傳入廣播訊息
     * @example
     * wo.on('broadcast', function(data) {
     *     ...
     * })
     */
    function onBroadcast() {} onBroadcast()


    /**
     * Hapi監聽客戶端發送事件
     *
     * @memberof WConverhpServer
     * @param {*} data 傳入發送訊息
     * @example
     * wo.on('deliver', function(data) {
     *     ...
     * })
     */
    function onDeliver() {} onDeliver()


    /**
     * Hapi通訊物件對全客戶端廣播函數
     *
     * @memberof WConverhpServer
     * @function broadcast
     * @param {*} data 輸入廣播函數之輸入資訊
     * @param {Function} cb 輸入進度函數
     * @example
     * let data = {...}
     * wo.broadcast(data, cb)
     */
    ee.broadcast = function (data, cbProgress = function () {}) {

        //check, broadcastMessages受ea偵測頻率1s影響, 伺服器初始化後至少需1s才會有有效對象
        if (iser(broadcastMessages)) {
            //console.log('no client for broadcast')
            return
        }

        //modify broadcast data
        let t = {}
        each(broadcastMessages, (v, k) => {

            //push, 數據為陣列, 加入新廣播數據
            v.push({
                mode: 'broadcast',
                data,
            })

            //save
            t[k] = v

        })
        broadcastMessages = t

        //cbProgress, 無法馬上傳需等待客戶端輪詢接收, 故進度回調只能先回傳100%
        cbProgress(100)

    }


    /**
     * Hapi通訊物件對客戶端發送訊息函數
     *
     * @memberof WConverhpServer
     * @function deliver
     * @param {String} clientId 輸入識別用使用者主鍵字串
     * @param {*} data 輸入發送函數之輸入資訊
     * @param {Function} cb 輸入進度函數
     * @example
     * let clientId = '...'
     * let data = {...}
     * wo.deliver(clientId, data, cb)
     */
    ee.deliver = function (clientId, data, cbProgress = function () {}) {

        //bms, 此時有可能ea trigger為非同步, 尚未把
        let bms = get(broadcastMessages, clientId, [])

        //push
        bms.push({
            mode: 'deliver',
            data,
        })

        //modify broadcast data
        broadcastMessages[clientId] = bms

        //cbProgress, 無法馬上傳需等待客戶端輪詢接收, 故進度回調只能先回傳100%
        cbProgress(100)

    }


    async function startServer() {

        //register inert
        await server.register(Inert)

        //api
        let api = {
            method: 'GET',
            path: '/{file*}',
            handler: {
                directory: {
                    path: './'
                }
            },
        }

        //route
        server.route([api, apiMain])

        //start
        await server.start()

        console.log(`Server running at: ${server.info.uri}`)

    }

    if (opt.serverHapi) {
        opt.serverHapi.route(apiMain)
    }
    else {
        startServer()
    }

    return ee
}


export default WConverhpServer