forked from elastic/elasticsearch-hadoop
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cluster.gradle
410 lines (365 loc) · 17.3 KB
/
cluster.gradle
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
import org.apache.tools.ant.DefaultLogger
import org.apache.tools.ant.taskdefs.condition.Os
import java.nio.file.Paths
/*
* This is mostly adapted from the main Elasticsearch project, but slimmed down to
* avoid all the extra baggage of dealing with mixed version clusters or multiple
* nodes.
*/
configurations {
esfixture
}
def version = project.hasProperty("es-version") ? project.getProperty("es-version") : esVersion
dependencies {
esfixture group: 'org.elasticsearch.distribution.zip', name: 'elasticsearch', version: version, ext: "zip"
}
// Optionally allow user to disable the fixture
def useFixture = Boolean.parseBoolean(project.hasProperty("run-node") ? project.getProperty("run-node") : "true")
if (useFixture) {
// Depends on project already containing an "integrationTest"
// task, as well as javaHome configured
createClusterFor(project.tasks.getByName("integrationTest"), version)
} else {
project.tasks.getByName("integrationTest") {
systemProperty "test.disable.local.es", "true"
}
}
def createClusterFor(Task integrationTest, String version) {
// Version settings
def majorVersion = version.tokenize(".").get(0).toInteger()
// General settings
def clusterName = "elasticsearch-fixture"
def nodeName = "node-1"
def numNodes = 1
def useMinimumMasterNodes = true
// Executable information
def executable = Os.isFamily(Os.FAMILY_WINDOWS) ? "cmd" : "sh"
def buffer = new ByteArrayOutputStream()
// Elasticsearch settings
def esEnvVariables = [ 'JAVA_HOME' : project.javaHome ]
def esArgs = []
// Directories & Files (indented to illustrate layout)
def baseDir = new File(rootProject.buildDir, "$clusterName-$version")
def pidFile = new File(baseDir, "es.pid")
def homeDir = new File(baseDir, "$nodeName")
def confDir = new File(homeDir, "config")
def scriptsDir = new File(confDir, 'scripts')
def configFile = new File(confDir, 'elasticsearch.yml')
def logsDir = new File(homeDir, 'logs')
def httpPortsFile = new File(logsDir, 'http.ports')
def transportPortsFile = new File(logsDir, 'transport.ports')
def cwd = new File(baseDir, "cwd")
def startLog = new File(cwd, 'run.log')
def failedMarker = new File(cwd, 'run.failed')
// Clean out the working directory as well as any installed nodes
Task cleanESDir = project.tasks.create(name: 'cleanESDirectory', type: Delete) {
delete homeDir
delete cwd
doLast {
cwd.mkdirs()
}
}
// Check to make sure that any previous nodes aren't still around
// Makes sure that if the pidFile exists it's actually tied to an ES Node
Task checkPrevious = project.tasks.create(name: "checkPrevious", type: Exec, dependsOn: cleanESDir) {
onlyIf { pidFile.exists() }
// the pid file won't actually be read until execution time, since the read is wrapped within an inner closure of the GString
ext.pid = "${ -> pidFile.getText('UTF-8').trim()}"
File jps
def jpsCommand = Os.isFamily(Os.FAMILY_WINDOWS) ? "jps.exe" : "jps"
jps = Paths.get(project.javaHome.toString(), "bin/" + jpsCommand).toFile()
if (!jps.exists()) {
throw new GradleException("jps executable not found; ensure that you're running Gradle with the JDK rather than the JRE")
}
commandLine jps, '-l'
standardOutput = new ByteArrayOutputStream()
doLast {
String out = standardOutput.toString()
if (out.contains("${pid} org.elasticsearch.bootstrap.Elasticsearch") == false) {
logger.error('jps -l')
logger.error(out)
logger.error("pid file: ${pidFile}")
logger.error("pid: ${pid}")
throw new GradleException("jps -l did not report any process with org.elasticsearch.bootstrap.Elasticsearch\n" +
"Did you run gradle clean? Maybe an old pid file is still lying around.")
} else {
logger.info(out)
}
}
}
// Stop any previously running nodes
Task stopPrevious = project.tasks.create(name: "stopPrevious", type: Exec, dependsOn: checkPrevious) {
onlyIf { pidFile.exists() }
ext.pid = "${ -> pidFile.getText('UTF-8').trim()}"
doFirst {
logger.info("Shutting down external node with pid ${pid}")
}
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
setExecutable('Taskkill')
args '/PID', pid, '/F'
} else {
setExecutable('kill')
args '-9', pid
}
doLast {
project.delete(pidFile)
}
}
// Extract the contents of the Elasticsearch zip file to the node's directory
Task extractES = project.tasks.create(name: "extractES", type: Copy, dependsOn: stopPrevious) {
dependsOn project.configurations.esfixture
from {
project.zipTree(project.configurations.esfixture.singleFile)
}
into baseDir
doLast {
new File(baseDir, "elasticsearch-$version").renameTo(homeDir)
}
}
// Write the contents of the node's configuration file
Map esConfig = [
'cluster.name' : clusterName,
'node.name' : "node-1",
'pidfile' : pidFile.toString(),
'path.home' : homeDir.toString(),
"path.data" : new File(homeDir, "data").toString(),
"http.port" : "9500-9599",
"transport.tcp.port" : "9600-9699",
"node.ingest" : "true"
]
// Version specific configurations
if (majorVersion <= 2) {
esConfig.put("transport.type","local")
esConfig.put("http.type","netty3")
esConfig.put("script.inline", "true")
esConfig.put("script.indexed", "true")
} else if (majorVersion == 5) {
esConfig.put("transport.type","netty4")
esConfig.put("http.type","netty4")
esConfig.put("script.inline", "true")
} else if (majorVersion >= 6) {
esConfig.put("transport.type","netty4")
esConfig.put("http.type","netty4")
}
// we set min master nodes to the total number of nodes in the cluster and
// basically skip initial state recovery to allow the cluster to form using a realistic master election
if (useMinimumMasterNodes && numNodes > 1) {
esConfig['discovery.zen.minimum_master_nodes'] = "$numNodes"
esConfig['discovery.initial_state_timeout'] = '0s' // don't wait for state.. just start up quickly
}
esConfig['node.max_local_storage_nodes'] = "$numNodes"
// Default the watermarks to absurdly low to prevent the tests from failing on nodes without enough disk space
esConfig['cluster.routing.allocation.disk.watermark.low'] = '1b'
esConfig['cluster.routing.allocation.disk.watermark.high'] = '1b'
Task writeConfig = project.tasks.create(name: "writeConfig", type: DefaultTask, dependsOn: extractES)
writeConfig.doFirst {
logger.info("Configuring ${configFile}")
configFile.setText(esConfig.collect { key, value -> "${key}: ${value}" }.join('\n'), 'UTF-8')
// Also write a script to a file for use in tests
if (majorVersion <= 2) {
scriptsDir.mkdirs()
new File(scriptsDir, "increment.groovy").setText("ctx._source.counter+=1", 'UTF-8')
} else if (majorVersion == 5) {
scriptsDir.mkdirs()
new File(scriptsDir, "increment.painless").setText("ctx._source.counter = ctx._source.getOrDefault('counter', 0) + 1", 'UTF-8')
}
}
// Start the node up
// this closure is converted into ant nodes by groovy's AntBuilder
Closure antRunner = { AntBuilder ant ->
ant.exec(executable: executable, spawn: true, dir: cwd, taskname: 'elasticsearch') {
esEnvVariables.each { key, value -> env(key: key, value: value) }
esArgs.each { arg(value: it) }
}
}
// this closure is the actual code to run elasticsearch
Task start = project.tasks.create(name: "startES", type: DefaultTask, dependsOn: writeConfig)
start.doLast {
// Wrap elasticsearch start command inside another shell script which redirects
// output of the real script to keep the streams open
def wrapperScript
def esScript
// output script data to a file
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
executable = 'cmd'
esArgs.add('/C')
esArgs.add('"') // quote the entire command
wrapperScript = new File(cwd, "run.bat")
esScript = new File(homeDir, 'bin/elasticsearch.bat')
} else {
executable = 'sh'
wrapperScript = new File(cwd, "run")
esScript = new File(homeDir, 'bin/elasticsearch')
}
esArgs.add("${wrapperScript}")
if (majorVersion < 5) {
esArgs.addAll("--node.portsfile=true")
} else {
esArgs.addAll("-E", "node.portsfile=true")
}
String argsPasser = '"$@"'
String exitMarker = "; if [ \$? != 0 ]; then touch run.failed; fi"
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
argsPasser = '%*'
exitMarker = "\r\n if \"%errorlevel%\" neq \"0\" ( type nul >> run.failed )"
}
wrapperScript.setText("\"${esScript}\" ${argsPasser} > run.log 2>&1 ${exitMarker}", 'UTF-8')
// Debugging multi line string to be logged explaining the node's information
String esCommandString = "\nNode 1 configuration:\n"
esCommandString += "|-----------------------------------------\n"
esCommandString += "| cwd: ${cwd}\n"
esCommandString += "| command: ${executable} ${esArgs.join(' ')}\n"
esCommandString += '| environment:\n'
esEnvVariables.each { k, v -> esCommandString += "| ${k}: ${v}\n" }
esCommandString += "|\n| [${wrapperScript.name}]\n"
wrapperScript.eachLine('UTF-8', { line -> esCommandString += " ${line}\n"})
esCommandString += '|\n| [elasticsearch.yml]\n'
configFile.eachLine('UTF-8', { line -> esCommandString += "| ${line}\n" })
esCommandString += "|-----------------------------------------"
esCommandString.eachLine { line -> logger.info(line) }
if (logger.isInfoEnabled()) {
runAntCommand(project, antRunner, System.out, System.err)
} else {
// buffer the output, we may not need to print it
PrintStream captureStream = new PrintStream(buffer, true, "UTF-8")
runAntCommand(project, antRunner, captureStream, captureStream)
}
}
// Task to stop the node
Task stop = project.tasks.create(name: "stopES", type: Exec) {
onlyIf { pidFile.exists() }
// the pid file won't actually be read until execution time, since the read is wrapped within an inner closure of the GString
ext.pid = "${ -> pidFile.getText('UTF-8').trim()}"
doFirst {
logger.info("Shutting down external node with pid ${pid}")
}
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
setExecutable('Taskkill')
args '/PID', pid, '/F'
} else {
setExecutable('kill')
args '-9', pid
}
doLast {
project.delete(pidFile)
}
}
// Task to wait for the node to be available
Task wait = project.tasks.create(name: 'waitForCluster', dependsOn: start)
wait.doLast {
// Make sure to put "ant." in front of the conditions. Without it, in some weird
// cases it uses calls on the Project object instead of the AntBuilder object.
project.ant.waitfor(maxwait: '30', maxwaitunit: 'second', checkevery: '500', checkeveryunit: 'millisecond', timeoutproperty: "failed${name}") {
ant.or {
ant.resourceexists {
ant.file(file: failedMarker.toString())
}
ant.and {
ant.resourceexists {
ant.file(file: pidFile.toString())
}
ant.resourceexists {
ant.file(file: httpPortsFile.toString())
}
ant.resourceexists {
ant.file(file: transportPortsFile.toString())
}
}
}
}
boolean anyNodeFailed = failedMarker.exists()
if (ant.properties.containsKey("failed${name}".toString()) || anyNodeFailed) {
def failureInfo = new FailureInfo(failedMarker, pidFile, httpPortsFile, transportPortsFile, startLog, buffer)
waitFailed(project, failureInfo, logger, 'Failed to start elasticsearch')
}
Closure waitChecker = { AntBuilder ant ->
File tmpFile = new File(cwd, 'wait.success')
String waitUrl = "http://${httpPortsFile.readLines("UTF-8").get(0)}/_cluster/health?wait_for_status=yellow"
ant.echo(message: "==> [${new Date()}] checking health: ${waitUrl}",
level: 'info')
// checking here for wait_for_nodes to be >= the number of nodes because its possible
// this cluster is attempting to connect to nodes created by another task (same cluster name),
// so there will be more nodes in that case in the cluster state
ant.get(src: waitUrl,
dest: tmpFile.toString(),
ignoreerrors: true, // do not fail on error, so logging buffers can be flushed by the wait task
retries: 10)
return tmpFile.exists()
}
boolean success
if (logger.isInfoEnabled()) {
success = runAntCommand(project, waitChecker, System.out, System.err)
} else {
PrintStream captureStream = new PrintStream(buffer, true, "UTF-8")
success = runAntCommand(project, waitChecker, captureStream, captureStream)
}
if (success == false) {
def failureInfo = new FailureInfo(failedMarker, pidFile, httpPortsFile, transportPortsFile, startLog, buffer)
waitFailed(project, failureInfo, logger, 'Elasticsearch cluster failed to pass wait condition')
}
}
// Configure the integration test with the ports file, and
// surround it with the es fixture.
integrationTest.configure {
// Configure the integration tests with a system property describing where the ports file is
systemProperty 'es.test.ports.file.location', httpPortsFile.toString()
}
integrationTest.dependsOn(wait)
integrationTest.finalizedBy(stop)
}
static Object runAntCommand(Project project, Closure command, PrintStream outputStream, PrintStream errorStream) {
DefaultLogger listener = new DefaultLogger(
errorPrintStream: errorStream,
outputPrintStream: outputStream,
messageOutputLevel: org.apache.tools.ant.Project.MSG_INFO)
project.ant.project.addBuildListener(listener)
Object retVal = command(project.ant)
project.ant.project.removeBuildListener(listener)
return retVal
}
class FailureInfo {
File failedMarker
File pidFile
File httpPortsFile
File transportPortsFile
File startLog
ByteArrayOutputStream buffer
FailureInfo(File failedMarker, File pidFile, File httpPortsFile, File transportPortsFile,
File startLog, ByteArrayOutputStream buffer) {
this.failedMarker = failedMarker
this.pidFile = pidFile
this.httpPortsFile = httpPortsFile
this.transportPortsFile = transportPortsFile
this.startLog = startLog
this.buffer = buffer
}
}
static void waitFailed(Project project, FailureInfo node, Logger logger, String msg) {
logger.error("Node 1 output:")
logger.error("|-----------------------------------------")
logger.error("| failure marker exists: ${node.failedMarker.exists()}")
logger.error("| pid file exists: ${node.pidFile.exists()}")
logger.error("| http ports file exists: ${node.httpPortsFile.exists()}")
logger.error("| transport ports file exists: ${node.transportPortsFile.exists()}")
// the waitfor failed, so dump any output we got (if info logging this goes directly to stdout)
logger.error("|\n| [ant output]")
node.buffer.toString('UTF-8').eachLine { line -> logger.error("| ${line}") }
// also dump the log file for the startup script (which will include ES logging output to stdout)
if (node.startLog.exists()) {
logger.error("|\n| [log]")
node.startLog.eachLine { line -> logger.error("| ${line}") }
}
if (node.pidFile.exists() && node.failedMarker.exists() == false &&
(node.httpPortsFile.exists() == false || node.transportPortsFile.exists() == false)) {
logger.error("|\n| [jstack]")
String pid = node.pidFile.getText('UTF-8')
ByteArrayOutputStream output = new ByteArrayOutputStream()
project.exec {
commandLine = ["${project.javaHome}/bin/jstack", pid]
standardOutput = output
}
output.toString('UTF-8').eachLine { line -> logger.error("| ${line}") }
}
logger.error("|-----------------------------------------")
throw new GradleException(msg)
}