WPubsubServer.mjs

import Aedes from 'aedes'
import aedesPersistenceLevel from 'aedes-persistence-level'
import net from 'net'
import { Level } from 'level'
import get from 'lodash-es/get.js'
import size from 'lodash-es/size.js'
import isestr from 'wsemi/src/isestr.mjs'
import iseobj from 'wsemi/src/iseobj.mjs'
import ispint from 'wsemi/src/ispint.mjs'
import isearr from 'wsemi/src/isearr.mjs'
import cstr from 'wsemi/src/cstr.mjs'
import cint from 'wsemi/src/cint.mjs'
import j2o from 'wsemi/src/j2o.mjs'
import evem from 'wsemi/src/evem.mjs'


/**
 * 建立一個支援 Token 驗證與 LevelDB 持久化儲存的 MQTT Server
 *
 * @param {Object} [opt={}] - 設定參數物件
 * @param {Number} [opt.port=8080] - 要監聽的 TCP 連接埠 (預設 8080)
 * @param {String} [opt.storage='./_db'] - 持久化資料儲存目錄 (LevelDB 路徑)
 * @param {Array} [opt.tokens=[]] - 可接受的連線 token 列表,若為空則允許所有連線
 * @returns {Object} - Aedes broker 實例物件
 * @example
 *
 * import w from 'wsemi'
 * import WPubsubServer from './src/WPubsubServer.mjs'
 *
 * let test = () => {
 *     let pm = w.genPm()
 *
 *     let ms = []
 *
 *     let opt = {
 *         port: 8080,
 *         storage: './_db',
 *         tokens: ['token-for-test'],
 *     }
 *     let wps = new WPubsubServer(opt)
 *     wps.on('server-listen', (msg) => {
 *         console.log('server-listen', msg)
 *         ms.push({ 'server-listen': msg })
 *     })
 *     wps.on('client-in', (clientId) => {
 *         console.log('client-in', clientId)
 *         ms.push({ 'client-in': clientId })
 *     })
 *     wps.on('client-out', (clientId) => {
 *         console.log('client-out', clientId)
 *         ms.push({ 'client-out': clientId })
 *     })
 *     wps.on('subscribe', (clientId, subscriptions) => {
 *         console.log('subscribe', clientId, subscriptions)
 *         ms.push({ 'subscribe': clientId, 'subscriptions': JSON.stringify(subscriptions) })
 *     })
 *     wps.on('publish', (clientId, topic, payload, qos) => {
 *         console.log('publish', clientId, topic, payload, qos)
 *         ms.push({ 'publish': clientId, topic, 'payload': payload, qos })
 *     })
 *     wps.on('server-error', (err) => {
 *         console.log('server-error', err)
 *     })
 *     wps.on('broker-error', (err) => {
 *         console.log('broker-error', err)
 *     })
 *     wps.on('client-error', (clientId, err) => {
 *         console.log('client-error', clientId, err)
 *     })
 *
 *     setTimeout(async () => {
 *         await wps.clear()
 *         console.log('ms', ms)
 *         pm.resolve(ms)
 *     }, 6000)
 *
 *     return pm
 * }
 * await test()
 *     .catch((err) => {
 *         console.log(err)
 *     })
 * // => ms [
 * //   { 'server-listen': { port: 8080 } },
 * //   { 'client-in': 'id-for-client' },
 * //   {
 * //     subscribe: 'id-for-client',
 * //     subscriptions: '[{"topic":"task","qos":2}]'
 * //   },
 * //   { publish: 'id-for-client', topic: 'task', payload: 'result', qos: 2 },
 * //   { 'client-out': 'id-for-client' }
 * // ]
 *
 */
function WPubsubServer(opt = {}) {

    //keyMsg
    let keyMsg = '__msg__'

    //port
    let port = get(opt, 'port')
    if (!ispint(port)) {
        port = 8080
    }
    port = cint(port)

    //storage
    let storage = get(opt, 'storage')
    if (!isestr(storage)) {
        storage = './_db' //LevelDB路徑
    }

    //tokens
    let tokens = get(opt, 'tokens')
    if (!isearr(tokens)) {
        tokens = []
    }
    let stTokens = new Set(tokens) //可用的合法 Token 列表

    //db
    let db = new Level(storage)

    //persistenceInstance
    let persistenceInstance = aedesPersistenceLevel(db)

    //broker
    let broker = Aedes({
        persistence: persistenceInstance,
        authenticate: (client, username, password, callback) => {
            // console.log('authenticate', client.id, username, password)
            // console.log('client.id', client.id)
            // console.log('username', username)
            // console.log('password', password)

            //check
            if (size(tokens) === 0) {
                return callback(null, true)
            }

            //check
            if (!isestr(username)) {
                let err = new Error('missing token')
                err.returnCode = 4 // MQTT 3.1.1: "Connection Refused: bad username or password"
                return callback(err, false)
            }

            //token
            let token = cstr(username)

            //check
            if (stTokens.has(token)) {
                // console.log('client conn', `token[${token}]`, `client.id[${client.id}]`)
                return callback(null, true)
            }

            //err
            let err = new Error('invalid token')
            err.returnCode = 4
            // console.log('client deny', `token[${token}]`, `client.id[${client.id}]`)
            return callback(err, false)
        }
    })

    //server
    let server = net.createServer(broker.handle)

    //listen
    server.listen(port, () => {
        // console.log(`Server started and listening on port ${port}`)
        ev.emit('server-listen', { port })
    })

    //error
    server.on('error', (err) => {
        // console.error('Server error:', err)
        ev.emit('server-error', err) // 也可以轉發出去
    })

    //ev
    let ev = evem()

    //客戶端連線
    broker.on('client', (client) => {
        // console.log('client in', `client.id[${client.id}]`)
        ev.emit('client-in', client.id)

        //error
        client.on('error', (err) => {
            // console.error(`Client error [${client.id}]:`, err)
            ev.emit('client-error', client.id, err)
        })

    })

    //客戶端訂閱
    broker.on('subscribe', (subscriptions, client) => {
        if (iseobj(client)) {
            // console.log('client subscribe', `client.id[${client.id}]`, `topics[${subscriptions.map(s => s.topic).join(', ')}]`)
            ev.emit('subscribe', client.id, subscriptions)
        }
    })

    //客戶端發佈訊息
    broker.on('publish', (packet, client) => {
        if (iseobj(client)) {
            // console.log('client publish', `client.id[${client.id}]`, `topic[${packet.topic}]`, `qos[${packet.qos}]`, packet.payload.toString())
            let _message = ''
            try {
                let j = packet.payload.toString() //mqtt接收payload時會變成Buffer, 得轉payload.toString()
                // console.log('payload j', j)
                let o = j2o(j)
                // console.log('payload o', o)
                _message = get(o, keyMsg, '')
            }
            catch (err) {
                console.log(err)
            }
            ev.emit('publish', client.id, packet.topic, _message, packet.qos)
        }
    })

    //客戶端斷線
    broker.on('clientDisconnect', (client) => {
        // console.log('client out', `client.id[${client.id}]`)
        ev.emit('client-out', client.id)
    })

    //error
    broker.on('error', (err) => {
        // console.error('Broker error:', err)
        ev.emit('broker-error', err) // 也可以轉發出去
    })

    //closeBroker
    let closeBroker = () => {
        return new Promise((resolve, reject) => {
            broker.close((err) => {
                if (err) {
                    return reject(err)
                }
                resolve()
            })
        })
    }

    //closeServer
    let closeServer = () => {
        return new Promise((resolve, reject) => {
            server.close((err) => {
                if (err) {
                    return reject(err)
                }
                resolve()
            })
        })
    }

    //clear
    let clear = async () => {
        await closeBroker()
        await closeServer()
    }

    //save
    ev.clear = clear

    return ev
}


export default WPubsubServer