Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize sort #55

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ node_js:
- '4.2'

before_script:
- npm install juttle@^0.6.0
- npm install juttle@^0.7.0

script:
- npm run test-coverage
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and [SQLite](https://github.com/juttle/juttle-sqlite-adapter/) adapters.

* any filter expression `read sql` (note: `read sql | filter ...` is not optimized)
* `head` or `tail`
* `sort` when used without a `groupby` (note the `time` key will be deleted from any point)
* `reduce count()`, `sum()`, `min()`, `max()`, `sum()`, `avg()`, `count_unique()`
* `reduce by fieldname`
* `reduce -every :interval:`
Expand Down
22 changes: 22 additions & 0 deletions lib/optimize.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ class Optimizer {
optimization_info.limit = limit;
return true;
}

static optimize_sort(read, sort, graph, optimization_info) {
if (optimization_info && optimization_info.type) {
logger.debug('optimization aborting -- cannot append sort optimization to prior', optimization_info.type, 'optimization');
return false;
}

let groupby = graph.node_get_option(sort, 'groupby');
if (groupby) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little more canonical to say if (graph.node_has_option(sort, 'groupby')) { in cases like this.

// we could optimize this of course, but the use case is too narrow/edge.
logger.debug('optimization aborting -- cannot optimize sort with a groupby field');
return false;
}

let columns = graph.node_get_option(sort, 'columns');
let formattedCols = columns.map((col) => { return _.pick(col, 'field', 'direction');});

optimization_info.type = 'sort';
optimization_info.columns = formattedCols;
optimization_info.limit = graph.node_get_option(sort, 'limit');
return true;
}

static optimize_reduce(read, reduce, graph, optimization_info) {
if (!graph.node_contains_only_options(reduce, ALLOWED_REDUCE_OPTIONS)) {
Expand Down
54 changes: 49 additions & 5 deletions lib/read.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,12 @@ class ReadSql extends AdapterRead {
this.addOptimizations(params.optimization_info);

this.baseQuery = this.baseQuery.from(options.table);

if (this.timeField && this.optimization_info.type !== 'reduce') {
this.baseQuery = this.baseQuery.orderBy(this.timeField, this.tailOptimized ? 'desc' : 'asc');
}

this.addSorting();
}

periodicLiveRead() {
return !!this.timeField;
return !!this.timeField && !this.sortFields;
}

getDbConnection() {
Expand Down Expand Up @@ -95,6 +93,20 @@ class ReadSql extends AdapterRead {
this.timeField = options.timeField ||
(options.from || options.to || options.last ? 'time' : undefined);
}

addSorting() {
if (this.sortFields) {
// adds multiple sort orders
this.sortFields.forEach((sortObj) => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put this down by line 128 in addOptimizations, seems a little scattered to have it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. I thought having all the order by statements together would be easiest to read since this function determines sort order. No?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're the boss! In my head even though they both use the order by clause they're pretty unrelated, I'd rather have all the code that deals with sort optimization together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll think on it.

this.baseQuery = this.baseQuery.orderBy(sortObj.field, sortObj.direction);
});
return;
}

if (this.timeField && this.optimization_info.type !== 'reduce') {
this.baseQuery = this.baseQuery.orderBy(this.timeField, this.tailOptimized ? 'desc' : 'asc');
}
}

addFilters(filter_ast) {
let compiler = new FilterSQLCompiler({ baseQuery: this.baseQuery });
Expand All @@ -110,6 +122,15 @@ class ReadSql extends AdapterRead {
this.logger.debug(optimization_info.type + ' optimization, new max size: ', optimization_info.limit);
this.maxSize = optimization_info.limit;
this.tailOptimized = optimization_info.type === 'tail';
return;
}
if (optimization_info.type === 'sort') {
this.logger.debug(optimization_info.type + ' optimization, params: ', optimization_info);
this.sortFields = optimization_info.columns;
if (optimization_info.limit) {
this.maxSize = optimization_info.limit;
}
return;
}
if (optimization_info.type === 'reduce') {
let groupby = optimization_info.groupby;
Expand Down Expand Up @@ -217,6 +238,15 @@ class ReadSql extends AdapterRead {
} else {
query = query.offset(this.offsetCount);
}

if (this.sortBorderValue) {
query = query.where(
this.sortFields[0].field,
this.sortFields[0].direction === 'desc' ? '<=' : '>=',

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we making sure we don't get duplicate points if there's a bunch of points with sortBorderValue as their value spanning two pages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the way this works is that each result set consists of all values of the border value. For example, lets say the borderVals were 1,2,2,3,3,4,5 and the fetchSize is 4 then only 1,2,2 would be returned: the last value becomes the border value none of those will be in the result set. Then the next result set is >=3. In other words, points with the border value are not included in result set. If the entire fetch has the same border value then we throw an error.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, and that's in processPaginatedResults called with the sort field.

this.sortBorderValue
);
}

return query;
}

Expand Down Expand Up @@ -335,6 +365,14 @@ class ReadSql extends AdapterRead {
readEnd: this.timeField ? to : new JuttleMoment(Infinity)
};
}

if (this.sortFields) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can SQL not paginate sorted data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean using offset? Yes but using offset when data could be coming in in real time could cause errors in the pagination. this method takes real time data into account so IMO it's a superior pagination method.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I don't understand what this is doing...if we read with a limit of X points a time range t1-t2 with 2X points in it, and optimize a sort on that, won't we return only the first X points but set the readEnd to t2 and never get the second page?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidvgalbraith you're absolutely right - the pagination was broken and untested. Fixed now. Take another look?

let res = this.processPaginatedResults(points, this.sortFields[0].field);
this.sortBorderValue = res.borderValue;
res.readEnd = null; //read again
return res;
}

//perform time-based pagination if timeField is indicated
if (this.timeField) {
let res = this.processPaginatedResults(points, this.timeField);
Expand Down Expand Up @@ -404,6 +442,12 @@ class ReadSql extends AdapterRead {
) {
points = this.parseTime(points, { timeField: this.timeField });
}

if (this.sortFields) {
_.each(points, (p) => {
delete p.time;
});
}

this.total_emitted_points += points.length;
return points;
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"istanbul": "^0.4.2",
"mocha": "^2.3.4"
},
"juttleAdapterAPI": "^0.5.0",
"juttleAdapterAPI": "^0.7.0",
"engines": {
"node": ">=4.2.0",
"npm": ">=2.14.7"
Expand Down
113 changes: 103 additions & 10 deletions test/optimize.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,99 @@ describe('test optimizations', function() {
before(function() {
return TestUtils.createTables(['logs']);
});

it('sort by field', function() {
return check_optimization_juttle({
program: 'read sql -table "logs" | sort code',
optimize_param: {
type: "sort",
columns: [{
direction: "",
field: 'code'
}],
},
massage: {sort: ['host', 'level']},
})
.then(function(result) {
expect(result.sinks.table).to.have.length.gt(5);
});
});
it('sort by field with pagination', function() {
return check_optimization_juttle({
program: 'read sql -fetchSize 40 -table "logs" | sort code',
optimize_param: {
type: "sort",
columns: [{
direction: "",
field: 'code'
}],
},
massage: {sort: ['host', 'level']},
})
.then(function(result) {
expect(result.sinks.table).to.have.length(150);
});
});
it('sort by field with pagination desc', function() {
return check_optimization_juttle({
program: 'read sql -fetchSize 40 -table "logs" | sort code -desc',
optimize_param: {
type: "sort",
columns: [{
direction: "desc",
field: 'code'
}],
},
massage: {sort: ['host', 'level']},
})
.then(function(result) {
expect(result.sinks.table).to.have.length(150);
});
});
it('sort by 2 fields', function() {
return check_optimization_juttle({
program: 'read sql -table "logs" | sort code, level',
optimize_param: {
type: "sort",
columns: [
{
direction: "",
field: 'code'
},
{
direction: "",
field: 'level'
}
],
},
massage: {sort: ['host']},
})
.then(function(result) {
expect(result.sinks.table).to.have.length.gt(5);
});
});
it('sort by 2 fields with order change', function() {
return check_optimization_juttle({
program: 'read sql -table "logs" | sort code -desc,level',
optimize_param: {
type: "sort",
columns: [
{
direction: "desc",
field: 'code'
},
{
direction: "",
field: 'level'
}
],
},
massage: {sort: ['host']},
})
.then(function(result) {
expect(result.sinks.table).to.have.length.gt(5);
});
});

it('head with positive number', function() {
return check_optimization_juttle({
Expand Down Expand Up @@ -131,7 +224,7 @@ describe('test optimizations', function() {
it('reduce avg, count, max, min, sum (as target s) by field aggregation', function() {
return check_optimization_juttle({
program: 'read sql -table "logs" | reduce avg(code), count(level), max(code), min(code), s = sum(code)',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand Down Expand Up @@ -184,7 +277,7 @@ describe('test optimizations', function() {
it('groupby', function() {
return check_optimization_juttle({
program: 'read sql -from :200 days ago: -table "logs" | reduce by level',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {},
Expand All @@ -201,7 +294,7 @@ describe('test optimizations', function() {
it('groupby and count', function() {
return check_optimization_juttle({
program: 'read sql -table "logs" | reduce count() by level',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand All @@ -220,7 +313,7 @@ describe('test optimizations', function() {
it('multiple groupby count', function() {
return check_optimization_juttle({
program: 'read sql -table "logs" | reduce count() by level,code',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand All @@ -244,7 +337,7 @@ describe('test optimizations', function() {
it('reduce every', function() {
return check_optimization_juttle({
program: 'read sql -from :20 days ago: -to :3 days ago: -table "logs" | reduce -every :week: count()',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand All @@ -264,7 +357,7 @@ describe('test optimizations', function() {
it('reduce every with timeframe smaller than every param', function() {
return check_optimization_juttle({
program: 'read sql -from :8 days ago: -to :4 days ago: -table "logs" | reduce -every :week: count(), a = avg(code)',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand All @@ -286,7 +379,7 @@ describe('test optimizations', function() {
//first opt elem has count = 0 (not included in unopt) and last time is -3d not full week. which is right?
return check_optimization_juttle({
program: 'read sql -from :20 days ago: -to :3 days ago: -table "logs" | reduce -every :week: -on :day 2: count()',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand All @@ -307,7 +400,7 @@ describe('test optimizations', function() {
it('reduce every multi-aggr', function() {
return check_optimization_juttle({
program: 'read sql -from :20 days ago: -to :3 days ago: -table "logs" | reduce -every :week: c = count(), a = avg(code)',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand All @@ -329,7 +422,7 @@ describe('test optimizations', function() {
return check_optimization_juttle({
program: 'read sql -from :60 hours ago: -to :12 hours ago: -table "logs" |' +
'reduce -every :hour: count(), a = avg(code), max(code), min(code), s = sum(code), count_unique(code)',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand All @@ -354,7 +447,7 @@ describe('test optimizations', function() {
it('reduce every with aggregation and groupby', function() {
return check_optimization_juttle({
program: 'read sql -from :20 days ago: -to :3 days ago: -table "logs" | reduce -every :week: a = avg(code) by level',
massage: true,
massage: {sort: ['code', 'level']},
optimize_param: {
type: 'reduce',
aggregations: {
Expand Down
20 changes: 12 additions & 8 deletions test/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,11 @@ var TestUtils = {
},
// What is performed here:
// - round values
massage: function(arr, shouldMassage) {
if (!shouldMassage) {
massage: function(arr, massageOptions) {
if (!massageOptions) {
return arr;
}
return _.chain(arr)
.sortBy('level')
.sortBy('code')
.each(function(pt) {
_.each(arr, function(pt) {
var k, v;
for (k in pt) {
v = pt[k];
Expand All @@ -167,8 +164,15 @@ var TestUtils = {
pt[k] = Math.round(v * 10000) / 10000;
}
}
})
.value();
});

if (massageOptions.sort) {
massageOptions.sort.forEach(function(sortField) {
arr = _.sortBy(arr, sortField);
});
}

return arr;
},
expectTimeSorted: function(result) {
var time;
Expand Down