Skip to content

Commit

Permalink
v1.25
Browse files Browse the repository at this point in the history
 - added delayed message dispatch
 - combined / cleaned code and constants
 - added tests for retained and delayed messages
 - fixed test for transforming subscriptions
  • Loading branch information
matsfunk committed Jan 3, 2022
1 parent 2af1fc2 commit 8f811be
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 37 deletions.
2 changes: 1 addition & 1 deletion client/test/AllTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
@RunWith(Suite.class)
@SuiteClasses({ ClientCallTest.class, ClientConnectionTest.class, ClientLoadTest.class, ClientRequestTest.class,
ClientSpatialTest.class, ClientVariableTest.class, ClientDataTest.class, ClientPasswordTest.class,
ClientSubscriptionTest.class })
ClientSubscriptionTest.class, ClientMessageRetainDelayTest.class })
public class AllTests {

}
134 changes: 134 additions & 0 deletions client/test/ClientMessageRetainDelayTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.junit.Test;

import nl.tue.id.oocsi.client.OOCSIClient;
import nl.tue.id.oocsi.client.protocol.DataHandler;
import nl.tue.id.oocsi.client.protocol.OOCSIMessage;

public class ClientMessageRetainDelayTest {

@Test
public void testRetainedMessage() throws InterruptedException {
final List<String> list = new ArrayList<String>();

OOCSIClient o1a = new OOCSIClient("test_message_retain1a");
o1a.connect("localhost", 4444);
assertTrue(o1a.isConnected());

OOCSIClient o1b = new OOCSIClient("test_message_retain1b");
o1b.connect("localhost", 4444);
assertTrue(o1b.isConnected());
o1b.subscribe("channel_retain", new DataHandler() {
public void receive(String sender, Map<String, Object> data, long timestamp) {
// do nothing
}
});

// send retained message
new OOCSIMessage(o1a, "channel_retain").data("_RETAIN", 10).data("test", true).send();

// disconnect to ensure that channel would normally be closed
o1a.disconnect();
o1b.disconnect();

// connect with new client
OOCSIClient o2 = new OOCSIClient("test_message_retain2");
o2.connect("localhost", 4444);
assertTrue(o2.isConnected());

// subscribe and wait
o2.subscribe("channel_retain", new DataHandler() {
public void receive(String sender, Map<String, Object> data, long timestamp) {
list.add(sender + data.toString());
}
});
Thread.sleep(200);

// check results
assertEquals(1, list.size());
assertTrue(list.get(0).contains("true"));
}

@Test
public void testRetainedMessageTimeout() throws InterruptedException {
final List<String> list = new ArrayList<String>();

OOCSIClient o1a = new OOCSIClient("test_message_retain3a");
o1a.connect("localhost", 4444);
assertTrue(o1a.isConnected());

OOCSIClient o1b = new OOCSIClient("test_message_retain3b");
o1b.connect("localhost", 4444);
assertTrue(o1b.isConnected());
o1b.subscribe("channel_retain", new DataHandler() {
public void receive(String sender, Map<String, Object> data, long timestamp) {
// do nothing
}
});

// send retained message
new OOCSIMessage(o1a, "channel_retain").data("_RETAIN", 1).data("test", true).send();

// disconnect to ensure that channel would normally be closed
o1a.disconnect();
o1b.disconnect();
Thread.sleep(1100);

// connect with new client
OOCSIClient o2 = new OOCSIClient("test_message_retain4");
o2.connect("localhost", 4444);
assertTrue(o2.isConnected());

// subscribe and wait
o2.subscribe("channel_retain", new DataHandler() {
public void receive(String sender, Map<String, Object> data, long timestamp) {
list.add(sender + data.toString());
}
});
Thread.sleep(200);

// check that no retained message was received
assertEquals(0, list.size());
}

@Test
public void testDelayedMessage() throws InterruptedException {
final List<String> list = new ArrayList<String>();

OOCSIClient o1 = new OOCSIClient("test_message_delay1");
o1.connect("localhost", 4444);
assertTrue(o1.isConnected());

// send retained message
new OOCSIMessage(o1, "channel_delay").data("_DELAY", 1).data("test", true).send();
o1.disconnect();

OOCSIClient o2 = new OOCSIClient("test_message_delay2");
o2.connect("localhost", 4444);
assertTrue(o2.isConnected());
o2.subscribe("channel_delay", new DataHandler() {
public void receive(String sender, Map<String, Object> data, long timestamp) {
list.add(sender + data.toString());
}
});

// wait briefly
Thread.sleep(100);

// check that no retained message was received
assertEquals(0, list.size());

// wait for at least 1sec, better a little longer because the status task might need more time to run through
Thread.sleep(2000);

// check that no retained message was received
assertEquals(1, list.size());
}

}
16 changes: 14 additions & 2 deletions client/test/ClientSubscriptionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ public void testAggregatedTransformMinMax() throws InterruptedException {
assertTrue(o1.isConnected());

// o1.subscribe("channel_filter", new DataHandler() {
o1.subscribe("channel_transform[transform(maxval,max(size,5));transform(minval,min(pos,3))]",
o1.subscribe("channel_transform[transform(maxval,EMAX(size,5));transform(minval,EMIN(pos,3))]",
new DataHandler() {
public void receive(String sender, Map<String, Object> data, long timestamp) {
list.add(sender + data.toString());
Expand All @@ -541,11 +541,17 @@ public void receive(String sender, Map<String, Object> data, long timestamp) {
assertEquals(0, list.size());

new OOCSIMessage(o2, "channel_transform").data("size", 20).data("pos", 2).send();
Thread.sleep(10);
new OOCSIMessage(o2, "channel_transform").data("size", 10).data("pos", 4).send();
Thread.sleep(10);
new OOCSIMessage(o2, "channel_transform").data("size", 10).data("pos", 4).send();
Thread.sleep(10);
new OOCSIMessage(o2, "channel_transform").data("size", 10).data("pos", 4).send();
Thread.sleep(10);
new OOCSIMessage(o2, "channel_transform").data("size", 10).data("pos", 2).send();
Thread.sleep(10);
new OOCSIMessage(o2, "channel_transform").data("size", 10).data("pos", 4).send();
Thread.sleep(10);
new OOCSIMessage(o2, "channel_transform").data("size", 10).data("pos", 4).send();

Thread.sleep(200);
Expand All @@ -556,8 +562,14 @@ public void receive(String sender, Map<String, Object> data, long timestamp) {
assertTrue(list.get(0).contains("minval"));
assertTrue(list.get(0).contains("=2.0"));

assertTrue(list.get(2).contains("minval"));
assertTrue(list.get(2).toString(), list.get(2).contains("=2.0"));

assertTrue(list.get(3).contains("minval"));
assertTrue(list.get(3).contains("=4.0"));
assertTrue(list.get(3).toString(), list.get(3).contains("=4.0"));

assertTrue(list.get(4).contains("maxval"));
assertTrue(list.get(4).contains("=20.0"));

assertTrue(list.get(5).contains("maxval"));
assertTrue(list.get(5).contains("=10.0"));
Expand Down
Binary file modified server/dist/OOCSI_server.jar
Binary file not shown.
19 changes: 18 additions & 1 deletion server/src/nl/tue/id/oocsi/server/OOCSIServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class OOCSIServer extends Server {

// constants
public static final String VERSION = "1.24";
public static final String VERSION = "1.25";

// defaults for different services
private int maxClients = 100;
Expand Down Expand Up @@ -422,6 +422,23 @@ public void statusTask() {
closeStaleClients();
closeEmptyChannels();

// dispatch delayed messages
synchronized (delayedMessages) {
final Date now = new Date();
delayedMessages.values().removeIf(message -> {
if (message.timestamp.before(now)) {
Channel c = getChannel(message.recipient);
if (c != null && c.validate(message.recipient)) {
c.send(message);
}

// remove message from map
return true;
}
return false;
});
}

long afterCleans = System.currentTimeMillis();

// keep-alive ping-pong with socket clients
Expand Down
6 changes: 2 additions & 4 deletions server/src/nl/tue/id/oocsi/server/model/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
public class Channel {

private static final String RETAIN_MESSAGE = "_RETAIN";

protected final Date created = new Date();
protected final ChangeListener presence;
protected final Map<String, Channel> subChannels = new ConcurrentHashMap<String, Channel>();
Expand Down Expand Up @@ -107,8 +105,8 @@ public void send(Message message) {
retainedMessage = null;

// check for retained message flag and store message
if (message.data.containsKey(RETAIN_MESSAGE)) {
Object retainTimeoutRaw = message.data.getOrDefault(RETAIN_MESSAGE, "0");
if (message.data.containsKey(Message.RETAIN_MESSAGE)) {
Object retainTimeoutRaw = message.data.getOrDefault(Message.RETAIN_MESSAGE, "0");
try {
// retrieve timeout and restrict timeout to 2 days
long timeout = Long.parseLong(retainTimeoutRaw.toString());
Expand Down
21 changes: 19 additions & 2 deletions server/src/nl/tue/id/oocsi/server/model/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -26,6 +27,7 @@ public class Server extends Channel {
protected final Map<String, Client> clients = new ConcurrentHashMap<String, Client>();
protected final Protocol protocol;
protected final PresenceTracker presence;
protected final Map<String, Message> delayedMessages;

/**
* create new server data structure
Expand All @@ -38,13 +40,28 @@ public Server(PresenceTracker presence) {

// start protocol controller
protocol = new Protocol(this);

// create map for delayed messages
delayedMessages = new HashMap<>();
}

@Override
public void send(Message message) {
// disable direct send
}

/**
* dispatch a delayed message; will replace earlier delayed and not yet delivered messages for this client
*
* @param sender
* @param message
*/
public void sendDelayedMessage(String sender, Message message) {
synchronized (delayedMessages) {
delayedMessages.put(sender, message);
}
}

/**
* retrieve client from client list
*
Expand Down Expand Up @@ -177,8 +194,8 @@ protected void closeStaleClients() {
long now = System.currentTimeMillis();
for (Client client : clients.values()) {
if (client.lastAction() + 120000 < now || !client.isConnected()) {
OOCSIServer.log("Client " + client.getName()
+ " has not responded for 120 secs and will be disconnected");
OOCSIServer
.log("Client " + client.getName() + " has not responded for 120 secs and will be disconnected");

// remove from presence tracking if tracking
presence.timeout(client.getName(), client.getName());
Expand Down
8 changes: 8 additions & 0 deletions server/src/nl/tue/id/oocsi/server/protocol/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ public class Message implements Serializable {
* id to be able to serialize across client and server
*/
private static final long serialVersionUID = 7907711514783823619L;
/**
* message attribute key to flag a message that should be retained on a channel (for n seconds)
*/
public static final String RETAIN_MESSAGE = "_RETAIN";
/**
* message attribute key to flag a message that should be delivered with a delay (in at least n seconds)
*/
public static final String DELAY_MESSAGE = "_DELAY";

/**
* id of sender (individual client)
Expand Down
Loading

0 comments on commit 8f811be

Please sign in to comment.