Skip to content

Commit

Permalink
Use the PUSH type in PubSub output if the RESP3 was enabled (apache#2159
Browse files Browse the repository at this point in the history
)
  • Loading branch information
git-hulk authored Mar 13, 2024
1 parent 2465803 commit ca1d5e0
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/commands/cmd_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class CommandMPublish : public Commander {

void SubscribeCommandReply(const Connection *conn, std::string *output, const std::string &name,
const std::string &sub_name, int num) {
output->append(redis::MultiLen(3));
output->append(conn->HeaderOfPush(3));
output->append(redis::BulkString(name));
output->append(sub_name.empty() ? conn->NilString() : BulkString(sub_name));
output->append(redis::Integer(num));
Expand Down
3 changes: 3 additions & 0 deletions src/server/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ class Connection : public EvbufCallbackBase<Connection> {
std::string HeaderOfAttribute(T len) const {
return "|" + std::to_string(len) + CRLF;
}
std::string HeaderOfPush(int64_t len) const {
return protocol_version_ == RESP::v3 ? ">" + std::to_string(len) + CRLF : MultiLen(len);
}

using UnsubscribeCallback = std::function<void(std::string, int)>;
void SubscribeChannel(const std::string &channel);
Expand Down
52 changes: 27 additions & 25 deletions tests/gocase/unit/protocol/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,21 @@ func TestProtocolRESP2(t *testing.T) {
})
}

func handshakeWithRESP3(t *testing.T, c *util.TCPClient) {
require.NoError(t, c.WriteArgs("HELLO", "3"))
values := []string{"%6",
"$6", "server", "$5", "redis",
"$7", "version", "$5", "4.0.0",
"$5", "proto", ":3",
"$4", "mode", "$10", "standalone",
"$4", "role", "$6", "master",
"$7", "modules", "_",
}
for _, line := range values {
c.MustRead(t, line)
}
}

func TestProtocolRESP3(t *testing.T) {
srv := util.StartServer(t, map[string]string{
"resp3-enabled": "yes",
Expand All @@ -236,20 +251,9 @@ func TestProtocolRESP3(t *testing.T) {
require.NoError(t, c.Close())
require.NoError(t, rdb.Close())
}()
handshakeWithRESP3(t, c)

t.Run("debug protocol string", func(t *testing.T) {
require.NoError(t, c.WriteArgs("HELLO", "3"))
values := []string{"%6",
"$6", "server", "$5", "redis",
"$7", "version", "$5", "4.0.0",
"$5", "proto", ":3",
"$4", "mode", "$10", "standalone",
"$4", "role", "$6", "master",
"$7", "modules", "_",
}
for _, line := range values {
c.MustRead(t, line)
}

types := map[string][]string{
"string": {"$11", "Hello World"},
Expand Down Expand Up @@ -285,6 +289,17 @@ func TestProtocolRESP3(t *testing.T) {
c.MustRead(t, "_")
})

t.Run("should return PUSH type", func(t *testing.T) {
// use a new client to avoid affecting other tests
require.NoError(t, c.WriteArgs("SUBSCRIBE", "test-channel"))
c.MustRead(t, ">3")
c.MustRead(t, "$9")
c.MustRead(t, "subscribe")
c.MustRead(t, "$12")
c.MustRead(t, "test-channel")
c.MustRead(t, ":1")
})

t.Run("null array", func(t *testing.T) {
require.NoError(t, c.WriteArgs("ZRANK", "no-exists-zset", "m0", "WITHSCORE"))
c.MustRead(t, "_")
Expand All @@ -295,19 +310,6 @@ func TestProtocolRESP3(t *testing.T) {
Members: []redis.Z{{1, "one"}, {2, "two"}, {3, "three"}},
})

require.NoError(t, c.WriteArgs("HELLO", "3"))
values := []string{"%6",
"$6", "server", "$5", "redis",
"$7", "version", "$5", "4.0.0",
"$5", "proto", ":3",
"$4", "mode", "$10", "standalone",
"$4", "role", "$6", "master",
"$7", "modules", "_",
}
for _, line := range values {
c.MustRead(t, line)
}

// should return an array of strings if without score
require.NoError(t, c.WriteArgs("ZRANGE", "zset", "0", "-1"))
c.MustRead(t, "*3")
Expand Down
14 changes: 12 additions & 2 deletions tests/gocase/unit/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,18 @@ func receiveType[T any](t *testing.T, pubsub *redis.PubSub, typ T) T {
return msg.(T)
}

func TestPubSub(t *testing.T) {
srv := util.StartServer(t, map[string]string{})
func TestPubSubWithRESP2(t *testing.T) {
testPubSub(t, "no")
}

func TestPubSubWithRESP3(t *testing.T) {
testPubSub(t, "yes")
}

func testPubSub(t *testing.T, enabledRESP3 string) {
srv := util.StartServer(t, map[string]string{
"resp3-enabled": enabledRESP3,
})
defer srv.Close()

ctx := context.Background()
Expand Down

0 comments on commit ca1d5e0

Please sign in to comment.