Skip to content

Commit

Permalink
Merge pull request #136 from e-ucm/newTopologies
Browse files Browse the repository at this point in the history
Updated topology start to use specific flux.yaml
  • Loading branch information
Victorma authored Jun 28, 2018
2 parents 4e424cf + 6b2155e commit 80498a9
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 111 deletions.
2 changes: 1 addition & 1 deletion app.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var connectToDB = function () {

if (!isTest) {
kafkaService.createTopic(app.config.kafka.topicName);
stormService.startTopology(app.config.storm.defaultAnalysisName, app.config.kafka.topicName);
stormService.startTopology(app.config.storm.defaultAnalysisName, app.config.storm.defaultAnalysisFolder, app.config.kafka.topicName);
}
}
});
Expand Down
3 changes: 1 addition & 2 deletions config-example.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ exports.lrs = {
};
exports.storm = {
defaultAnalysisName: '{{defaultAnalysisName}}',
realtimeJar: '{{realtimeJar}}',
defaultAnalysisFolder: '{{defaultAnalysisFolder}}',
path: '{{stormPath}}',
nimbusHost: '{{nimbusHost}}',
fluxYaml: '{{fluxYaml}}',
analysisFolder: '{{analysisFolder}}',
elasticsearchHost: '{{elasticsearchHost}}',
elasticsearchPort: '{{elasticsearchPort}}'
Expand Down
6 changes: 2 additions & 4 deletions config-values.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ exports.defaultValues = {
lrsUsername: 'openlrs', // Used for 'basic' authentication
lrsPassword: 'openlrs',
useLrs: process.env.USE_LRS || false,
realtimeJar: '/home/eucm/hlocal/rage2/rage-analytics-realtime/target/realtime-jar-with-dependencies.jar',
defaultAnalysisFolder: '/home/eucm/hlocal/rage2/rage-analytics-realtime/target/realtime-jar-with-dependencies.jar',
stormPath: '/home/eucm/hlocal/rage/gleaner/storm/apache-storm-1.1.1/bin',
nimbusHost: 'localhost',
nimbusPort: '6627',
Expand All @@ -133,7 +133,6 @@ exports.defaultValues = {
kafkaTopicName: 'defaultkafkaanalysistopic',
myHost: process.env.MY_HOST || 'localhost',
defaultAnalysisName: 'defaultstormanalysis',
fluxYaml: 'flux.yml',
analysisFolder: './analysis',
elasticsearchHost: 'localhost',
elasticsearchPort: 9200,
Expand Down Expand Up @@ -167,7 +166,7 @@ exports.testValues = {
lrsUsername: 'openlrs',
lrsPassword: 'openlrs',
useLrs: process.env.USE_LRS || false,
realtimeJar: '/home/eucm/hlocal/rage/gleaner/gleaner-realtime/target/realtime-jar-with-dependencies.jar',
defaultAnalysisFolder: '/home/eucm/hlocal/rage/gleaner/gleaner-realtime/target/realtime-jar-with-dependencies.jar',
stormPath: '/home/eucm/hlocal/rage/gleaner/storm/apache-storm-1.1.1/bin',
nimbusHost: 'localhost',
nimbusPort: '6627',
Expand All @@ -177,7 +176,6 @@ exports.testValues = {
kafkaTopicName: 'defaultkafkaanalysistopictest',
myHost: process.env.MY_HOST || 'localhost',
defaultAnalysisName: 'defaultstormanalysistest',
fluxYaml: 'flux.yml',
analysisFolder: './analysis',
elasticsearchHost: 'localhost',
elasticsearchPort: 9200,
Expand Down
15 changes: 0 additions & 15 deletions default-flux.yml

This file was deleted.

129 changes: 42 additions & 87 deletions lib/services/storm.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
var Q = require('q');
var shell = require('shelljs');
var Fs = require('fs');
var Path = require('path');
var Handlebars = require('handlebars');

var analysis = require('../analysis');

var storm = function (stormConfig, zookeeperUrl) {

var genYMLAndLaunch = function (src, ymlPath, stormJar, config, callback) {
var genYMLAndLaunch = function (src, ymlPath, stormJar, config) {
var deferred = Q.defer();

var configTemplate = Handlebars.compile(src);
Fs.writeFile(ymlPath, configTemplate(config), function (err) {
Expand All @@ -19,7 +17,7 @@ var storm = function (stormConfig, zookeeperUrl) {
message: 'Failed to write' + ymlPath + ' file, ' + err,
code: 0
};
return callback(retErr);
return deferred.reject(retErr);
}

// Execute Topology
Expand All @@ -30,109 +28,66 @@ var storm = function (stormConfig, zookeeperUrl) {
{async: true},
function (code, stdout, stderr) {
if (code !== 0) {
return callback({
return deferred.reject({
message: 'Failed to end topology, code: ' + code +
', stderr: ' + stderr + ', stdout: ' + stdout,
code: code
});
}
if (stderr) {
return callback({
return deferred.reject({
message: 'Failed to end topology but exit code was 0, stderr: ' +
stderr + ', stdout: ' + stdout
});
}
callback(null, stdout);
deferred.resolve(stdout);
});
});

return deferred.promise;
};

return {
startTopology: function (topologyName, kafkaTopicName, versionId) {
return analysis.findById(analysis.toObjectID(versionId))
.then(function (analysis) {

var deferred = Q.defer();

var config = {
topologyName: topologyName,
kafkaTopicName: kafkaTopicName,
zookeeperURL: zookeeperUrl,
elasticsearchUrl: stormConfig.elasticsearchHost
};

var configTemplatePath = Path.resolve(__dirname, '../../default-flux.yml');
var configPath = Path.resolve(__dirname, '../../' + stormConfig.fluxYaml);
var fsOptions = {
encoding: 'utf-8'
};

var genDefaultYAMLAndLaunch = function (src) {
// If the Uploaded flux.yml file could be read, try to start the topology
genYMLAndLaunch(src, configPath, stormConfig.realtimeJar, config, function (err, output) {
if (err) {
return deferred.reject(new Error(err.message));
}
deferred.resolve(output);
});
};
// Check if the files in the uploaded /analysis folder exist
// If they exist, use them (expected: realtime.jar and flux.yaml)
startTopology: function (topologyName, analysisFolder, kafkaTopicName) {
var config = {
topologyName: topologyName,
kafkaTopicName: kafkaTopicName,
zookeeperURL: zookeeperUrl,
elasticsearchUrl: stormConfig.elasticsearchHost
};
var fsOptions = {
encoding: 'utf-8'
};

var tryLoadYAML = function(ymlPath) {
var defyml = Q.defer();

Fs.readFile(ymlPath, fsOptions, function (err, src) {
if (err) {
// If the default flux.yml file couldn't be read, return an error;
return defyml.reject(new Error('Failed to read ' + ymlPath));
}

function startDefault() {
// If the uploaded flux.yml file could not be read, try to start the default topology
Fs.readFile(configTemplatePath, fsOptions, function (err, src) {
defyml.resolve(src);
});

if (err) {
// If the default flux.yml file couldn't be read, return an error;
return deferred.reject(new Error('Failed to read ' + configTemplatePath + ' template.'));
}
return defyml.promise;
};
// Check if the files in the uploaded /analysis folder exist
// If they exist, use them (expected: realtime.jar and flux.yaml)

genDefaultYAMLAndLaunch(src);
});
}
function startAnalysis(folder) {
var ymlpath = folder + 'flux.yml';

if (analysis && analysis.fluxPath) {
// Try to read the uploaded flux.yml file
Fs.readFile(analysis.fluxPath, fsOptions, function (err, src) {

if (err) {
console.log('Failed to read ' + analysis.fluxPath + ' template.' +
' Proceeding with the default file: ' + configTemplatePath);

startDefault();
} else {

// If the Uploaded flux.yml file could be read, try to start the topology
genYMLAndLaunch(src, configPath, analysis.realtimePath, config, function (err, output) {
if (err) {
// Something failed when launching the uploaded topology or generating the uploaded flux.yml
console.log(err.message);

// If the uploaded flux.yml file could not be read, try to start the default topology
Fs.readFile(configTemplatePath, fsOptions, function (err, src) {

if (err) {
// If the default flux.yml file couldn't be read, return an error;
return deferred.reject(new Error('Failed to read ' + configTemplatePath + ' template.'));
}
genDefaultYAMLAndLaunch(src);
});
return;
}
deferred.resolve(output);
});
}
});
} else {
console.log('There is no uploaded analysis.' +
' Proceeding with the default file: ' + configTemplatePath);
// If the uploaded flux.yml file could not be read, try to start the default topology
return tryLoadYAML(ymlpath)
.then(function(src) {
return genYMLAndLaunch(src, folder + 'fluxinstance.yml', folder + 'realtime.jar', config);
});
}

startDefault();
}

return deferred.promise;
});
return startAnalysis(analysisFolder);
},
endTopology: function (sessionId) {
var deferred = Q.defer();
Expand Down
6 changes: 4 additions & 2 deletions test/tests/configs.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ describe('Config files validations', function () {
should(config.lrs.username).be.a.String();
should(config.lrs.password).be.a.String();
should(config.storm).be.an.Object();
should(config.storm.realtimeJar).be.a.String();
should(config.storm.defaultAnalysisFolder).be.a.String();
should(config.storm.defaultAnalysisName).be.a.String();
should(config.storm.path).be.a.String();
should(config.storm.nimbusHost).be.a.String();
should(config.kafka).be.an.Object();
Expand Down Expand Up @@ -142,7 +143,8 @@ describe('Config files validations', function () {
should(testConfig.lrs.username).be.a.String();
should(testConfig.lrs.password).be.a.String();
should(testConfig.storm).be.an.Object();
should(testConfig.storm.realtimeJar).be.a.String();
should(testConfig.storm.defaultAnalysisFolder).be.a.String();
should(testConfig.storm.defaultAnalysisName).be.a.String();
should(testConfig.storm.path).be.a.String();
should(testConfig.storm.nimbusHost).be.a.String();
should(testConfig.kafka).be.an.Object();
Expand Down

0 comments on commit 80498a9

Please sign in to comment.