Skip to content

Commit

Permalink
SelectHandle now emits initialization errors
Browse files Browse the repository at this point in the history
  • Loading branch information
numtel committed May 6, 2015
1 parent caddb16 commit 4d1bc67
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 57 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ The `LivePG.prototype.select()` method returns an instance of the `SelectHandle`

The `SelectHandle` class inherits from `EventEmitter`, providing an `update` event on each result set change with two arguments: `diff` and `data`. `diff` contains a description of which rows have been `added`, `moved`, `removed`, and `copied`. `data` contains an array of the full result set.

An `error` event is emitted from the `SelectHandle` for initialization errors.

## Getting started with the examples

1. Run `npm install` to download dependent packages.
Expand Down
88 changes: 51 additions & 37 deletions lib/LivePG.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ var LivePG = (function (_EventEmitter) {
var handle = new SelectHandle(this, queryHash);

// Perform initialization asynchronously
this._initSelect(query, params, triggers, queryHash, handle)['catch'](this._error);
this._initSelect(query, params, triggers, queryHash, handle)['catch'](function (error) {
return handle.emit('error', error);
});

return handle;
}
Expand Down Expand Up @@ -272,7 +274,7 @@ var LivePG = (function (_EventEmitter) {

// Initial results from cache
handle.emit('update', { removed: null, moved: null, copied: null, added: queryBuffer.data }, queryBuffer.data);
context$2$0.next = 54;
context$2$0.next = 61;
break;

case 8:
Expand All @@ -290,109 +292,121 @@ var LivePG = (function (_EventEmitter) {
case 11:
pgHandle = context$2$0.sent;
tablesUsed = undefined;
context$2$0.prev = 13;

if (!(queryHash in this.tablesUsedCache)) {
context$2$0.next = 17;
context$2$0.next = 18;
break;
}

tablesUsed = this.tablesUsedCache[queryHash];
context$2$0.next = 21;
context$2$0.next = 22;
break;

case 17:
context$2$0.next = 19;
case 18:
context$2$0.next = 20;
return common.getQueryDetails(pgHandle.client, query);

case 19:
case 20:
tablesUsed = context$2$0.sent;

this.tablesUsedCache[queryHash] = tablesUsed;

case 21:
case 22:
_iteratorNormalCompletion3 = true;
_didIteratorError3 = false;
_iteratorError3 = undefined;
context$2$0.prev = 24;
context$2$0.prev = 25;
_iterator3 = _getIterator(tablesUsed);

case 26:
case 27:
if (_iteratorNormalCompletion3 = (_step3 = _iterator3.next()).done) {
context$2$0.next = 38;
context$2$0.next = 39;
break;
}

table = _step3.value;

if (table in this.allTablesUsed) {
context$2$0.next = 34;
context$2$0.next = 35;
break;
}

this.allTablesUsed[table] = [queryHash];
context$2$0.next = 32;
context$2$0.next = 33;
return common.createTableTrigger(pgHandle.client, table, this.channel);

case 32:
context$2$0.next = 35;
case 33:
context$2$0.next = 36;
break;

case 34:
case 35:
if (this.allTablesUsed[table].indexOf(queryHash) === -1) {
this.allTablesUsed[table].push(queryHash);
}

case 35:
case 36:
_iteratorNormalCompletion3 = true;
context$2$0.next = 26;
context$2$0.next = 27;
break;

case 38:
context$2$0.next = 44;
case 39:
context$2$0.next = 45;
break;

case 40:
context$2$0.prev = 40;
context$2$0.t1 = context$2$0['catch'](24);
case 41:
context$2$0.prev = 41;
context$2$0.t1 = context$2$0['catch'](25);
_didIteratorError3 = true;
_iteratorError3 = context$2$0.t1;

case 44:
context$2$0.prev = 44;
case 45:
context$2$0.prev = 45;
context$2$0.prev = 46;

if (!_iteratorNormalCompletion3 && _iterator3['return']) {
_iterator3['return']();
}

case 47:
context$2$0.prev = 47;
case 48:
context$2$0.prev = 48;

if (!_didIteratorError3) {
context$2$0.next = 50;
context$2$0.next = 51;
break;
}

throw _iteratorError3;

case 50:
return context$2$0.finish(47);

case 51:
return context$2$0.finish(44);
return context$2$0.finish(48);

case 52:
return context$2$0.finish(45);

pgHandle.done();
case 53:

// Retrieve initial results
this.waitingToUpdate.push(queryHash);
context$2$0.next = 60;
break;

case 54:
case 56:
context$2$0.prev = 56;
context$2$0.t2 = context$2$0['catch'](13);

pgHandle.done();
throw context$2$0.t2;

case 60:

pgHandle.done();

case 61:
case 'end':
return context$2$0.stop();
}
}, null, this, [[24, 40, 44, 52], [45,, 47, 51]]);
}, null, this, [[13, 56], [25, 41, 45, 53], [46,, 48, 52]]);
}
}, {
key: '_updateQuery',
Expand Down Expand Up @@ -437,9 +451,9 @@ var LivePG = (function (_EventEmitter) {

case 17:
context$2$0.prev = 17;
context$2$0.t2 = context$2$0['catch'](13);
context$2$0.t3 = context$2$0['catch'](13);
_didIteratorError4 = true;
_iteratorError4 = context$2$0.t2;
_iteratorError4 = context$2$0.t3;

case 21:
context$2$0.prev = 21;
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name" : "pg-live-select",
"version" : "0.0.8",
"version" : "0.0.9",
"description" : "Live updating PostgreSQL SELECT statements",
"main" : "lib/LivePG.js",
"license" : "MIT",
Expand Down
41 changes: 24 additions & 17 deletions src/LivePG.es6
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class LivePG extends EventEmitter {

// Perform initialization asynchronously
this._initSelect(query, params, triggers, queryHash, handle)
.catch(this._error)
.catch(error => handle.emit('error', error))

return handle
}
Expand Down Expand Up @@ -156,28 +156,35 @@ class LivePG extends EventEmitter {

let pgHandle = await common.getClient(this.connStr)
let tablesUsed
if(queryHash in this.tablesUsedCache) {
tablesUsed = this.tablesUsedCache[queryHash]
}
else {
tablesUsed = await common.getQueryDetails(pgHandle.client, query)
this.tablesUsedCache[queryHash] = tablesUsed
}

for(let table of tablesUsed) {
if(!(table in this.allTablesUsed)) {
this.allTablesUsed[table] = [ queryHash ]
await common.createTableTrigger(pgHandle.client, table, this.channel)
try {
if(queryHash in this.tablesUsedCache) {
tablesUsed = this.tablesUsedCache[queryHash]
}
else if(this.allTablesUsed[table].indexOf(queryHash) === -1) {
this.allTablesUsed[table].push(queryHash)
else {
tablesUsed = await common.getQueryDetails(pgHandle.client, query)
this.tablesUsedCache[queryHash] = tablesUsed
}

for(let table of tablesUsed) {
if(!(table in this.allTablesUsed)) {
this.allTablesUsed[table] = [ queryHash ]
await common.createTableTrigger(pgHandle.client, table, this.channel)
}
else if(this.allTablesUsed[table].indexOf(queryHash) === -1) {
this.allTablesUsed[table].push(queryHash)
}
}

// Retrieve initial results
this.waitingToUpdate.push(queryHash)
}
catch(error) {
pgHandle.done()
throw error
}

pgHandle.done()

// Retrieve initial results
this.waitingToUpdate.push(queryHash)
}
}

Expand Down
7 changes: 7 additions & 0 deletions test/fixtures/variousQueries.es6
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,13 @@ exports.cases.stopped = {
]
}

exports.cases.relation_error = {
query: `SELECT score FROM scores_invalid ORDER BY score DESC`,
events: [
{ error: /error: relation "scores_relation_error_invalid" does not exist/ }
]
}

let newName = randomString.alphaLower(BIG_PAYLOAD_LENGTH)
exports.cases.bigPayload = {
query: `SELECT big_name FROM big_payload ORDER BY id ASC`,
Expand Down
18 changes: 16 additions & 2 deletions test/variousQueries.es6
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ exports.variousQueries = function(test) {
.catch(error => console.error(error))
.then(result => {
var updateLog = [] // Cache for any updates to this query
var errorLog = []
var nextLogPos = 0 // Length at last action performed
var select = liveDb.select(query).on('update',
(diff, data) => updateLog.push({ diff, data }))
var select = liveDb.select(query)
.on('update', (diff, data) => updateLog.push({ diff, data }))
.on('error', error => errorLog.push(error))

// For each event, check values or perform action, then continue
var processEvents = (callback, index) => {
Expand Down Expand Up @@ -72,6 +74,18 @@ exports.variousQueries = function(test) {
processEvents(callback, index + 1)
}
break
case 'error':
if(errorLog.length === 0) {
// No error yet, wait longer
setTimeout(() => {
processEvents(callback, index)
}, 100)
}
else {
test.ok(errorLog[0].toString().match(data) !== null)
// Move to next event
processEvents(callback, index + 1)
}
case 'unchanged':
setTimeout(() => {
test.equal(updateLog.length, nextLogPos,
Expand Down

0 comments on commit 4d1bc67

Please sign in to comment.