From 6036b750ac22213bdd1094daec3438b53b567381 Mon Sep 17 00:00:00 2001 From: Fabrice Date: Mon, 8 Apr 2024 17:13:02 -0500 Subject: [PATCH 1/3] added support for generic artemis AbstractConsumer --- pom.xml | 5 +-- .../sa/client/jms/AbstractConsumer.java | 31 +++++++++++++------ .../sa/client/jms/ConsumerInterface.java | 5 +-- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/pom.xml b/pom.xml index cf3442e..432c289 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 io.2060 service-agent-java-client - 2.0.4 + 2.0.5 UTF-8 17 @@ -45,7 +45,8 @@ io.quarkus quarkus-rest-client-jackson - 3.6.4 + 3.9.2 + provided org.graalvm.sdk diff --git a/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java b/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java index c16dacb..b38e2f2 100644 --- a/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java +++ b/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java @@ -24,7 +24,7 @@ import io.twentysixty.sa.client.util.JsonUtil; -public class AbstractConsumer implements ConsumerInterface { +public class AbstractConsumer implements ConsumerInterface { private ConnectionFactory connectionFactory; @@ -174,11 +174,11 @@ public Uni consumer(UUID uuid) { } - logger.info("consumer: running " + uuid); + if (debug) logger.info("consumer: running " + uuid); try { - if (debug) logger.warn("consumer " + queueName + ": create session " + uuid ); + if (debug) logger.info("consumer " + queueName + ": create session " + uuid ); if (context == null) { @@ -187,13 +187,21 @@ public Uni consumer(UUID uuid) { } - if (debug) logger.warn("consumer " + queueName + ": session created " + uuid ); + if (debug) logger.info("consumer " + queueName + ": session created " + uuid ); if (queue == null) { queue = context.createQueue(queueName); } - if (debug) logger.warn("consumer " + queueName + ": create consumer " + uuid ); - JMSConsumer consumer = context.createConsumer(queue); + if (debug) logger.info("consumer " + queueName + ": create consumer " + uuid ); + JMSConsumer consumer = null; + String messageSelector = getMessageSelector(); + + if (messageSelector != null) { + consumer = context.createConsumer(queue, getMessageSelector()); + } else { + context.createConsumer(queue); + } + if (debug) logger.info("consumer " + queueName + ": waiting for message... " + uuid); @@ -238,7 +246,7 @@ public Uni consumer(UUID uuid) { } } try { - BaseMessage baseMessage = (BaseMessage) objMsg.getObject(); + M baseMessage = (M) objMsg.getObject(); this.receiveMessage(baseMessage); //messageResource.sendMessage(baseMessage); @@ -260,7 +268,7 @@ public Uni consumer(UUID uuid) { } } else { - if (debug) logger.warn("consumer " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": unkown event " + message); + if (debug) logger.info("consumer " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": unkown event " + message); context.commit(); } @@ -333,7 +341,7 @@ public void setDebug(boolean debug) { } @Override - public void receiveMessage(BaseMessage message) throws Exception { + public void receiveMessage(M message) throws Exception { // TODO Auto-generated method stub } @@ -345,7 +353,10 @@ public ConnectionFactory getConnectionFactory() { public void setConnectionFactory(ConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } - + @Override + public String getMessageSelector() { + return null; + } diff --git a/src/main/java/io/twentysixty/sa/client/jms/ConsumerInterface.java b/src/main/java/io/twentysixty/sa/client/jms/ConsumerInterface.java index d4c26d2..5c8fb25 100644 --- a/src/main/java/io/twentysixty/sa/client/jms/ConsumerInterface.java +++ b/src/main/java/io/twentysixty/sa/client/jms/ConsumerInterface.java @@ -3,14 +3,15 @@ import io.twentysixty.sa.client.model.message.BaseMessage; -public interface ConsumerInterface { +public interface ConsumerInterface { public void setExDelay(Long exDelay); public void setQueueName(String queueName); public void setThreads(Integer threads); public void setDebug(boolean debug); + public String getMessageSelector(); + public void receiveMessage(M message) throws Exception; - public void receiveMessage(BaseMessage message) throws Exception; } From 614330af158bcc7dfa1cd9b243b237ee9bace366 Mon Sep 17 00:00:00 2001 From: Fabrice Date: Mon, 8 Apr 2024 18:05:33 -0500 Subject: [PATCH 2/3] improved artemis consumer code --- .../sa/client/jms/AbstractConsumer.java | 540 ++++++++++-------- 1 file changed, 288 insertions(+), 252 deletions(-) diff --git a/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java b/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java index b38e2f2..59719e0 100644 --- a/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java +++ b/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java @@ -3,6 +3,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -26,299 +27,334 @@ public class AbstractConsumer implements ConsumerInterface { - private ConnectionFactory connectionFactory; + private ConnectionFactory connectionFactory; + - private Long exDelay; private String queueName; private Integer threads; private Boolean debug; - - - + + + private static final Logger logger = Logger.getLogger(AbstractConsumer.class); - private Map lockObjs = new HashMap(); - private Map runnings = new HashMap(); - private Map starteds = new HashMap(); - private Map contexts = new HashMap(); - - + private static Map lockObjs = new HashMap(); + private static Map runnings = new HashMap(); + private static Map starteds = new HashMap(); + private static Map contexts = new HashMap(); + + private static ExecutorService executor = Executors.newCachedThreadPool(); - - - protected void _onStart() { - - - for (int i=0; i consumer(UUID uuid) { - - JMSContext context = null; - Queue queue = null; - - Object lockObj = new Object(); - synchronized (lockObjs) { - lockObjs.put(uuid, lockObj); - } - synchronized (runnings) { - runnings.put(uuid, true); - } - - - long now = System.currentTimeMillis(); - synchronized (lockObj) { + + public Uni consumer(UUID uuid) { + + JMSContext context = null; + Queue queue = null; + + Object lockObj = new Object(); + synchronized (lockObjs) { + lockObjs.put(uuid, lockObj); + } + synchronized (runnings) { + runnings.put(uuid, true); + } + + + long now = System.currentTimeMillis(); + /*synchronized (lockObj) { try { lockObj.wait(10000l); } catch (InterruptedException e) { + + } + } + */ + + while (true) { + Boolean started = null; + synchronized(starteds) { + started = starteds.get(uuid); + + } + if ((started == null) || (!started)) { + break; + } + + + if (debug) logger.info("consumer: running " + uuid); + + try { + + if (debug) logger.info("consumer " + queueName + ": create session " + uuid ); + + + if (context == null) { + context = getConnectionFactory().createContext(Session.SESSION_TRANSACTED); + synchronized (contexts) { + contexts.put(uuid, context); + } + + } + + + if (debug) logger.info("consumer " + queueName + ": session created " + uuid ); + + if (queue == null) { + queue = context.createQueue(queueName); + } + if (debug) logger.info("consumer " + queueName + ": create consumer " + uuid ); + JMSConsumer consumer = null; + String messageSelector = getMessageSelector(); + + if (messageSelector != null) { + consumer = context.createConsumer(queue, getMessageSelector()); + } else { + consumer = context.createConsumer(queue); + } + + + + if (debug) logger.info("consumer " + queueName + ": waiting for message... " + uuid); + + + while (true) { + started = null; + + synchronized(starteds) { + started = starteds.get(uuid); + + } + if ((started == null) || (!started)) { + break; + } + + + now = System.currentTimeMillis(); + + if (debug) + logger.info("consumer: waiting for message... " + uuid + " " + (System.currentTimeMillis() - now)); + + Message message = consumer.receive(); + + if (message != null) { + if (debug) + logger.info("consumer: received message " + uuid + " " + (System.currentTimeMillis() - now)); + + + + //BaseMessage baseMessage = null; + + if (message instanceof ObjectMessage) { + + ObjectMessage objMsg = (ObjectMessage) message; + //baseMessage = (BaseMessage) objMsg.getObject(); + + if (debug) { + try { + logger.info(JsonUtil.serialize(objMsg.getObject(), false)); + } catch (JsonProcessingException e2) { + + } + } + try { + M baseMessage = (M) objMsg.getObject(); + this.receiveMessage(baseMessage); + //messageResource.sendMessage(baseMessage); + + + context.commit(); + if (debug) + logger.info("consumer: " + queueName + " after commit "+ uuid + " " + (System.currentTimeMillis() - now)); + + + } catch (Exception e) { + try { + logger.warn("consumer: " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": exception " + JsonUtil.serialize(objMsg, false), e); + } catch (JsonProcessingException e1) { + logger.warn("consumer: " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": exception", e); + } + context.rollback(); + //if (debug) + logger.info("consumer: " + queueName + " after rollback "+ uuid + " " + (System.currentTimeMillis() - now)); + + } + } else { + if (debug) logger.info("consumer " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": unkown event " + message); + context.commit(); + } + + + } else { + if (debug) + logger.info("consumer: no delivered message " + uuid + " " + (System.currentTimeMillis() - now)); + + synchronized (lockObj) { + try { + if (debug) logger.info("consumer: waiting thread " + uuid + " " + (System.currentTimeMillis() - now)); + lockObj.wait(1000); + } catch (InterruptedException e1) { + } + } + } + + + + } + consumer.close(); + context.close(); + consumer = null; + context = null; + } catch (Exception e) { + logger.error("", e); + try { + + context.close(); + } catch (Exception e1) { + + } + context = null; + queue = null; + + + + synchronized (lockObj) { + try { + lockObj.wait(exDelay); + } catch (InterruptedException e1) { + } + } } } - - - while (true) { - Boolean started = null; - synchronized(starteds) { - started = starteds.get(uuid); - - } - if ((started == null) || (!started)) { - break; - } - - - if (debug) logger.info("consumer: running " + uuid); - - try { - - if (debug) logger.info("consumer " + queueName + ": create session " + uuid ); - - - if (context == null) { - context = getConnectionFactory().createContext(Session.SESSION_TRANSACTED); - contexts.put(uuid, context); - } - - - if (debug) logger.info("consumer " + queueName + ": session created " + uuid ); - - if (queue == null) { - queue = context.createQueue(queueName); - } - if (debug) logger.info("consumer " + queueName + ": create consumer " + uuid ); - JMSConsumer consumer = null; - String messageSelector = getMessageSelector(); - - if (messageSelector != null) { - consumer = context.createConsumer(queue, getMessageSelector()); - } else { - context.createConsumer(queue); - } - - - - if (debug) logger.info("consumer " + queueName + ": waiting for message... " + uuid); - - - while (true) { - started = null; - synchronized(starteds) { - started = starteds.get(uuid); - - } - if ((started == null) || (!started)) { - break; - } - - - now = System.currentTimeMillis(); - - if (debug) - logger.info("consumer: waiting for message... " + uuid + " " + (System.currentTimeMillis() - now)); - - Message message = consumer.receive(); - - if (message != null) { - if (debug) - logger.info("consumer: received message " + uuid + " " + (System.currentTimeMillis() - now)); - - - - //BaseMessage baseMessage = null; - - if (message instanceof ObjectMessage) { - - ObjectMessage objMsg = (ObjectMessage) message; - //baseMessage = (BaseMessage) objMsg.getObject(); - - if (debug) { - try { - logger.info(JsonUtil.serialize(objMsg.getObject(), false)); - } catch (JsonProcessingException e2) { - - } - } - try { - M baseMessage = (M) objMsg.getObject(); - this.receiveMessage(baseMessage); - //messageResource.sendMessage(baseMessage); - - - context.commit(); - if (debug) - logger.info("consumer: " + queueName + " after commit "+ uuid + " " + (System.currentTimeMillis() - now)); - - - } catch (Exception e) { - try { - logger.warn("consumer: " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": exception " + JsonUtil.serialize(objMsg, false), e); - } catch (JsonProcessingException e1) { - logger.warn("consumer: " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": exception", e); - } - context.rollback(); - //if (debug) - logger.info("consumer: " + queueName + " after rollback "+ uuid + " " + (System.currentTimeMillis() - now)); - - } - } else { - if (debug) logger.info("consumer " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": unkown event " + message); - context.commit(); - } - - - } else { - if (debug) - logger.info("consumer: no delivered message " + uuid + " " + (System.currentTimeMillis() - now)); - - synchronized (lockObj) { - try { - if (debug) logger.info("consumer: waiting thread " + uuid + " " + (System.currentTimeMillis() - now)); - lockObj.wait(1000); - } catch (InterruptedException e1) { - } - } - } - - - - } - consumer.close(); - context.close(); - consumer = null; - context = null; - } catch (Exception e) { - try { - - context.close(); - } catch (Exception e1) { - - } - context = null; - queue = null; - - - - synchronized (lockObj) { - try { - lockObj.wait(exDelay); - } catch (InterruptedException e1) { - } - } - } - } - synchronized (runnings) { - runnings.put(uuid, false); - } - - return Uni.createFrom().voidItem(); - } + synchronized (runnings) { + runnings.put(uuid, false); + } + + return Uni.createFrom().voidItem(); + } @Override public void setExDelay(Long exDelay) { @@ -343,7 +379,7 @@ public void setDebug(boolean debug) { @Override public void receiveMessage(M message) throws Exception { // TODO Auto-generated method stub - + } public ConnectionFactory getConnectionFactory() { @@ -357,8 +393,8 @@ public void setConnectionFactory(ConnectionFactory connectionFactory) { public String getMessageSelector() { return null; } - - - - + + + + } \ No newline at end of file From dc3225422411989d8a3d62abdad9ee25c7e9e2bd Mon Sep 17 00:00:00 2001 From: Fabrice Date: Mon, 8 Apr 2024 18:25:35 -0500 Subject: [PATCH 3/3] fixed bug caused by static variables --- .../sa/client/jms/AbstractConsumer.java | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java b/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java index 59719e0..bef9940 100644 --- a/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java +++ b/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java @@ -39,10 +39,10 @@ public class AbstractConsumer implements ConsumerInterface { private static final Logger logger = Logger.getLogger(AbstractConsumer.class); - private static Map lockObjs = new HashMap(); - private static Map runnings = new HashMap(); - private static Map starteds = new HashMap(); - private static Map contexts = new HashMap(); + private Map lockObjs = new HashMap(); + private Map runnings = new HashMap(); + private Map starteds = new HashMap(); + private Map contexts = new HashMap(); private static ExecutorService executor = Executors.newCachedThreadPool(); @@ -81,7 +81,7 @@ private void stopConsumers() { } for (UUID uuid: keySet) { - logger.info("stopConsumers: stopping consumer " + uuid + " for " + queueName); + if (debug) logger.info("stopConsumers: stopping consumer " + uuid + " for " + queueName); setStoppedConsumer(uuid); JMSContext context = null; @@ -90,10 +90,18 @@ private void stopConsumers() { context = contexts.get(uuid); } - if (context != null) context.close(); + + + if (context != null) { + if (debug) logger.info("stopConsumers: closing context " + uuid); + context.close(); + } + if (debug) logger.info("stopConsumers: closing context done, calling stopConsumer " + uuid); stopConsumer(uuid); - + + if (debug) logger.info("stopConsumers: removing lockObj " + uuid); + synchronized (lockObjs) { lockObjs.remove(uuid); }