forked from elasticsearch-dump/elasticsearch-dump
-
Notifications
You must be signed in to change notification settings - Fork 0
/
elasticdump.js
85 lines (71 loc) · 2.7 KB
/
elasticdump.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
const http = require('http')
const https = require('https')
const TransportProcessor = require('./lib/processor')
const { promisify } = require('util')
const ioHelper = require('./lib/ioHelper')
const _ = require('lodash')
class ElasticDump extends TransportProcessor {
constructor (input, output, options) {
super()
if (arguments.length === 1 && _.isPlainObject(arguments[0])) {
options = input
output = options.output
input = options.input
}
this.input = input
this.output = output
this.options = options
this.modifiers = []
this.sModififiers = []
if (output !== '$' && (this.options.toLog === null || this.options.toLog === undefined)) {
this.options.toLog = true
}
this.validationErrors = this.validateOptions()
if (this.options.searchBodyTemplate) {
this.sModififiers = this.generateModifiers(this.options.searchBodyTemplate)
const state = {}
this.applyModifiers([state], this.sModififiers)
this.options.searchBody = state.searchBody
}
if (options.maxSockets) {
this.log(`globally setting maxSockets=${options.maxSockets}`)
http.globalAgent.maxSockets = options.maxSockets
https.globalAgent.maxSockets = options.maxSockets
}
ioHelper(this, 'input')
ioHelper(this, 'output')
if (this.options.type === 'data' && this.options.transform) {
this.modifiers = this.generateModifiers(this.options.transform)
}
}
dump (callback, continuing, limit, offset, totalWrites) {
if (this.validationErrors.length > 0) {
this.emit('error', { errors: this.validationErrors })
callback(new Error('There was an error starting this dump'))
return
}
// promisify helpers
this.get = promisify(this.output.get).bind(this.input)
this.set = promisify(this.output.set).bind(this.output)
if (!limit) { limit = this.options.limit }
if (!offset) { offset = this.options.offset }
if (!totalWrites) { totalWrites = 0 }
if (continuing !== true) {
this.log('starting dump')
if (this.options.offset) {
this.log(`Warning: offsetting ${this.options.offset} rows.`)
this.log(' * Using an offset doesn\'t guarantee that the offset rows have already been written, please refer to the HELP text.')
}
if (this.modifiers.length) {
this.log(`Will modify documents using these scripts: ${this.options.transform}`)
}
}
this._loop(limit, offset, totalWrites)
.then((totalWrites) => {
if (typeof callback === 'function') { return callback(null, totalWrites) }
}, (error) => {
if (typeof callback === 'function') { return callback(error/*, totalWrites */) }
})
}
}
module.exports = ElasticDump