From b4ce3e391a7edea07af8f544617909b0a295e6ca Mon Sep 17 00:00:00 2001 From: Rob Fuller Date: Mon, 9 May 2016 11:11:21 +0100 Subject: [PATCH] added filter ascoltatore for topic based routing --- README.md | 29 ++++++++ lib/ascoltatori.js | 2 + lib/filter_ascoltatore.js | 122 ++++++++++++++++++++++++++++++++ test/common.js | 8 +++ test/filter_ascoltatore_spec.js | 22 ++++++ 5 files changed, 183 insertions(+) create mode 100644 lib/filter_ascoltatore.js create mode 100644 test/filter_ascoltatore_spec.js diff --git a/README.md b/README.md index ce0c724..37d0e95 100644 --- a/README.md +++ b/README.md @@ -316,6 +316,35 @@ If you publish to a kafka topic that doesn't exist, that topic will be created u If you subscribe to a kafka topic that doesn't exist, that subscription will take affect only when something is published to the kafka topic through this ascoltatori. +### Filter (Topic Based Routing) + +Configure multiple backends and use regular expressions to select the backend to which a topic belongs. + +```javascript +var ascoltatori = require('ascoltatori'); +var settings = { + type: 'filter', + filters: [{accepts: /\$SYS/, ascoltatore: {type: "trie"}, + {accepts: /.*/, ascoltatore: { + type: 'kafka', + json: false, + kafka: require("kafka-node"), + connectString: "localhost:2181", + clientId: "ascoltatori", + groupId: "ascoltatori", + defaultEncoding: "utf8", + encodings: { + image: "buffer" + } + }} + ] +}; + +ascoltatori.build(settings, function (err, ascoltatore) { + // ... +}); +``` + ## Domain support Ascoltatori supports the [node.js domain API](http://nodejs.org/api/domain.html). diff --git a/lib/ascoltatori.js b/lib/ascoltatori.js index c544796..65a0d0d 100644 --- a/lib/ascoltatori.js +++ b/lib/ascoltatori.js @@ -21,6 +21,7 @@ module.exports.DecoratorAscoltatore = require("./decorator_ascoltatore"); module.exports.JSONAscoltatore = require("./json_ascoltatore"); module.exports.FileSystemAscoltatore = require("./filesystem_ascoltatore"); module.exports.KafkaAscoltatore = require("./kafka_ascoltatore"); +module.exports.FilterAscoltatore = require("./filter_ascoltatore"); /** * @@ -35,6 +36,7 @@ var classes = { "zmq": module.exports.ZeromqAscoltatore, "mongo": module.exports.MongoAscoltatore, "kafka": module.exports.KafkaAscoltatore, + "filter": module.exports.FilterAscoltatore, "filesystem": module.exports.FileSystemAscoltatore }; diff --git a/lib/filter_ascoltatore.js b/lib/filter_ascoltatore.js new file mode 100644 index 0000000..2c1cc6c --- /dev/null +++ b/lib/filter_ascoltatore.js @@ -0,0 +1,122 @@ +"use strict"; + +var AbstractAscoltatore = require("./abstract_ascoltatore"); +var util = require("./util"); +var defer = util.defer; +var debug = require("debug")("ascoltatori:filter"); +var ascoltatori = require('./ascoltatori'); +var TrieAscoltatore = require("./trie_ascoltatore"); +var async = require("async"); + +/** + * A FilterAscoltatore is a class that inherits from AbstractAscoltatore, + * delegating to the first ascoltatore which accepts a topic. + * + * @api public + */ +function FilterAscoltatore(settings) { + AbstractAscoltatore.call(this, settings); + + settings = settings || {}; + var filters = settings.filters || []; + debug(filters.length+" filters given"); + filters.push({accepts: /.*/, ascoltatore: {type: "trie"}}); + this.wrapped = []; + var that = this; + var xtors = []; + var xtor = function(filter){ + return function(callback){ + var wrapper = function(err,result){ + callback(err,{accepts: filter.accepts, ascoltatore: result}); + }; + ascoltatori.build(filter.ascoltatore,wrapper); + }; + }; + for(var i=0;i