Skip to content

Commit

Permalink
Handle array of messages in subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
sankarcheppali committed Aug 26, 2023
1 parent 7af02d5 commit d71e016
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
13 changes: 10 additions & 3 deletions src/main/java/redis/clients/jedis/JedisPubSubBase.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package redis.clients.jedis;

import static java.util.stream.Collectors.*;
import static redis.clients.jedis.Protocol.ResponseKeyword.*;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.exceptions.JedisException;
Expand Down Expand Up @@ -133,10 +135,15 @@ private void process() {
onUnsubscribe(enchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.getRaw(), resp)) {
final byte[] bchannel = (byte[]) listReply.get(1);
final byte[] bmesg = (byte[]) listReply.get(2);
final T enchannel = (bchannel == null) ? null : encode(bchannel);
final T enmesg = (bmesg == null) ? null : encode(bmesg);
onMessage(enchannel, enmesg);
final Object bmesg = listReply.get(2);
if (bmesg instanceof List) {
final List<T> enmesgs = ((List<byte[]>) bmesg).stream().map(this::encode).collect(toList());
enmesgs.forEach(msg -> onMessage(enchannel, msg));
} else {
final T enmesg = (bmesg == null) ? null : encode((byte[]) bmesg);
onMessage(enchannel, enmesg);
}
} else if (Arrays.equals(PMESSAGE.getRaw(), resp)) {
final byte[] bpattern = (byte[]) listReply.get(1);
final byte[] bchannel = (byte[]) listReply.get(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import redis.clients.jedis.BinaryJedisPubSub;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.SafeEncoder;

Expand All @@ -39,6 +40,20 @@ public void run() {
t.start();
}

private void setKeyValue(final String key, final String value) {
Thread t = new Thread(new Runnable() {
public void run() {
try {
Jedis j = createJedis();
j.set(key, value);
j.disconnect();
} catch (Exception ex) {
}
}
});
t.start();
}

@Test
public void subscribe() throws InterruptedException {
jedis.subscribe(new JedisPubSub() {
Expand Down Expand Up @@ -457,6 +472,28 @@ public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {
jedis.subscribe(pubsub, SafeEncoder.encode("foo"));
}

@Test
public void binarySubscribeToKeySpaceInvalidateChannel() {
final String keySpaceChannel = "__redis__:invalidate";
final String keyPrefix = "barPrefix:";
final String key = keyPrefix+"bar";
final BinaryJedisPubSub pubsub = new BinaryJedisPubSub() {
public void onMessage(byte[] channel, byte[] message) {
assertArrayEquals(SafeEncoder.encode(keySpaceChannel), channel);
assertArrayEquals(SafeEncoder.encode(key), message);
unsubscribe(channel);
}

public void onSubscribe(byte[] channel, int subscribedChannels) {
setKeyValue(key,"foo");
}
};
// get client id
String clientId = String.valueOf(jedis.clientId());
jedis.sendCommand(Protocol.Command.CLIENT,"TRACKING","ON","REDIRECT",clientId,"BCAST","PREFIX",keyPrefix);
jedis.subscribe(pubsub, SafeEncoder.encode(keySpaceChannel));
}

@Test(expected = JedisException.class)
public void unsubscribeWhenNotSusbscribed() throws InterruptedException {
JedisPubSub pubsub = new JedisPubSub() {
Expand Down

0 comments on commit d71e016

Please sign in to comment.