WDataScheduler.mjs

import fs from 'fs'
import get from 'lodash-es/get.js'
import size from 'lodash-es/size.js'
import iseobj from 'wsemi/src/iseobj.mjs'
import isestr from 'wsemi/src/isestr.mjs'
import isp0int from 'wsemi/src/isp0int.mjs'
import isbol from 'wsemi/src/isbol.mjs'
import isfun from 'wsemi/src/isfun.mjs'
import ispm from 'wsemi/src/ispm.mjs'
import cdbl from 'wsemi/src/cdbl.mjs'
import str2b64 from 'wsemi/src/str2b64.mjs'
import ltdtDiffByKey from 'wsemi/src/ltdtDiffByKey.mjs'
import now2str from 'wsemi/src/now2str.mjs'
import evem from 'wsemi/src/evem.mjs'
import pmSeries from 'wsemi/src/pmSeries.mjs'
import delay from 'wsemi/src/delay.mjs'
import waitFun from 'wsemi/src/waitFun.mjs'
import getErrorMessage from 'wsemi/src/getErrorMessage.mjs'
import fsIsFile from 'wsemi/src/fsIsFile.mjs'
import fsIsFolder from 'wsemi/src/fsIsFolder.mjs'
import fsDeleteFile from 'wsemi/src/fsDeleteFile.mjs'
import fsCreateFolder from 'wsemi/src/fsCreateFolder.mjs'
import fsTaskCp from 'wsemi/src/fsTaskCp.mjs'
import fsTreeFolder from 'wsemi/src/fsTreeFolder.mjs'
import WSyslog from 'w-syslog/src/WSyslog.mjs'
import ot from 'dayjs'


/**
 * 基於檔案之數據變更驅動任務建構器
 *
 * @param {Object} [opt={}] 輸入設定物件,預設{}
 * @param {String} [opt.keyId='keyId'] 輸入各筆數據之主鍵字串,預設'keyId'
 * @param {String} [opt.fdTagRemove='./_tagRemove'] 輸入暫存標記為刪除數據資料夾字串,預設'./_tagRemove'
 * @param {String} [opt.fdTaskCpActualSrc='./_taskCpActualSrc'] 輸入儲存來源端之完整任務狀態資料夾字串,預設'./_taskCpActualSrc'
 * @param {String} [opt.fdTaskCpSrc='./_taskCpSrc'] 輸入儲存來源端之觸發任務狀態資料夾字串,預設'./_taskCpSrc'
 * @param {String} [opt.fdLog='./_logs'] 輸入儲存log資料夾字串,預設'./_logs'
 * @param {Function} [opt.funGetNew=null] 輸入取得最新數據之hash數據處理函數,回傳資料陣列,為必須,預設null
 * @param {Function} [opt.funGetCurrent=null] 輸入取得既有數據之hash數據處理函數,回傳資料陣列,為必須,預設null
 * @param {Function} [opt.funAdd=null] 輸入當有新資料時,需要連動處理之函數,預設null
 * @param {Function} [opt.funModify=null] 輸入當有資料需更新時,需要連動處理之函數,預設null
 * @param {Function} [opt.funRemove=null] 輸入當有資料需刪除時,需要連動處理之函數,預設null
 * @param {Function} [opt.funAfterStart=null] 輸入偵測程序剛開始啟動時,需要處理之函數,預設null
 * @param {Function} [opt.funBeforeEnd=null] 輸入偵測程序要結束前,需要處理之函數,預設null
 * @param {Number} [opt.timeToleranceRemove=0] 輸入刪除任務之防抖時長,單位ms,預設0,代表不使用
 * @param {String} [opt.eventNameProcCallfunGetNew='proc-callfun-getNew'] 輸入觸發事件與紀錄log事件名稱字串,預設'proc-callfun-getNew'
 * @param {String} [opt.eventNameProcCallfunGetCurrent='proc-callfun-getCurrent'] 輸入觸發事件與紀錄log事件名稱字串,預設'proc-callfun-getCurrent'
 * @param {String} [opt.eventNameProcAddCallfunAdd='proc-add-callfun-add'] 輸入觸發事件與紀錄log事件名稱字串,預設'proc-add-callfun-add'
 * @param {String} [opt.eventNameProcDiffCallfunModify='proc-diff-callfun-modify'] 輸入觸發事件與紀錄log事件名稱字串,預設'proc-diff-callfun-modify'
 * @param {String} [opt.eventNameProcRemoveCallfunRemove='proc-remove-callfun-remove'] 輸入觸發事件與紀錄log事件名稱字串,預設'proc-remove-callfun-remove'
 * @param {String} [opt.eventNameProcCallfunAfterStart='proc-callfun-afterStart'] 輸入觸發事件與紀錄log事件名稱字串,預設'proc-callfun-afterStart'
 * @param {String} [opt.eventNameProcCallfunBeforeEnd='proc-callfun-beforeEnd'] 輸入觸發事件與紀錄log事件名稱字串,預設'proc-callfun-beforeEnd'
 * @param {String} [opt.eventNameProcCoreDetect='proc-coreDetect'] 輸入觸發事件與紀錄log事件名稱字串,預設'proc-coreDetect'
 * @param {String} [opt.eventNameProcRetakeRemove='proc-retake-remove'] 輸入觸發事件與紀錄log事件名稱字串,預設'proc-retake-remove'
 * @param {String} [opt.eventNameProcCoreRetake='proc-coreRetake'] 輸入觸發事件與紀錄log事件名稱字串,預設'proc-coreRetake'
 * @returns {Object} 回傳事件物件,可呼叫函數on監聽change事件,可呼叫函數srlog額外進行事件紀錄
 * @example
 *
 * import w from 'wsemi'
 * import WDataScheduler from './src/WDataScheduler.mjs'
 *
 * //fdTagRemove
 * let fdTagRemove = './_tagRemove'
 * w.fsCleanFolder(fdTagRemove)
 *
 * //fdTaskCpActualSrc
 * let fdTaskCpActualSrc = './_taskCpActualSrc'
 * w.fsCleanFolder(fdTaskCpActualSrc)
 *
 * //fdTaskCpSrc
 * let fdTaskCpSrc = './_taskCpSrc'
 * w.fsCleanFolder(fdTaskCpSrc)
 *
 * //funGetNew
 * let funGetNew = async() => {
 *     let items = [
 *         {
 *             'id': '114115',
 *             'hash': 'abc',
 *         },
 *     ]
 *     return items
 * }
 *
 * //funGetCurrent
 * let funGetCurrent = async() => {
 *     let items = [
 *         {
 *             'id': '114115',
 *             'hash': 'abc',
 *         },
 *         {
 *             'id': '114116',
 *             'hash': 'def',
 *         },
 *     ]
 *     return items
 * }
 *
 * //funRemove
 * let funRemove = async(v) => {
 *     //do somethings
 * }
 *
 * //funAdd
 * let funAdd = async(v) => {
 *     //do somethings
 * }
 *
 * //funModify
 * let funModify = async(v) => {
 *     //do somethings
 * }
 *
 * let opt = {
 *     fdTagRemove,
 *     fdTaskCpActualSrc,
 *     fdTaskCpSrc,
 *     funGetNew,
 *     funGetCurrent,
 *     funRemove,
 *     funAdd,
 *     funModify,
 * }
 * let ev = await WDataScheduler(opt)
 *     .catch((err) => {
 *         console.log(err)
 *     })
 * ev.on('change', (msg) => {
 *     delete msg.type
 *     delete msg.timeRunStart
 *     delete msg.timeRunEnd
 *     delete msg.timeRunSpent
 *     console.log('change', msg)
 * })
 * // change { event: 'start', msg: 'running...' }
 * // change { event: 'proc-callfun-getNew', msg: 'start...' }
 * // change { event: 'proc-callfun-getNew', num: 2, msg: 'done' }
 * // change { event: 'proc-callfun-getCurrent', msg: 'start...' }
 * // change { event: 'proc-callfun-getCurrent', num: 1, msg: 'done' }
 * // change { event: 'proc-compare', msg: 'start...' }
 * // change {
 * //   event: 'proc-compare',
 * //   numRemove: 0,
 * //   numAdd: 1,
 * //   numModify: 0,
 * //   numSame: 1,
 * //   msg: 'done'
 * // }
 * // change { event: 'proc-add-callfun-add', id: '114116', msg: 'start...' }
 * // change { event: 'proc-add-callfun-add', id: '114116', msg: 'done' }
 * // change { event: 'end', msg: 'done' }
 *
 */
let WDataScheduler = async(opt = {}) => {

    //keyId
    let keyId = get(opt, 'keyId')
    if (!isestr(keyId)) {
        keyId = `id`
    }

    //fdTagRemove, 暫存標記為刪除數據資料夾
    let fdTagRemove = get(opt, 'fdTagRemove')
    if (!isestr(fdTagRemove)) {
        fdTagRemove = './_tagRemove'
    }

    //fdTaskCpActualSrc, 儲存完整任務狀態資料夾
    let fdTaskCpActualSrc = get(opt, 'fdTaskCpActualSrc')
    if (!isestr(fdTaskCpActualSrc)) {
        fdTaskCpActualSrc = './_taskCpActualSrc'
    }
    if (!fsIsFolder(fdTaskCpActualSrc)) {
        fsCreateFolder(fdTaskCpActualSrc)
    }

    //fdTaskCpSrc, 儲存觸發任務狀態資料夾
    let fdTaskCpSrc = get(opt, 'fdTaskCpSrc')
    if (!isestr(fdTaskCpSrc)) {
        fdTaskCpSrc = './_taskCpSrc'
    }
    if (!fsIsFolder(fdTaskCpSrc)) {
        fsCreateFolder(fdTaskCpSrc)
    }

    //fdLog
    let fdLog = get(opt, 'fdLog')
    if (!isestr(fdLog)) {
        fdLog = './_logs'
    }
    if (!fsIsFolder(fdLog)) {
        fsCreateFolder(fdLog)
    }

    //funGetNew
    let funGetNew = get(opt, 'funGetNew')
    if (!isfun(funGetNew)) {
        throw new Error(`invalid funGetNew`)
    }

    //funGetCurrent
    let funGetCurrent = get(opt, 'funGetCurrent')
    if (!isfun(funGetCurrent)) {
        throw new Error(`invalid funGetCurrent`)
    }

    //funAdd
    let funAdd = get(opt, 'funAdd')

    //funModify
    let funModify = get(opt, 'funModify')

    //funRemove
    let funRemove = get(opt, 'funRemove')

    //funAfterStart
    let funAfterStart = get(opt, 'funAfterStart')

    //funBeforeEnd
    let funBeforeEnd = get(opt, 'funBeforeEnd')

    // //timeToleranceAdd
    // let timeToleranceAdd = get(opt, 'timeToleranceAdd')
    // if (!isp0int(timeToleranceAdd)) {
    //     timeToleranceAdd = 60 * 60 * 1000
    // }
    // timeToleranceAdd = cdbl(timeToleranceAdd)

    // //timeToleranceModify
    // let timeToleranceModify = get(opt, 'timeToleranceModify')
    // if (!isp0int(timeToleranceModify)) {
    //     timeToleranceModify = 60 * 60 * 1000
    // }
    // timeToleranceModify = cdbl(timeToleranceModify)

    //timeToleranceRemove
    let timeToleranceRemove = get(opt, 'timeToleranceRemove')
    if (!isp0int(timeToleranceRemove)) {
        timeToleranceRemove = 0
    }
    timeToleranceRemove = cdbl(timeToleranceRemove)

    //延遲偵測是否創建資料夾
    // if (timeToleranceModify >= 0 && !fsIsFolder(fdTagModify)) {
    //     fsCreateFolder(fdTagModify)
    // }
    if (timeToleranceRemove > 0 && !fsIsFolder(fdTagRemove)) {
        fsCreateFolder(fdTagRemove)
    }

    //eventNameProcCallfunGetNew
    let eventNameProcCallfunGetNew = get(opt, 'eventNameProcCallfunGetNew')
    if (!isestr(eventNameProcCallfunGetNew)) {
        eventNameProcCallfunGetNew = 'proc-callfun-getNew'
    }

    //eventNameProcCallfunGetCurrent
    let eventNameProcCallfunGetCurrent = get(opt, 'eventNameProcCallfunGetCurrent')
    if (!isestr(eventNameProcCallfunGetCurrent)) {
        eventNameProcCallfunGetCurrent = 'proc-callfun-getCurrent'
    }

    //eventNameProcAddCallfunAdd
    let eventNameProcAddCallfunAdd = get(opt, 'eventNameProcAddCallfunAdd')
    if (!isestr(eventNameProcAddCallfunAdd)) {
        eventNameProcAddCallfunAdd = 'proc-add-callfun-add'
    }

    //eventNameProcDiffCallfunModify
    let eventNameProcDiffCallfunModify = get(opt, 'eventNameProcDiffCallfunModify')
    if (!isestr(eventNameProcDiffCallfunModify)) {
        eventNameProcDiffCallfunModify = 'proc-diff-callfun-modify'
    }

    //eventNameProcRemoveCallfunRemove
    let eventNameProcRemoveCallfunRemove = get(opt, 'eventNameProcRemoveCallfunRemove')
    if (!isestr(eventNameProcRemoveCallfunRemove)) {
        eventNameProcRemoveCallfunRemove = 'proc-remove-callfun-remove'
    }

    //eventNameProcCallfunAfterStart
    let eventNameProcCallfunAfterStart = get(opt, 'eventNameProcCallfunAfterStart')
    if (!isestr(eventNameProcCallfunAfterStart)) {
        eventNameProcCallfunAfterStart = 'proc-callfun-afterStart'
    }

    //eventNameProcCallfunBeforeEnd
    let eventNameProcCallfunBeforeEnd = get(opt, 'eventNameProcCallfunBeforeEnd')
    if (!isestr(eventNameProcCallfunBeforeEnd)) {
        eventNameProcCallfunBeforeEnd = 'proc-callfun-beforeEnd'
    }

    //eventNameProcCoreDetect
    let eventNameProcCoreDetect = get(opt, 'eventNameProcCoreDetect')
    if (!isestr(eventNameProcCoreDetect)) {
        eventNameProcCoreDetect = 'proc-coreDetect'
    }

    //eventNameProcRetakeRemove
    let eventNameProcRetakeRemove = get(opt, 'eventNameProcRetakeRemove')
    if (!isestr(eventNameProcRetakeRemove)) {
        eventNameProcRetakeRemove = 'proc-retake-remove'
    }

    //eventNameProcCoreRetake
    let eventNameProcCoreRetake = get(opt, 'eventNameProcCoreRetake')
    if (!isestr(eventNameProcCoreRetake)) {
        eventNameProcCoreRetake = 'proc-coreRetake'
    }

    //ev
    let ev = evem()

    //_srlog
    let _srlog = WSyslog({
        fdLog,
        interval: 'hr',
    })

    //srlog
    let srlog = {
        info: (msg) => {
            _srlog.info(msg)
            ev.emit('change', { type: 'info', ...msg })
        },
        warn: (msg) => {
            _srlog.warn(msg)
            ev.emit('change', { type: 'warn', ...msg })
        },
        error: (msg) => {
            _srlog.error(msg)
            ev.emit('change', { type: 'error', ...msg })
        },
    }

    //addTagCore
    let addTagCore = async (mode, fn, hash = '') => {

        //fdTag
        let fdTag = ''
        if (mode === 'add') {
            // fdTag = fdTagAdd
            throw new Error(`does not support add`)
        }
        else if (mode === 'modify') {
            // fdTag = fdTagModify
            throw new Error(`does not support modify`)
        }
        else if (mode === 'remove') {
            fdTag = fdTagRemove
        }
        else {
            throw new Error(`invalid mode[${mode}]`)
        }

        //fp, 增加/變更/刪除清單資料夾內之檔案路徑
        let fp = `${fdTag}/${fn}`

        // //check, 防抖要能刷新起算時間, 故要持續writeFileSync
        // if (fsIsFile(fp)) {
        //     return
        // }

        //o
        let o = {
            time: now2str(),
            hash,
        }

        //writeFileSync
        fs.writeFileSync(fp, JSON.stringify(o), 'utf8')

    }

    // //addTagAdd
    // let addTagAdd = async (fn) => {
    //     let r = await addTagCore('add', fn)
    //     return r
    // }

    // //addTagModify
    // let addTagModify = async (fn, hash) => {
    //     let r = await addTagCore('modify', fn, hash)
    //     return r
    // }

    //addTagRemove
    let addTagRemove = async (fn) => {
        let r = await addTagCore('remove', fn)
        return r
    }

    //checkTagCore
    let checkTagCore = async (mode, fn, opt = {}) => {

        //checkTime
        let checkTime = get(opt, 'checkTime')
        if (!isbol(checkTime)) {
            checkTime = true
        }

        //timeNow
        let timeNow = get(opt, 'timeNow')

        //fdTag, timeTolerance
        let fdTag = ''
        let timeTolerance = 0
        if (mode === 'add') {
            // fdTag = fdTagAdd
            // timeTolerance = timeToleranceAdd
            throw new Error(`does not support add`)
        }
        else if (mode === 'modify') {
            // fdTag = fdTagModify
            // timeTolerance = timeToleranceModify
            throw new Error(`does not support modify`)
        }
        else if (mode === 'remove') {
            fdTag = fdTagRemove
            timeTolerance = timeToleranceRemove
        }
        else {
            throw new Error(`invalid mode[${mode}]`)
        }

        //fp, 增加/變更/刪除清單資料夾內之檔案路徑
        let fp = `${fdTag}/${fn}`

        //check, 預期要有檔案, 若無檔案無法讀取內容
        if (!fsIsFile(fp)) {
            return {
                b: false,
                msg: `fp[${fp}] is not a file`,
            }
        }

        //checkTime
        if (!checkTime) {
            return {
                b: true,
                msg: `fp[${fp}] exists`,
            }
        }

        //check
        if (!iseobj(timeNow)) {
            throw new Error(`invalid timeNow`) //預期調用checkTagCore時若checkTime=true就一定要給timeNow, 沒給就報錯確保程式一定要通過單元測試
        }

        //j
        let j = fs.readFileSync(fp, 'utf8')

        //o
        let o = JSON.parse(j)

        //time
        let time = get(o, 'time')

        //ts
        let ts = ot(time)

        //te
        let te = timeNow

        //num
        let num = te.diff(ts, 'millisecond')

        //b
        let b = num >= timeTolerance
        // console.log(mode, fn, `num[${num}] >= timeTolerance[${timeTolerance}]`, b, 'checkTime', checkTime)
        if (b) {
            return {
                b,
                msg: `time difference[${num}] >= time tolerance[${timeTolerance}]`,
            }
        }
        else {
            return {
                b,
                msg: `allowable time difference`,
            }
        }
    }

    // //checkTagAdd
    // let checkTagAdd = async (fn, opt = {}) => {
    //     let r = await checkTagCore('add', fn, opt)
    //     return r
    // }

    // //checkTagModify
    // let checkTagModify = async (fn, opt = {}) => {
    //     let r = await checkTagCore('modify', fn, opt)
    //     return r
    // }

    //checkTagRemove
    let checkTagRemove = async (fn, opt = {}) => {
        let r = await checkTagCore('remove', fn, opt)
        return r
    }

    //releaseTagCore
    let releaseTagCore = async (mode, fn) => {

        //fdTag
        let fdTag = ''
        if (mode === 'add') {
            // fdTag = fdTagAdd
            throw new Error(`does not support add`)
        }
        else if (mode === 'modify') {
            // fdTag = fdTagModify
            throw new Error(`does not support modify`)
        }
        else if (mode === 'remove') {
            fdTag = fdTagRemove
        }
        else {
            throw new Error(`invalid mode[${mode}]`)
        }

        //fp, 增加/變更/刪除清單資料夾內之檔案路徑
        let fp = `${fdTag}/${fn}`

        //check, 若有檔案才刪除
        if (!fsIsFile(fp)) {
            return
        }

        //j
        let j = fs.readFileSync(fp, 'utf8')

        //o
        let o = JSON.parse(j)

        //fsDeleteFile
        let r = fsDeleteFile(fp)

        //check
        if (r.error) {
            throw new Error(r.error)
        }

        return o
    }

    // //releaseTagAdd
    // let releaseTagAdd = async (fn) => {
    //     let r = await releaseTagCore('add', fn)
    //     return r
    // }

    // //releaseTagModify
    // let releaseTagModify = async (fn) => {
    //     let r = await releaseTagCore('modify', fn)
    //     return r
    // }

    //releaseTagRemove
    let releaseTagRemove = async (fn) => {
        let r = await releaseTagCore('remove', fn)
        return r
    }

    //coreDetect
    let otkActualSrc = null
    let otkSrc = null
    let coreDetect = async() => {

        //因core執行初期ev尚未回傳給外部監聽, 故須delay延遲脫勾
        await delay(1)

        srlog.info({ event: 'start', msg: 'running...' })

        //ts
        let ts = ot()

        //calcTimeRun
        let calcTimeRun = () => {

            //te
            let te = ot()

            //s
            let s = te.diff(ts, 'second')

            //r
            let r = {
                timeRunStart: ts.format('YYYY-MM-DDTHH:mm:ssZ'),
                timeRunEnd: te.format('YYYY-MM-DDTHH:mm:ssZ'),
                timeRunSpent: `${s}s`,
            }

            return r
        }

        //fsTaskCp
        let otkActual = fsTaskCp(fdTaskCpActualSrc, fdTaskCpActualSrc) //因不使用偵測端, 故設定fdTar=fdSrc

        //otkActualSrc
        otkActualSrc = otkActual.buildSrc()
        // otkActualSrc.on('set', (msg) => {
        //     // console.log(`src send task...`, msg)
        // })
        // otkActualSrc.on('remove', (msg) => {
        //     // console.log(`src send task...`, msg)
        // })

        //fsTaskCp
        let otk = fsTaskCp(fdTaskCpSrc, fdTaskCpSrc) //因不使用偵測端, 故設定fdTar=fdSrc

        //otkSrc
        otkSrc = otk.buildSrc()
        // otkSrc.on('set', (msg) => {
        //     // console.log(`src send task...`, msg)
        // })
        // otkSrc.on('remove', (msg) => {
        //     // console.log(`src send task...`, msg)
        // })

        //msgExit
        let msgExit = ''

        //stageAfterStart
        let stageAfterStart = async() => {

            //funAfterStart
            if (isfun(funAfterStart)) {
                try {
                    srlog.info({ event: eventNameProcCallfunAfterStart, msg: 'start...' })

                    //funAfterStart
                    let q = funAfterStart()
                    if (ispm(q)) {
                        q = await q
                    }

                    srlog.info({ event: eventNameProcCallfunAfterStart, msg: 'done' })
                }
                catch (err) {
                    console.log(err)
                    srlog.error({ event: eventNameProcCallfunAfterStart, msg: getErrorMessage(err) })
                    msgExit = 'error at proc-callfun-afterStart'
                }
            }

            //check
            if (isestr(msgExit)) {
                console.log(`error occurred, task canceled`) //程序發生錯誤, 不進行後續動作
                srlog.info({ event: 'cancel-stage-afterStart', msg: msgExit })
                return
            }

            return null
        }
        if (!isestr(msgExit)) { //開始前階段一定無錯誤(無msgExit), 一定會執行
            await stageAfterStart()
                .catch(() => {})
        }

        //stageMain
        let stageMain = async() => {

            //itemsAtt
            let itemsAtt = []
            if (true) {
                try {
                    srlog.info({ event: eventNameProcCallfunGetNew, msg: 'start...' })

                    //funGetNew
                    let q = funGetNew()
                    if (ispm(q)) {
                        q = await q
                    }

                    //save
                    itemsAtt = q

                    srlog.info({ event: eventNameProcCallfunGetNew, num: size(q), msg: 'done' })
                }
                catch (err) {
                    console.log(err)
                    srlog.error({ event: eventNameProcCallfunGetNew, msg: getErrorMessage(err) })
                    msgExit = 'error at proc-callfun-getNew'
                }
            }

            //check
            if (isestr(msgExit)) {
                console.log(`error occurred, task canceled`) //程序發生錯誤, 不進行後續動作
                srlog.info({ event: 'cancel-stage-main', msg: msgExit })
                return
            }

            //itemsCur
            let itemsCur = []
            if (true) {
                try {
                    srlog.info({ event: eventNameProcCallfunGetCurrent, msg: 'start...' })

                    //funGetCurrent
                    let q = funGetCurrent()
                    if (ispm(q)) {
                        q = await q
                    }

                    //save
                    itemsCur = q

                    srlog.info({ event: eventNameProcCallfunGetCurrent, num: size(q), msg: 'done' })
                }
                catch (err) {
                    console.log(err)
                    srlog.error({ event: eventNameProcCallfunGetCurrent, msg: getErrorMessage(err) })
                    msgExit = 'error at proc-callfun-getCurrent'
                }
            }

            //check
            if (isestr(msgExit)) {
                console.log(`error occurred, task canceled`) //程序發生錯誤, 不進行後續動作
                srlog.info({ event: 'cancel-stage-main', msg: msgExit })
                return
            }

            //check, itemsAtt第1次執行預期一定有數據, 第n次執行無數據視為非預期被清空須報錯
            if (size(itemsAtt) === 0) {
                console.log(`invalid data, task canceled`) //程序發生錯誤, 不進行後續動作
                srlog.info({ event: 'cancel-stage-main', msg: 'no data' })
                return
            }

            // //check, itemsCur第1次執行會無數據, 不能偵測報錯
            // if (size(itemsCur) === 0) {
            //     console.log(`invalid data, task canceled`) //程序發生錯誤, 不進行後續動作
            //     srlog.info({ event: 'cancel-stage-main',  msg: 'no data' })
            //     return
            // }

            // //check
            // if (size(itemsCur) - size(itemsAtt) > 10) {
            //     console.log(`difference between the data before and after is too large, size(itemsCur)[${size(itemsCur)}]-size(itemsAtt)[${size(itemsAtt)}]>10`) //當前取得數據與已儲存數據之數量差距超過10, 不進行後續動作, 當前取得數據筆數[${size(itemsAtt)}], 已儲存數據筆數[${size(itemsCur)}]
            //     srlog.info({ event: 'cancel-stage-main',  msg: 'difference between the data before and after is too large' })
            //     return
            // }

            //ltdtDiffByKey
            let r = null
            if (true) {
                try {
                    srlog.info({ event: 'proc-compare', msg: 'start...' })

                    //ltdtSrc, ltdtTar
                    let ltdtSrc = itemsAtt
                    // console.log('ltdtSrc', ltdtSrc)
                    let ltdtTar = itemsCur
                    // console.log('ltdtTar', ltdtTar)

                    //ltdtDiffByKey
                    r = ltdtDiffByKey(ltdtTar, ltdtSrc, keyId, { withInfor: false })
                    // console.log('ltdtDiffByKey', r)
                    //   del: [ {...} ],
                    //   add: [ {...} ],
                    //   same: [ {...} ],
                    //   diff: [ {...} ],
                    let numDel = size(r.del)
                    let numAdd = size(r.add)
                    let numDiff = size(r.diff)
                    let numSame = size(r.same)

                    srlog.info({ event: 'proc-compare', numRemove: numDel, numAdd, numModify: numDiff, numSame, msg: 'done' })
                }
                catch (err) {
                    console.log(err)
                    srlog.error({ event: 'proc-compare', msg: getErrorMessage(err) })
                    msgExit = 'error at proc-compare'
                }
            }

            //check
            if (isestr(msgExit)) {
                console.log(`error occurred, task canceled`) //程序發生錯誤, 不進行後續動作
                srlog.info({ event: 'cancel-stage-main', msg: msgExit })
                return
            }

            //check
            if (!iseobj(r)) {
                console.log(`can not calculate the difference, task canceled`) //無法計算當前取得與已儲存數據之差異, 不進行後續動作
                srlog.info({ event: 'cancel-stage-main', msg: 'can not calculate the difference between the data before and after' })
                return
            }

            //check
            if (size(r.del) === 0 && size(r.add) === 0 && size(r.diff) === 0) {
                //當前取得與已儲存數據無差異, 不進行後續動作
                srlog.info({ event: 'cancel-stage-main', msg: 'no difference' })
                return
            }

            //trigger del
            await pmSeries(r.del, async(v) => {

                //check
                if (isestr(msgExit)) {
                    return
                }

                try {
                    srlog.info({ event: eventNameProcRemoveCallfunRemove, [keyId]: v[keyId], msg: 'start...' })

                    //funRemove
                    if (isfun(funRemove)) {
                        let q = funRemove(v)
                        if (ispm(q)) {
                            q = await q
                        }
                    }

                    // srlog.info({ event: eventNameProcRemoveCallfunRemove, [keyId]: v[keyId], msg: 'done' })
                }
                catch (err) {
                    console.log(err)
                    srlog.error({ event: eventNameProcRemoveCallfunRemove, [keyId]: v[keyId], msg: getErrorMessage(err) })
                    msgExit = 'error at proc-remove-callfun-remove'
                }

                //check
                if (isestr(msgExit)) {
                    return
                }

                try {

                    //otkActualSrc.remove
                    otkActualSrc.remove(v[keyId])

                    //check
                    if (timeToleranceRemove <= 0) {
                        //若未給予刪除誤差

                        //otkSrc.remove
                        otkSrc.remove(v[keyId])

                        srlog.info({ event: eventNameProcRemoveCallfunRemove, [keyId]: v[keyId], msg: 'done' })
                    }
                    else {
                        //若有給予刪除誤差

                        //checkTagRemove
                        let r = await checkTagRemove(v[keyId], { checkTime: false })

                        //check, 是否位於刪除清單
                        if (r.b) {
                            //若位於刪除清單, 不處理
                        }
                        else {
                            //若非位於刪除清單, 呼叫addTagRemove新增[刪除tag]

                            //addTagRemove
                            await addTagRemove(v[keyId])

                            srlog.info({ event: eventNameProcRemoveCallfunRemove, [keyId]: v[keyId], msg: 'add-tag' })
                        }

                    }

                }
                catch (err) {
                    console.log(err)
                    srlog.error({ event: eventNameProcRemoveCallfunRemove, [keyId]: v[keyId], msg: getErrorMessage(err) })
                    msgExit = 'error at proc-remove-callfun-remove'
                }

            })

            //check
            if (isestr(msgExit)) {
                console.log(`error occurred, task canceled`) //程序發生錯誤, 不進行後續動作
                srlog.info({ event: 'cancel-stage-main', msg: msgExit })
                return
            }

            //trigger add
            await pmSeries(r.add, async(v) => {

                //check
                if (isestr(msgExit)) {
                    return
                }

                try {
                    srlog.info({ event: eventNameProcAddCallfunAdd, [keyId]: v[keyId], msg: 'start...' })

                    //funAdd
                    if (isfun(funAdd)) {
                        let q = funAdd(v)
                        if (ispm(q)) {
                            q = await q
                        }
                    }

                    // srlog.info({ event: eventNameProcAddCallfunAdd, [keyId]: v[keyId], msg: 'done' })
                }
                catch (err) {
                    console.log(err)
                    srlog.error({ event: eventNameProcAddCallfunAdd, [keyId]: v[keyId], msg: getErrorMessage(err) })
                    msgExit = 'error at proc-add-callfun-add'
                }

                //check
                if (isestr(msgExit)) {
                    return
                }

                try {

                    //hash
                    let hash = str2b64(JSON.stringify(v))

                    //otkActualSrc.set
                    otkActualSrc.set(v[keyId], hash)

                    //otkSrc.set, 因可能有非預期中斷與重啟問題, 故一律呼叫otkSrc.set, 確保跟實際otkActualSrc.set能有一致任務清單
                    otkSrc.set(v[keyId], hash)

                    //check
                    if (timeToleranceRemove > 0) {
                        //若有給予刪除誤差

                        //checkTagRemove
                        let r = await checkTagRemove(v[keyId], { checkTime: false })

                        //check, 是否位於刪除清單
                        if (r.b) {
                            //若位於刪除清單, 呼叫releaseTagRemove清除[刪除tag]

                            //releaseTagRemove
                            await releaseTagRemove(v[keyId])

                            srlog.info({ event: eventNameProcAddCallfunAdd, [keyId]: v[keyId], msg: 'release-tag' })
                        }
                        else {
                            //若非位於刪除清單, 不處理
                        }

                    }

                    srlog.info({ event: eventNameProcAddCallfunAdd, [keyId]: v[keyId], msg: 'done' })
                }
                catch (err) {
                    console.log(err)
                    srlog.error({ event: eventNameProcAddCallfunAdd, [keyId]: v[keyId], msg: getErrorMessage(err) })
                    msgExit = 'error at proc-add-callfun-add'
                }

            })

            //check
            if (isestr(msgExit)) {
                console.log(`error occurred, task canceled`) //程序發生錯誤, 不進行後續動作
                srlog.info({ event: 'cancel-stage-main', msg: msgExit })
                return
            }

            //trigger diff
            await pmSeries(r.diff, async(v) => {

                //check
                if (isestr(msgExit)) {
                    return
                }

                try {
                    srlog.info({ event: eventNameProcDiffCallfunModify, [keyId]: v[keyId], msg: 'start...' })

                    //funModify
                    if (isfun(funModify)) {
                        let q = funModify(v)
                        if (ispm(q)) {
                            q = await q
                        }
                    }

                    // srlog.info({ event: eventNameProcDiffCallfunModify, [keyId]: v[keyId], msg: 'done' })
                }
                catch (err) {
                    console.log(err)
                    srlog.error({ event: eventNameProcDiffCallfunModify, [keyId]: v[keyId], msg: getErrorMessage(err) })
                    msgExit = 'error at proc-diff-callfun-modify'
                }

                //check
                if (isestr(msgExit)) {
                    return
                }

                try {

                    //hash
                    let hash = str2b64(JSON.stringify(v))

                    //otkActualSrc.set
                    otkActualSrc.set(v[keyId], hash)

                    //otkSrc.set, 因可能有非預期中斷與重啟問題, 故一律呼叫otkSrc.set, 確保跟實際otkActualSrc.set能有一致任務清單
                    otkSrc.set(v[keyId], hash)

                    srlog.info({ event: eventNameProcDiffCallfunModify, [keyId]: v[keyId], msg: 'done' })
                }
                catch (err) {
                    console.log(err)
                    srlog.error({ event: eventNameProcDiffCallfunModify, [keyId]: v[keyId], msg: getErrorMessage(err) })
                    msgExit = 'error at proc-diff-callfun-modify'
                }

            })

            //check
            if (isestr(msgExit)) {
                console.log(`error occurred, task canceled`) //程序發生錯誤, 不進行後續動作
                srlog.info({ event: 'cancel-stage-main', msg: msgExit })
                return
            }

            return null
        }
        if (!isestr(msgExit)) { //若開始前階段有錯誤發生(有msgExit), 則不執行
            await stageMain()
                .catch(() => {})
        }

        //stageBeforeEnd
        let stageBeforeEnd = async() => {

            //funBeforeEnd
            if (isfun(funBeforeEnd)) {
                try {
                    srlog.info({ event: eventNameProcCallfunBeforeEnd, msg: 'start...' })

                    //funBeforeEnd
                    let q = funBeforeEnd()
                    if (ispm(q)) {
                        q = await q
                    }

                    srlog.info({ event: eventNameProcCallfunBeforeEnd, msg: 'done' })
                }
                catch (err) {
                    console.log(err)
                    srlog.error({ event: eventNameProcCallfunBeforeEnd, msg: getErrorMessage(err) })
                    msgExit = 'error at proc-callfun-beforeEnd'
                }
            }

            //check
            if (isestr(msgExit)) {
                console.log(`error occurred, task canceled`) //程序發生錯誤, 不進行後續動作
                srlog.info({ event: 'cancel-stage-beforeEnd', msg: msgExit })
                return
            }

            return null
        }
        if (true) { //不論有無錯誤, 一定會執行
            await stageBeforeEnd()
                .catch(() => {})
        }

        srlog.info({ event: 'end', ...calcTimeRun(), msg: 'done' })
    }

    let pmDetect = coreDetect()
        .catch((err) => {
            console.log(err)
            srlog.error({ event: eventNameProcCoreDetect, msg: getErrorMessage(err) })
        })

    let coreRetake = async() => {

        //wait
        await waitFun(() => {
            return otkSrc !== null
        })

        //timeNow
        let timeNow = ot()

        //vfpsRemove
        let vfpsRemove = fsTreeFolder(fdTagRemove, 1)

        // //vfpsModify
        // let vfpsModify = fsTreeFolder(fdTagModify, 1)

        //check
        // if (size(vfpsRemove) === 0 && size(vfpsModify) === 0) {
        //     return
        // }
        if (size(vfpsRemove) === 0) {
            return
        }

        await pmSeries(vfpsRemove, async(v) => {

            //checkTagRemove
            let r = await checkTagRemove(v.name, { timeNow, checkTime: true }) //v.name對應v[keyId]

            //b
            let b1 = r.b //刪除任務之時間已超過容許值門檻
            let b2 = !isestr(otkActualSrc.get(v.name)) //v.name對應v[keyId], 執行偵測完整任務內已不存在v[keyId]
            let b3 = isestr(otkSrc.get(v.name)) //v.name對應v[keyId], 執行偵測觸發任務內仍存在v[keyId]
            let b = b1 && b2 && b3 //因可能有非預期中斷與重啟問題, 故須同時檢測
            // console.log('b1', b1, 'b2', b2, 'b3', b3, 'b', b)

            //check
            if (b) {
                //已滿足刪除任務所有條件, 呼叫releaseTagRemove清除[刪除tag], 與呼叫otkSrc.remove

                await releaseTagRemove(v.name) //v.name對應v[keyId]

                srlog.info({ event: eventNameProcRetakeRemove, [keyId]: v.name, from: 'debounce', msg: 'release-tag' })

                otkSrc.remove(v.name) //v.name對應v[keyId]

                srlog.info({ event: eventNameProcRetakeRemove, [keyId]: v.name, from: 'debounce', msg: 'done' })

            }
            else {
                //時間差未超過容許值門檻, 不處理
            }

        })

        // await pmSeries(vfpsModify, async(v) => {

        //     //checkTagModify
        //     let r = await checkTagModify(v.name, { timeNow, checkTime: true }) //v.name對應v[keyId]

        //     //check, 變更任務之時間是否已超過容許值門檻
        //     if (r.b) {

        //         //時間差已超過容許值門檻, 則須呼叫releaseTagModify偵測與清除變更tag, 再呼叫otkSrc.set
        //         let rModify = await releaseTagModify(v.name)
        //         otkSrc.set(v.name, rModify.hash) //v.name對應v[keyId]

        //         srlog.info({ event: 'proc-diff-modify', [keyId]: v.name, from: 'debounce', msg: 'done' })

        //     }
        //     else {
        //         //時間差未超過容許值門檻, 不處理
        //     }

        // })

    }

    let pmRetake = coreRetake()
        .catch((err) => {
            console.log(err)
            srlog.error({ event: eventNameProcCoreRetake, msg: getErrorMessage(err) })
        })

    //save
    ev.srlog = srlog

    //emit end
    Promise.all([pmDetect, pmRetake])
        .finally(() => {
            ev.emit('end')
        })

    return ev
}


export default WDataScheduler