diff --git a/pom.xml b/pom.xml index cf3442e..432c289 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 io.2060 service-agent-java-client - 2.0.4 + 2.0.5 UTF-8 17 @@ -45,7 +45,8 @@ io.quarkus quarkus-rest-client-jackson - 3.6.4 + 3.9.2 + provided org.graalvm.sdk diff --git a/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java b/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java index c16dacb..bef9940 100644 --- a/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java +++ b/src/main/java/io/twentysixty/sa/client/jms/AbstractConsumer.java @@ -3,6 +3,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -24,293 +25,344 @@ import io.twentysixty.sa.client.util.JsonUtil; -public class AbstractConsumer implements ConsumerInterface { +public class AbstractConsumer implements ConsumerInterface { + + private ConnectionFactory connectionFactory; - private ConnectionFactory connectionFactory; - private Long exDelay; private String queueName; private Integer threads; private Boolean debug; - - - + + + private static final Logger logger = Logger.getLogger(AbstractConsumer.class); private Map lockObjs = new HashMap(); private Map runnings = new HashMap(); private Map starteds = new HashMap(); private Map contexts = new HashMap(); - - + + private static ExecutorService executor = Executors.newCachedThreadPool(); - - - protected void _onStart() { - - - for (int i=0; i consumer(UUID uuid) { - - JMSContext context = null; - Queue queue = null; - - Object lockObj = new Object(); - synchronized (lockObjs) { - lockObjs.put(uuid, lockObj); - } - synchronized (runnings) { - runnings.put(uuid, true); - } - - - long now = System.currentTimeMillis(); - synchronized (lockObj) { + + public Uni consumer(UUID uuid) { + + JMSContext context = null; + Queue queue = null; + + Object lockObj = new Object(); + synchronized (lockObjs) { + lockObjs.put(uuid, lockObj); + } + synchronized (runnings) { + runnings.put(uuid, true); + } + + + long now = System.currentTimeMillis(); + /*synchronized (lockObj) { try { lockObj.wait(10000l); } catch (InterruptedException e) { + + } + } + */ + + while (true) { + Boolean started = null; + synchronized(starteds) { + started = starteds.get(uuid); + + } + if ((started == null) || (!started)) { + break; + } + + + if (debug) logger.info("consumer: running " + uuid); + + try { + + if (debug) logger.info("consumer " + queueName + ": create session " + uuid ); + + + if (context == null) { + context = getConnectionFactory().createContext(Session.SESSION_TRANSACTED); + synchronized (contexts) { + contexts.put(uuid, context); + } + + } + + + if (debug) logger.info("consumer " + queueName + ": session created " + uuid ); + + if (queue == null) { + queue = context.createQueue(queueName); + } + if (debug) logger.info("consumer " + queueName + ": create consumer " + uuid ); + JMSConsumer consumer = null; + String messageSelector = getMessageSelector(); + + if (messageSelector != null) { + consumer = context.createConsumer(queue, getMessageSelector()); + } else { + consumer = context.createConsumer(queue); + } + + + + if (debug) logger.info("consumer " + queueName + ": waiting for message... " + uuid); + + + while (true) { + started = null; + + synchronized(starteds) { + started = starteds.get(uuid); + + } + if ((started == null) || (!started)) { + break; + } + + + now = System.currentTimeMillis(); + + if (debug) + logger.info("consumer: waiting for message... " + uuid + " " + (System.currentTimeMillis() - now)); + + Message message = consumer.receive(); + + if (message != null) { + if (debug) + logger.info("consumer: received message " + uuid + " " + (System.currentTimeMillis() - now)); + + + + //BaseMessage baseMessage = null; + + if (message instanceof ObjectMessage) { + + ObjectMessage objMsg = (ObjectMessage) message; + //baseMessage = (BaseMessage) objMsg.getObject(); + + if (debug) { + try { + logger.info(JsonUtil.serialize(objMsg.getObject(), false)); + } catch (JsonProcessingException e2) { + + } + } + try { + M baseMessage = (M) objMsg.getObject(); + this.receiveMessage(baseMessage); + //messageResource.sendMessage(baseMessage); + + + context.commit(); + if (debug) + logger.info("consumer: " + queueName + " after commit "+ uuid + " " + (System.currentTimeMillis() - now)); + + + } catch (Exception e) { + try { + logger.warn("consumer: " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": exception " + JsonUtil.serialize(objMsg, false), e); + } catch (JsonProcessingException e1) { + logger.warn("consumer: " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": exception", e); + } + context.rollback(); + //if (debug) + logger.info("consumer: " + queueName + " after rollback "+ uuid + " " + (System.currentTimeMillis() - now)); + + } + } else { + if (debug) logger.info("consumer " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": unkown event " + message); + context.commit(); + } + + + } else { + if (debug) + logger.info("consumer: no delivered message " + uuid + " " + (System.currentTimeMillis() - now)); + + synchronized (lockObj) { + try { + if (debug) logger.info("consumer: waiting thread " + uuid + " " + (System.currentTimeMillis() - now)); + lockObj.wait(1000); + } catch (InterruptedException e1) { + } + } + } + + + + } + consumer.close(); + context.close(); + consumer = null; + context = null; + } catch (Exception e) { + logger.error("", e); + try { + + context.close(); + } catch (Exception e1) { + + } + context = null; + queue = null; + + + + synchronized (lockObj) { + try { + lockObj.wait(exDelay); + } catch (InterruptedException e1) { + } + } } } - - - while (true) { - Boolean started = null; - synchronized(starteds) { - started = starteds.get(uuid); - - } - if ((started == null) || (!started)) { - break; - } - - - logger.info("consumer: running " + uuid); - - try { - - if (debug) logger.warn("consumer " + queueName + ": create session " + uuid ); - - - if (context == null) { - context = getConnectionFactory().createContext(Session.SESSION_TRANSACTED); - contexts.put(uuid, context); - } - - - if (debug) logger.warn("consumer " + queueName + ": session created " + uuid ); - - if (queue == null) { - queue = context.createQueue(queueName); - } - if (debug) logger.warn("consumer " + queueName + ": create consumer " + uuid ); - JMSConsumer consumer = context.createConsumer(queue); - - - if (debug) logger.info("consumer " + queueName + ": waiting for message... " + uuid); - - - while (true) { - started = null; - synchronized(starteds) { - started = starteds.get(uuid); - - } - if ((started == null) || (!started)) { - break; - } - - - now = System.currentTimeMillis(); - - if (debug) - logger.info("consumer: waiting for message... " + uuid + " " + (System.currentTimeMillis() - now)); - - Message message = consumer.receive(); - - if (message != null) { - if (debug) - logger.info("consumer: received message " + uuid + " " + (System.currentTimeMillis() - now)); - - - - //BaseMessage baseMessage = null; - - if (message instanceof ObjectMessage) { - - ObjectMessage objMsg = (ObjectMessage) message; - //baseMessage = (BaseMessage) objMsg.getObject(); - - if (debug) { - try { - logger.info(JsonUtil.serialize(objMsg.getObject(), false)); - } catch (JsonProcessingException e2) { - - } - } - try { - BaseMessage baseMessage = (BaseMessage) objMsg.getObject(); - this.receiveMessage(baseMessage); - //messageResource.sendMessage(baseMessage); - - - context.commit(); - if (debug) - logger.info("consumer: " + queueName + " after commit "+ uuid + " " + (System.currentTimeMillis() - now)); - - - } catch (Exception e) { - try { - logger.warn("consumer: " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": exception " + JsonUtil.serialize(objMsg, false), e); - } catch (JsonProcessingException e1) { - logger.warn("consumer: " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": exception", e); - } - context.rollback(); - //if (debug) - logger.info("consumer: " + queueName + " after rollback "+ uuid + " " + (System.currentTimeMillis() - now)); - - } - } else { - if (debug) logger.warn("consumer " + queueName + " "+ uuid + " " + (System.currentTimeMillis() - now)+ ": unkown event " + message); - context.commit(); - } - - - } else { - if (debug) - logger.info("consumer: no delivered message " + uuid + " " + (System.currentTimeMillis() - now)); - - synchronized (lockObj) { - try { - if (debug) logger.info("consumer: waiting thread " + uuid + " " + (System.currentTimeMillis() - now)); - lockObj.wait(1000); - } catch (InterruptedException e1) { - } - } - } - - - - } - consumer.close(); - context.close(); - consumer = null; - context = null; - } catch (Exception e) { - try { - - context.close(); - } catch (Exception e1) { - - } - context = null; - queue = null; - - - - synchronized (lockObj) { - try { - lockObj.wait(exDelay); - } catch (InterruptedException e1) { - } - } - } - } - synchronized (runnings) { - runnings.put(uuid, false); - } - - return Uni.createFrom().voidItem(); - } + synchronized (runnings) { + runnings.put(uuid, false); + } + + return Uni.createFrom().voidItem(); + } @Override public void setExDelay(Long exDelay) { @@ -333,9 +385,9 @@ public void setDebug(boolean debug) { } @Override - public void receiveMessage(BaseMessage message) throws Exception { + public void receiveMessage(M message) throws Exception { // TODO Auto-generated method stub - + } public ConnectionFactory getConnectionFactory() { @@ -345,9 +397,12 @@ public ConnectionFactory getConnectionFactory() { public void setConnectionFactory(ConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } - - - - - + @Override + public String getMessageSelector() { + return null; + } + + + + } \ No newline at end of file diff --git a/src/main/java/io/twentysixty/sa/client/jms/ConsumerInterface.java b/src/main/java/io/twentysixty/sa/client/jms/ConsumerInterface.java index d4c26d2..5c8fb25 100644 --- a/src/main/java/io/twentysixty/sa/client/jms/ConsumerInterface.java +++ b/src/main/java/io/twentysixty/sa/client/jms/ConsumerInterface.java @@ -3,14 +3,15 @@ import io.twentysixty.sa.client.model.message.BaseMessage; -public interface ConsumerInterface { +public interface ConsumerInterface { public void setExDelay(Long exDelay); public void setQueueName(String queueName); public void setThreads(Integer threads); public void setDebug(boolean debug); + public String getMessageSelector(); + public void receiveMessage(M message) throws Exception; - public void receiveMessage(BaseMessage message) throws Exception; }