Skip to content

Commit

Permalink
optimize sort
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladimir Bukhin committed Mar 19, 2016
1 parent 9ae8050 commit 9e2ede3
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 23 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and [SQLite](https://github.com/juttle/juttle-sqlite-adapter/) adapters.

* any filter expression `read sql` (note: `read sql | filter ...` is not optimized)
* `head` or `tail`
* `sort` when used without a `groupby` (note the `time` key will be deleted from any point)
* `reduce count()`, `sum()`, `min()`, `max()`, `sum()`, `avg()`, `count_unique()`
* `reduce by fieldname`
* `reduce -every :interval:`
Expand Down
22 changes: 22 additions & 0 deletions lib/optimize.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ class Optimizer {
optimization_info.limit = limit;
return true;
}

static optimize_sort(read, sort, graph, optimization_info) {
if (optimization_info && optimization_info.type) {
logger.debug('optimization aborting -- cannot append sort optimization to prior', optimization_info.type, 'optimization');
return false;
}

let groupby = graph.node_get_option(sort, 'groupby');
if (groupby) {
// we could optimize this of course, but the use case is too narrow/edge.
logger.debug('optimization aborting -- cannot optimize sort with a groupby field');
return false;
}

let columns = graph.node_get_option(sort, 'columns');
let formattedCols = columns.map((col) => { return _.pick(col, 'field', 'direction');});

optimization_info.type = 'sort';
optimization_info.columns = formattedCols;
optimization_info.limit = graph.node_get_option(sort, 'limit');
return true;
}

static optimize_reduce(read, reduce, graph, optimization_info) {
if (!graph.node_contains_only_options(reduce, ALLOWED_REDUCE_OPTIONS)) {
Expand Down
44 changes: 39 additions & 5 deletions lib/read.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,12 @@ class ReadSql extends AdapterRead {
this.addOptimizations(params.optimization_info);

this.baseQuery = this.baseQuery.from(options.table);

if (this.timeField && this.optimization_info.type !== 'reduce') {
this.baseQuery = this.baseQuery.orderBy(this.timeField, this.tailOptimized ? 'desc' : 'asc');
}

this.addSorting();
}

periodicLiveRead() {
return !!this.timeField;
return !!this.timeField && !this.sortFields;
}

getDbConnection() {
Expand Down Expand Up @@ -95,6 +93,20 @@ class ReadSql extends AdapterRead {
this.timeField = options.timeField ||
(options.from || options.to || options.last ? 'time' : undefined);
}

addSorting() {
if (this.sortFields) {
// adds multiple sort orders
this.sortFields.forEach((sortObj) => {
this.baseQuery = this.baseQuery.orderBy(sortObj.field, sortObj.direction);
});
return;
}

if (this.timeField && this.optimization_info.type !== 'reduce') {
this.baseQuery = this.baseQuery.orderBy(this.timeField, this.tailOptimized ? 'desc' : 'asc');
}
}

addFilters(filter_ast) {
let compiler = new FilterSQLCompiler({ baseQuery: this.baseQuery });
Expand All @@ -110,6 +122,15 @@ class ReadSql extends AdapterRead {
this.logger.debug(optimization_info.type + ' optimization, new max size: ', optimization_info.limit);
this.maxSize = optimization_info.limit;
this.tailOptimized = optimization_info.type === 'tail';
return;
}
if (optimization_info.type === 'sort') {
this.logger.debug(optimization_info.type + ' optimization, params: ', optimization_info);
this.sortFields = optimization_info.columns;
if (optimization_info.limit) {
this.maxSize = optimization_info.limit;
}
return;
}
if (optimization_info.type === 'reduce') {
let groupby = optimization_info.groupby;
Expand Down Expand Up @@ -335,6 +356,13 @@ class ReadSql extends AdapterRead {
readEnd: this.timeField ? to : new JuttleMoment(Infinity)
};
}

if (this.sortFields) {
let res = this.processPaginatedResults(points, this.sortFields[0].field);
res.readEnd = to;
return res;
}

//perform time-based pagination if timeField is indicated
if (this.timeField) {
let res = this.processPaginatedResults(points, this.timeField);
Expand Down Expand Up @@ -404,6 +432,12 @@ class ReadSql extends AdapterRead {
) {
points = this.parseTime(points, { timeField: this.timeField });
}

if (this.sortFields) {
_.each(points, (p) => {
delete p.time;
});
}

this.total_emitted_points += points.length;
return points;
Expand Down
82 changes: 72 additions & 10 deletions test/optimize.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,68 @@ describe('test optimizations', function() {
before(function() {
return TestUtils.createTables(['logs']);
});

it('sort by field', function() {
return check_optimization_juttle({
program: 'read sql -table "logs" | sort code | head 10',
optimize_param: {
type: "sort",
columns: [{
direction: "",
field: 'code'
}],
},
massage: {sort: ['host', 'level']},
})
.then(function(result) {
expect(result.sinks.table).to.have.length.gt(5);
});
});
it('sort by 2 fields', function() {
return check_optimization_juttle({
program: 'read sql -table "logs" | sort code, level | head 10',
optimize_param: {
type: "sort",
columns: [
{
direction: "",
field: 'code'
},
{
direction: "",
field: 'level'
}
],
},
massage: {sort: ['host']},
})
.then(function(result) {
expect(result.sinks.table).to.have.length.gt(5);
});
});
it('sort by 2 fields with order change', function() {
return check_optimization_juttle({
program: 'read sql -table "logs" | sort code -desc,level | head 20',
optimize_param: {
type: "sort",
columns: [
{
direction: "desc",
field: 'code'
},
{
direction: "",
field: 'level'
}
],
},
massage: {sort: ['host']},
})
.then(function(result) {
expect(result.sinks.table).to.have.length.gt(5);
});
});


it('head with positive number', function() {
return check_optimization_juttle({
Expand Down Expand Up @@ -131,7 +193,7 @@ describe('test optimizations', function() {
it('reduce avg, count, max, min, sum (as target s) by field aggregation', function() {
return check_optimization_juttle({
program: 'read sql -table "logs" | reduce avg(code), count(level), max(code), min(code), s = sum(code)',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand Down Expand Up @@ -184,7 +246,7 @@ describe('test optimizations', function() {
it('groupby', function() {
return check_optimization_juttle({
program: 'read sql -from :200 days ago: -table "logs" | reduce by level',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {},
Expand All @@ -201,7 +263,7 @@ describe('test optimizations', function() {
it('groupby and count', function() {
return check_optimization_juttle({
program: 'read sql -table "logs" | reduce count() by level',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand All @@ -220,7 +282,7 @@ describe('test optimizations', function() {
it('multiple groupby count', function() {
return check_optimization_juttle({
program: 'read sql -table "logs" | reduce count() by level,code',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand All @@ -244,7 +306,7 @@ describe('test optimizations', function() {
it('reduce every', function() {
return check_optimization_juttle({
program: 'read sql -from :20 days ago: -to :3 days ago: -table "logs" | reduce -every :week: count()',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand All @@ -264,7 +326,7 @@ describe('test optimizations', function() {
it('reduce every with timeframe smaller than every param', function() {
return check_optimization_juttle({
program: 'read sql -from :8 days ago: -to :4 days ago: -table "logs" | reduce -every :week: count(), a = avg(code)',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand All @@ -286,7 +348,7 @@ describe('test optimizations', function() {
//first opt elem has count = 0 (not included in unopt) and last time is -3d not full week. which is right?
return check_optimization_juttle({
program: 'read sql -from :20 days ago: -to :3 days ago: -table "logs" | reduce -every :week: -on :day 2: count()',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand All @@ -307,7 +369,7 @@ describe('test optimizations', function() {
it('reduce every multi-aggr', function() {
return check_optimization_juttle({
program: 'read sql -from :20 days ago: -to :3 days ago: -table "logs" | reduce -every :week: c = count(), a = avg(code)',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand All @@ -329,7 +391,7 @@ describe('test optimizations', function() {
return check_optimization_juttle({
program: 'read sql -from :60 hours ago: -to :12 hours ago: -table "logs" |' +
'reduce -every :hour: count(), a = avg(code), max(code), min(code), s = sum(code), count_unique(code)',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand All @@ -354,7 +416,7 @@ describe('test optimizations', function() {
it('reduce every with aggregation and groupby', function() {
return check_optimization_juttle({
program: 'read sql -from :20 days ago: -to :3 days ago: -table "logs" | reduce -every :week: a = avg(code) by level',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand Down
20 changes: 12 additions & 8 deletions test/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,11 @@ var TestUtils = {
},
// What is performed here:
// - round values
massage: function(arr, shouldMassage) {
if (!shouldMassage) {
massage: function(arr, massageOptions) {
if (!massageOptions) {
return arr;
}
return _.chain(arr)
.sortBy('level')
.sortBy('code')
.each(function(pt) {
_.each(arr, function(pt) {
var k, v;
for (k in pt) {
v = pt[k];
Expand All @@ -167,8 +164,15 @@ var TestUtils = {
pt[k] = Math.round(v * 10000) / 10000;
}
}
})
.value();
});

if (massageOptions.sort) {
massageOptions.sort.forEach(function(sortField) {
arr = _.sortBy(sortField);
});
}

return arr;
},
expectTimeSorted: function(result) {
var time;
Expand Down

0 comments on commit 9e2ede3

Please sign in to comment.