-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Changed connection to subscription for consistency. Started readies.
- Loading branch information
Roman Smolgovsky
committed
Mar 11, 2016
1 parent
2b43562
commit 5e45ddb
Showing
78 changed files
with
12,671 additions
and
189 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
# Kaazing JavaScript AMQP Client Libraries Facade | ||
Kaazing AMQP Client Libraries Facade: | ||
• Implements basic publish-subscribe functionality for AMQP protocol to help developers in getting started with their AMQP WebSocket projects | ||
• Provide developers with the reference implementations for using Kaazing AMQP JavaScript client libraries | ||
For more information see: | ||
- [How to Build JavaScript Clients Using Kaazing WebSocket Gateway][2] | ||
- [Use the Kaazing WebSocket Gateway JavaScript AMQP Client Library][3] | ||
|
||
## Organization of the library | ||
Library consists of amqpClientFunction that creates AmqpClient object. AmqpClient objects provides the following functionality: | ||
- **connect** function - connects client to Kaazing WebSocket AMQP gateway and on success returns via callback a _connection_ object that will be used to create subscriptions. | ||
- ** subscribe ** method of a _connecion_ object that creates publishing endpoint and subscribes to a subscription endpoint. Method returns _subscription_ object that is used for sending messages. | ||
- **sendMessage** method of a _subscription_ object - sends the message to a publishing endpoint | ||
- **close** function of a _subscription_ object - closes both publishing and subscription. | ||
- **disconnect** method - closes all subscriptions and disconnects client from Kaazing WebSocket AMQP gateway | ||
|
||
|
||
### **connect** function | ||
Connect function implements the following sequence: | ||
|
||
1. Create WebSocket and AMQP client factories | ||
|
||
```javascript | ||
var amqpClientFactory = new AmqpClientFactory(); | ||
var webSocketFactory; | ||
if ($gatewayModule && typeof($gatewayModule.WebSocketFactory) === "function") { | ||
webSocketFactory = createWebSocketFactory(); | ||
amqpClientFactory.setWebSocketFactory(webSocketFactory); | ||
} | ||
``` | ||
|
||
2. Create AMQP client | ||
|
||
```javascript | ||
amqpClient = amqpClientFactory.createAmqpClient(); | ||
``` | ||
|
||
3. Connect to Gateway using amqpClient connect function. Connect function uses has the following parameters: | ||
- Connection options. In most common cases it contains of URL, credentials and virtual host (set to ‘/‘) hat specifies the namespace for entities (exchanges and queues) referred to by the protocol. Note that this is not virtual hosting in the HTTP sense. | ||
- Callback function that will be called once connection is established. | ||
|
||
```javascript | ||
var credentials = {username: username, password: password}; | ||
var options = { | ||
url: url, | ||
virtualHost: "/", | ||
credentials: credentials | ||
}; | ||
amqpClient.connect(options, function(){ | ||
var connection=createConnectionObject(amqpClient,connectionInfo.username); | ||
connectedFunctionHandle(connection); | ||
}); | ||
``` | ||
|
||
### **subscribe** method of connection object | ||
1. Creates subscription object | ||
2. Initializes subscription object | ||
- Opens publishing and consumption (subscription) channels using amqpClient openChannel function that will call on success the callback function that is passed as a parameter: | ||
|
||
```javascript | ||
var openHandler=function(){ | ||
publishChannel = amqpClient.openChannel(this.publishChannelOpenHandler); | ||
consumeChannel = amqpClient.openChannel(this.consumeChannelOpenHandler); | ||
} | ||
``` | ||
Once the channels are created method returns the _subscription_ object via a callback. | ||
During the creation of the channels: | ||
- Publishing channel open handler declares AMQP Exchange of a _fanout_ type thus creating publishing endpoint. | ||
```javascript | ||
publishChannelOpenHandler:function(that){ | ||
that.publishChannel.declareExchange({exchange: that.topicPub, type: "fanout"}); | ||
} | ||
``` | ||
- Consumption (or subscription) channel open handler: | ||
1. Adds an event listener for “message” event providing the ability to receive messages. | ||
2. Declares subscription queue for the client. Library randomly generates the name for every client. | ||
3. Binds the queue to a subscription exchange with “broadcastkey” routing key. | ||
**Note** For fanout exchanges routing key is not used. For more information about exchanges and routing keys see: [https://www.rabbitmq.com/tutorials/amqp-concepts.html][1] | ||
4. Starts basic consumer. Basic consumer is started with noAck=true parameter so the client does not need to implement explicit acknowledgement. Another parameter - noLocal - controls whether the client wants to receive its own messages. | ||
|
||
```javascript | ||
consumeChannelOpenHandler:function(that{ | ||
consumeChannel.addEventListener("message", function(message) { | ||
var body = null; | ||
// Check how the payload was packaged since older browsers like IE7 don't | ||
// support ArrayBuffer. In those cases, a Kaazing ByteBuffer was used instead. | ||
if (typeof(ArrayBuffer) === "undefined") { | ||
body = message.getBodyAsByteBuffer().getString(Charset.UTF8); | ||
} | ||
else { | ||
body = arrayBufferToString(message.getBodyAsArrayBuffer()) | ||
} | ||
that.messageReceivedFunc(body);; | ||
}); | ||
that.consumeChannel.declareQueue({queue: that.queueName}) | ||
.bindQueue({queue: that.queueName, exchange: that.topicSub, routingKey: routingKey }) | ||
.consumeBasic({queue: that.queueName, consumerTag: that.clientId, noAck: true, noLocal:that.noLocal }) | ||
} | ||
``` | ||
### **sendMessage** function of a subscription object | ||
Function sets AMQP properties and sends the message to a publishing exchange using specified routing key. | ||
**Note:** As mentioned earlier, library creates a fanout type of exchange that does not use routing keys; thus library sets the value of the routing key to 'broadcast'. | ||
|
||
```javascript | ||
sendMessage:function(msg){ | ||
if (typeof msg ==="object"){ | ||
msg.clientId=this.clientId; | ||
msg=JSON.stringify(msg); | ||
} | ||
else{ | ||
handleException("Message "+msg+" should be an object!"); | ||
} | ||
|
||
var body = null; | ||
if (typeof(ArrayBuffer) === "undefined") { | ||
body = new ByteBuffer(); | ||
body.putString(msg, Charset.UTF8); | ||
body.flip(); | ||
} | ||
else { | ||
body = stringToArrayBuffer(msg); | ||
} | ||
var props = new AmqpProperties(); | ||
props.setContentType("text/plain"); | ||
props.setContentEncoding("UTF-8"); | ||
props.setDeliveryMode("1"); | ||
props.setMessageId((this.messageIdCounter++).toString()); | ||
props.setPriority("6"); | ||
props.setTimestamp(new Date()); | ||
props.setUserId(this.user); | ||
logInformation("sent","Sending message to "+this.topicPub+": "+ msg, "sent"); | ||
this.publishChannel.publishBasic({body: body, properties: props, exchange: this.topicPub, routingKey: routingKey}); | ||
} | ||
``` | ||
### **close** function of a subscription object | ||
Due to AMQP nature, nothing has to be performed. | ||
|
||
### **disconnect** function | ||
Disconnects the client from Kaazing WebSocket AMQP Gateway | ||
```javascript | ||
amqpClient.disconnect(); | ||
``` | ||
|
||
|
||
|
||
[1]: https://www.rabbitmq.com/tutorials/amqp-concepts.html | ||
[2]: http://developer.kaazing.com/documentation/amqp/4.0/dev-js/o_dev_js.html#keglibs | ||
[3]: http://developer.kaazing.com/documentation/amqp/4.0/dev-js/p_dev_js_client.html |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
# Kaazing Javas JMS Client Libraries Facade | ||
Kaazing Javat JMS Client Libraries Facade: | ||
* Implements basic publish-subscribe functionality for JMS to help developers in getting started with their JMS WebSocket projects | ||
* Provide developers with the reference implementations for using Kaazing AMQP Java client libraries | ||
|
||
For more information see: | ||
- [Build Java JMS Clients Using Kaazing WebSocket Gateway - JMS Edition](http://developer.kaazing.com/documentation/jms/4.0/dev-java/o_dev_java.html) | ||
- [Use the Kaazing WebSocket Gateway Java JMS Client API](http://developer.kaazing.com/documentation/jms/4.0/dev-java/p_dev_java_client.html) | ||
|
||
## Organization of the library | ||
UniversalClientFactory's create | ||
Library contains JMSUniversalClient class (implementing UniversalClient interface) that is created by UniversalClientFactory's _createUniversalClient_ method. JMSClient objects provides the following functionality: | ||
- **constructor** connects client to Kaazing WebSocket JMS gateway | ||
- **subscribe** function - subscribes to publishing and subscription endpoints and returns an instance of ClientSubscription object. | ||
- **close** function - close all subscriptions and connections | ||
|
||
### **constructor** | ||
Constructor implements the following sequence: | ||
|
||
1. Create JMS connection factory | ||
|
||
```javascript | ||
var jmsConnectionFactory = new JmsConnectionFactory(url); | ||
``` | ||
|
||
2. Create connection. createConnection function of JmsConnectionFactory takes three parameters: login, password and a callback function that will be called upon completion. Function returns the future that is checked in a callback function for exceptions. | ||
|
||
```javascript | ||
var connectionFuture = jmsConnectionFactory.createConnection(username, password, function () { | ||
if (!connectionFuture.exception) { | ||
try { | ||
connection = connectionFuture.getValue(); | ||
connection.setExceptionListener(handleException); | ||
|
||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | ||
|
||
connection.start(function () { | ||
var connectionObject=createConnectionObject(session, JMSClient); | ||
connectedFunctionHandle(connectionObject); | ||
}); | ||
} | ||
catch (e) { | ||
handleException(e); | ||
} | ||
} | ||
else { | ||
handleException(connectionFuture.exception); | ||
} | ||
}) | ||
``` | ||
|
||
3. Once connection is created, callback function does the following: | ||
1. Obtains the connection from the connectionFuture that was returned by createConection. | ||
2. Sets exception listener to handle exceptions. | ||
3. Creates session using createSession method. Session is created with auto-acknowledgement. | ||
4. Starts the connection using start function passing to it a callback function. | ||
|
||
4. Once connection is started, connection object is returned for the subscription to be created using __subscribe__ method. | ||
|
||
### **subscribe** method of connection object | ||
Method executed the following actions: | ||
|
||
- Creates publishing topic and producer to send messages | ||
|
||
```javascript | ||
var pubDest = session.createTopic(topicPub); | ||
var producer = session.createProducer(dest); | ||
``` | ||
- Creates subscription topic and consumer. | ||
_In order to prevent client from receiving its own messages consumer may be created with the query that will filter out the messages with the 'appId' string property set to this client application ID - a randomly generated GUID._ | ||
Once consumer is created, setMessageListener function is used to specify the function to be called when new message is received. | ||
|
||
```javascript | ||
var subDest = session.createTopic(topicSub); | ||
if (noLocalFlag) | ||
consumer = session.createConsumer(dest, "appId<>'" + appId + "'"); | ||
else | ||
consumer = session.createConsumer(dest); | ||
consumer.setMessageListener(function (message) { | ||
... obtain the message body ... | ||
|
||
rcvFunction(body); | ||
}); | ||
``` | ||
|
||
- Creates subscription object, adds it to the array of opened subscriptions and returns it via callback. | ||
|
||
### **sendMessage** function of a subscription object | ||
Function creates text message and sends it. In order to prevent client from receiving its own messages 'appId' string property may be set to this client application ID - a randomly generated GUID. | ||
|
||
```javascript | ||
sendMessage:function(msg){ | ||
var textMsg = session.createTextMessage(msg); | ||
if (noLocalFlag) | ||
textMsg.setStringProperty("appId", appId); | ||
try { | ||
var future = producer.send(textMsg, function () { | ||
if (future.exception) { | ||
handleException(future.exception); | ||
}; | ||
}); | ||
} catch (e) { | ||
handleException(e); | ||
} | ||
} | ||
``` | ||
|
||
### **close** function of a subscription object | ||
Function closes producer and consumer that were created during the subscription call. | ||
|
||
```javascript | ||
this.producer.close(function(){ | ||
this.consumer.close(function(){ | ||
}); | ||
}) | ||
``` | ||
### **disconnect** function | ||
Closes all subscriptions (causing closing of their producer and consumer), session and connection in a chain of callbacks. | ||
|
||
```javascript | ||
JMSClient.disconnect=function(){ | ||
for(var i=0;i<this.subscriptions.length;i++){ | ||
this.subscriptions[i].close(); | ||
} | ||
|
||
... Wait while all the subscriptions are closed... | ||
|
||
session.close(function () { | ||
connection.close(function () { | ||
|
||
}); | ||
}); | ||
} | ||
|
||
``` | ||
|
Oops, something went wrong.