import path from 'path'
import fs from 'fs'
import get from 'lodash-es/get.js'
import each from 'lodash-es/each.js'
import map from 'lodash-es/map.js'
import keys from 'lodash-es/keys.js'
import size from 'lodash-es/size.js'
import join from 'lodash-es/join.js'
import evem from 'wsemi/src/evem.mjs'
import isestr from 'wsemi/src/isestr.mjs'
import isfun from 'wsemi/src/isfun.mjs'
import ispm from 'wsemi/src/ispm.mjs'
import fsIsFile from 'wsemi/src/fsIsFile.mjs'
import pmQueue from 'wsemi/src/pmQueue.mjs'
import ltdtDiffByKey from 'wsemi/src/ltdtDiffByKey.mjs'
import fsWatchFolder from 'wsemi/src/fsWatchFolder.mjs'
import wdc from 'w-data-csv/src/WDataCsv.mjs'
/**
* 檔案CSV來源數據供給器
* @param {string} fd - 要監控的資料夾路徑
* @returns {EventEmitter} - 具有 select 方法的事件發射器
*/
function WDataSourceFromCsv(fd, opt = {}) {
//heads
let heads = get(opt, 'heads', [])
//chead
let chead = ''
if (size(heads) > 0) {
chead = join(heads, ',')
}
//key
let key = get(opt, 'key')
if (!isestr(key)) {
key = 'time'
}
//converter
let converter = get(opt, 'converter')
if (!isfun(converter)) {
converter = (ltdt) => {
return ltdt
}
}
//getLtdtFmt
let getLtdtFmt = async(ltdt) => {
let ltdtFmt = []
if (true) {
ltdtFmt = converter(ltdt)
if (ispm(ltdtFmt)) {
ltdtFmt = await ltdtFmt
}
}
return ltdtFmt
}
//ev
let ev = evem()
//memory
let memory = new Map()
//readCsv
let readCsv = async (fp) => {
//c
let c = fs.readFileSync(fp, 'utf8')
if (isestr(chead)) {
c = chead + '\n' + c
}
// console.log('c', c)
//ltdt
let ltdt = await wdc.parseCsv(c)
// console.log('ltdt', ltdt)
//偵測是否有無head欄位
if (true) {
let ks = keys(get(ltdt, 0))
if (size(ks) !== size(heads)) {
ltdt = map(ltdt, (dt) => {
let _dt = {}
each(heads, (k) => {
_dt[k] = get(dt, k, '')
})
return _dt
})
}
}
return ltdt
}
//readInsert
let readInsert = async (fp) => {
//check
if (!fsIsFile(fp)) {
return
}
//fn
let fn = path.basename(fp)
// console.log('fn', fn)
//check
if (memory.has(fn)) {
throw new Error(`fn[${fn}] existed`)
}
//ltdtNew
let ltdtNew = await readCsv(fp)
// console.log('ltdtNew', ltdtNew)
//ltdtFmt
let ltdtFmt = await getLtdtFmt(ltdtNew)
//set
memory.set(fn, {
ltdt: ltdtNew,
ltdtFmt,
})
//emit
ev.emit('change', { type: 'insert', ltdt: ltdtNew, ltdtFmt })
// return ltdtNew
}
//readSave
let readSave = async (fp) => {
//check
if (!fsIsFile(fp)) {
return
}
//fn
let fn = path.basename(fp)
// console.log('fn', fn)
//check
if (!memory.has(fn)) {
throw new Error(`fn[${fn}] does not exist`)
}
//ltdtNew
let ltdtNew = await readCsv(fp)
// console.log('ltdtNew', ltdtNew)
//ltdtFmt
let ltdtFmt = await getLtdtFmt(ltdtNew)
// console.log('ltdtFmt', ltdtFmt)
//ltdtOld
let { ltdt: ltdtOld } = memory.get(fn)
// console.log('ltdtOld', ltdtOld)
//check
if (size(ltdtOld) === 0) {
//set, 要先set才能emit與return
memory.set(fn, {
ltdt: ltdtNew,
ltdtFmt,
})
//emit
ev.emit('change', { type: 'insert', ltdt: ltdtNew, ltdtFmt })
return
}
//set
memory.set(fn, {
ltdt: ltdtNew,
ltdtFmt,
})
//ltdtDiffByKey
let r = ltdtDiffByKey(ltdtOld, ltdtNew, key)
// console.log('ltdtOld', ltdtOld)
// console.log('ltdtNew', ltdtNew)
// console.log('r', r)
//emit, 此處要針對不同種類數據emit, 不能用else-if
if (size(r.add) > 0) {
let ltdtAddFmt = await getLtdtFmt(r.add)
ev.emit('change', { type: 'insert', ltdt: r.add, ltdtFmt: ltdtAddFmt })
}
if (size(r.diff) > 0) {
let ltdtDiffFmt = await getLtdtFmt(r.diff)
ev.emit('change', { type: 'save', ltdt: r.diff, ltdtFmt: ltdtDiffFmt })
}
if (size(r.del) > 0) {
let ltdtDelFmt = await getLtdtFmt(r.del)
ev.emit('change', { type: 'del', ltdt: r.del, ltdtFmt: ltdtDelFmt })
}
// return ltdtNew
}
//remove
let remove = async (fp) => {
//fn
let fn = path.basename(fp)
//ltdtOld
let { ltdt: ltdtOld, ltdtFmt } = memory.get(fn)
// console.log('ltdtOld', ltdtOld)
//delete
memory.delete(fn)
//emit
ev.emit('change', { type: 'del', ltdt: ltdtOld, ltdtFmt })
// return ltdtOld
}
//pmq
let pmq = pmQueue(1) //同時處理1個
//watcher
let watcher = fsWatchFolder(fd)
watcher.on('change', async(msg) => {
// console.log(msg.type, msg.fp)
if (msg.type === 'add') {
// console.log('push q readInsert')
pmq(readInsert, msg.fp)
// .then(function(res) {
// console.log('readInsert then', res)
// })
.catch(function(err) {
console.log('readInsert catch', err)
})
}
else if (msg.type === 'change') {
// console.log('push q readSave')
pmq(readSave, msg.fp)
// .then(function(res) {
// console.log('readSave then', res)
// })
.catch(function(err) {
console.log('readSave catch', err)
})
}
else if (msg.type === 'unlink') {
// console.log('push q remove')
pmq(remove, msg.fp)
// .then(function(res) {
// console.log('remove then', res)
// })
.catch(function(err) {
console.log('remove catch', err)
})
}
})
//select
let select = async () => {
let allData = []
for (let [, r] of memory.entries()) {
allData.push(...r.ltdtFmt)
}
// console.log('select', allData)
return allData
}
//clear
let clear = () => {
watcher.clear()
}
//save
ev.select = select
ev.clear = clear
return ev
}
export default WDataSourceFromCsv