Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
Merge pull request #151 from mcollina/steed
Browse files Browse the repository at this point in the history
Removed domains and switch to steed
  • Loading branch information
mcollina committed May 28, 2016
2 parents 786e701 + 855905b commit e2c7bcc
Show file tree
Hide file tree
Showing 24 changed files with 97 additions and 241 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ language: node_js
sudo: false
node_js:
- "6"
- "5"
- "4"
- "0.12"
script:
Expand Down
28 changes: 0 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,34 +316,6 @@ If you publish to a kafka topic that doesn't exist, that topic will be created u

If you subscribe to a kafka topic that doesn't exist, that subscription will take affect only when something is published to the kafka topic through this ascoltatori.

## Domain support

Ascoltatori supports the [node.js domain API](http://nodejs.org/api/domain.html).
Use it calling the `registerDomain` function on your Ascoltatore and it will take
care of routing the exceptions to the given domain. Look at this example:

```javascript
var ascoltatori = require('ascoltatori');
var domain = require('domain');

var d = domain.create();
d.on('error', function() {
console.log(arguments);
});

ascoltatori.build(function (err, ascoltatore) {
ascoltatore.registerDomain(d);

ascoltatore.subscribe('hello/*', function() {
throw new Error();
});

ascoltatore.publish('hello/42', 'a message', function() {
console.log('message published');
});
});
```


## Debugging

Expand Down
11 changes: 0 additions & 11 deletions lib/abstract_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,6 @@ AbstractAscoltatore.prototype.close = function(done) {
throw new Error("Subclass to implement");
};

/**
* You can register a nodejs domain so that every callback is
* jailed and cannot crash the process.
*
* @param {Domain} domain the node.js error domain to use.
* @api public
*/
AbstractAscoltatore.prototype.registerDomain = function(domain) {
this._ascoltatore.registerDomain(domain);
};

AbstractAscoltatore.prototype._subTopic = function(topic) {
if (this._reInSeparator) {
topic = topic.replace(this._reInSeparator,
Expand Down
19 changes: 10 additions & 9 deletions lib/amqp_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

var util = require("./util");
var wrap = util.wrap;
var defer = util.defer;
var TrieAscoltatore = require("./trie_ascoltatore");
var AbstractAscoltatore = require('./abstract_ascoltatore');
var async = require("async");
var steed = require("steed")();
var SubsCounter = require("./subs_counter");
var debug = require("debug")("ascoltatori:amqp");

Expand Down Expand Up @@ -71,7 +72,7 @@ AMQPAscoltatore.prototype._startConn = function() {

debug("connecting to " + this._opts.client);

async.series([
steed.series([

function(callback) {
that._client_conn.once("ready", wrap(callback));
Expand Down Expand Up @@ -106,7 +107,7 @@ AMQPAscoltatore.prototype._startConn = function() {
that._ascoltatore.publish(topic, message.data.toString());
});
that._queue.once("basicConsumeOk", function() {
util.defer(callback);
defer(callback);
});
},

Expand Down Expand Up @@ -134,13 +135,13 @@ AMQPAscoltatore.prototype.subscribe = function subscribe(topic, callback, done)
// as advertised
setTimeout(function() {
debug("queue bound to topic " + topic);
util.defer(done);
defer(done);
}, 5);
});

this._queue.bind(this._exchange, this._subTopic(topic));
} else {
util.defer(done);
defer(done);
}

this._subs_counter.add(topic);
Expand All @@ -154,7 +155,7 @@ AMQPAscoltatore.prototype.publish = function publish(topic, message, done) {
debug("new message published to " + topic);

this._exchange.publish(this._pubTopic(topic), String(message));
util.defer(done);
defer(done);
};

AMQPAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) {
Expand All @@ -168,12 +169,12 @@ AMQPAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, do
if (!this._subs_counter.include(topic)) {
this._queue.once("queueUnbindOk", function() {
debug("queue unbound to topic " + topic);
util.defer(done);
defer(done);
});

this._queue.unbind(this._exchange, this._subTopic(topic));
} else {
util.defer(done);
defer(done);
}

return this;
Expand Down Expand Up @@ -202,7 +203,7 @@ AMQPAscoltatore.prototype.close = function close(done) {
}

debug("closed");
util.defer(done);
defer(done);
that.emit("closed");
};

Expand Down
2 changes: 1 addition & 1 deletion lib/ascoltatori.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ module.exports.build = function build(opts, done) {
}

if (done) {
module.exports.util.defer(function() {
setImmediate(function() {
result.once("ready", function() {
result.removeListener("error", done);
done(null, result);
Expand Down
71 changes: 11 additions & 60 deletions lib/behave_like_an_ascoltatore.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"use strict";

var wrap = require("./util").wrap;
var async = require("async");
var steed = require("steed")();

/**
* This is a shared mocha test for verifying that an
Expand All @@ -17,12 +17,7 @@ var async = require("async");
*/
module.exports = function() {

var domain, expect;

// we need to require this here
// so we do not force a dependency on the new
// domain stuff
domain = require("domain");
var expect;

// you MUST depend on chai only if you plan to run
// this test
Expand Down Expand Up @@ -61,7 +56,7 @@ module.exports = function() {
var that = this;
that.instance.sub("hello", function() {});
setTimeout(function() {
// still, some Ascoltatore are async
// still, some Ascoltatore are steed
// and we must give them some time
done();
}, 10);
Expand Down Expand Up @@ -220,7 +215,7 @@ module.exports = function() {
funcToRemove = function(topic, value) {
throw "that should never run";
};
async.series([
steed.series([

function(cb) {
that.instance.sub("hello", funcToRemove, cb);
Expand All @@ -246,7 +241,7 @@ module.exports = function() {
funcToRemove = function(topic, value) {
throw "that should never run";
};
async.series([
steed.series([

function(cb) {
that.instance.sub("hello/42", wrap(done), cb);
Expand Down Expand Up @@ -289,7 +284,7 @@ module.exports = function() {
a.push(subscribe);
}

async.parallel(a, instance.publish.bind(instance, "hello", null));
steed.parallel(a, instance.publish.bind(instance, "hello", null));
});

it("should emit the ready event", function(done) {
Expand All @@ -311,7 +306,7 @@ module.exports = function() {
for (i = counter; i > 0; i = i - 1) {
a.push(this.instance.on.bind(this.instance, "ready", callback));
}
async.parallel(a);
steed.parallel(a);
});

it("should support removing a single listener", function(done) {
Expand All @@ -321,7 +316,7 @@ module.exports = function() {
funcToRemove = function(topic, value) {
throw "that should never run";
};
async.series([
steed.series([

function(cb) {
that.instance.sub("hello", wrap(done), cb);
Expand Down Expand Up @@ -373,57 +368,13 @@ module.exports = function() {

it("should allow the close method to be called twice", function(done) {
var that = this;
async.series([
steed.series([
this.instance.publish.bind(this.instance, "hello", "world"),
this.instance.close.bind(this.instance),
this.instance.close.bind(this.instance)
], done);
});

// skip the tests, we need to drop these anyway
describe.skip("wrapping uncaughtException", function() {
var uncaughtExceptionHandler, dm;

beforeEach(function(done) {
dm = domain.create();

var listeners = process.listeners("uncaughtException");
uncaughtExceptionHandler = listeners[listeners.length - 1];
process.removeListener("uncaughtException", uncaughtExceptionHandler);
done();
});

afterEach(function(done) {
process.on("uncaughtException", uncaughtExceptionHandler);
dm.dispose();
done();
});

it("should support domains", function(done) {
var that = this;

dm.on("error", function(err) {
expect(err.message).to.equal("ahaha");
done();
});

that.instance.registerDomain(dm);

that.instance.subscribe("throw", function() {
throw new Error("ahaha");
}, function() {
// we need to properly wait that the subscribe
// has happened correctly

// the nextTick hack is needed to skip out
// of mocha control
async.setImmediate(function() {
that.instance.publish("throw");
});
});
});
});

it("should not deliver message twice for double subscription", function(done) {
var that = this,
count = 2,
Expand All @@ -446,7 +397,7 @@ module.exports = function() {
}, cb);
};

async.series([
steed.series([
sub, sub,

function(cb) {
Expand Down Expand Up @@ -477,7 +428,7 @@ module.exports = function() {
}, cb);
};

async.series([
steed.series([
function (cb) {
sub("a/+", cb);
},
Expand Down
12 changes: 4 additions & 8 deletions lib/decorator_ascoltatore.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"use strict";

var async = require("async");
var steed = require("steed")();
var AbstractAscoltatore = require("./abstract_ascoltatore");

/**
Expand Down Expand Up @@ -72,7 +72,7 @@ DecoratorAscoltatore.prototype.removeListener = function(event, callback) {

DecoratorAscoltatore.prototype.subscribe = function(topic, callback, done) {
var that = this;
async.waterfall([
steed.waterfall([

function(cb) {
that.wrapTopic(topic, cb);
Expand All @@ -92,7 +92,7 @@ DecoratorAscoltatore.prototype.subscribe = function(topic, callback, done) {

DecoratorAscoltatore.prototype.unsubscribe = function(topic, callback, done) {
var that = this;
async.waterfall([
steed.waterfall([

function(cb) {
that.wrapTopic(topic, cb);
Expand All @@ -112,7 +112,7 @@ DecoratorAscoltatore.prototype.unsubscribe = function(topic, callback, done) {

DecoratorAscoltatore.prototype.publish = function(topic, payload, options, done) {
var that = this;
async.waterfall([
steed.waterfall([

function(cb) {
that.wrapTopic(topic, cb);
Expand All @@ -134,10 +134,6 @@ DecoratorAscoltatore.prototype.close = function(done) {
this._ascoltatore.close(done);
};

DecoratorAscoltatore.prototype.registerDomain = function(domain) {
this._ascoltatore.registerDomain(domain);
};

DecoratorAscoltatore.prototype.unsub = function(topic, callback, done) {
this.unsubscribe(topic, callback, done);
};
Expand Down
10 changes: 0 additions & 10 deletions lib/event_emitter2_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,6 @@ EventEmitter2Ascoltatore.prototype.close = function close(done) {
defer(done);
};

EventEmitter2Ascoltatore.prototype.registerDomain = function(domain) {
debug("registered domain");

if (!this._publish) {
this._publish = this.publish;
}

this.publish = domain.bind(this._publish);
};

util.aliasAscoltatore(EventEmitter2Ascoltatore.prototype);

/**
Expand Down
17 changes: 0 additions & 17 deletions lib/filesystem_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ FileSystemAscoltatore.prototype.subscribe = function (topic, callback, done)

var f = cb;

if (this._domain) {
f = this._domain.bind(cb);
}

callback[this._dehnd] = callback[this._dehnd] || f;

this._fsq.subscribe(this._subTopic(topic),
Expand Down Expand Up @@ -87,19 +83,6 @@ FileSystemAscoltatore.prototype.close = function (done)
this._fsq.stop_watching(done);
};

FileSystemAscoltatore.prototype.registerDomain = function (domain)
{
debug('registered domain');

this._domain = domain;

var ths = this;

domain.on('error', function () {
ths._fsq.stop_watching();
});
};

util.aliasAscoltatore(FileSystemAscoltatore.prototype);

/**
Expand Down
Loading

0 comments on commit e2c7bcc

Please sign in to comment.