Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
use steed
Browse files Browse the repository at this point in the history
  • Loading branch information
mcollina committed May 23, 2016
1 parent 786e701 commit 196206c
Show file tree
Hide file tree
Showing 20 changed files with 97 additions and 109 deletions.
18 changes: 9 additions & 9 deletions lib/amqp_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ var util = require("./util");
var wrap = util.wrap;
var TrieAscoltatore = require("./trie_ascoltatore");
var AbstractAscoltatore = require('./abstract_ascoltatore');
var async = require("async");
var steed = require("steed")();
var SubsCounter = require("./subs_counter");
var debug = require("debug")("ascoltatori:amqp");

Expand Down Expand Up @@ -71,7 +71,7 @@ AMQPAscoltatore.prototype._startConn = function() {

debug("connecting to " + this._opts.client);

async.series([
steed.series([

function(callback) {
that._client_conn.once("ready", wrap(callback));
Expand Down Expand Up @@ -106,7 +106,7 @@ AMQPAscoltatore.prototype._startConn = function() {
that._ascoltatore.publish(topic, message.data.toString());
});
that._queue.once("basicConsumeOk", function() {
util.defer(callback);
setImmediate(callback);
});
},

Expand Down Expand Up @@ -134,13 +134,13 @@ AMQPAscoltatore.prototype.subscribe = function subscribe(topic, callback, done)
// as advertised
setTimeout(function() {
debug("queue bound to topic " + topic);
util.defer(done);
setImmediate(done);
}, 5);
});

this._queue.bind(this._exchange, this._subTopic(topic));
} else {
util.defer(done);
setImmediate(done);
}

this._subs_counter.add(topic);
Expand All @@ -154,7 +154,7 @@ AMQPAscoltatore.prototype.publish = function publish(topic, message, done) {
debug("new message published to " + topic);

this._exchange.publish(this._pubTopic(topic), String(message));
util.defer(done);
setImmediate(done);
};

AMQPAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) {
Expand All @@ -168,12 +168,12 @@ AMQPAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, do
if (!this._subs_counter.include(topic)) {
this._queue.once("queueUnbindOk", function() {
debug("queue unbound to topic " + topic);
util.defer(done);
setImmediate(done);
});

this._queue.unbind(this._exchange, this._subTopic(topic));
} else {
util.defer(done);
setImmediate(done);
}

return this;
Expand Down Expand Up @@ -202,7 +202,7 @@ AMQPAscoltatore.prototype.close = function close(done) {
}

debug("closed");
util.defer(done);
setImmediate(done);
that.emit("closed");
};

Expand Down
2 changes: 1 addition & 1 deletion lib/ascoltatori.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ module.exports.build = function build(opts, done) {
}

if (done) {
module.exports.util.defer(function() {
setImmediate(function() {
result.once("ready", function() {
result.removeListener("error", done);
done(null, result);
Expand Down
22 changes: 11 additions & 11 deletions lib/behave_like_an_ascoltatore.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"use strict";

var wrap = require("./util").wrap;
var async = require("async");
var steed = require("steed")();

/**
* This is a shared mocha test for verifying that an
Expand Down Expand Up @@ -61,7 +61,7 @@ module.exports = function() {
var that = this;
that.instance.sub("hello", function() {});
setTimeout(function() {
// still, some Ascoltatore are async
// still, some Ascoltatore are steed
// and we must give them some time
done();
}, 10);
Expand Down Expand Up @@ -220,7 +220,7 @@ module.exports = function() {
funcToRemove = function(topic, value) {
throw "that should never run";
};
async.series([
steed.series([

function(cb) {
that.instance.sub("hello", funcToRemove, cb);
Expand All @@ -246,7 +246,7 @@ module.exports = function() {
funcToRemove = function(topic, value) {
throw "that should never run";
};
async.series([
steed.series([

function(cb) {
that.instance.sub("hello/42", wrap(done), cb);
Expand Down Expand Up @@ -289,7 +289,7 @@ module.exports = function() {
a.push(subscribe);
}

async.parallel(a, instance.publish.bind(instance, "hello", null));
steed.parallel(a, instance.publish.bind(instance, "hello", null));
});

it("should emit the ready event", function(done) {
Expand All @@ -311,7 +311,7 @@ module.exports = function() {
for (i = counter; i > 0; i = i - 1) {
a.push(this.instance.on.bind(this.instance, "ready", callback));
}
async.parallel(a);
steed.parallel(a);
});

it("should support removing a single listener", function(done) {
Expand All @@ -321,7 +321,7 @@ module.exports = function() {
funcToRemove = function(topic, value) {
throw "that should never run";
};
async.series([
steed.series([

function(cb) {
that.instance.sub("hello", wrap(done), cb);
Expand Down Expand Up @@ -373,7 +373,7 @@ module.exports = function() {

it("should allow the close method to be called twice", function(done) {
var that = this;
async.series([
steed.series([
this.instance.publish.bind(this.instance, "hello", "world"),
this.instance.close.bind(this.instance),
this.instance.close.bind(this.instance)
Expand Down Expand Up @@ -417,7 +417,7 @@ module.exports = function() {

// the nextTick hack is needed to skip out
// of mocha control
async.setImmediate(function() {
steed.setImmediate(function() {
that.instance.publish("throw");
});
});
Expand Down Expand Up @@ -446,7 +446,7 @@ module.exports = function() {
}, cb);
};

async.series([
steed.series([
sub, sub,

function(cb) {
Expand Down Expand Up @@ -477,7 +477,7 @@ module.exports = function() {
}, cb);
};

async.series([
steed.series([
function (cb) {
sub("a/+", cb);
},
Expand Down
8 changes: 4 additions & 4 deletions lib/decorator_ascoltatore.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"use strict";

var async = require("async");
var steed = require("steed")();
var AbstractAscoltatore = require("./abstract_ascoltatore");

/**
Expand Down Expand Up @@ -72,7 +72,7 @@ DecoratorAscoltatore.prototype.removeListener = function(event, callback) {

DecoratorAscoltatore.prototype.subscribe = function(topic, callback, done) {
var that = this;
async.waterfall([
steed.waterfall([

function(cb) {
that.wrapTopic(topic, cb);
Expand All @@ -92,7 +92,7 @@ DecoratorAscoltatore.prototype.subscribe = function(topic, callback, done) {

DecoratorAscoltatore.prototype.unsubscribe = function(topic, callback, done) {
var that = this;
async.waterfall([
steed.waterfall([

function(cb) {
that.wrapTopic(topic, cb);
Expand All @@ -112,7 +112,7 @@ DecoratorAscoltatore.prototype.unsubscribe = function(topic, callback, done) {

DecoratorAscoltatore.prototype.publish = function(topic, payload, options, done) {
var that = this;
async.waterfall([
steed.waterfall([

function(cb) {
that.wrapTopic(topic, cb);
Expand Down
9 changes: 4 additions & 5 deletions lib/event_emitter2_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

var AbstractAscoltatore = require("./abstract_ascoltatore");
var util = require("./util");
var defer = util.defer;
var debug = require("debug")("ascoltatori:ee2");
var EventEmitter2 = require("eventemitter2").EventEmitter2;
var ascoltatori = require('./ascoltatori');
Expand Down Expand Up @@ -43,7 +42,7 @@ EventEmitter2Ascoltatore.prototype.subscribe = function subscribe(topic, callbac
debug("registered new subscriber for topic " + topic);

this._event.on(this._subTopic(topic).replace(/^\//g, ''), callback);
defer(done);
setImmediate(done);
};

EventEmitter2Ascoltatore.prototype.publish = function (topic, message, options, done) {
Expand All @@ -52,7 +51,7 @@ EventEmitter2Ascoltatore.prototype.publish = function (topic, message, options,

this._event.emit(this._pubTopic(topic).replace(/^\//g, ''), topic, message, options);

defer(done);
setImmediate(done);
};

EventEmitter2Ascoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) {
Expand All @@ -62,7 +61,7 @@ EventEmitter2Ascoltatore.prototype.unsubscribe = function unsubscribe(topic, cal

this._event.off(this._subTopic(topic).replace(/^\//g, ''), callback);

defer(done);
setImmediate(done);
};

EventEmitter2Ascoltatore.prototype.close = function close(done) {
Expand All @@ -71,7 +70,7 @@ EventEmitter2Ascoltatore.prototype.close = function close(done) {

debug("closed");

defer(done);
setImmediate(done);
};

EventEmitter2Ascoltatore.prototype.registerDomain = function(domain) {
Expand Down
16 changes: 7 additions & 9 deletions lib/kafka_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ var TrieAscoltatore = require("./trie_ascoltatore");
var AbstractAscoltatore = require('./abstract_ascoltatore');
var debug = require("debug")("ascoltatori:kafka");
var Qlobber = require('qlobber').Qlobber;

var SubsCounter = require("./subs_counter");
var async = require("async");

/**
* KafkaAscoltatore is a class that inherits from AbstractAscoltatore.
Expand Down Expand Up @@ -194,7 +192,7 @@ KafkaAscoltatore.prototype._startConn = function(cb) {
debug("error in client",e);
that.emit("error", e);
});
util.defer(newcb);
setImmediate(newcb);
};
that._consumerClient.on('ready',function() {
that.withTopicOffsetsAdded(that._consumerClient,subscriptions,initConsumer);
Expand Down Expand Up @@ -228,7 +226,7 @@ KafkaAscoltatore.prototype.subscribe = function subscribe(topic, onMessageReceiv

this._ascoltatore.subscribe(topic, onMessageReceived);
if(subscribe_topics.length === 0){
util.defer(done);
setImmediate(done);
return;
}

Expand Down Expand Up @@ -256,7 +254,7 @@ KafkaAscoltatore.prototype.addKafkaSubscriptions = function addKafkaSubscription
return;
}
debug("registered new kafka subscriptions", subscribe_topics);
util.defer(done);
setImmediate(done);
},true);
});
};
Expand Down Expand Up @@ -334,7 +332,7 @@ KafkaAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, d

var newDone = function() {
debug("deregistered subscriber for topic " + subtopic);
util.defer(done);
setImmediate(done);
};

this._subs_matcher.remove(subtopic);
Expand Down Expand Up @@ -401,11 +399,11 @@ KafkaAscoltatore.prototype.createTopics = function createTopics(topics,callback)
this._producer.createTopics(topics, false, function (err, data) {
if(err){
debug("problem creating topics",err);
util.defer(function(){callback(err);});
setImmediate(function(){callback(err);});
return;
}
that._knowntopics = that._knowntopics.concat(topics);
util.defer(callback);
setImmediate(callback);
});
};

Expand All @@ -432,7 +430,7 @@ KafkaAscoltatore.prototype.close = function close(done) {
delete that._consumerClient;
delete that._consumer;
that.emit("closed");
util.defer(done);
setImmediate(done);
});
});
}else{
Expand Down
4 changes: 2 additions & 2 deletions lib/mongo_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ var debug = require("debug")("ascoltatori:mongodb");
var mongo = require('mongodb');
var MongoClient = require('mongodb').MongoClient;
var ObjectID = require('mongodb').ObjectID;
var async = require("async");
var steed = require("steed");

/**
* MongoAscoltatore is a class that inherits from AbstractAscoltatore.
Expand Down Expand Up @@ -331,7 +331,7 @@ MongoAscoltatore.prototype.close = function close(done) {

that._closed = true;

async.series([
steed.series([
function(cb) {
if (that._cursor) {
that._cursor.close(cb);
Expand Down
Loading

0 comments on commit 196206c

Please sign in to comment.