Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

added filter ascoltatore for topic based routing #149

Open
wants to merge 1 commit into
base: v3-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,35 @@ If you publish to a kafka topic that doesn't exist, that topic will be created u

If you subscribe to a kafka topic that doesn't exist, that subscription will take affect only when something is published to the kafka topic through this ascoltatori.

### Filter (Topic Based Routing)

Configure multiple backends and use regular expressions to select the backend to which a topic belongs.

```javascript
var ascoltatori = require('ascoltatori');
var settings = {
type: 'filter',
filters: [{accepts: /\$SYS/, ascoltatore: {type: "trie"},
{accepts: /.*/, ascoltatore: {
type: 'kafka',
json: false,
kafka: require("kafka-node"),
connectString: "localhost:2181",
clientId: "ascoltatori",
groupId: "ascoltatori",
defaultEncoding: "utf8",
encodings: {
image: "buffer"
}
}}
]
};

ascoltatori.build(settings, function (err, ascoltatore) {
// ...
});
```

## Domain support

Ascoltatori supports the [node.js domain API](http://nodejs.org/api/domain.html).
Expand Down
2 changes: 2 additions & 0 deletions lib/ascoltatori.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module.exports.DecoratorAscoltatore = require("./decorator_ascoltatore");
module.exports.JSONAscoltatore = require("./json_ascoltatore");
module.exports.FileSystemAscoltatore = require("./filesystem_ascoltatore");
module.exports.KafkaAscoltatore = require("./kafka_ascoltatore");
module.exports.FilterAscoltatore = require("./filter_ascoltatore");

/**
*
Expand All @@ -35,6 +36,7 @@ var classes = {
"zmq": module.exports.ZeromqAscoltatore,
"mongo": module.exports.MongoAscoltatore,
"kafka": module.exports.KafkaAscoltatore,
"filter": module.exports.FilterAscoltatore,
"filesystem": module.exports.FileSystemAscoltatore
};

Expand Down
122 changes: 122 additions & 0 deletions lib/filter_ascoltatore.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"use strict";

var AbstractAscoltatore = require("./abstract_ascoltatore");
var util = require("./util");
var defer = util.defer;
var debug = require("debug")("ascoltatori:filter");
var ascoltatori = require('./ascoltatori');
var TrieAscoltatore = require("./trie_ascoltatore");
var async = require("async");

/**
* A FilterAscoltatore is a class that inherits from AbstractAscoltatore,
* delegating to the first ascoltatore which accepts a topic.
*
* @api public
*/
function FilterAscoltatore(settings) {
AbstractAscoltatore.call(this, settings);

settings = settings || {};
var filters = settings.filters || [];
debug(filters.length+" filters given");
filters.push({accepts: /.*/, ascoltatore: {type: "trie"}});
this.wrapped = [];
var that = this;
var xtors = [];
var xtor = function(filter){
return function(callback){
var wrapper = function(err,result){
callback(err,{accepts: filter.accepts, ascoltatore: result});
};
ascoltatori.build(filter.ascoltatore,wrapper);
};
};
for(var i=0;i<filters.length;i++){
xtors.push(xtor(filters[i]));
}

async.parallel(xtors, function(err,results){
if(err){
that.emit("error",err);
} else {
that.wrapped = results;
for(var i=0;i<results.length;i++){
debug("filter "+i+" is "+results[i].accepts.toString());
}
that.emit("ready");
}
});

}


/**
* See AbstractAscoltatore for the public API definitions.
*
* @api private
*/

FilterAscoltatore.prototype = Object.create(AbstractAscoltatore.prototype);

FilterAscoltatore.prototype._filter = function filter(topic) {
for(var i=0;i<this.wrapped.length;i++){
if(this.wrapped[i].accepts.test(topic)){
debug("filter "+this.wrapped[i].accepts.toString()+" accepts "+topic);
return this.wrapped[i].ascoltatore;
}
}
};

FilterAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) {
this._raiseIfClosed();
this._filter(topic).subscribe(topic,callback,done);
};

FilterAscoltatore.prototype.publish = function (topic, message, options, done) {
this._raiseIfClosed();
this._filter(topic).publish(topic,message,options,done);
};

FilterAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) {
this._raiseIfClosed();
this._filter(topic).unsubscribe(topic,callback,done);
};

FilterAscoltatore.prototype.close = function close(done) {
if(this._closed){
defer(done);
return;
}
var closers = [];
var closer = function(delegate){
return function(callback){delegate.close(callback);};
}
for(var i=0;i<this.wrapped.length;i++){
var delegate = this.wrapped[i].ascoltatore;
closers.push(closer(delegate));
}
var that = this;
async.parallel(closers, function(err,results){
that.emit("closed");
defer(done);
});
};

FilterAscoltatore.prototype.registerDomain = function(domain) {

if (!this._publish) {
this._publish = this.publish;
}

this.publish = domain.bind(this._publish);
};

util.aliasAscoltatore(FilterAscoltatore.prototype);

/**
* Exports the FilterAscoltatore.
*
* @api public
*/
module.exports = FilterAscoltatore;
8 changes: 8 additions & 0 deletions test/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ global.prefixSettings = function() {
r.args = [this.separator + "myprefix"];
return r;
};
global.filterSettings = function() {
return {
args: [{
filters: [{accepts: /hello/, ascoltatore: {type: "trie"}},
{accepts: /hebida/, ascoltatore: {type: "trie"}}]
}]
};
};

var mosca = require("mosca");

Expand Down
22 changes: 22 additions & 0 deletions test/filter_ascoltatore_spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

describeAscoltatore("filter", function(){
it("should publish messages to the filtered delegate (hello)", function(done) {
var that = this;
this.instance.wrapped[0].ascoltatore.subscribe("/hello", wrap(done), function() {
that.instance.publish("/hello", "world");
});
});
it("should publish messages to the filtered delegate (hebida)", function(done) {
var that = this;
this.instance.wrapped[1].ascoltatore.subscribe("/hebida", wrap(done), function() {
that.instance.publish("/hebida", "hoobida");
});
});
it("should publish messages to a default delegate", function(done) {
var that = this;
this.instance.wrapped[2].ascoltatore.subscribe("/cheese", wrap(done), function() {
that.instance.publish("/cheese", "yum");
});
});

});