pmQueue.mjs

import get from 'lodash-es/get.js'
import isBoolean from 'lodash-es/isBoolean.js'
import genPm from './genPm.mjs'
import genID from './genID.mjs'
import pm2resolve from './pm2resolve.mjs'
import queue from './queue.mjs'
import isfun from './isfun.mjs'
import isp0int from './isp0int.mjs'
import cint from './cint.mjs'
import delay from './delay.mjs'


/**
 * 通過佇列限制與呼叫非同步(Promise)函數,可推入不同之非同步函數,將一併受限
 *
 * 通過new建構,呼叫時輸入不同之非同步函數,以及其輸入參數,會推入佇列後並循序等待執行
 *
 * 可限制同時運行的非同步函數總數量(takeLimit>0),可只取最後呼叫的非同步函數進行防抖功能(takeLast=true),代表前面的呼叫皆自動轉為catch,而其回傳訊息為物件{reason:'cancelled'}
 *
 * Unit Test: {@link https://github.com/yuda-lyu/wsemi/blob/master/test/pmQueue.test.mjs Github}
 * @memberOf wsemi
 * @param {Integer} [takeLimit=0] 輸入同時處理數量整數,預設0,代表無限制
 * @param {Boolean} [takeLast=false] 輸入多次觸發時是否只取最後呼叫的非同步函數,預設false,搭配takeLimit=0進行非同步函數防抖
 * @returns {Function} 回傳Function,第1參數為非同步函數,第2參數之後為欲輸入非同步函數之參數。回傳為Promise,resolve回傳成功結果而reject回傳失敗訊息
 * @example
 *
 * async function topAsync() {
 *
 *     async function test1() {
 *         return new Promise((resolve, reject) => {
 *
 *             let ms = []
 *             let pmq = pmQueue(1) //同時處理1個
 *
 *             function fun1(v) {
 *                 console.log('call fun1')
 *                 ms.push('call fun1')
 *                 return new Promise(function(resolve, reject) {
 *                     setTimeout(function() {
 *                         ms.push('fun1 resolve: ' + v)
 *                         resolve('#' + v)
 *                     }, 300)
 *                 })
 *             }
 *
 *             function fun2(v) {
 *                 console.log('call fun2')
 *                 ms.push('call fun2')
 *                 return new Promise(function(resolve, reject) {
 *                     setTimeout(function() {
 *                         ms.push('fun2 resolve: ' + v)
 *                         resolve('#' + v)
 *                     }, 200)
 *                 })
 *             }
 *
 *             function fun3(v) {
 *                 console.log('call fun3')
 *                 ms.push('call fun3')
 *                 return new Promise(function(resolve, reject) {
 *                     setTimeout(function() {
 *                         ms.push('fun3 resolve: ' + v)
 *                         resolve('#' + v)
 *                     }, 100)
 *                 })
 *             }
 *
 *             pmq(fun1, 'inp1')
 *                 .then(function(msg) {
 *                     console.log('t1 then', msg)
 *                     ms.push('t1 then: ' + msg)
 *                 })
 *                 .catch(function(msg) {
 *                     console.log('t1 catch', msg)
 *                     ms.push('t1 catch: ' + 'reason ' + msg.reason)
 *                 })
 *             pmq(fun2, 'inp2')
 *                 .then(function(msg) {
 *                     console.log('t2 then', msg)
 *                     ms.push('t2 then: ' + msg)
 *                 })
 *                 .catch(function(msg) {
 *                     console.log('t2 catch', msg)
 *                     ms.push('t2 catch: ' + 'reason ' + msg.reason)
 *                 })
 *             pmq(fun3, 'inp3')
 *                 .then(function(msg) {
 *                     console.log('t3 then', msg)
 *                     ms.push('t3 then: ' + msg)
 *                 })
 *                 .catch(function(msg) {
 *                     console.log('t3 catch', msg)
 *                     ms.push('t3 catch: ' + 'reason ' + msg.reason)
 *                 })
 *             setTimeout(function() {
 *                 resolve(ms)
 *             }, 700)
 *         })
 *     }
 *     console.log('test1')
 *     let r1 = await test1()
 *     console.log(JSON.stringify(r1))
 *     // test1
 *     // call fun1
 *     // t1 then #inp1
 *     // call fun2
 *     // t2 then #inp2
 *     // call fun3
 *     // t3 then #inp3
 *     // ["call fun1","fun1 resolve: inp1","t1 then: #inp1","call fun2","fun2 resolve: inp2","t2 then: #inp2","call fun3","fun3 resolve: inp3","t3 then: #inp3"]
 *
 *     async function test2() {
 *         return new Promise((resolve, reject) => {
 *
 *             let ms = []
 *             let pmq = pmQueue(2) //同時處理2個
 *
 *             function fun1(v) {
 *                 console.log('call fun1')
 *                 ms.push('call fun1')
 *                 return new Promise(function(resolve, reject) {
 *                     setTimeout(function() {
 *                         ms.push('fun1 resolve: ' + v)
 *                         resolve('#' + v)
 *                     }, 300)
 *                 })
 *             }
 *
 *             function fun2(v) {
 *                 console.log('call fun2')
 *                 ms.push('call fun2')
 *                 return new Promise(function(resolve, reject) {
 *                     setTimeout(function() {
 *                         ms.push('fun2 resolve: ' + v)
 *                         resolve('#' + v)
 *                     }, 200)
 *                 })
 *             }
 *
 *             function fun3(v) {
 *                 console.log('call fun3')
 *                 ms.push('call fun3')
 *                 return new Promise(function(resolve, reject) {
 *                     setTimeout(function() {
 *                         ms.push('fun3 resolve: ' + v)
 *                         resolve('#' + v)
 *                     }, 100)
 *                 })
 *             }
 *
 *             pmq(fun1, 'inp1')
 *                 .then(function(msg) {
 *                     console.log('t1 then', msg)
 *                     ms.push('t1 then: ' + msg)
 *                 })
 *                 .catch(function(msg) {
 *                     console.log('t1 catch', msg)
 *                     ms.push('t1 catch: ' + 'reason ' + msg.reason)
 *                 })
 *             pmq(fun2, 'inp2')
 *                 .then(function(msg) {
 *                     console.log('t2 then', msg)
 *                     ms.push('t2 then: ' + msg)
 *                 })
 *                 .catch(function(msg) {
 *                     console.log('t2 catch', msg)
 *                     ms.push('t2 catch: ' + 'reason ' + msg.reason)
 *                 })
 *             pmq(fun3, 'inp3')
 *                 .then(function(msg) {
 *                     console.log('t3 then', msg)
 *                     ms.push('t3 then: ' + msg)
 *                 })
 *                 .catch(function(msg) {
 *                     console.log('t3 catch', msg)
 *                     ms.push('t3 catch: ' + 'reason ' + msg.reason)
 *                 })
 *             setTimeout(function() {
 *                 resolve(ms)
 *             }, 700)
 *         })
 *     }
 *     console.log('test2')
 *     let r2 = await test2()
 *     console.log(JSON.stringify(r2))
 *     // test2
 *     // call fun1
 *     // call fun2
 *     // t2 then #inp2
 *     // call fun3
 *     // t1 then #inp1
 *     // t3 then #inp3
 *     // ["call fun1","call fun2","fun2 resolve: inp2","t2 then: #inp2","call fun3","fun1 resolve: inp1","t1 then: #inp1","fun3 resolve: inp3","t3 then: #inp3"]
 *
 *     async function test3() {
 *         return new Promise((resolve, reject) => {
 *
 *             let ms = []
 *             let pmq = pmQueue(null) //同時處理全部
 *
 *             function fun1(v) {
 *                 console.log('call fun1')
 *                 ms.push('call fun1')
 *                 return new Promise(function(resolve, reject) {
 *                     setTimeout(function() {
 *                         ms.push('fun1 resolve: ' + v)
 *                         resolve('#' + v)
 *                     }, 300)
 *                 })
 *             }
 *
 *             function fun2(v) {
 *                 console.log('call fun2')
 *                 ms.push('call fun2')
 *                 return new Promise(function(resolve, reject) {
 *                     setTimeout(function() {
 *                         ms.push('fun2 resolve: ' + v)
 *                         resolve('#' + v)
 *                     }, 200)
 *                 })
 *             }
 *
 *             function fun3(v) {
 *                 console.log('call fun3')
 *                 ms.push('call fun3')
 *                 return new Promise(function(resolve, reject) {
 *                     setTimeout(function() {
 *                         ms.push('fun3 resolve: ' + v)
 *                         resolve('#' + v)
 *                     }, 100)
 *                 })
 *             }
 *
 *             pmq(fun1, 'inp1')
 *                 .then(function(msg) {
 *                     console.log('t1 then', msg)
 *                     ms.push('t1 then: ' + msg)
 *                 })
 *                 .catch(function(msg) {
 *                     console.log('t1 catch', msg)
 *                     ms.push('t1 catch: ' + 'reason ' + msg.reason)
 *                 })
 *             pmq(fun2, 'inp2')
 *                 .then(function(msg) {
 *                     console.log('t2 then', msg)
 *                     ms.push('t2 then: ' + msg)
 *                 })
 *                 .catch(function(msg) {
 *                     console.log('t2 catch', msg)
 *                     ms.push('t2 catch: ' + 'reason ' + msg.reason)
 *                 })
 *             pmq(fun3, 'inp3')
 *                 .then(function(msg) {
 *                     console.log('t3 then', msg)
 *                     ms.push('t3 then: ' + msg)
 *                 })
 *                 .catch(function(msg) {
 *                     console.log('t3 catch', msg)
 *                     ms.push('t3 catch: ' + 'reason ' + msg.reason)
 *                 })
 *             setTimeout(function() {
 *                 resolve(ms)
 *             }, 700)
 *         })
 *     }
 *     console.log('test3')
 *     let r3 = await test3()
 *     console.log(JSON.stringify(r3))
 *     // test3
 *     // call fun1
 *     // call fun2
 *     // call fun3
 *     // t3 then #inp3
 *     // t2 then #inp2
 *     // t1 then #inp1
 *     // ["call fun1","call fun2","call fun3","fun3 resolve: inp3","t3 then: #inp3","fun2 resolve: inp2","t2 then: #inp2","fun1 resolve: inp1","t1 then: #inp1"]
 *
 *     async function test4() {
 *         return new Promise((resolve, reject) => {
 *
 *             let ms = []
 *             let pmq = pmQueue(null, true) //同時處理全部, 但只拿最後執行者的結果
 *
 *             function fun1(v) {
 *                 console.log('call fun1')
 *                 ms.push('call fun1')
 *                 return new Promise(function(resolve, reject) {
 *                     setTimeout(function() {
 *                         ms.push('fun1 resolve: ' + v)
 *                         resolve('#' + v)
 *                     }, 300)
 *                 })
 *             }
 *
 *             function fun2(v) {
 *                 console.log('call fun2')
 *                 ms.push('call fun2')
 *                 return new Promise(function(resolve, reject) {
 *                     setTimeout(function() {
 *                         ms.push('fun2 resolve: ' + v)
 *                         resolve('#' + v)
 *                     }, 200)
 *                 })
 *             }
 *
 *             function fun3(v) {
 *                 console.log('call fun3')
 *                 ms.push('call fun3')
 *                 return new Promise(function(resolve, reject) {
 *                     setTimeout(function() {
 *                         ms.push('fun3 resolve: ' + v)
 *                         resolve('#' + v)
 *                     }, 100)
 *                 })
 *             }
 *
 *             pmq(fun1, 'inp1')
 *                 .then(function(msg) {
 *                     console.log('t1 then', msg)
 *                     ms.push('t1 then: ' + msg)
 *                 })
 *                 .catch(function(msg) {
 *                     console.log('t1 catch', msg)
 *                     ms.push('t1 catch: ' + 'reason ' + msg.reason)
 *                 })
 *             pmq(fun2, 'inp2')
 *                 .then(function(msg) {
 *                     console.log('t2 then', msg)
 *                     ms.push('t2 then: ' + msg)
 *                 })
 *                 .catch(function(msg) {
 *                     console.log('t2 catch', msg)
 *                     ms.push('t2 catch: ' + 'reason ' + msg.reason)
 *                 })
 *             pmq(fun3, 'inp3')
 *                 .then(function(msg) {
 *                     console.log('t3 then', msg)
 *                     ms.push('t3 then: ' + msg)
 *                 })
 *                 .catch(function(msg) {
 *                     console.log('t3 catch', msg)
 *                     ms.push('t3 catch: ' + 'reason ' + msg.reason)
 *                 })
 *             setTimeout(function() {
 *                 resolve(ms)
 *             }, 700)
 *         })
 *     }
 *     console.log('test4')
 *     let r4 = await test4()
 *     console.log(JSON.stringify(r4))
 *     // test4
 *     // call fun1
 *     // call fun2
 *     // call fun3
 *     // t3 then #inp3
 *     // t2 catch { reason: 'cancelled' }
 *     // t1 catch { reason: 'cancelled' }
 *     // ["call fun1","call fun2","call fun3","fun3 resolve: inp3","t3 then: #inp3","fun2 resolve: inp2","t2 catch: reason cancelled","fun1 resolve: inp1","t1 catch: reason cancelled"]
 *
 * }
 * topAsync().catch(() => {})
 *
 */
function pmQueue(takeLimit = 0, takeLast = false) {

    function ClsPmQueue(takeLimit, takeLast) {
        let gid = null

        //takeLimit
        if (!isp0int(takeLimit)) {
            takeLimit = 0
        }
        takeLimit = cint(takeLimit)

        //takeLast
        if (!isBoolean(takeLast)) {
            takeLast = false
        }

        //queue
        let q = queue(takeLimit)

        //message
        q.on('message', async function(qs) {
        //console.log('message', qs)

            //get
            let v = q.get()
            if (!v) {
                return
            }

            //delay, 因emit與cb出去後外面promise會慢一點, 故執行前delay釋放同步調用, 使前一個佇列先完成promise
            await delay(1)

            //id
            let id = get(v, 'id')

            //fun
            let fun = get(v, 'fun')

            //input
            let input = get(v, 'input')

            //res
            let res
            if (!isfun(fun)) {
                res = {
                    state: 'error',
                    msg: 'fun is not a function',
                }
            }
            else {
                res = await pm2resolve(fun)(...input)
            }

            //emit
            q.emit(id, res)

            //cb
            q.cb()

        })

        //run
        function run(fun, ...input) {

            //pm
            let pm = genPm()

            //id
            let id = genID()

            //save gid
            gid = id

            //p
            let p = {
                id,
                fun,
                input,
            }

            //push
            q.push(p)

            //once
            q.once(id, (res) => {
            //console.log('once', id, res)
                if (takeLast) {
                    if (id === gid) {
                        if (res.state === 'success') {
                            pm.resolve(res.msg)
                        }
                        else {
                            pm.reject(res.msg)
                        }
                    }
                    else {
                        pm.reject({ reason: 'cancelled' })
                    }
                }
                else {
                    if (res.state === 'success') {
                        pm.resolve(res.msg)
                    }
                    else {
                        pm.reject(res.msg)
                    }
                }
            })

            return pm
        }

        return run
    }

    return new ClsPmQueue(takeLimit, takeLast)
}


export default pmQueue