Skip to content

Commit

Permalink
Fixed closing method.
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman Smolgovsky committed Mar 15, 2016
1 parent 92cd091 commit 5c6322e
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 28 deletions.
49 changes: 34 additions & 15 deletions javascript/src/AmqpUniversalClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var amqpClientFunction=function(logInformation){
* Provides communication services with AMQP server. Created within amqpClientFunction constructor.
* @class
*/
var AmqpClient = {};
var AmqpClient = {subscriptions:[]};
var errorFunction=null;
var amqpClient=null;

Expand Down Expand Up @@ -89,7 +89,9 @@ var amqpClientFunction=function(logInformation){
clientId:null,
messageIdCounter:0,
user:user,
closed:null,
messageReceivedFunc:messageReceivedFunc,
subscribed:false,
init:function(subscribedCallback){
this.queueName="client" + Math.floor(Math.random() * 1000000);
this.clientId=appId;
Expand All @@ -101,6 +103,8 @@ var amqpClientFunction=function(logInformation){
logInformation("INFO", "OPEN: Consume Channel");
this.consumeChannel = amqpClient.openChannel(function(){that.consumeChannelOpenHandler(that)});
$.when(this.publishChannelOpened, this.consumeChannelOpened).done(function(){
that.closed=$.Deferred();
that.subscribed=true;
subscribedCallback(that);
});

Expand Down Expand Up @@ -213,31 +217,38 @@ var amqpClientFunction=function(logInformation){
this.publishChannel.publishBasic({body: body, properties: props, exchange: this.topicPub, routingKey: routingKey});
},
disconnect:function(){
var config = {
replyCode: 0,
replyText, '',
classId: 0,
methodId: 0
};
this.consumeChannel.deleteQueue({queue:this.queueName, ifEmpty: false}, function(){
this.consumeChannel.closeChannel(config, function(){
this.publishChannel.closeChannel(config, function(){

if (!this.subscribed){
this.closed.resolve();
}
else{
this.subscribed=false;
var config = {
replyCode: 0,
replyText: '',
classId: 0,
methodId: 0
};
this.consumeChannel.deleteQueue({queue:this.queueName, ifEmpty: false}, function(){
this.consumeChannel.closeChannel(config, function(){
this.publishChannel.closeChannel(config, function(){
this.closed.resolve();
});
});
});
});
}
}
};
return SubscriptionObject;

}

var createConnectionObject=function(amqpClient, user){
var createConnectionObject=function(connection, amqpClient, user){
/**
* Contains infomration about established connection.
* @class
*/
var ConnectionObject = {
connection:connection,
user:user,
amqpClient:amqpClient,
/**
Expand All @@ -251,7 +262,9 @@ var amqpClientFunction=function(logInformation){
subscribe:function(topicPub, topicSub, messageReceivedFunc, noLocal, subscribedCallbackFunction){
logInformation("INFO","CONNECTED!!!");
var subscription=createSubscriptionObject(this.amqpClient, topicPub, topicSub, noLocal, messageReceivedFunc, this.user);
var that=this;
subscription.init(function(subscription){
that.connection.subscriptions.push(subscription);
subscribedCallbackFunction(subscription);
});
}
Expand Down Expand Up @@ -288,8 +301,9 @@ var amqpClientFunction=function(logInformation){
credentials: credentials
};
try{
var that=this;
amqpClient.connect(options, function(){
var connection=createConnectionObject(amqpClient,connectionInfo.username);
var connection=createConnectionObject(that, amqpClient,connectionInfo.username);
connectedFunctionHandle(connection);
});
}
Expand All @@ -302,7 +316,12 @@ var amqpClientFunction=function(logInformation){
* Disconnects from Kaazing WebSocket AMQP Gateway
*/
AmqpClient.close=function(){
amqpClient.disconnect();
for(var i=0;i<this.subscriptions.length;i++){
this.subscriptions[i].disconnect();
}
$.when.apply($,this.subscriptions).then(function() {
amqpClient.disconnect();
});
}

return AmqpClient;
Expand Down
24 changes: 15 additions & 9 deletions javascript/src/JMSUniversalClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var jmsClientFunction=function(logInformation){
noLocal:noLocal,
messagesToSend:[],
inSend:false,
subscribed:false,
subscribed:true,
closed:$.Deferred(),
sendMessageOverTheWire:function(){
var msg = this.messagesToSend.pop();
Expand Down Expand Up @@ -92,16 +92,15 @@ var jmsClientFunction=function(logInformation){
* Closes the subscrpiption and releases all the resources.
*/
disconnect:function(){
if (this.subscribed){
if (!this.subscribed){
this.closed.resolve();
}
else{
this.producer.close(function(){
this.consumer.close(function(){
this.subscribed=false;
this.closed.resolve();
});
})
this.producer.close();
this.consumer.close(function(){
this.subscribed=false;
this.closed.resolve();
});
}
}
};
Expand All @@ -127,6 +126,13 @@ var jmsClientFunction=function(logInformation){
* @param subscribedCallbackFunction {function} callback function if a format function(SubcriptionObject) to be called when SubsriptionObject is created.
*/
subscribe:function(topicPub, topicSub, messageReceivedFunc, noLocal, subscribedCallbackFunction){
if (!topicPub.startsWith("/topic/")){
topicPub="/topic/"+topicPub;
}
if (!topicSub.startsWith("/topic/")){
topicSub="/topic/"+topicSub;
}

var pubDest = this.session.createTopic(topicPub);
var producer = this.session.createProducer(pubDest);
logInformation("INFO","Producer for "+topicPub+" is ready! AppID=" + appId);
Expand Down Expand Up @@ -218,7 +224,7 @@ var jmsClientFunction=function(logInformation){
*/
JMSClient.close=function(){
for(var i=0;i<this.subscriptions.length;i++){
this.subscriptions[i].close();
this.subscriptions[i].disconnect();
}
$.when.apply($,this.subscriptions).then(function() {
connection.stop(function(){
Expand Down
4 changes: 2 additions & 2 deletions javascript/src/JavascriptUniversalClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ var UniversalClientDef=function(protocol){
/**
* Disconnects from Kaazing WebSocket Gateway
*/
JavascriptUniversalClient.disconnect=function(){
client.disconnect();
JavascriptUniversalClient.close=function(){
client.close();
}

return JavascriptUniversalClient;
Expand Down
4 changes: 2 additions & 2 deletions javascript/src/JavascriptUniversalClientNPM.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ var UniversalClientDef=function(protocol){
/**
* Disconnects from Kaazing WebSocket Gateway
*/
JavascriptUniversalClient.disconnect=function(){
client.disconnect();
JavascriptUniversalClient.close=function(){
client.close();
}

return JavascriptUniversalClient;
Expand Down

0 comments on commit 5c6322e

Please sign in to comment.