diff --git a/client/test/AllTests.java b/client/test/AllTests.java index 0a5247a..83c5481 100644 --- a/client/test/AllTests.java +++ b/client/test/AllTests.java @@ -5,7 +5,7 @@ @RunWith(Suite.class) @SuiteClasses({ ClientCallTest.class, ClientConnectionTest.class, ClientLoadTest.class, ClientRequestTest.class, ClientSpatialTest.class, ClientVariableTest.class, ClientDataTest.class, ClientPasswordTest.class, - ClientSubscriptionTest.class }) + ClientSubscriptionTest.class, ClientMessageRetainDelayTest.class }) public class AllTests { } diff --git a/client/test/ClientMessageRetainDelayTest.java b/client/test/ClientMessageRetainDelayTest.java new file mode 100644 index 0000000..fe052b5 --- /dev/null +++ b/client/test/ClientMessageRetainDelayTest.java @@ -0,0 +1,134 @@ +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.junit.Test; + +import nl.tue.id.oocsi.client.OOCSIClient; +import nl.tue.id.oocsi.client.protocol.DataHandler; +import nl.tue.id.oocsi.client.protocol.OOCSIMessage; + +public class ClientMessageRetainDelayTest { + + @Test + public void testRetainedMessage() throws InterruptedException { + final List list = new ArrayList(); + + OOCSIClient o1a = new OOCSIClient("test_message_retain1a"); + o1a.connect("localhost", 4444); + assertTrue(o1a.isConnected()); + + OOCSIClient o1b = new OOCSIClient("test_message_retain1b"); + o1b.connect("localhost", 4444); + assertTrue(o1b.isConnected()); + o1b.subscribe("channel_retain", new DataHandler() { + public void receive(String sender, Map data, long timestamp) { + // do nothing + } + }); + + // send retained message + new OOCSIMessage(o1a, "channel_retain").data("_RETAIN", 10).data("test", true).send(); + + // disconnect to ensure that channel would normally be closed + o1a.disconnect(); + o1b.disconnect(); + + // connect with new client + OOCSIClient o2 = new OOCSIClient("test_message_retain2"); + o2.connect("localhost", 4444); + assertTrue(o2.isConnected()); + + // subscribe and wait + o2.subscribe("channel_retain", new DataHandler() { + public void receive(String sender, Map data, long timestamp) { + list.add(sender + data.toString()); + } + }); + Thread.sleep(200); + + // check results + assertEquals(1, list.size()); + assertTrue(list.get(0).contains("true")); + } + + @Test + public void testRetainedMessageTimeout() throws InterruptedException { + final List list = new ArrayList(); + + OOCSIClient o1a = new OOCSIClient("test_message_retain3a"); + o1a.connect("localhost", 4444); + assertTrue(o1a.isConnected()); + + OOCSIClient o1b = new OOCSIClient("test_message_retain3b"); + o1b.connect("localhost", 4444); + assertTrue(o1b.isConnected()); + o1b.subscribe("channel_retain", new DataHandler() { + public void receive(String sender, Map data, long timestamp) { + // do nothing + } + }); + + // send retained message + new OOCSIMessage(o1a, "channel_retain").data("_RETAIN", 1).data("test", true).send(); + + // disconnect to ensure that channel would normally be closed + o1a.disconnect(); + o1b.disconnect(); + Thread.sleep(1100); + + // connect with new client + OOCSIClient o2 = new OOCSIClient("test_message_retain4"); + o2.connect("localhost", 4444); + assertTrue(o2.isConnected()); + + // subscribe and wait + o2.subscribe("channel_retain", new DataHandler() { + public void receive(String sender, Map data, long timestamp) { + list.add(sender + data.toString()); + } + }); + Thread.sleep(200); + + // check that no retained message was received + assertEquals(0, list.size()); + } + + @Test + public void testDelayedMessage() throws InterruptedException { + final List list = new ArrayList(); + + OOCSIClient o1 = new OOCSIClient("test_message_delay1"); + o1.connect("localhost", 4444); + assertTrue(o1.isConnected()); + + // send retained message + new OOCSIMessage(o1, "channel_delay").data("_DELAY", 1).data("test", true).send(); + o1.disconnect(); + + OOCSIClient o2 = new OOCSIClient("test_message_delay2"); + o2.connect("localhost", 4444); + assertTrue(o2.isConnected()); + o2.subscribe("channel_delay", new DataHandler() { + public void receive(String sender, Map data, long timestamp) { + list.add(sender + data.toString()); + } + }); + + // wait briefly + Thread.sleep(100); + + // check that no retained message was received + assertEquals(0, list.size()); + + // wait for at least 1sec, better a little longer because the status task might need more time to run through + Thread.sleep(2000); + + // check that no retained message was received + assertEquals(1, list.size()); + } + +} diff --git a/client/test/ClientSubscriptionTest.java b/client/test/ClientSubscriptionTest.java index 3e3c30c..031b081 100644 --- a/client/test/ClientSubscriptionTest.java +++ b/client/test/ClientSubscriptionTest.java @@ -524,7 +524,7 @@ public void testAggregatedTransformMinMax() throws InterruptedException { assertTrue(o1.isConnected()); // o1.subscribe("channel_filter", new DataHandler() { - o1.subscribe("channel_transform[transform(maxval,max(size,5));transform(minval,min(pos,3))]", + o1.subscribe("channel_transform[transform(maxval,EMAX(size,5));transform(minval,EMIN(pos,3))]", new DataHandler() { public void receive(String sender, Map data, long timestamp) { list.add(sender + data.toString()); @@ -541,11 +541,17 @@ public void receive(String sender, Map data, long timestamp) { assertEquals(0, list.size()); new OOCSIMessage(o2, "channel_transform").data("size", 20).data("pos", 2).send(); + Thread.sleep(10); new OOCSIMessage(o2, "channel_transform").data("size", 10).data("pos", 4).send(); + Thread.sleep(10); new OOCSIMessage(o2, "channel_transform").data("size", 10).data("pos", 4).send(); + Thread.sleep(10); new OOCSIMessage(o2, "channel_transform").data("size", 10).data("pos", 4).send(); + Thread.sleep(10); new OOCSIMessage(o2, "channel_transform").data("size", 10).data("pos", 2).send(); + Thread.sleep(10); new OOCSIMessage(o2, "channel_transform").data("size", 10).data("pos", 4).send(); + Thread.sleep(10); new OOCSIMessage(o2, "channel_transform").data("size", 10).data("pos", 4).send(); Thread.sleep(200); @@ -556,8 +562,14 @@ public void receive(String sender, Map data, long timestamp) { assertTrue(list.get(0).contains("minval")); assertTrue(list.get(0).contains("=2.0")); + assertTrue(list.get(2).contains("minval")); + assertTrue(list.get(2).toString(), list.get(2).contains("=2.0")); + assertTrue(list.get(3).contains("minval")); - assertTrue(list.get(3).contains("=4.0")); + assertTrue(list.get(3).toString(), list.get(3).contains("=4.0")); + + assertTrue(list.get(4).contains("maxval")); + assertTrue(list.get(4).contains("=20.0")); assertTrue(list.get(5).contains("maxval")); assertTrue(list.get(5).contains("=10.0")); diff --git a/server/dist/OOCSI_server.jar b/server/dist/OOCSI_server.jar index 65d3633..6732215 100644 Binary files a/server/dist/OOCSI_server.jar and b/server/dist/OOCSI_server.jar differ diff --git a/server/src/nl/tue/id/oocsi/server/OOCSIServer.java b/server/src/nl/tue/id/oocsi/server/OOCSIServer.java index 25f88fb..3712dd5 100644 --- a/server/src/nl/tue/id/oocsi/server/OOCSIServer.java +++ b/server/src/nl/tue/id/oocsi/server/OOCSIServer.java @@ -29,7 +29,7 @@ public class OOCSIServer extends Server { // constants - public static final String VERSION = "1.24"; + public static final String VERSION = "1.25"; // defaults for different services private int maxClients = 100; @@ -422,6 +422,23 @@ public void statusTask() { closeStaleClients(); closeEmptyChannels(); + // dispatch delayed messages + synchronized (delayedMessages) { + final Date now = new Date(); + delayedMessages.values().removeIf(message -> { + if (message.timestamp.before(now)) { + Channel c = getChannel(message.recipient); + if (c != null && c.validate(message.recipient)) { + c.send(message); + } + + // remove message from map + return true; + } + return false; + }); + } + long afterCleans = System.currentTimeMillis(); // keep-alive ping-pong with socket clients diff --git a/server/src/nl/tue/id/oocsi/server/model/Channel.java b/server/src/nl/tue/id/oocsi/server/model/Channel.java index 4e36b36..f1ab0a0 100644 --- a/server/src/nl/tue/id/oocsi/server/model/Channel.java +++ b/server/src/nl/tue/id/oocsi/server/model/Channel.java @@ -16,8 +16,6 @@ */ public class Channel { - private static final String RETAIN_MESSAGE = "_RETAIN"; - protected final Date created = new Date(); protected final ChangeListener presence; protected final Map subChannels = new ConcurrentHashMap(); @@ -107,8 +105,8 @@ public void send(Message message) { retainedMessage = null; // check for retained message flag and store message - if (message.data.containsKey(RETAIN_MESSAGE)) { - Object retainTimeoutRaw = message.data.getOrDefault(RETAIN_MESSAGE, "0"); + if (message.data.containsKey(Message.RETAIN_MESSAGE)) { + Object retainTimeoutRaw = message.data.getOrDefault(Message.RETAIN_MESSAGE, "0"); try { // retrieve timeout and restrict timeout to 2 days long timeout = Long.parseLong(retainTimeoutRaw.toString()); diff --git a/server/src/nl/tue/id/oocsi/server/model/Server.java b/server/src/nl/tue/id/oocsi/server/model/Server.java index f692b2d..91a8634 100644 --- a/server/src/nl/tue/id/oocsi/server/model/Server.java +++ b/server/src/nl/tue/id/oocsi/server/model/Server.java @@ -2,6 +2,7 @@ import java.util.Collection; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -26,6 +27,7 @@ public class Server extends Channel { protected final Map clients = new ConcurrentHashMap(); protected final Protocol protocol; protected final PresenceTracker presence; + protected final Map delayedMessages; /** * create new server data structure @@ -38,6 +40,9 @@ public Server(PresenceTracker presence) { // start protocol controller protocol = new Protocol(this); + + // create map for delayed messages + delayedMessages = new HashMap<>(); } @Override @@ -45,6 +50,18 @@ public void send(Message message) { // disable direct send } + /** + * dispatch a delayed message; will replace earlier delayed and not yet delivered messages for this client + * + * @param sender + * @param message + */ + public void sendDelayedMessage(String sender, Message message) { + synchronized (delayedMessages) { + delayedMessages.put(sender, message); + } + } + /** * retrieve client from client list * @@ -177,8 +194,8 @@ protected void closeStaleClients() { long now = System.currentTimeMillis(); for (Client client : clients.values()) { if (client.lastAction() + 120000 < now || !client.isConnected()) { - OOCSIServer.log("Client " + client.getName() - + " has not responded for 120 secs and will be disconnected"); + OOCSIServer + .log("Client " + client.getName() + " has not responded for 120 secs and will be disconnected"); // remove from presence tracking if tracking presence.timeout(client.getName(), client.getName()); diff --git a/server/src/nl/tue/id/oocsi/server/protocol/Message.java b/server/src/nl/tue/id/oocsi/server/protocol/Message.java index baab285..aba9230 100644 --- a/server/src/nl/tue/id/oocsi/server/protocol/Message.java +++ b/server/src/nl/tue/id/oocsi/server/protocol/Message.java @@ -17,6 +17,14 @@ public class Message implements Serializable { * id to be able to serialize across client and server */ private static final long serialVersionUID = 7907711514783823619L; + /** + * message attribute key to flag a message that should be retained on a channel (for n seconds) + */ + public static final String RETAIN_MESSAGE = "_RETAIN"; + /** + * message attribute key to flag a message that should be delivered with a delay (in at least n seconds) + */ + public static final String DELAY_MESSAGE = "_DELAY"; /** * id of sender (individual client) diff --git a/server/src/nl/tue/id/oocsi/server/protocol/Protocol.java b/server/src/nl/tue/id/oocsi/server/protocol/Protocol.java index d86e6e7..a7f8474 100644 --- a/server/src/nl/tue/id/oocsi/server/protocol/Protocol.java +++ b/server/src/nl/tue/id/oocsi/server/protocol/Protocol.java @@ -27,7 +27,7 @@ */ public class Protocol { - Server server; + private final Server server; /** * create new protocol @@ -89,15 +89,7 @@ else if (inputLine.startsWith("sendraw")) { String message = tokens[2]; Channel c = server.getChannel(recipient); - Map map = parseJSONMessage(message); - if (c != null && c.accept(recipient)) { - c.send(new Message(sender.getName(), recipient, new Date(), map)); - } else { - // log if not private message - if (!Channel.isPrivate(recipient)) { - OOCSIServer.logEvent(sender.getName(), recipient, "?", map, new Date()); - } - } + dispatchMessage(sender, recipient, c, parseJSONMessage(message)); } } // create new message from Java serialized input @@ -119,26 +111,11 @@ else if (inputLine.startsWith("send")) { // no function message map = new HashMap<>(); - if (c != null && c.accept(recipient)) { - c.send(new Message(sender.getName(), recipient, new Date(), map)); - } else { - // log if not private message - if (!Channel.isPrivate(recipient)) { - OOCSIServer.logEvent(sender.getName(), recipient, "?", map, new Date()); - } - } } // only send if there is useful data if (map != null) { - if (c != null && c.accept(recipient)) { - c.send(new Message(sender.getName(), recipient, new Date(), map)); - } else { - // log if not private message - if (!Channel.isPrivate(recipient)) { - OOCSIServer.logEvent(sender.getName(), recipient, "?", map, new Date()); - } - } + dispatchMessage(sender, recipient, c, map); } } } @@ -161,6 +138,58 @@ else if (inputLine.startsWith(".")) { return ""; } + /** + * dispatch a message in the system; check for delayed messages and log messages that are sent to empty channels + * + * @param sender + * @param recipient + * @param c outgoing channel + * @param map message data as map + */ + private void dispatchMessage(Client sender, String recipient, Channel c, Map map) { + // check for delayed message by requesting the _DELAY attribute that provides the requested delay in seconds + if (map.containsKey(Message.DELAY_MESSAGE)) { + long delayTime = 0; + // check for valid delay time + Object delay = map.get(Message.DELAY_MESSAGE); + // extract delay time from Number + if (delay instanceof Number) { + Number delayMS = (Number) delay; + delayTime = delayMS.longValue(); + } + // extract delay time from String + else if (delay instanceof String) { + String delayMS = (String) delay; + try { + delayTime = Long.parseLong(delayMS); + } catch (Exception e) { + delayTime = 0; + } + } + + // send with specified delay in seconds + if (delayTime > 0) { + server.sendDelayedMessage(recipient, new Message(sender.getName(), recipient, + new Date(System.currentTimeMillis() + delayTime * 1000), map)); + } + // normal dispatch for broken _DELAY + else if (c != null && c.accept(recipient)) { + c.send(new Message(sender.getName(), recipient, new Date(), map)); + } + } + // no delay --> normal dispatch + else if (c != null && c.accept(recipient)) { + c.send(new Message(sender.getName(), recipient, new Date(), map)); + } + // only log if no messages will go out to the channel + else { + // log if not private message + if (!Channel.isPrivate(recipient)) { + OOCSIServer.logEvent(sender.getName(), recipient, "-", map, new Date()); + } + } + } + /** * parse a JSON message --> convert JsonNode (ObjectNode) to Map that can later be serialized as * Json again @@ -201,7 +230,7 @@ public static Map parseJSONMessage(String message) { } } catch (JsonMappingException e) { } catch (JsonProcessingException e) { - } + } return map; }