WDataSourceFromCsv.mjs

import path from 'path'
import fs from 'fs'
import get from 'lodash-es/get.js'
import each from 'lodash-es/each.js'
import map from 'lodash-es/map.js'
import keys from 'lodash-es/keys.js'
import size from 'lodash-es/size.js'
import join from 'lodash-es/join.js'
import evem from 'wsemi/src/evem.mjs'
import isestr from 'wsemi/src/isestr.mjs'
import isfun from 'wsemi/src/isfun.mjs'
import ispm from 'wsemi/src/ispm.mjs'
import fsIsFile from 'wsemi/src/fsIsFile.mjs'
import pmQueue from 'wsemi/src/pmQueue.mjs'
import ltdtDiffByKey from 'wsemi/src/ltdtDiffByKey.mjs'
import fsWatchFolder from 'wsemi/src/fsWatchFolder.mjs'
import wdc from 'w-data-csv/src/WDataCsv.mjs'


/**
 * 檔案CSV來源數據供給器
 * @param {string} fd - 要監控的資料夾路徑
 * @returns {EventEmitter} - 具有 select 方法的事件發射器
 */
function WDataSourceFromCsv(fd, opt = {}) {

    //heads
    let heads = get(opt, 'heads', [])

    //chead
    let chead = ''
    if (size(heads) > 0) {
        chead = join(heads, ',')
    }

    //key
    let key = get(opt, 'key')
    if (!isestr(key)) {
        key = 'time'
    }

    //converter
    let converter = get(opt, 'converter')
    if (!isfun(converter)) {
        converter = (ltdt) => {
            return ltdt
        }
    }

    //getLtdtFmt
    let getLtdtFmt = async(ltdt) => {
        let ltdtFmt = []
        if (true) {
            ltdtFmt = converter(ltdt)
            if (ispm(ltdtFmt)) {
                ltdtFmt = await ltdtFmt
            }
        }
        return ltdtFmt
    }

    //ev
    let ev = evem()

    //memory
    let memory = new Map()

    //readCsv
    let readCsv = async (fp) => {

        //c
        let c = fs.readFileSync(fp, 'utf8')
        if (isestr(chead)) {
            c = chead + '\n' + c
        }
        // console.log('c', c)

        //ltdt
        let ltdt = await wdc.parseCsv(c)
        // console.log('ltdt', ltdt)

        //偵測是否有無head欄位
        if (true) {
            let ks = keys(get(ltdt, 0))
            if (size(ks) !== size(heads)) {
                ltdt = map(ltdt, (dt) => {
                    let _dt = {}
                    each(heads, (k) => {
                        _dt[k] = get(dt, k, '')
                    })
                    return _dt
                })
            }
        }

        return ltdt
    }

    //readInsert
    let readInsert = async (fp) => {

        //check
        if (!fsIsFile(fp)) {
            return
        }

        //fn
        let fn = path.basename(fp)
        // console.log('fn', fn)

        //check
        if (memory.has(fn)) {
            throw new Error(`fn[${fn}] existed`)
        }

        //ltdtNew
        let ltdtNew = await readCsv(fp)
        // console.log('ltdtNew', ltdtNew)

        //ltdtFmt
        let ltdtFmt = await getLtdtFmt(ltdtNew)

        //set
        memory.set(fn, {
            ltdt: ltdtNew,
            ltdtFmt,
        })

        //emit
        ev.emit('change', { type: 'insert', ltdt: ltdtNew, ltdtFmt })

        // return ltdtNew
    }

    //readSave
    let readSave = async (fp) => {

        //check
        if (!fsIsFile(fp)) {
            return
        }

        //fn
        let fn = path.basename(fp)
        // console.log('fn', fn)

        //check
        if (!memory.has(fn)) {
            throw new Error(`fn[${fn}] does not exist`)
        }

        //ltdtNew
        let ltdtNew = await readCsv(fp)
        // console.log('ltdtNew', ltdtNew)

        //ltdtFmt
        let ltdtFmt = await getLtdtFmt(ltdtNew)
        // console.log('ltdtFmt', ltdtFmt)

        //ltdtOld
        let { ltdt: ltdtOld } = memory.get(fn)
        // console.log('ltdtOld', ltdtOld)

        //check
        if (size(ltdtOld) === 0) {

            //set, 要先set才能emit與return
            memory.set(fn, {
                ltdt: ltdtNew,
                ltdtFmt,
            })

            //emit
            ev.emit('change', { type: 'insert', ltdt: ltdtNew, ltdtFmt })

            return
        }

        //set
        memory.set(fn, {
            ltdt: ltdtNew,
            ltdtFmt,
        })

        //ltdtDiffByKey
        let r = ltdtDiffByKey(ltdtOld, ltdtNew, key)
        // console.log('ltdtOld', ltdtOld)
        // console.log('ltdtNew', ltdtNew)
        // console.log('r', r)

        //emit, 此處要針對不同種類數據emit, 不能用else-if
        if (size(r.add) > 0) {
            let ltdtAddFmt = await getLtdtFmt(r.add)
            ev.emit('change', { type: 'insert', ltdt: r.add, ltdtFmt: ltdtAddFmt })
        }
        if (size(r.diff) > 0) {
            let ltdtDiffFmt = await getLtdtFmt(r.diff)
            ev.emit('change', { type: 'save', ltdt: r.diff, ltdtFmt: ltdtDiffFmt })
        }
        if (size(r.del) > 0) {
            let ltdtDelFmt = await getLtdtFmt(r.del)
            ev.emit('change', { type: 'del', ltdt: r.del, ltdtFmt: ltdtDelFmt })
        }

        // return ltdtNew
    }

    //remove
    let remove = async (fp) => {

        //fn
        let fn = path.basename(fp)

        //ltdtOld
        let { ltdt: ltdtOld, ltdtFmt } = memory.get(fn)
        // console.log('ltdtOld', ltdtOld)

        //delete
        memory.delete(fn)

        //emit
        ev.emit('change', { type: 'del', ltdt: ltdtOld, ltdtFmt })

        // return ltdtOld
    }

    //pmq
    let pmq = pmQueue(1) //同時處理1個

    //watcher
    let watcher = fsWatchFolder(fd)
    watcher.on('change', async(msg) => {
        // console.log(msg.type, msg.fp)
        if (msg.type === 'add') {
            // console.log('push q readInsert')
            pmq(readInsert, msg.fp)
                // .then(function(res) {
                //     console.log('readInsert then', res)
                // })
                .catch(function(err) {
                    console.log('readInsert catch', err)
                })
        }
        else if (msg.type === 'change') {
            // console.log('push q readSave')
            pmq(readSave, msg.fp)
                // .then(function(res) {
                //     console.log('readSave then', res)
                // })
                .catch(function(err) {
                    console.log('readSave catch', err)
                })
        }
        else if (msg.type === 'unlink') {
            // console.log('push q remove')
            pmq(remove, msg.fp)
                // .then(function(res) {
                //     console.log('remove then', res)
                // })
                .catch(function(err) {
                    console.log('remove catch', err)
                })
        }
    })

    //select
    let select = async () => {
        let allData = []
        for (let [, r] of memory.entries()) {
            allData.push(...r.ltdtFmt)
        }
        // console.log('select', allData)
        return allData
    }

    //clear
    let clear = () => {
        watcher.clear()
    }

    //save
    ev.select = select
    ev.clear = clear

    return ev
}


export default WDataSourceFromCsv