From 6b00c94ac5d3a98094864a1e3348617daa4a678e Mon Sep 17 00:00:00 2001 From: Mohamed Ezzat Date: Wed, 25 May 2016 06:39:27 +0200 Subject: [PATCH] squid:S1213 - The members of an interface declaration or class should appear in a pre-defined order --- .../mqtt/client/CallbackConnection.java | 9 +-- .../java/org/fusesource/mqtt/client/MQTT.java | 73 +++++++++---------- .../mqtt/codec/MQTTProtocolCodec.java | 38 +++++----- .../org/fusesource/mqtt/codec/PUBREL.java | 8 +- 4 files changed, 63 insertions(+), 65 deletions(-) diff --git a/mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java b/mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java index 9aadb95..4f32621 100644 --- a/mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java +++ b/mqtt-client/src/main/java/org/fusesource/mqtt/client/CallbackConnection.java @@ -71,7 +71,10 @@ * @author Hiram Chirino */ public class CallbackConnection { - + private boolean onRefillCalled =false; + public static final Task NOOP = Dispatch.NOOP; + private short nextMessageId = 1; + private static class Request { private final MQTTFrame frame; private final short id; @@ -411,7 +414,6 @@ public void onFailure(Throwable value) { } } - private boolean onRefillCalled =false; public void onSessionEstablished(Transport transport) { this.transport = transport; if( suspendCount.get() > 0 ) { @@ -741,7 +743,6 @@ private void send(Request request) { } } - private short nextMessageId = 1; private short getNextMessageId() { short rc = nextMessageId; nextMessageId++; @@ -858,8 +859,6 @@ private void processFrame(MQTTFrame frame) { } } - static public final Task NOOP = Dispatch.NOOP; - private void toReceiver(final PUBLISH publish) { if( listener !=null ) { try { diff --git a/mqtt-client/src/main/java/org/fusesource/mqtt/client/MQTT.java b/mqtt-client/src/main/java/org/fusesource/mqtt/client/MQTT.java index 2e1b3ca..67584f1 100644 --- a/mqtt-client/src/main/java/org/fusesource/mqtt/client/MQTT.java +++ b/mqtt-client/src/main/java/org/fusesource/mqtt/client/MQTT.java @@ -45,44 +45,7 @@ public class MQTT { private static final long KEEP_ALIVE = Long.parseLong(System.getProperty("mqtt.thread.keep_alive", Integer.toString(1000))); private static final long STACK_SIZE = Long.parseLong(System.getProperty("mqtt.thread.stack_size", Integer.toString(1024*512))); private static ThreadPoolExecutor blockingThreadPool; - - - public synchronized static ThreadPoolExecutor getBlockingThreadPool() { - if( blockingThreadPool == null ) { - blockingThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, KEEP_ALIVE, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadFactory() { - public Thread newThread(Runnable r) { - Thread rc = new Thread(null, r, "MQTT Task", STACK_SIZE); - rc.setDaemon(true); - return rc; - } - }) { - - @Override - public void shutdown() { - // we don't ever shutdown since we are shared.. - } - - @Override - public List shutdownNow() { - // we don't ever shutdown since we are shared.. - return Collections.emptyList(); - } - }; - } - return blockingThreadPool; - } - public synchronized static void setBlockingThreadPool(ThreadPoolExecutor pool) { - blockingThreadPool = pool; - } - private static final URI DEFAULT_HOST = createDefaultHost(); - private static URI createDefaultHost() { - try { - return new URI("tcp://127.0.0.1:1883"); - } catch (URISyntaxException e) { - return null; - } - } protected URI host = DEFAULT_HOST; protected URI localAddress; @@ -127,6 +90,42 @@ public MQTT(MQTT other) { this.tracer = other.tracer; } + public synchronized static ThreadPoolExecutor getBlockingThreadPool() { + if( blockingThreadPool == null ) { + blockingThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, KEEP_ALIVE, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread rc = new Thread(null, r, "MQTT Task", STACK_SIZE); + rc.setDaemon(true); + return rc; + } + }) { + + @Override + public void shutdown() { + // we don't ever shutdown since we are shared.. + } + + @Override + public List shutdownNow() { + // we don't ever shutdown since we are shared.. + return Collections.emptyList(); + } + }; + } + return blockingThreadPool; + } + public synchronized static void setBlockingThreadPool(ThreadPoolExecutor pool) { + blockingThreadPool = pool; + } + + private static URI createDefaultHost() { + try { + return new URI("tcp://127.0.0.1:1883"); + } catch (URISyntaxException e) { + return null; + } + } + public CallbackConnection callbackConnection() { if( !isCleanSession() && ( getClientId()==null || getClientId().length==0 )) { throw new IllegalArgumentException("The client id MUST be configured when clean session is set to false"); diff --git a/mqtt-client/src/main/java/org/fusesource/mqtt/codec/MQTTProtocolCodec.java b/mqtt-client/src/main/java/org/fusesource/mqtt/codec/MQTTProtocolCodec.java index a5d300c..3b96d42 100644 --- a/mqtt-client/src/main/java/org/fusesource/mqtt/codec/MQTTProtocolCodec.java +++ b/mqtt-client/src/main/java/org/fusesource/mqtt/codec/MQTTProtocolCodec.java @@ -37,6 +37,25 @@ public class MQTTProtocolCodec extends AbstractProtocolCodec { private int maxMessageLength = 1024*1024*100; + private final Action readHeader = new Action() { + public MQTTFrame apply() throws IOException { + int length = readLength(); + if( length >= 0 ) { + if( length > maxMessageLength) { + throw new IOException("The maximum message length was exceeded"); + } + byte header = readBuffer.get(readStart); + readStart = readEnd; + if( length > 0 ) { + nextDecodeAction = readBody(header, length); + } else { + return new MQTTFrame().header(header); + } + } + return null; + } + }; + public MQTTProtocolCodec() { this.bufferPools = BUFFER_POOLS; } @@ -76,25 +95,6 @@ protected Action initialDecodeAction() { return readHeader; } - private final Action readHeader = new Action() { - public MQTTFrame apply() throws IOException { - int length = readLength(); - if( length >= 0 ) { - if( length > maxMessageLength) { - throw new IOException("The maximum message length was exceeded"); - } - byte header = readBuffer.get(readStart); - readStart = readEnd; - if( length > 0 ) { - nextDecodeAction = readBody(header, length); - } else { - return new MQTTFrame().header(header); - } - } - return null; - } - }; - private int readLength() throws IOException { readEnd = readStart+2; // Header is at least 2 bytes.. int limit = readBuffer.position(); diff --git a/mqtt-client/src/main/java/org/fusesource/mqtt/codec/PUBREL.java b/mqtt-client/src/main/java/org/fusesource/mqtt/codec/PUBREL.java index 121ea7e..aab1dd6 100644 --- a/mqtt-client/src/main/java/org/fusesource/mqtt/codec/PUBREL.java +++ b/mqtt-client/src/main/java/org/fusesource/mqtt/codec/PUBREL.java @@ -38,14 +38,14 @@ public class PUBREL extends MessageSupport.HeaderBase implements Message, Acked private short messageId; - public byte messageType() { - return TYPE; - } - public PUBREL() { qos(QoS.AT_LEAST_ONCE); } + public byte messageType() { + return TYPE; + } + public PUBREL decode(MQTTFrame frame) throws ProtocolException { assert(frame.buffers.length == 1); header(frame.header());