diff --git a/java/KaazingAMQPClientLibrariesFacade.md b/java/KaazingAMQPClientLibrariesFacade.md index ecc807a..906e277 100644 --- a/java/KaazingAMQPClientLibrariesFacade.md +++ b/java/KaazingAMQPClientLibrariesFacade.md @@ -1,154 +1,261 @@ -# Kaazing JavaScript AMQP Client Libraries Facade -Kaazing AMQP Client Libraries Facade: -• Implements basic publish-subscribe functionality for AMQP protocol to help developers in getting started with their AMQP WebSocket projects -• Provide developers with the reference implementations for using Kaazing AMQP JavaScript client libraries +# Kaazing Java AMQP Client Libraries Facade +Kaazing Java AMQP Client Libraries Facade simplifies the interaction with Kaazing Java AMQP Client libraries that enable the developers to interact with [AMQP 0-9-1](https://www.rabbitmq.com/tutorials/amqp-concepts.html) brokers via WebSockets. + +Kaazing Java AMQP Client Libraries Facade: +* Implements basic publish-subscribe functionality for AMQP to help developers in getting started with their AMQP WebSocket projects +* Provide developers with the reference implementations for using Kaazing AMQP Java client libraries + For more information see: -- [How to Build JavaScript Clients Using Kaazing WebSocket Gateway][2] -- [Use the Kaazing WebSocket Gateway JavaScript AMQP Client Library][3] +- [Build Java AMQP Clients Using Kaazing WebSocket Gateway](http://kaazing.com/doc/amqp/4.0/dev-java/o_dev_java.html) +- [Use the Kaazing WebSocket Gateway Java AMQP Client API](http://kaazing.com/doc/amqp/4.0/dev-java/p_dev_java_client.html) ## Organization of the library -Library consists of amqpClientFunction that creates AmqpClient object. AmqpClient objects provides the following functionality: -- **connect** function - connects client to Kaazing WebSocket AMQP gateway and on success returns via callback a _connection_ object that will be used to create subscriptions. -- ** subscribe ** method of a _connecion_ object that creates publishing endpoint and subscribes to a subscription endpoint. Method returns _subscription_ object that is used for sending messages. -- **sendMessage** method of a _subscription_ object - sends the message to a publishing endpoint -- **close** function of a _subscription_ object - closes both publishing and subscription. -- **disconnect** method - closes all subscriptions and disconnects client from Kaazing WebSocket AMQP gateway +UniversalClientFactory's create +Library contains AmqpUniversalClient class (implementing UniversalClient interface) that is created by UniversalClientFactory's _createUniversalClient_ method. AmqpUniversalClient object provides the following functionality: +- **constructor** of AmqpUniversalClient - connects client to Kaazing WebSocket AMQP gateway +- **subscribe** method of AmqpUniversalClient - subscribes to publishing and subscription endpoints and returns an instance of ClientSubscription object. +- **sendMessage** method of ClientSubscription - sends the instances of Serialized Objects to the publishing endpoint +- **disconnect** method of ClientSubscription - closes subscription to publishing and subscription endpoints for this subscription +- **close** method of AmqpUniversalClient - closes all subscriptions and connections -### **connect** function +### **constructor** of AmqpUniversalClient Connect function implements the following sequence: -1. Create WebSocket and AMQP client factories +- Create AMQP client factory and AMQP client - ```javascript - var amqpClientFactory = new AmqpClientFactory(); - var webSocketFactory; - if ($gatewayModule && typeof($gatewayModule.WebSocketFactory) === "function") { - webSocketFactory = createWebSocketFactory(); - amqpClientFactory.setWebSocketFactory(webSocketFactory); - } - ``` +```java + amqpClientFactory = AmqpClientFactory.createAmqpClientFactory(); + amqpClient = amqpClientFactory.createAmqpClient(); +``` +- Register connection listeners +```java + ... + amqpClient.addConnectionListener(new ConnectionListener() { -2. Create AMQP client + @Override + public void onConnectionOpen(ConnectionEvent e) { + LOGGER.info("Connected to "+url+" message "+e.getMessage()); + fConnected = true; + latch.countDown(); + } - ```javascript - amqpClient = amqpClientFactory.createAmqpClient(); - ``` - -3. Connect to Gateway using amqpClient connect function. Connect function uses has the following parameters: - - Connection options. In most common cases it contains of URL, credentials and virtual host (set to ‘/‘) hat specifies the namespace for entities (exchanges and queues) referred to by the protocol. Note that this is not virtual hosting in the HTTP sense. - - Callback function that will be called once connection is established. - - ```javascript - var credentials = {username: username, password: password}; - var options = { - url: url, - virtualHost: "/", - credentials: credentials - }; - amqpClient.connect(options, function(){ - var connection=createConnectionObject(amqpClient,connectionInfo.username); - connectedFunctionHandle(connection); - }); - ``` + @Override + public void onConnectionError(ConnectionEvent e) { + LOGGER.error("Connection error to url "+url+"... "+e.getMessage()); + errorsListener.onException(new ClientException("Error connecting to " + url + ": " + e.getMessage())); + latch.countDown(); + } + + @Override + public void onConnectionClose(ConnectionEvent e) { + for (AmqpClientSubscription conn : connections) { + try { + conn.disconnect(); + } catch (ClientException e1) { + errorsListener.onException(new ClientException("Error closing client connection: "+conn.getSubscriptionIdentifier(), e1)); + LOGGER.error("Error closing client connection: "+conn.getSubscriptionIdentifier(), e1); + } + } + LOGGER.info("Closed connection to "+url+"."); + } + + @Override + public void onConnecting(ConnectionEvent e) { + + + } + }); + ... +``` + We use onConnectionOpen and onConnectionError listeners to wait until connection is either established or failed; we use the countdown latch to wait for either of these events. +- Establish connection using provided login and password +```java + ... + amqpClient.connect(this.url, "/", login, password); + try { + latch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new ClientException("Exception wating for connection latch connecting to " + this.url, e); + } + if (!fConnected) { + throw new ClientException("Connection to " + this.url + " was not established in 10 sec."); + } + ... +``` -### **subscribe** method of connection object -1. Creates subscription object -2. Initializes subscription object - - Opens publishing and consumption (subscription) channels using amqpClient openChannel function that will call on success the callback function that is passed as a parameter: +### **subscribe** method of of AmqpUniversalClient object +Method executed the following actions: +- Opens publishing channel +```java + ... + AmqpChannel pubChannel = this.amqpClient.openChannel(); + ... +``` +- Adds publishing channel listeners +```java + ... + pubChannel.addChannelListener(new ChannelAdapter() { + @Override + public void onClose(ChannelEvent e) { + + } + + @Override + public void onError(final ChannelEvent e) { + errorsListener.onException(new ClientException("Error creating publishing channel for " + pubTopicName + ": " + e.getMessage())); + LOGGER.error("Error creating publishing channel "+pubTopicName+" for connection to "+url+": "+e.getMessage()); + connectionsLatch.countDown(); + } + + @Override + public void onDeclareExchange(ChannelEvent e) { + LOGGER.info("Created channel "+pubTopicName+" for url "+url); + fPubOpened = true; + connectionsLatch.countDown(); + } + + @Override + public void onOpen(ChannelEvent e) { + pubChannel.declareExchange(pubTopicName, "fanout", false, false, false, null); + } + }); + ... +``` + Opening the channel will trigger _onOpen_ event listener where we declare an exchange. Successful declaring of an exchange will result in triggering of onDeclareExchange event listener and is an indication of a success; onError listener is triggered in an event of any error. We use the countdown latch to wait for either of these events. +- Opens subscription channel +```java + ... + AmqpChannel subChannel = this.amqpClient.openChannel(); + ... +``` +- Registers subscription channel events listeners: +```java + ... + subChannel.addChannelListener(new ChannelAdapter() { + @Override + public void onError(final ChannelEvent e) { + errorsListener.onException(new ClientException("Error creating subscription channel " + subTopicName + ": " + e.getMessage())); + LOGGER.error("Error creating subscription channel "+subTopicName+" for url "+url+": "+e.getMessage()); + connectionsLatch.countDown(); + } + + @Override + public void onConsumeBasic(ChannelEvent e) { + LOGGER.info("Created subscription channel "+subTopicName+" for url "+url); + fSubOpened = true; + connectionsLatch.countDown(); + } + + @Override + public void onMessage(final ChannelEvent e) { + ... + + } + + @Override + public void onOpen(ChannelEvent e) { + subChannel.declareQueue(queueName, false, false, false, false, false, null) + .bindQueue(queueName, subTopicName, ROUTING_KEY, false, null) + .consumeBasic(queueName, clientId, noLocal, false, false, false, null); + } + }); + ... +``` - ```javascript - var openHandler=function(){ - publishChannel = amqpClient.openChannel(this.publishChannelOpenHandler); - consumeChannel = amqpClient.openChannel(this.consumeChannelOpenHandler); - } - ``` - - Once the channels are created method returns the _subscription_ object via a callback. - During the creation of the channels: - - Publishing channel open handler declares AMQP Exchange of a _fanout_ type thus creating publishing endpoint. - - ```javascript - publishChannelOpenHandler:function(that){ - that.publishChannel.declareExchange({exchange: that.topicPub, type: "fanout"}); - } - ``` - - Consumption (or subscription) channel open handler: - 1. Adds an event listener for “message” event providing the ability to receive messages. - 2. Declares subscription queue for the client. Library randomly generates the name for every client. - 3. Binds the queue to a subscription exchange with “broadcastkey” routing key. - - **Note** For fanout exchanges routing key is not used. For more information about exchanges and routing keys see: [https://www.rabbitmq.com/tutorials/amqp-concepts.html][1] +Once the channel is successfully opened, onOpen event listener will be called where we: + - declare a new queue + - bind the queue to an exchange with 'broadcast' routing key + - start basic consumer for the queue. + **Note** For fanout exchanges routing key is not used. For more information about exchanges and routing keys see: [https://www.rabbitmq.com/tutorials/amqp-concepts.html](https://www.rabbitmq.com/tutorials/amqp-concepts.html). - 4. Starts basic consumer. Basic consumer is started with noAck=true parameter so the client does not need to implement explicit acknowledgement. Another parameter - noLocal - controls whether the client wants to receive its own messages. +Once consumer is started, onConsumeBasic event listener is called which is an indication that channel is successfully opened. onError listener is triggered in an event of any error. We use the countdown latch to wait for either of these events. +onMessage event listener is called every time we will receive a message from an exchange. In this method we: + - Allocates the buffer and read the data from the event body that contains serialized object. + - Deserialize the object and make sure it an instance of our AmqpMessageEnvelope. The reason to use an envelope it contains application identifier that is used for messages filtering for the AMQP brokers that do not support noLocal functionality (functionality that prevents the client to received its own messages when publishing and subscription endpoints are the same). + - calls onMessage method of MessagesListener object passing it received object. - ```javascript - consumeChannelOpenHandler:function(that{ - consumeChannel.addEventListener("message", function(message) { - var body = null; - // Check how the payload was packaged since older browsers like IE7 don't - // support ArrayBuffer. In those cases, a Kaazing ByteBuffer was used instead. - if (typeof(ArrayBuffer) === "undefined") { - body = message.getBodyAsByteBuffer().getString(Charset.UTF8); - } - else { - body = arrayBufferToString(message.getBodyAsArrayBuffer()) - } - that.messageReceivedFunc(body);; - }); - that.consumeChannel.declareQueue({queue: that.queueName}) - .bindQueue({queue: that.queueName, exchange: that.topicSub, routingKey: routingKey }) - .consumeBasic({queue: that.queueName, consumerTag: that.clientId, noAck: true, noLocal:that.noLocal }) - } - ``` +```java + ... + byte[] bytes = new byte[e.getBody().remaining()]; + e.getBody().get(bytes); + try + { + Serializable object=Utils.deserialize(bytes); + if (!(object instanceof AmqpMessageEnvelope)){ + errorsListener.onException(new ClientException("Received object is not an instance of AmqpMessageEnvelope; received from " + subTopicName)); + LOGGER.error("Received object is not an instance of AmqpMessageEnvelope; received from " + subTopicName +" for url"+url); + return; + } + AmqpMessageEnvelope messageEnvelope=(AmqpMessageEnvelope)object; + if (noLocal && messageEnvelope.getClientId().equals(appId)){ + LOGGER.debug("Received message ["+messageEnvelope.toString()+"] on topic "+subTopicName+", connection to "+url+" is ignored as it came from the same client and noLocal is set!"); + return; + } + LOGGER.debug("Received message ["+messageEnvelope.getData().toString()+"] on topic "+subTopicName+", connection to "+url); + messageListener.onMessage(messageEnvelope.getData()); + } + catch (ClassNotFoundException | IOException e1) { + errorsListener.onException(new ClientException("Cannot deserialize an object from the message received from " + subTopicName, e1)); + LOGGER.error("Cannot deserialize an object from the message received from " + subTopicName +" for url"+url); + return; + } + ... +``` + +Once the channels are opened, they are stored in an AmqpClientSubscription object (subclass of ClientSubscription object) for future use. Created instance of ClientSubscription object is registered with AmqpUniversalClient. -### **sendMessage** function of a subscription object -Function sets AMQP properties and sends the message to a publishing exchange using specified routing key. +### **sendMessage** method of AmqpClientSubscription object +Method sets AMQP properties and sends the message to a publishing exchange using specified routing key. **Note:** As mentioned earlier, library creates a fanout type of exchange that does not use routing keys; thus library sets the value of the routing key to 'broadcast'. +We use AmqpMessageEnvelope to store application identifier that may be needed for filtering of the message. +Serialized object is stored in the ByteBuffer that is sent to the channel along with AMQP properties. + +```java + ... + AmqpMessageEnvelope messageEnvelope=new AmqpMessageEnvelope(this.appId, message); + byte[] serializedObject; + try { + serializedObject = Utils.serialize(messageEnvelope); + } catch (IOException e) { + throw new ClientException("Cannot serialize message " + message + " to send over subscription " + this.getSubscriptionIdentifier(), e); + } + ByteBuffer buffer = ByteBuffer.allocate(serializedObject.length); + buffer.put(serializedObject); + buffer.flip(); -```javascript -sendMessage:function(msg){ - if (typeof msg ==="object"){ - msg.clientId=this.clientId; - msg=JSON.stringify(msg); - } - else{ - handleException("Message "+msg+" should be an object!"); - } - - var body = null; - if (typeof(ArrayBuffer) === "undefined") { - body = new ByteBuffer(); - body.putString(msg, Charset.UTF8); - body.flip(); - } - else { - body = stringToArrayBuffer(msg); - } - var props = new AmqpProperties(); - props.setContentType("text/plain"); - props.setContentEncoding("UTF-8"); - props.setDeliveryMode("1"); - props.setMessageId((this.messageIdCounter++).toString()); - props.setPriority("6"); - props.setTimestamp(new Date()); - props.setUserId(this.user); - logInformation("sent","Sending message to "+this.topicPub+": "+ msg, "sent"); - this.publishChannel.publishBasic({body: body, properties: props, exchange: this.topicPub, routingKey: routingKey}); - } + Timestamp ts = new Timestamp(System.currentTimeMillis()); + AmqpProperties props = new AmqpProperties(); + props.setMessageId("1"); + props.setCorrelationId("4"); + props.setAppId(appId); + props.setUserId(userId); + props.setPriority(6); + props.setDeliveryMode(1); + props.setTimestamp(ts); + + this.pubChannel.publishBasic(buffer, props, this.pubChannelName, AmqpUniversalClient.ROUTING_KEY, false, false); + ... ``` -### **close** function of a subscription object -Due to AMQP nature, nothing has to be performed. +### **disconnect** method of AmqpClientSubscription object +Deletes the declared subscription queue and closes the channels +```java + ... + this.subChannel.deleteQueue(this.queueName, false, false, false); -### **disconnect** function -Disconnects the client from Kaazing WebSocket AMQP Gateway -```javascript -amqpClient.disconnect(); + this.pubChannel.closeChannel(0, "", 0, 0); + this.subChannel.closeChannel(0, "", 0, 0); + ... ``` - - -[1]: https://www.rabbitmq.com/tutorials/amqp-concepts.html -[2]: http://developer.kaazing.com/documentation/amqp/4.0/dev-js/o_dev_js.html#keglibs -[3]: http://developer.kaazing.com/documentation/amqp/4.0/dev-js/p_dev_js_client.html +### **close** method of AmqpUniversalClient object +Disconnects all opened subscriptions, disconnects Amqp client. +```java + ... + for (AmqpClientSubscription conn : this.connections) { + conn.disconnect(); + } + this.amqpClient.disconnect(); + ... +``` \ No newline at end of file diff --git a/java/KaazingJMSClientLibrariesFacade.md b/java/KaazingJMSClientLibrariesFacade.md index 3f805fd..4fb2743 100644 --- a/java/KaazingJMSClientLibrariesFacade.md +++ b/java/KaazingJMSClientLibrariesFacade.md @@ -1,7 +1,9 @@ -# Kaazing Javas JMS Client Libraries Facade -Kaazing Javat JMS Client Libraries Facade: +# Kaazing Java JMS Client Libraries Facade +Kaazing Java JMS Client Libraries Facade simplifies the interaction with Kaazing Java JMS Client libraries that enable the developers to interact with [JMS](http://docs.oracle.com/javaee/6/tutorial/doc/bncdx.html) brokers via WebSockets. + +Kaazing Java JMS Client Libraries Facade: * Implements basic publish-subscribe functionality for JMS to help developers in getting started with their JMS WebSocket projects -* Provide developers with the reference implementations for using Kaazing AMQP Java client libraries +* Provide developers with the reference implementations for using Kaazing JMS Java client libraries For more information see: - [Build Java JMS Clients Using Kaazing WebSocket Gateway - JMS Edition](http://developer.kaazing.com/documentation/jms/4.0/dev-java/o_dev_java.html) @@ -9,6 +11,253 @@ For more information see: ## Organization of the library UniversalClientFactory's create +<<<<<<< HEAD +Library contains JMSUniversalClient class (implementing UniversalClient interface) that is created by UniversalClientFactory's _createUniversalClient_ method. JMSUniversalClient object provides the following functionality: +- **constructor** of JMSUniversalClient - connects client to Kaazing WebSocket JMS gateway +- **subscribe** method of JMSUniversalClient - subscribes to publishing and subscription endpoints and returns an instance of ClientSubscription object. +- **sendMessage** method of ClientSubscription - sends the instances of Serialized Objects to the publishing endpoint +- **disconnect** method of ClientSubscription - closes subscription to publishing and subscription endpoints for this subscription +- **close** method of JMSUniversalClient - closes all subscriptions and connections + +### **constructor** of JMSUniversalClient +Constructor implements the following sequence: + +- Locate JMS connection factory using JNDI: + +```java + Properties env = new Properties(); + env.setProperty("java.naming.factory.initial", "com.kaazing.gateway.jms.client.JmsInitialContextFactory"); + + try { + jndiInitialContext = new InitialContext(env); + } catch (NamingException e1) { + throw new ClientException("Error creating initial context factory for JMS!", e1); + } + env.put(JmsInitialContext.CONNECTION_TIMEOUT, "15000"); + try { + connectionFactory = (ConnectionFactory) jndiInitialContext.lookup("ConnectionFactory"); + } catch (NamingException e) { + throw new ClientException("Error locating connection factory for JMS!", e); + } + JmsConnectionFactory jmsConnectionFactory = (JmsConnectionFactory) connectionFactory; +``` + +- Create connection. To create connection: + - Set the gateway URL connection for JMS connection factory. + - Create WebSocket factory + - Create connection passing login and password: +```java + ... + jmsConnectionFactory.setGatewayLocation(url); + WebSocketFactory webSocketFactory = jmsConnectionFactory.getWebSocketFactory(); + webSocketFactory.setDefaultRedirectPolicy(HttpRedirectPolicy.ALWAYS); + try { + connection = connectionFactory.createConnection(login, password); + } catch (JMSException e) { + throw new ClientException("Error connecting to gateway with " + url.toString() + ", credentials " + login + "/" + password, e); + } + ... +``` +- Register the ErrorsListener object that is passed to the constructor with the connection to be a listener for any errors and exceptions: +```java + ... + try { + connection.setExceptionListener(this); + } catch (JMSException e) { + throw new ClientException("Error setting exceptions listener. Connection: " + url.toString() + ", credentials " + login + "/" + password, e); + } + ... +``` + +- Create __session__ in auto-acknowledgement mode. In this mode session automatically acknowledges a client's receipt of a message either when the session has successfully returned from a call to receive or when the message listener the session has called to process the message successfully returns. +```java + ... + try { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } catch (JMSException e) { + throw new ClientException("Error creating session. Connection: " + url.toString() + ", credentials " + login + "/" + password, e); + } + ... +``` +- Start the connection +```java + ... + try { + connection.start(); + } + catch (JMSException e) { + throw new ClientException("Error starting connection: " + url.toString() + ", credentials " + login + "/" + password, e); + } + ... +``` + +Once object is successfully constructed it is ready to create subscriptions. + + +### **subscribe** method of of JMSUniversalClient object +Method executed the following actions: + +- Creates subscription destination + +```java + ... + Destination subDestination; + try { + subDestination = (Destination) jndiInitialContext.lookup("/topic/" + subTopicName); + } catch (NamingException e) { + throw new ClientException("Cannot locate subscription topic " + subTopicName, e); + } + ... +``` +- Creates message Consumer. + _In order to prevent client from receiving its own messages consumer may be created with the query that will filter out the messages with the 'appId' string property set to this client application ID - a randomly generated GUID._ +```java + ... + MessageConsumer consumer; + try { + if (noLocal){ + clientId=UUID.randomUUID().toString(); + consumer = session.createConsumer(subDestination, "clientId<>'"+clientId+"'"); + } + else + consumer = session.createConsumer(subDestination); + } catch (JMSException e) { + throw new ClientException("Cannot create consumer for subscription topic " + subTopicName, e); + } + ... +``` +- Registers an instance of ... passed to the method to be a message listener. +```java + ... + try + { + consumer.setMessageListener(new MessageListenerImpl(messageListener, this.errorsListener, subTopicName)); + } catch (JMSException e) { + throw new ClientException("Cannot create messages listener for subscription topic " + subTopicName, e); + } + ... +``` + + We use MessageLinstenerImpl wrapper class that implements MessageListener to convert the ByteMessage object received from the wire to an instance of Serializable object that was sent. +```java + ... + if (message instanceof BytesMessage) { + try { + BytesMessage bytesMessage = ((BytesMessage) message); + long len = bytesMessage.getBodyLength(); + byte b[] = new byte[(int) len]; + bytesMessage.readBytes(b); + Serializable object; + try { + object = Utils.deserialize(b); + LOGGER.debug("Received message ["+object.toString()+"] on topic "+destination+", connection to "+url); + this.listener.onMessage(object); + } catch (ClassNotFoundException | IOException e) { + this.errorsListener.onException(new ClientException("Cannot deserialize an object from the message received from " + destination, e)); + LOGGER.error("Cannot deserialize an object from the message received from " + destination+" connection to "+url, e); + return; + } + } catch (JMSException e) { + this.errorsListener.onException(new ClientException("Error receiving message from destination " + destination, e)); + LOGGER.error("Error receiving message from destination " + destination+" connection to "+url, e); + } + } else { + this.errorsListener.onException(new ClientException("Received a message of unexpected type " + message.getClass().getName() + " for a destination " + destination)); + LOGGER.error("Received a message of unexpected type " + message.getClass().getName() + destination+" connection to "+url); + } + ... +``` + +- Create publishing destination and producer + +```java + ... + Destination pubDestination; + try { + pubDestination = (Destination) jndiInitialContext.lookup("/topic/" + pubTopicName); + } catch (NamingException e) { + throw new ClientException("Cannot locate publishing topic " + pubTopicName, e); + } + MessageProducer producer; + try { + producer = session.createProducer(pubDestination); + } catch (JMSException e) { + throw new ClientException("Cannot create producer for publishing topic " + pubTopicName, e); + } + ... +``` + + Session, producer and consumer are stored in an JMSClientSubscription object (subclass of ClientSubscription object) for future use. Created instance of ClientSubscription object is registered with JMSUniversalClient. + +### **sendMessage** method of JMSClientSubscription object +Function creates a binary message and sends it using the following steps. + - Creates bytes message and serializes Java object to it. + ```java + BytesMessage bytesMessage; + try { + bytesMessage = session.createBytesMessage(); + } catch (JMSException e) { + throw new ClientException("Subscription: "+this.getSubscriptionIdentifier()+" - Cannot create a BytesMessage for an object " + message.toString(), e); + } + try { + bytesMessage.writeBytes(Utils.serialize(message)); + } catch (JMSException e) { + throw new ClientException("Subscription: "+this.getSubscriptionIdentifier()+" - Cannot write bytes to set the payload of a BytesMessage for an object " + message.toString(), e); + } catch (IOException e) { + throw new ClientException("Subscription: "+this.getSubscriptionIdentifier()+" - Cannot serialize an object " + message.toString(), e); + } + ``` + - _If needed_, sets string property clientId. This property will be used to ignore receiving your own messages which may be applicable if publishing and subscription endpoints are the same. + ```java + ... + if (clientId!=null){ + try { + bytesMessage.setStringProperty("clientId", clientId); + } catch (JMSException e) { + throw new ClientException("Subscription: "+this.getSubscriptionIdentifier()+" - Cannot set a string property client id to "+clientId+" for an object " + message.toString(), e); + } + } + ... + ``` + - Send message to the publishing endpoing + ```java + ... + try { + producer.send(bytesMessage); + } catch (JMSException e) { + throw new ClientException("Subscription: "+this.getSubscriptionIdentifier()+" - Cannot sent and object message for an object " + message.toString(), e); + } + ... + ``` + +### **disconnect** method of JMSClientSubscription object +Closes producer and consumer. + +```java + try { + this.producer.close(); + } catch (JMSException e) { + throw new ClientException("Subscription: "+this.getSubscriptionIdentifier()+" - Error closing producer.",e); + } + try { + this.consumer.close(); + } catch (JMSException e) { + throw new ClientException("Subscription: "+this.getSubscriptionIdentifier()+" - Error closing consumer.",e); + } +``` + +**close** method of JMSUniversalClient +Disconnects all opened subscriptions, closes session and connection. + +```java + for(JMSClientSubscription connection: this.connections){ + connection.disconnect(); + } + this.connection.stop(); + this.session.close(); + this.connection.close(); +``` +======= Library contains JMSUniversalClient class (implementing UniversalClient interface) that is created by UniversalClientFactory's _createUniversalClient_ method. JMSClient objects provides the following functionality: - **constructor** connects client to Kaazing WebSocket JMS gateway - **subscribe** function - subscribes to publishing and subscription endpoints and returns an instance of ClientSubscription object. @@ -135,3 +384,4 @@ Closes all subscriptions (causing closing of their producer and consumer), sessi ``` +>>>>>>> kaazing/develop diff --git a/java/README.md b/java/README.md index c230ec4..9104201 100644 --- a/java/README.md +++ b/java/README.md @@ -96,7 +96,7 @@ The Kaazing Universal WebSocket clients depend on the Kaazing WebSocket Gateway ![][image-1] As shown on the diagram above, Kaazing Universal Client works as following: -- Instantiate required Client Facade Library based on specified protocol that will interact with necessary Kaazing Java Client Libraries +- Instantiates required Client Facade Library based on specified protocol that will interact with necessary Kaazing Java Client Libraries - Pass the data to and from the Kaazing Java Client libraries via instantiated Client Facade Library For more information about Client Facade libraries see diff --git a/java/src/main/java/com/kaazing/client/universal/AmqpUniversalClient.java b/java/src/main/java/com/kaazing/client/universal/AmqpUniversalClient.java index 4321c3a..63da712 100644 --- a/java/src/main/java/com/kaazing/client/universal/AmqpUniversalClient.java +++ b/java/src/main/java/com/kaazing/client/universal/AmqpUniversalClient.java @@ -185,10 +185,9 @@ public void onMessage(final ChannelEvent e) { @Override public void onOpen(ChannelEvent e) { - // TODO: decide on what to do with noLocal subChannel.declareQueue(queueName, false, false, false, false, false, null) - .bindQueue(queueName, subTopicName, ROUTING_KEY, false, null) - .consumeBasic(queueName, clientId, noLocal, false, false, false, null); + .bindQueue(queueName, subTopicName, ROUTING_KEY, false, null) + .consumeBasic(queueName, clientId, noLocal, false, false, false, null); } }); diff --git a/java/src/main/java/com/kaazing/client/universal/JMSUniversalClient.java b/java/src/main/java/com/kaazing/client/universal/JMSUniversalClient.java index 37ef643..68bc078 100644 --- a/java/src/main/java/com/kaazing/client/universal/JMSUniversalClient.java +++ b/java/src/main/java/com/kaazing/client/universal/JMSUniversalClient.java @@ -128,7 +128,7 @@ public ClientSubscription subscribe(String pubTopicName, String subTopicName, Me try { pubDestination = (Destination) jndiInitialContext.lookup("/topic/" + pubTopicName); } catch (NamingException e) { - throw new ClientException("Cannot locate publicshing topic " + pubTopicName, e); + throw new ClientException("Cannot locate publishing topic " + pubTopicName, e); } MessageProducer producer; try { diff --git a/javascript/KaazingAMQPClientLibrariesFacade.md b/javascript/KaazingAMQPClientLibrariesFacade.md index ecc807a..75ae381 100644 --- a/javascript/KaazingAMQPClientLibrariesFacade.md +++ b/javascript/KaazingAMQPClientLibrariesFacade.md @@ -1,4 +1,6 @@ # Kaazing JavaScript AMQP Client Libraries Facade +Kaazing JavaScript AMQP Client Libraries Facade simplifies the interaction with Kaazing JavaScript AMQP Client libraries that enable the developers to interact with [AMQP 0-9-1](https://www.rabbitmq.com/tutorials/amqp-concepts.html) brokers via WebSockets. + Kaazing AMQP Client Libraries Facade: • Implements basic publish-subscribe functionality for AMQP protocol to help developers in getting started with their AMQP WebSocket projects • Provide developers with the reference implementations for using Kaazing AMQP JavaScript client libraries @@ -9,37 +11,37 @@ For more information see: ## Organization of the library Library consists of amqpClientFunction that creates AmqpClient object. AmqpClient objects provides the following functionality: - **connect** function - connects client to Kaazing WebSocket AMQP gateway and on success returns via callback a _connection_ object that will be used to create subscriptions. -- ** subscribe ** method of a _connecion_ object that creates publishing endpoint and subscribes to a subscription endpoint. Method returns _subscription_ object that is used for sending messages. +- ** subscribe ** method of a _connection_ object that creates publishing endpoint and subscribes to a subscription endpoint. Method returns via callback a _subscription_ object that is used for sending messages. - **sendMessage** method of a _subscription_ object - sends the message to a publishing endpoint -- **close** function of a _subscription_ object - closes both publishing and subscription. -- **disconnect** method - closes all subscriptions and disconnects client from Kaazing WebSocket AMQP gateway +- **disconnect** function of a _subscription_ object - closes both publishing and subscription. +- **close** method - closes all subscriptions and disconnects client from Kaazing WebSocket AMQP gateway ### **connect** function Connect function implements the following sequence: -1. Create WebSocket and AMQP client factories +- Create WebSocket and AMQP client factories - ```javascript +```javascript var amqpClientFactory = new AmqpClientFactory(); var webSocketFactory; if ($gatewayModule && typeof($gatewayModule.WebSocketFactory) === "function") { webSocketFactory = createWebSocketFactory(); amqpClientFactory.setWebSocketFactory(webSocketFactory); } - ``` +``` -2. Create AMQP client +- Create AMQP client - ```javascript +```javascript amqpClient = amqpClientFactory.createAmqpClient(); - ``` +``` -3. Connect to Gateway using amqpClient connect function. Connect function uses has the following parameters: +- Connect to Gateway using amqpClient connect function. Connect function uses has the following parameters: - Connection options. In most common cases it contains of URL, credentials and virtual host (set to ‘/‘) hat specifies the namespace for entities (exchanges and queues) referred to by the protocol. Note that this is not virtual hosting in the HTTP sense. - Callback function that will be called once connection is established. - ```javascript +```javascript var credentials = {username: username, password: password}; var options = { url: url, @@ -50,64 +52,64 @@ Connect function implements the following sequence: var connection=createConnectionObject(amqpClient,connectionInfo.username); connectedFunctionHandle(connection); }); - ``` +``` ### **subscribe** method of connection object -1. Creates subscription object -2. Initializes subscription object - - Opens publishing and consumption (subscription) channels using amqpClient openChannel function that will call on success the callback function that is passed as a parameter: +- Creates and initializes subscription object +- Opens publishing and consumption (subscription) channels using amqpClient openChannel function that will call on success the callback function that is passed as a parameter: - ```javascript - var openHandler=function(){ - publishChannel = amqpClient.openChannel(this.publishChannelOpenHandler); - consumeChannel = amqpClient.openChannel(this.consumeChannelOpenHandler); - } - ``` +```javascript + var openHandler=function(){ + publishChannel = amqpClient.openChannel(this.publishChannelOpenHandler); + consumeChannel = amqpClient.openChannel(this.consumeChannelOpenHandler); + } +``` Once the channels are created method returns the _subscription_ object via a callback. During the creation of the channels: - - Publishing channel open handler declares AMQP Exchange of a _fanout_ type thus creating publishing endpoint. +- Publishing channel open handler declares AMQP Exchange of a _fanout_ type thus creating publishing endpoint. - ```javascript - publishChannelOpenHandler:function(that){ - that.publishChannel.declareExchange({exchange: that.topicPub, type: "fanout"}); - } - ``` - - Consumption (or subscription) channel open handler: - 1. Adds an event listener for “message” event providing the ability to receive messages. - 2. Declares subscription queue for the client. Library randomly generates the name for every client. - 3. Binds the queue to a subscription exchange with “broadcastkey” routing key. +```javascript + publishChannelOpenHandler:function(that){ + that.publishChannel.declareExchange({exchange: that.topicPub, type: "fanout"}); + } +``` + +- Consumption (or subscription) channel open handler: + 1. Adds an event listener for “message” event providing the ability to receive messages. + 2. Declares subscription queue for the client. Library randomly generates the name for every client. + 3. Binds the queue to a subscription exchange with “broadcastkey” routing key. - **Note** For fanout exchanges routing key is not used. For more information about exchanges and routing keys see: [https://www.rabbitmq.com/tutorials/amqp-concepts.html][1] + **Note** For fanout exchanges routing key is not used. For more information about exchanges and routing keys see: [https://www.rabbitmq.com/tutorials/amqp-concepts.html][1] - 4. Starts basic consumer. Basic consumer is started with noAck=true parameter so the client does not need to implement explicit acknowledgement. Another parameter - noLocal - controls whether the client wants to receive its own messages. + 4. Starts basic consumer. Basic consumer is started with noAck=true parameter so the client does not need to implement explicit acknowledgement. Another parameter - noLocal - controls whether the client wants to receive its own messages. - ```javascript - consumeChannelOpenHandler:function(that{ - consumeChannel.addEventListener("message", function(message) { - var body = null; - // Check how the payload was packaged since older browsers like IE7 don't - // support ArrayBuffer. In those cases, a Kaazing ByteBuffer was used instead. - if (typeof(ArrayBuffer) === "undefined") { - body = message.getBodyAsByteBuffer().getString(Charset.UTF8); - } - else { - body = arrayBufferToString(message.getBodyAsArrayBuffer()) - } - that.messageReceivedFunc(body);; - }); - that.consumeChannel.declareQueue({queue: that.queueName}) - .bindQueue({queue: that.queueName, exchange: that.topicSub, routingKey: routingKey }) - .consumeBasic({queue: that.queueName, consumerTag: that.clientId, noAck: true, noLocal:that.noLocal }) - } - ``` +```javascript + consumeChannelOpenHandler:function(that{ + consumeChannel.addEventListener("message", function(message) { + var body = null; + // Check how the payload was packaged since older browsers like IE7 don't + // support ArrayBuffer. In those cases, a Kaazing ByteBuffer was used instead. + if (typeof(ArrayBuffer) === "undefined") { + body = message.getBodyAsByteBuffer().getString(Charset.UTF8); + } + else { + body = arrayBufferToString(message.getBodyAsArrayBuffer()); + } + that.messageReceivedFunc(body); + }); + that.consumeChannel.declareQueue({queue: that.queueName}) + .bindQueue({queue: that.queueName, exchange: that.topicSub, routingKey: routingKey }) + .consumeBasic({queue: that.queueName, consumerTag: that.clientId, noAck: true, noLocal:that.noLocal }) + } +``` ### **sendMessage** function of a subscription object Function sets AMQP properties and sends the message to a publishing exchange using specified routing key. **Note:** As mentioned earlier, library creates a fanout type of exchange that does not use routing keys; thus library sets the value of the routing key to 'broadcast'. ```javascript -sendMessage:function(msg){ + sendMessage:function(msg){ if (typeof msg ==="object"){ msg.clientId=this.clientId; msg=JSON.stringify(msg); @@ -135,16 +137,32 @@ sendMessage:function(msg){ props.setUserId(this.user); logInformation("sent","Sending message to "+this.topicPub+": "+ msg, "sent"); this.publishChannel.publishBasic({body: body, properties: props, exchange: this.topicPub, routingKey: routingKey}); - } + } ``` -### **close** function of a subscription object -Due to AMQP nature, nothing has to be performed. +### **disconnect** function of a subscription object +Deletes declared subscription queue and closes the channels: +```javascript + ... + var config = { + replyCode: 0, + replyText, '', + classId: 0, + methodId: 0 + }; + this.consumeChannel.deleteQueue({queue:this.queueName, ifEmpty: false}, function(){ + this.consumeChannel.closeChannel(config, function(){ + this.publishChannel.closeChannel(config, function(){ + + }); + }); + }); +``` -### **disconnect** function +### **close** function Disconnects the client from Kaazing WebSocket AMQP Gateway ```javascript -amqpClient.disconnect(); + amqpClient.disconnect(); ``` diff --git a/javascript/KaazingJMSClientLibrariesFacade.md b/javascript/KaazingJMSClientLibrariesFacade.md index bc8f7ec..e867982 100644 --- a/javascript/KaazingJMSClientLibrariesFacade.md +++ b/javascript/KaazingJMSClientLibrariesFacade.md @@ -1,28 +1,31 @@ # Kaazing Javascript JMS Client Libraries Facade +Kaazing JavaScript JMS Client Libraries Facade simplifies the interaction with Kaazing JavaScript JMS Client libraries that enable the developers to interact with [JMS](http://docs.oracle.com/javaee/6/tutorial/doc/bncdx.html) brokers via WebSockets. + Kaazing JavaScript JMS Client Libraries Facade: * Implements basic publish-subscribe functionality for JMS to help developers in getting started with their JMS WebSocket projects * Provide developers with the reference implementations for using Kaazing JMS JavaScript client libraries For more information see: -- [Build JavaScript JMS Clients Using Kaazing WebSocket Gateway - JMS Edition](http://developer.kaazing.com/documentation/jms/4.0/dev-js/o_dev_js.html) -- [Use the Kaazing WebSocket Gateway JavaScript JMS Client API](http://developer.kaazing.com/documentation/jms/4.0/dev-js/p_dev_js_client.html) +- [Build JavaScript JMS Clients Using Kaazing WebSocket Gateway - JMS Edition](http://kaazing.com/doc/jms/4.0/dev-js/o_dev_js.html) +- [Use the Kaazing WebSocket Gateway JavaScript JMS Client API](http://kaazing.com/doc/jms/4.0/dev-js/p_dev_js_client.html) ## Organization of the library -Library consists of jmsClientFunction that creates JMSClient object. JMSClient objects provides the following functionality: -- **connect** function - connects client to Kaazing WebSocket JMS gateway and creates a __connection__ object -- **disconnect** function - disconnects client from Kaazing WebSocket JMS gateway -- **sendMessage** function - sends the message to a publishing endpoint +- **connect** function - connects client to Kaazing WebSocket JMS gateway and on success returns via callback a _connection_ object that will be used to create subscriptions. +- ** subscribe ** method of a _connection_ object that creates publishing endpoint and subscribes to a subscription endpoint. Method returns via callback a _subscription_ object that is used for sending messages. +- **sendMessage** method of a _subscription_ object - sends the message to a publishing endpoint +- **disconnect** function of a _subscription_ object - closes both publishing and subscription. +- **close** method - closes all subscriptions and disconnects client from Kaazing WebSocket AMQP gateway ### **connect** function Connect function implements the following sequence: -1. Create JMS connection factory +- Create JMS connection factory ```javascript var jmsConnectionFactory = new JmsConnectionFactory(url); ``` -2. Create connection. createConnection function of JmsConnectionFactory takes three parameters: login, password and a callback function that will be called upon completion. Function returns the future that is checked in a callback function for exceptions. +- Create connection. createConnection function of JmsConnectionFactory takes three parameters: login, password and a callback function that will be called upon completion. Function returns the future that is checked in a callback function for exceptions. ```javascript var connectionFuture = jmsConnectionFactory.createConnection(username, password, function () { @@ -48,13 +51,13 @@ Connect function implements the following sequence: }) ``` -3. Once connection is created, callback function does the following: + Once connection is created, callback function does the following: 1. Obtains the connection from the connectionFuture that was returned by createConection. 2. Sets exception listener to handle exceptions. 3. Creates session using createSession method. Session is created with auto-acknowledgement. 4. Starts the connection using start function passing to it a callback function. -4. Once connection is started, connection object is returned for the subscription to be created using __subscribe__ method. +- Once connection is started, connection object is returned for the subscription to be created using __subscribe__ method. ### **subscribe** method of connection object Method executed the following actions: @@ -104,7 +107,7 @@ Function creates text message and sends it. In order to prevent client from rece } ``` -### **close** function of a subscription object +### **disconnect** function of a subscription object Function closes producer and consumer that were created during the subscription call. ```javascript @@ -114,8 +117,8 @@ Function closes producer and consumer that were created during the subscription }) ``` -### **disconnect** function -Closes all subscriptions (causing closing of their producer and consumer), session and connection in a chain of callbacks. +### **close** function +Closes all subscriptions (causing closing of their producer and consumer), stops the connection and then closes session and connection in a chain of callbacks. ```javascript JMSClient.disconnect=function(){ @@ -125,11 +128,13 @@ Closes all subscriptions (causing closing of their producer and consumer), sessi ... Wait while all the subscriptions are closed... - session.close(function () { - connection.close(function () { + connection.stop(function(){ + session.close(function () { + connection.close(function () { + }); + }); }); - }); } ``` diff --git a/javascript/jsdoc/AmqpUniversalClient.js.html b/javascript/jsdoc/AmqpUniversalClient.js.html index 06e1558..187a6c6 100644 --- a/javascript/jsdoc/AmqpUniversalClient.js.html +++ b/javascript/jsdoc/AmqpUniversalClient.js.html @@ -60,7 +60,7 @@

Source: AmqpUniversalClient.js

* Provides communication services with AMQP server. Created within amqpClientFunction constructor. * @class */ - var AmqpClient = {}; + var AmqpClient = {subscriptions:[]}; var errorFunction=null; var amqpClient=null; @@ -117,7 +117,9 @@

Source: AmqpUniversalClient.js

clientId:null, messageIdCounter:0, user:user, + closed:null, messageReceivedFunc:messageReceivedFunc, + subscribed:false, init:function(subscribedCallback){ this.queueName="client" + Math.floor(Math.random() * 1000000); this.clientId=appId; @@ -129,6 +131,8 @@

Source: AmqpUniversalClient.js

logInformation("INFO", "OPEN: Consume Channel"); this.consumeChannel = amqpClient.openChannel(function(){that.consumeChannelOpenHandler(that)}); $.when(this.publishChannelOpened, this.consumeChannelOpened).done(function(){ + that.closed=$.Deferred(); + that.subscribed=true; subscribedCallback(that); }); @@ -239,18 +243,40 @@

Source: AmqpUniversalClient.js

props.setUserId(this.user); logInformation("sent","Sending message to "+this.topicPub+": "+ msg, "sent"); this.publishChannel.publishBasic({body: body, properties: props, exchange: this.topicPub, routingKey: routingKey}); + }, + disconnect:function(){ + if (!this.subscribed){ + this.closed.resolve(); + } + else{ + this.subscribed=false; + var config = { + replyCode: 0, + replyText: '', + classId: 0, + methodId: 0 + }; + this.consumeChannel.deleteQueue({queue:this.queueName, ifEmpty: false}, function(){ + this.consumeChannel.closeChannel(config, function(){ + this.publishChannel.closeChannel(config, function(){ + this.closed.resolve(); + }); + }); + }); + } } }; return SubscriptionObject; } - var createConnectionObject=function(amqpClient, user){ + var createConnectionObject=function(connection, amqpClient, user){ /** * Contains infomration about established connection. * @class */ var ConnectionObject = { + connection:connection, user:user, amqpClient:amqpClient, /** @@ -264,7 +290,9 @@

Source: AmqpUniversalClient.js

subscribe:function(topicPub, topicSub, messageReceivedFunc, noLocal, subscribedCallbackFunction){ logInformation("INFO","CONNECTED!!!"); var subscription=createSubscriptionObject(this.amqpClient, topicPub, topicSub, noLocal, messageReceivedFunc, this.user); + var that=this; subscription.init(function(subscription){ + that.connection.subscriptions.push(subscription); subscribedCallbackFunction(subscription); }); } @@ -301,8 +329,9 @@

Source: AmqpUniversalClient.js

credentials: credentials }; try{ + var that=this; amqpClient.connect(options, function(){ - var connection=createConnectionObject(amqpClient,connectionInfo.username); + var connection=createConnectionObject(that, amqpClient,connectionInfo.username); connectedFunctionHandle(connection); }); } @@ -314,8 +343,13 @@

Source: AmqpUniversalClient.js

/** * Disconnects from Kaazing WebSocket AMQP Gateway */ - AmqpClient.disconnect=function(){ - amqpClient.disconnect(); + AmqpClient.close=function(){ + for(var i=0;i<this.subscriptions.length;i++){ + this.subscriptions[i].disconnect(); + } + $.when.apply($,this.subscriptions).then(function() { + amqpClient.disconnect(); + }); } return AmqpClient; @@ -336,7 +370,7 @@

Home

Classes