diff --git a/README.md b/README.md index 33f709e..61c882c 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ and [SQLite](https://github.com/juttle/juttle-sqlite-adapter/) adapters. * any filter expression `read sql` (note: `read sql | filter ...` is not optimized) * `head` or `tail` +* `sort` when used without a `groupby` (note the `time` key will be deleted from any point) * `reduce count()`, `sum()`, `min()`, `max()`, `sum()`, `avg()`, `count_unique()` * `reduce by fieldname` * `reduce -every :interval:` diff --git a/lib/optimize.js b/lib/optimize.js index d4be9cf..7b9e334 100644 --- a/lib/optimize.js +++ b/lib/optimize.js @@ -106,6 +106,28 @@ class Optimizer { optimization_info.limit = limit; return true; } + + static optimize_sort(read, sort, graph, optimization_info) { + if (optimization_info && optimization_info.type) { + logger.debug('optimization aborting -- cannot append sort optimization to prior', optimization_info.type, 'optimization'); + return false; + } + + let groupby = graph.node_get_option(sort, 'groupby'); + if (groupby) { + // we could optimize this of course, but the use case is too narrow/edge. + logger.debug('optimization aborting -- cannot optimize sort with a groupby field'); + return false; + } + + let columns = graph.node_get_option(sort, 'columns'); + let formattedCols = columns.map((col) => { return _.pick(col, 'field', 'direction');}); + + optimization_info.type = 'sort'; + optimization_info.columns = formattedCols; + optimization_info.limit = graph.node_get_option(sort, 'limit'); + return true; + } static optimize_reduce(read, reduce, graph, optimization_info) { if (!graph.node_contains_only_options(reduce, ALLOWED_REDUCE_OPTIONS)) { diff --git a/lib/read.js b/lib/read.js index 84296dd..63ac7c4 100644 --- a/lib/read.js +++ b/lib/read.js @@ -48,14 +48,12 @@ class ReadSql extends AdapterRead { this.addOptimizations(params.optimization_info); this.baseQuery = this.baseQuery.from(options.table); - - if (this.timeField && this.optimization_info.type !== 'reduce') { - this.baseQuery = this.baseQuery.orderBy(this.timeField, this.tailOptimized ? 'desc' : 'asc'); - } + + this.addSorting(); } periodicLiveRead() { - return !!this.timeField; + return !!this.timeField && !this.sortFields; } getDbConnection() { @@ -95,6 +93,20 @@ class ReadSql extends AdapterRead { this.timeField = options.timeField || (options.from || options.to || options.last ? 'time' : undefined); } + + addSorting() { + if (this.sortFields) { + // adds multiple sort orders + this.sortFields.forEach((sortObj) => { + this.baseQuery = this.baseQuery.orderBy(sortObj.field, sortObj.direction); + }); + return; + } + + if (this.timeField && this.optimization_info.type !== 'reduce') { + this.baseQuery = this.baseQuery.orderBy(this.timeField, this.tailOptimized ? 'desc' : 'asc'); + } + } addFilters(filter_ast) { let compiler = new FilterSQLCompiler({ baseQuery: this.baseQuery }); @@ -110,6 +122,15 @@ class ReadSql extends AdapterRead { this.logger.debug(optimization_info.type + ' optimization, new max size: ', optimization_info.limit); this.maxSize = optimization_info.limit; this.tailOptimized = optimization_info.type === 'tail'; + return; + } + if (optimization_info.type === 'sort') { + this.logger.debug(optimization_info.type + ' optimization, params: ', optimization_info); + this.sortFields = optimization_info.columns; + if (optimization_info.limit) { + this.maxSize = optimization_info.limit; + } + return; } if (optimization_info.type === 'reduce') { let groupby = optimization_info.groupby; @@ -335,6 +356,13 @@ class ReadSql extends AdapterRead { readEnd: this.timeField ? to : new JuttleMoment(Infinity) }; } + + if (this.sortFields) { + let res = this.processPaginatedResults(points, this.sortFields[0].field); + res.readEnd = to; + return res; + } + //perform time-based pagination if timeField is indicated if (this.timeField) { let res = this.processPaginatedResults(points, this.timeField); @@ -404,6 +432,12 @@ class ReadSql extends AdapterRead { ) { points = this.parseTime(points, { timeField: this.timeField }); } + + if (this.sortFields) { + _.each(points, (p) => { + delete p.time; + }); + } this.total_emitted_points += points.length; return points; diff --git a/test/optimize.spec.js b/test/optimize.spec.js index 460cfdd..7e95d37 100644 --- a/test/optimize.spec.js +++ b/test/optimize.spec.js @@ -9,6 +9,68 @@ describe('test optimizations', function() { before(function() { return TestUtils.createTables(['logs']); }); + + it('sort by field', function() { + return check_optimization_juttle({ + program: 'read sql -table "logs" | sort code | head 10', + optimize_param: { + type: "sort", + columns: [{ + direction: "", + field: 'code' + }], + }, + massage: {sort: ['host', 'level']}, + }) + .then(function(result) { + expect(result.sinks.table).to.have.length.gt(5); + }); + }); + it('sort by 2 fields', function() { + return check_optimization_juttle({ + program: 'read sql -table "logs" | sort code, level | head 10', + optimize_param: { + type: "sort", + columns: [ + { + direction: "", + field: 'code' + }, + { + direction: "", + field: 'level' + } + ], + }, + massage: {sort: ['host']}, + }) + .then(function(result) { + expect(result.sinks.table).to.have.length.gt(5); + }); + }); + it('sort by 2 fields with order change', function() { + return check_optimization_juttle({ + program: 'read sql -table "logs" | sort code -desc,level | head 20', + optimize_param: { + type: "sort", + columns: [ + { + direction: "desc", + field: 'code' + }, + { + direction: "", + field: 'level' + } + ], + }, + massage: {sort: ['host']}, + }) + .then(function(result) { + expect(result.sinks.table).to.have.length.gt(5); + }); + }); + it('head with positive number', function() { return check_optimization_juttle({ @@ -131,7 +193,7 @@ describe('test optimizations', function() { it('reduce avg, count, max, min, sum (as target s) by field aggregation', function() { return check_optimization_juttle({ program: 'read sql -table "logs" | reduce avg(code), count(level), max(code), min(code), s = sum(code)', - massage: true, + massage: {sort: ['code', 'level']}, optimize_param: { type: 'reduce', aggregations: { @@ -184,7 +246,7 @@ describe('test optimizations', function() { it('groupby', function() { return check_optimization_juttle({ program: 'read sql -from :200 days ago: -table "logs" | reduce by level', - massage: true, + massage: {sort: ['code', 'level']}, optimize_param: { type: 'reduce', aggregations: {}, @@ -201,7 +263,7 @@ describe('test optimizations', function() { it('groupby and count', function() { return check_optimization_juttle({ program: 'read sql -table "logs" | reduce count() by level', - massage: true, + massage: {sort: ['code', 'level']}, optimize_param: { type: 'reduce', aggregations: { @@ -220,7 +282,7 @@ describe('test optimizations', function() { it('multiple groupby count', function() { return check_optimization_juttle({ program: 'read sql -table "logs" | reduce count() by level,code', - massage: true, + massage: {sort: ['code', 'level']}, optimize_param: { type: 'reduce', aggregations: { @@ -244,7 +306,7 @@ describe('test optimizations', function() { it('reduce every', function() { return check_optimization_juttle({ program: 'read sql -from :20 days ago: -to :3 days ago: -table "logs" | reduce -every :week: count()', - massage: true, + massage: {sort: ['code', 'level']}, optimize_param: { type: 'reduce', aggregations: { @@ -264,7 +326,7 @@ describe('test optimizations', function() { it('reduce every with timeframe smaller than every param', function() { return check_optimization_juttle({ program: 'read sql -from :8 days ago: -to :4 days ago: -table "logs" | reduce -every :week: count(), a = avg(code)', - massage: true, + massage: {sort: ['code', 'level']}, optimize_param: { type: 'reduce', aggregations: { @@ -286,7 +348,7 @@ describe('test optimizations', function() { //first opt elem has count = 0 (not included in unopt) and last time is -3d not full week. which is right? return check_optimization_juttle({ program: 'read sql -from :20 days ago: -to :3 days ago: -table "logs" | reduce -every :week: -on :day 2: count()', - massage: true, + massage: {sort: ['code', 'level']}, optimize_param: { type: 'reduce', aggregations: { @@ -307,7 +369,7 @@ describe('test optimizations', function() { it('reduce every multi-aggr', function() { return check_optimization_juttle({ program: 'read sql -from :20 days ago: -to :3 days ago: -table "logs" | reduce -every :week: c = count(), a = avg(code)', - massage: true, + massage: {sort: ['code', 'level']}, optimize_param: { type: 'reduce', aggregations: { @@ -329,7 +391,7 @@ describe('test optimizations', function() { return check_optimization_juttle({ program: 'read sql -from :60 hours ago: -to :12 hours ago: -table "logs" |' + 'reduce -every :hour: count(), a = avg(code), max(code), min(code), s = sum(code), count_unique(code)', - massage: true, + massage: {sort: ['code', 'level']}, optimize_param: { type: 'reduce', aggregations: { @@ -354,7 +416,7 @@ describe('test optimizations', function() { it('reduce every with aggregation and groupby', function() { return check_optimization_juttle({ program: 'read sql -from :20 days ago: -to :3 days ago: -table "logs" | reduce -every :week: a = avg(code) by level', - massage: true, + massage: {sort: ['code', 'level']}, optimize_param: { type: 'reduce', aggregations: { diff --git a/test/utils.js b/test/utils.js index 508b0ae..6ccda4a 100644 --- a/test/utils.js +++ b/test/utils.js @@ -149,14 +149,11 @@ var TestUtils = { }, // What is performed here: // - round values - massage: function(arr, shouldMassage) { - if (!shouldMassage) { + massage: function(arr, massageOptions) { + if (!massageOptions) { return arr; } - return _.chain(arr) - .sortBy('level') - .sortBy('code') - .each(function(pt) { + _.each(arr, function(pt) { var k, v; for (k in pt) { v = pt[k]; @@ -167,8 +164,15 @@ var TestUtils = { pt[k] = Math.round(v * 10000) / 10000; } } - }) - .value(); + }); + + if (massageOptions.sort) { + massageOptions.sort.forEach(function(sortField) { + arr = _.sortBy(sortField); + }); + } + + return arr; }, expectTimeSorted: function(result) { var time;