Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ES6 syntax #1163

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 32 additions & 35 deletions lib/backends/analysis-status.js
Original file line number Diff line number Diff line change
@@ -1,38 +1,35 @@
'use strict';

var PSQL = require('cartodb-psql');

function AnalysisStatusBackend () {
}

module.exports = AnalysisStatusBackend;

AnalysisStatusBackend.prototype.getNodeStatus = function (nodeId, dbParams, callback) {
var statusQuery = [
'SELECT node_id, status, updated_at, last_error_message as error_message',
'FROM cartodb.cdb_analysis_catalog where node_id = \'' + nodeId + '\''
].join(' ');

var pg = new PSQL(dbParams);

pg.query(statusQuery, function (err, result) {
if (err) {
return callback(err, result);
}

result = result || {};

var rows = result.rows || [];

var statusResponse = rows[0] || {
node_id: nodeId,
status: 'unknown'
};

if (statusResponse.status !== 'failed') {
delete statusResponse.error_message;
}

return callback(null, statusResponse);
}, true); // use read-only transaction
const PSQL = require('cartodb-psql');

module.exports = class AnalysisStatusBackend {
getNodeStatus (nodeId, dbParams, callback) {
const statusQuery = [
'SELECT node_id, status, updated_at, last_error_message as error_message',
`FROM cartodb.cdb_analysis_catalog where node_id = '${nodeId}'`
].join(' ');

const pg = new PSQL(dbParams);
const readOnlyTransaction = true;

pg.query(statusQuery, (err, result) => {
if (err) {
return callback(err, result);
}

result = result || {};

const rows = result.rows || [];
const statusResponse = rows[0] || {
node_id: nodeId,
status: 'unknown'
};

if (statusResponse.status !== 'failed') {
delete statusResponse.error_message;
}

return callback(null, statusResponse);
}, readOnlyTransaction);
}
};
145 changes: 72 additions & 73 deletions lib/backends/analysis.js
Original file line number Diff line number Diff line change
@@ -1,95 +1,94 @@
'use strict';

var _ = require('underscore');
var camshaft = require('camshaft');
var fs = require('fs');
const camshaft = require('camshaft');
const fs = require('fs');

var REDIS_LIMITS = {
const REDIS_LIMITS = {
DB: 5,
PREFIX: 'limits:analyses:' // + username
PREFIX: 'limits:analyses' // + username
};

function AnalysisBackend (metadataBackend, options) {
this.metadataBackend = metadataBackend;
this.options = options || {};
this.options.limits = this.options.limits || {};
this.setBatchConfig(this.options.batch);
this.setLoggerConfig(this.options.logger);
}

module.exports = AnalysisBackend;

AnalysisBackend.prototype.setBatchConfig = function (options) {
var batchConfig = options || {};
batchConfig.endpoint = batchConfig.endpoint || 'http://127.0.0.1:8080/api/v1/sql/job';
batchConfig.inlineExecution = batchConfig.inlineExecution || false;
batchConfig.hostHeaderTemplate = batchConfig.hostHeaderTemplate || '{{=it.username}}.localhost.lan';
this.batchConfig = batchConfig;
};

AnalysisBackend.prototype.setLoggerConfig = function (options) {
this.loggerConfig = options || {};
module.exports = class AnalysisBackend {
constructor (metadataBackend, options) {
this.metadataBackend = metadataBackend;
this.options = options || {};
this.options.limits = this.options.limits || {};
this.setBatchConfig(this.options.batch);
this.setLoggerConfig(this.options.logger);
}

if (this.loggerConfig.filename) {
this.stream = fs.createWriteStream(this.loggerConfig.filename, { flags: 'a', encoding: 'utf8' });
setBatchConfig (config = {}) {
const batchConfig = config;
// TODO: use Object.assign instead
batchConfig.endpoint = batchConfig.endpoint || 'http://127.0.0.1:8080/api/v1/sql/job';
batchConfig.inlineExecution = batchConfig.inlineExecution || false;
batchConfig.hostHeaderTemplate = batchConfig.hostHeaderTemplate || '{{=it.username}}.localhost.lan';
this.batchConfig = batchConfig;
}

process.on('SIGHUP', function () {
if (this.stream) {
this.stream.destroy();
}
setLoggerConfig (options = {}) {
this.loggerConfig = options;

if (this.loggerConfig.filename) {
this.stream = fs.createWriteStream(this.loggerConfig.filename, { flags: 'a', encoding: 'utf8' });
}.bind(this));

process.on('SIGHUP', () => {
if (this.stream) {
this.stream.destroy();
}

this.stream = fs.createWriteStream(this.loggerConfig.filename, { flags: 'a', encoding: 'utf8' });
});
}
}
};

AnalysisBackend.prototype.create = function (analysisConfiguration, analysisDefinition, callback) {
analysisConfiguration.batch.endpoint = this.batchConfig.endpoint;
analysisConfiguration.batch.inlineExecution = this.batchConfig.inlineExecution;
analysisConfiguration.batch.hostHeaderTemplate = this.batchConfig.hostHeaderTemplate;
create (analysisConfiguration, analysisDefinition, callback) {
analysisConfiguration.batch.endpoint = this.batchConfig.endpoint;
analysisConfiguration.batch.inlineExecution = this.batchConfig.inlineExecution;
analysisConfiguration.batch.hostHeaderTemplate = this.batchConfig.hostHeaderTemplate;

analysisConfiguration.logger = {
stream: this.stream ? this.stream : process.stdout
};
analysisConfiguration.logger = {
stream: this.stream ? this.stream : process.stdout
};

this.getAnalysesLimits(analysisConfiguration.user, function (err, limits) {
if (err) {}
analysisConfiguration.limits = limits || {};
camshaft.create(analysisConfiguration, analysisDefinition, callback);
});
};
this.getAnalysesLimits(analysisConfiguration.user, (err, limits) => {
if (err) {}
analysisConfiguration.limits = limits || {};
camshaft.create(analysisConfiguration, analysisDefinition, callback);
});
}

AnalysisBackend.prototype.getAnalysesLimits = function (username, callback) {
var self = this;
getAnalysesLimits (username, callback) {
const analysesLimits = {
analyses: {
// buffer: {
// timeout: 1000,
// maxNumberOfRows: 1e6
// }
}
};

var analysesLimits = {
analyses: {
// buffer: {
// timeout: 1000,
// maxNumberOfRows: 1e6
// }
}
};
Object.keys(this.options.limits).forEach((analysisTypeOrTag) => {
analysesLimits.analyses[analysisTypeOrTag] = Object.assign({}, this.options.limits[analysisTypeOrTag]);
});

Object.keys(self.options.limits).forEach(function (analysisTypeOrTag) {
analysesLimits.analyses[analysisTypeOrTag] = _.extend({}, self.options.limits[analysisTypeOrTag]);
});
const analysesLimitsKey = `${REDIS_LIMITS.PREFIX}:${username}`;
this.metadataBackend.redisCmd(REDIS_LIMITS.DB, 'HGETALL', [analysesLimitsKey], (err, analysesTimeouts) => {
if (err) {
global.logger.error(err);
return callback(null, analysesLimits);
}

var analysesLimitsKey = REDIS_LIMITS.PREFIX + username;
this.metadataBackend.redisCmd(REDIS_LIMITS.DB, 'HGETALL', [analysesLimitsKey], function (err, analysesTimeouts) {
if (err) {}
// analysesTimeouts wil be something like: { moran: 3000, intersection: 5000 }
analysesTimeouts = analysesTimeouts || {};
analysesTimeouts = analysesTimeouts || {};

Object.keys(analysesTimeouts).forEach(function (analysisType) {
analysesLimits.analyses[analysisType] = _.defaults(
{
// analysesTimeouts wil be something like: { moran: 3000, intersection: 5000 }
Object.keys(analysesTimeouts).forEach((analysisType) => {
analysesLimits.analyses[analysisType] = Object.assign(analysesLimits.analyses[analysisType] || {}, {
timeout: Number.isFinite(+analysesTimeouts[analysisType]) ? +analysesTimeouts[analysisType] : 0
},
analysesLimits.analyses[analysisType]
);
});
});
});

return callback(null, analysesLimits);
});
return callback(null, analysesLimits);
});
}
};
Loading