middleware based components to build a custom mqtt broker
In development, not yet stable.
mqtt-stack is available in npm, first you need to install
npm install mqtt-stack --save
require it in your project
var mqttStack = require('mqtt-stack');
instantiate stack
var stack = new mqttStack.Stack();
register any middleware to be used by calling use
method
stack.use(new mqttStack.MemoryBackend());
In a broad terms, mqtt-stack middlewares are components that listen mqtt connection stream to perform actions according to their specific broker functionality.
Middleware
base class implementation is available in module exports, developpers are encouraged to inherit from that base class. mqtt-stack middlewares may implement following interface methods.
Standard object constructor function takes configuration object as argument. If middleware is inherited from Middleware
base class super(config, defaults)
call sets up config
member attribute of middleware object.
class MyMiddleware {
constructor(config) {
let defaults = {some: 'value'};
/* calling super function sets this.config and
* this.name attributes */
super(config, defaults);
}
}
Method is called once a new connection is established.
Method is called once a packet is received. Once middleware finishes its action, it should either call next
function to propagate to next middleware or call done
function to terminate propagation.
Other than these interface methods, middleware may handle a stack callback
by exposing a method function with callback name. For instance, please check OutboundManager middleware (path: src/middlewares/outbound_manager.js
) to see forwardMessage
callback handler. ctx
argument is an object which contains any relevant data required for callback handling. store
is an output argument, that is updated by callback handlers. done
terminates callback chain and returns callback.
mqtt-stack provide some built-in middlewares to provide basic MQTT Broker functionality. Keep in mind that those middlewares are not mandatory, on contrary they are designed to be easily replacible.
Simple authentication binder middleware that executes authenticateConnection
callback handler with {client, packet, username, password}
context if client is not authenticated.
Simple authorization binder middleware that executes authorizePacket
callback handler with {client, packet}
context for every received packet.
Simple connection management middleware. It observes connection status.
When connection is closed gracefully it executes cleanDisconnect
callback handler with {client}
context.
When connection is closed unexpectedly it executes uncleanDisconnect
callback handler with {client}
context.
It exposes closeClient
callback handler that will terminate client connection.
Handles client's PUBLISH
command by executing relayMessage
callback handler with {client, packet, topic, payload}
context. Once callback handler finishes and it sends PUBACK
message to client if its QoS is 1.
Manages client connection's life span. Once client's CONNECT
command is received, if it contains keepalive
duration, middleware bounds life time of connection with this duration and resets the time on every received packet. It executes closeClient
callback handler with {client}
context if no packet is received within keepalive
time frame.
It also responds received PINGREQ
commands with PINGRESP
.
Sends last will
packet if client is disconnected unexpectedly. Once client's CONNECT
command is received if it contains last will
packet, will packet is stored. This middleware exposes uncleanDisconnect
callback handler that sends will packet.
Simple non-persistent backend storage middleware. It stores clients' subscription list and topics' retained messages in memory. It exposes following callback handlers
storeSubscription
stores thatctx.client
is subscribed toctx.topic
withctx.qos
QoS level.removeSubscription
removes subscription record ofctx.client
for topicctx.topic
.clearSubscriptions
removes all stored subscription data forctx.client
.lookupSubscriptions
returns all stored subscription data forctx.client
instore
argument.storeRetainedMessage
clears previous retained message ofctx.topic
and ifctx.packet.payload
is not empty storesctx.packet
as new retained message.lookupRetainedMessages
returns stored retained message ofctx.topic
instore
argument.relayMessage
relaysctx.packet
to subscribers ofctx.packet.topic
by executingforwardMessage
callback handler with context{client, packet}
.subscribeTopic
subscribesctx.client
toctx.topic
with QoS level defined byctx.qos
.unsubscribeTopic
unsubscribesctx.client
fromctx.topic
.storeOfflineMessage
storesctx.packet
for offlinectx.client
withctx.messageId
.lookupOfflineMessages
returns all messages stored forctx.client
instore
argument.removeOfflineMessages
removes messages with id's in the listctx.messageIds
stored for client with idctx.clientId
.clearOfflineMessages
removes all messages stored for client with idctx.clientId
.
Manages outgoing messages. Handles client's PUBACK
command. Exposes forwardMessage
that publishes message to client.
Simple event bridge that establishes connection with an eventemitter and connection. Event emitter should be set by calling setClientHandler
method before it is used.
Manages retained messages for topics. If client's PUBLISH
command has flag retain
it executes storeRetainedMessage
callback handler with {client, topic of packet, packet}
context.
It exposes subscribeTopic
callback handler that first executes lookupRetainedMessages
callback handler with {topic}
then if topic has retained message executes forwardMessage
handler with {client, retained packet}
Manages the clients session and calls callbacks to manage the stored subscriptions for clean and unclean clients. Once client's CONNECT
command is received,
- if it contains
clean
flag, session manager does not store its subscriptions for later connections and also executesclearSubscriptions
callback handler with{client, packet, clientId}
context to destroy clients previous session. then sendsCONNACK
to client. - it it
clean
flag is false or not exists, session manager first executeslookupSubscriptions
callback handler with{client, packet, clientId}
context to retrieve old subscription list, then executessubscribeTopic
callback handler for each subscription in list with{client, packet, topic, subscription QoS}
context to restore old subscriptions. After session is restored,CONNACK
is sent to client.
Manages client's SUBSCRIBE
and UNSUBSCRIBE
commands. For subscibe, it executes subscribeTopic
callback handler with {client, packet, topic, QoS level}
context, then SUBACK
is sent to client. For unsubscribe it executes unsubscribeTopic
callback handler with {client, packet, topic}
context, then UNSUBACK
is sent to client.
Unit test are available in test folder. Project includes mqtt-spec
as git submodule. Stack is tested for mqtt specifications using mqtt-spec
module. Very primitive benchmarking results +20k message per second for loopback network.
mqtt-stack is an OPEN Open Source Project. This means that:
Individuals making significant and valuable contributions are given commit-access to the project to contribute as they see fit. This project is more like an open wiki than a standard guarded open source project.
See the CONTRIBUTING.md file for more details.
mqtt-stack is only possible due to the excellent work of the following contributors:
Joël Gähwiler | GitHub/256dpi | Twitter/@256dpi |
---|---|---|
Matteo Collina | GitHub/mcollina | Twitter/@matteocollina |
M Kamil Sulubulut | GitHub/kokeksibir | Twitter/@kokeksibir |
MIT