Multiplex multiple message-oriented protocols over a stream
Install with npm:
npm install protomux
const Protomux = require('protomux')
const c = require('compact-encoding')
// By framed stream, it has be a stream that preserves the messages, ie something that length prefixes
// like @hyperswarm/secret-stream
const mux = new Protomux(aStreamThatFrames)
// Now add some protocol channels
const cool = mux.createChannel({
protocol: 'cool-protocol',
id: Buffer.from('optional binary id'),
onopen () {
console.log('the other side opened this protocol!')
},
onclose () {
console.log('either side closed the protocol')
}
})
// And add some messages
const one = cool.addMessage({
encoding: c.string,
onmessage (m) {
console.log('recv message (1)', m)
}
})
const two = cool.addMessage({
encoding: c.bool,
onmessage (m) {
console.log('recv message (2)', m)
}
})
// open the channel
cool.open()
// And send some data
one.send('a string')
two.send(true)
Makes a new instance. stream
should be a framed stream, preserving the messages written.
options
include:
{
// Called when the muxer needs to allocate a message that is written, defaults to Buffer.allocUnsafe.
alloc (size) {}
}
Helper to accept either an existing muxer instance or a stream (which creates a new one).
const channel = mux.createChannel([options])
Adds a new protocol channel.
options
include:
{
// Used to match the protocol
protocol: 'name of the protocol',
// Optional additional binary id to identify this channel
id: buffer,
// Optional encoding for a handshake
handshake: encoding,
// Optional array of message types to send/receive.
messages: [],
// Called when the remote side adds this protocol.
// Errors here are caught and forwarded to stream.destroy
async onopen (handshake) {},
// Called when the channel closes - ie the remote side closes or rejects this protocol or we closed it.
// Errors here are caught and forwarded to stream.destroy
async onclose () {},
// Called after onclose when all pending promises have been resolved.
async ondestroy () {}
}
Sessions are paired based on a queue, so the first remote channel with the same protocol
and id
.
mux.createChannel
returnsnull
if the channel should not be opened, it's a duplicate channel or the remote has already closed this one. To have multiple sessions with the sameprotocol
andid
, setunique: false
as an option.
Boolean that indicates if the channel is opened.
Registers a callback to be called every time a new channel is requested.
Unregisters the pair callback.
Opens the channel.
Adds/registers a message type for a specific encoding. Options include:
{
// compact-encoding specifying how to encode/decode this message
encoding: c.binary,
// Called when the remote side sends a message.
// Errors here are caught and forwared to stream.destroy
async onmessage (message) { }
}
Sends a message.
The function that is called when a message arrives.
The encoding for this message.
Closes the protocol channel.
Corking the protocol channel, makes it buffer messages and sends them all in a batch when it uncorks.
Uncorks and send the batch.
Same as channel.cork
but on the muxer instance.
Same as channel.uncork
but on the muxer instance.
The muxer instance is iterable so all channels can be iterated.