Skip to content

Commit

Permalink
DiceDB#553: Add Support for QWATCH Command with HTTP (DiceDB#614)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucifercr07 authored Sep 25, 2024
1 parent 79a97ee commit 55160a2
Show file tree
Hide file tree
Showing 16 changed files with 572 additions and 157 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand Down Expand Up @@ -45,6 +44,7 @@ require (
github.com/cockroachdb/swiss v0.0.0-20240612210725-f4de07ae6964
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13
github.com/dicedb/go-dice v0.0.0-20240820180649-d97f15fca831
github.com/google/go-cmp v0.6.0
github.com/ohler55/ojg v1.24.0
github.com/pelletier/go-toml/v2 v2.2.3
github.com/rs/xid v1.6.0
Expand Down
8 changes: 6 additions & 2 deletions integration_tests/commands/http/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ func TestMain(m *testing.M) {
Port: 8083,
Logger: logger,
}
RunHTTPServer(context.Background(), &wg, opts)
ctx, cancel := context.WithCancel(context.Background())
RunHTTPServer(ctx, &wg, opts)

// Wait for the server to start
time.Sleep(2 * time.Second)
Expand All @@ -38,6 +39,9 @@ func TestMain(m *testing.M) {
Body: map[string]interface{}{},
})

cancel()
wg.Wait()
os.Exit(exitCode)
if exitCode != 0 {
os.Exit(exitCode)
}
}
140 changes: 140 additions & 0 deletions integration_tests/commands/http/qwatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package http

import (
"bytes"
"encoding/json"
"github.com/stretchr/testify/assert"
"io"
"net/http"
"sync"
"testing"
"time"
)

var qWatchQuery = "SELECT $key, $value WHERE $key LIKE \"match:100:*\" AND $value > 10 ORDER BY $value DESC LIMIT 3"

func TestQWatch(t *testing.T) {
exec := NewHTTPCommandExecutor()

testCases := []TestCase{
{
name: "QWATCH Register Bad Request",
commands: []HTTPCommand{
{Command: "QWATCH", Body: map[string]interface{}{}},
},
expected: []interface{}{
[]interface{}{},
},
errorExpected: true,
},
{
name: "QWATCH Register",
commands: []HTTPCommand{
{Command: "QWATCH", Body: map[string]interface{}{"query": qWatchQuery}},
},
expected: []interface{}{
[]interface{}{
"qwatch",
"SELECT $key, $value WHERE $key like 'match:100:*' and $value > 10 ORDER BY $value desc LIMIT 3",
// Empty array, as the initial result will be empty
[]interface{}{},
},
},
errorExpected: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
exec.FireCommand(HTTPCommand{
Command: "DEL",
Body: map[string]interface{}{"key": "match:100:user"},
})

for i, cmd := range tc.commands {
result, err := exec.FireCommand(cmd)
if tc.errorExpected {
assert.NotNil(t, err)
} else {
assert.Equal(t, tc.expected[i], result)
}
}
})
}
}

func TestQwatchWithSSE(t *testing.T) {
exec := NewHTTPCommandExecutor()
const key = "match:100:user:3"
const val = 15

qwatchResponseReceived := make(chan struct{}, 2)
var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()

resp, err := http.Post("http://localhost:8083/qwatch", "application/json",
bytes.NewBuffer([]byte(`{
"query": "SELECT $key, $value WHERE $key like 'match:100:*' and $value > 10 ORDER BY $value desc LIMIT 3"
}`)))
if err != nil {
t.Errorf("Failed to start QWATCH: %v", err)
return
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
t.Errorf("Failed to close Resp body: %v", err)
}
}(resp.Body)

decoder := json.NewDecoder(resp.Body)
expectedResponses := []interface{}{
[]interface{}{
"qwatch",
"SELECT $key, $value WHERE $key like 'match:100:*' and $value > 10 ORDER BY $value desc LIMIT 3",
[]interface{}{},
},
map[string]interface{}{
"cmd": "qwatch",
"query": "SELECT $key, $value WHERE $key like 'match:100:*' and $value > 10 ORDER BY $value desc LIMIT 3",
"data": []interface{}{[]interface{}{key, float64(val)}},
},
}

for responseCount := 0; responseCount < 2; responseCount++ {
var result interface{}
if err := decoder.Decode(&result); err != nil {
if err == io.EOF {
break
}
t.Errorf("Error reading SSE response: %v", err)
return
}

assert.Equal(t, result, expectedResponses[responseCount])
qwatchResponseReceived <- struct{}{}
}
}()

time.Sleep(2 * time.Second)

setCmd := HTTPCommand{
Command: "SET",
Body: map[string]interface{}{"key": key, "value": val},
}
result, _ := exec.FireCommand(setCmd)
assert.Equal(t, result, "OK")

for i := 0; i < 2; i++ {
select {
case <-qwatchResponseReceived:
case <-time.After(10 * time.Second):
t.Fatalf("Timed out waiting for SSE response")
}
}

wg.Wait()
}
14 changes: 7 additions & 7 deletions integration_tests/commands/http/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (
)

type TestCase struct {
name string
commands []HTTPCommand
expected []interface{}
name string
commands []HTTPCommand
expected []interface{}
errorExpected bool
}

func TestSet(t *testing.T) {
Expand Down Expand Up @@ -47,14 +48,13 @@ func TestSet(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// deleteTestKeys([]string{"k"}, store)
exec.FireCommand(HTTPCommand{
Command: "DEL",
Body: map[string]interface{}{"key": "k"},
})

for i, cmd := range tc.commands {
result := exec.FireCommand(cmd)
result, _ := exec.FireCommand(cmd)
assert.DeepEqual(t, tc.expected[i], result)
}
})
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestSetWithOptions(t *testing.T) {
exec.FireCommand(HTTPCommand{Command: "DEL", Body: map[string]interface{}{"key": "k1"}})
exec.FireCommand(HTTPCommand{Command: "DEL", Body: map[string]interface{}{"key": "k2"}})
for i, cmd := range tc.commands {
result := exec.FireCommand(cmd)
result, _ := exec.FireCommand(cmd)
assert.Equal(t, tc.expected[i], result)
}
})
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestSetWithExat(t *testing.T) {
})

for i, cmd := range tc.commands {
result := exec.FireCommand(cmd)
result, _ := exec.FireCommand(cmd)
if cmd.Command == "TTL" {
assert.Assert(t, result.(float64) <= tc.expected[i].(float64))
} else {
Expand Down
27 changes: 18 additions & 9 deletions integration_tests/commands/http/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"sync"
"time"

"github.com/dicedb/dice/internal/querywatcher"

"github.com/dicedb/dice/config"
"github.com/dicedb/dice/internal/server"
"github.com/dicedb/dice/internal/shard"
Expand Down Expand Up @@ -48,35 +50,36 @@ type HTTPCommand struct {
Body map[string]interface{}
}

func (e *HTTPCommandExecutor) FireCommand(cmd HTTPCommand) interface{} {
func (e *HTTPCommandExecutor) FireCommand(cmd HTTPCommand) (interface{}, error) {
command := strings.ToUpper(cmd.Command)
body, err := json.Marshal(cmd.Body)

// Handle error during JSON marshaling
if err != nil {
return fmt.Sprintf("ERR failed to marshal command body: %v", err)
return nil, err
}

ctx := context.Background()
req, err := http.NewRequestWithContext(ctx, "POST", e.baseURL+"/"+command, bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")

resp, err := e.httpClient.Do(req)

if err != nil {
return err.Error()
return nil, err
}
defer resp.Body.Close()

var result interface{}
err = json.NewDecoder(resp.Body).Decode(&result)
if err != nil {
return nil
return nil, err
}
return result

return result, nil
}

func (e *HTTPCommandExecutor) Name() string {
Expand All @@ -89,19 +92,25 @@ func RunHTTPServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption

watchChan := make(chan dstore.WatchEvent, config.DiceConfig.Server.KeysLimit)
shardManager := shard.NewShardManager(1, watchChan, opt.Logger)
queryWatcherLocal := querywatcher.NewQueryManager(opt.Logger)
config.HTTPPort = opt.Port
// Initialize the AsyncServer
testServer := server.NewHTTPServer(shardManager, watchChan, opt.Logger)
// Initialize the HTTPServer
testServer := server.NewHTTPServer(shardManager, opt.Logger)
// Inform the user that the server is starting
fmt.Println("Starting the test server on port", config.HTTPPort)

shardManagerCtx, cancelShardManager := context.WithCancel(ctx)
wg.Add(1)
go func() {
defer wg.Done()
shardManager.Run(shardManagerCtx)
}()

wg.Add(1)
go func() {
defer wg.Done()
queryWatcherLocal.Run(ctx, watchChan)
}()

// Start the server in a goroutine
wg.Add(1)
go func() {
Expand Down
29 changes: 23 additions & 6 deletions internal/comm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,27 @@ import (
"github.com/dicedb/dice/internal/cmd"
)

type QwatchResponse struct {
ClientIdentifierID uint32
Result interface{}
Error error
}

type Client struct {
io.ReadWriter
Fd int
Cqueue cmd.RedisCmds
IsTxn bool
Session *auth.Session
HTTPQwatchResponseChan chan QwatchResponse // Response channel to send back the operation response
Fd int
Cqueue cmd.RedisCmds
IsTxn bool
Session *auth.Session
ClientIdentifierID uint32
}

func (c Client) Write(b []byte) (int, error) {
func (c *Client) Write(b []byte) (int, error) {
return syscall.Write(c.Fd, b)
}

func (c Client) Read(b []byte) (int, error) {
func (c *Client) Read(b []byte) (int, error) {
return syscall.Read(c.Fd, b)
}

Expand All @@ -44,3 +52,12 @@ func NewClient(fd int) *Client {
Session: auth.NewSession(),
}
}

func NewHTTPQwatchClient(qwatchResponseChan chan QwatchResponse, clientIdentifierID uint32) *Client {
return &Client{
Cqueue: make(cmd.RedisCmds, 0),
Session: auth.NewSession(),
ClientIdentifierID: clientIdentifierID,
HTTPQwatchResponseChan: qwatchResponseChan,
}
}
17 changes: 9 additions & 8 deletions internal/eval/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ var (
}

pingCmdMeta = DiceCmdMeta{
Name: "PING",
Info: `PING returns with an encoded "PONG" If any message is added with the ping command,the message will be returned.`,
Arity: -1,
IsMigrated: true,
Name: "PING",
Info: `PING returns with an encoded "PONG" If any message is added with the ping command,the message will be returned.`,
Arity: -1,
// TODO: Move this to true once compatible with HTTP server
IsMigrated: false,
Eval: evalPING,
}

Expand Down Expand Up @@ -250,8 +251,8 @@ var (
Retrieves the keys of a JSON object stored at path specified.
Null reply: If the key doesn't exist or has expired.
Error reply: If the number of arguments is incorrect or the stored value is not a JSON type.`,
Eval: evalJSONOBJKEYS,
Arity: 2,
Eval: evalJSONOBJKEYS,
Arity: 2,
}
jsonarrpopCmdMeta = DiceCmdMeta{
Name: "JSON.ARRPOP",
Expand Down Expand Up @@ -601,8 +602,8 @@ var (
}
hsetnxCmdMeta = DiceCmdMeta{
Name: "HSETNX",
Info: `Sets field in the hash stored at key to value, only if field does not yet exist.
If key does not exist, a new key holding a hash is created. If field already exists,
Info: `Sets field in the hash stored at key to value, only if field does not yet exist.
If key does not exist, a new key holding a hash is created. If field already exists,
this operation has no effect.`,
Eval: evalHSETNX,
Arity: 4,
Expand Down
Loading

0 comments on commit 55160a2

Please sign in to comment.