Skip to content

Commit

Permalink
add multi-threading capability to DiceDB (DiceDB#709)
Browse files Browse the repository at this point in the history
Signed-off-by: soumya-codes <[email protected]>
  • Loading branch information
soumya-codes authored Sep 26, 2024
1 parent 55160a2 commit 7b36268
Show file tree
Hide file tree
Showing 35 changed files with 2,275 additions and 284 deletions.
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ var baseConfig = Config{
MaxMemory: 0,
EvictionPolicy: EvictAllKeysLFU,
EvictionRatio: 0.40,
KeysLimit: 10000,
KeysLimit: 20000000,
AOFFile: "./dice-master.aof",
PersistenceEnabled: true,
WriteAOFOnCleanup: true,
WriteAOFOnCleanup: false,
LFULogFactor: 10,
LogLevel: "info",
PrettyPrintLogs: false,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require gotest.tools/v3 v3.5.1

require (
github.com/bytedance/sonic/loader v0.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
Expand Down Expand Up @@ -41,6 +40,7 @@ require (
require (
github.com/axiomhq/hyperloglog v0.2.0
github.com/bytedance/sonic v1.12.1
github.com/cespare/xxhash/v2 v2.2.0
github.com/cockroachdb/swiss v0.0.0-20240612210725-f4de07ae6964
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13
github.com/dicedb/go-dice v0.0.0-20240820180649-d97f15fca831
Expand Down
13 changes: 6 additions & 7 deletions integration_tests/commands/async/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ import (
"sync"
"time"

"github.com/dicedb/dice/internal/shard"

"github.com/dicedb/dice/config"
"github.com/dicedb/dice/internal/clientio"

derrors "github.com/dicedb/dice/internal/errors"
"github.com/dicedb/dice/internal/server"

"github.com/dicedb/dice/config"
"github.com/dicedb/dice/internal/shard"
dstore "github.com/dicedb/dice/internal/store"
"github.com/dicedb/dice/testutils"
redis "github.com/dicedb/go-dice"
Expand Down Expand Up @@ -121,7 +119,8 @@ func RunTestServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption
const totalRetries = 100
var err error
watchChan := make(chan dstore.WatchEvent, config.DiceConfig.Server.KeysLimit)
shardManager := shard.NewShardManager(1, watchChan, opt.Logger)
gec := make(chan error)
shardManager := shard.NewShardManager(1, watchChan, gec, opt.Logger)
// Initialize the AsyncServer
testServer := server.NewAsyncServer(shardManager, watchChan, opt.Logger)

Expand Down Expand Up @@ -167,7 +166,7 @@ func RunTestServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption
go func() {
defer wg.Done()
if err := testServer.Run(ctx); err != nil {
if errors.Is(err, server.ErrAborted) {
if errors.Is(err, derrors.ErrAborted) {
cancelShardManager()
return
}
Expand Down
9 changes: 5 additions & 4 deletions integration_tests/commands/http/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"sync"
"time"

"github.com/dicedb/dice/internal/querywatcher"

"github.com/dicedb/dice/config"
derrors "github.com/dicedb/dice/internal/errors"
"github.com/dicedb/dice/internal/querywatcher"
"github.com/dicedb/dice/internal/server"
"github.com/dicedb/dice/internal/shard"
dstore "github.com/dicedb/dice/internal/store"
Expand Down Expand Up @@ -90,8 +90,9 @@ func RunHTTPServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption
config.DiceConfig.Network.IOBufferLength = 16
config.DiceConfig.Server.WriteAOFOnCleanup = false

globalErrChannel := make(chan error)
watchChan := make(chan dstore.WatchEvent, config.DiceConfig.Server.KeysLimit)
shardManager := shard.NewShardManager(1, watchChan, opt.Logger)
shardManager := shard.NewShardManager(1, watchChan, globalErrChannel, opt.Logger)
queryWatcherLocal := querywatcher.NewQueryManager(opt.Logger)
config.HTTPPort = opt.Port
// Initialize the HTTPServer
Expand All @@ -118,7 +119,7 @@ func RunHTTPServer(ctx context.Context, wg *sync.WaitGroup, opt TestServerOption
err := testServer.Run(ctx)
if err != nil {
cancelShardManager()
if errors.Is(err, server.ErrAborted) {
if errors.Is(err, derrors.ErrAborted) {
return
}
if err.Error() != "http: Server closed" {
Expand Down
3 changes: 1 addition & 2 deletions internal/auth/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (session *Session) Validate(username, password string) error {
return fmt.Errorf("WRONGPASS invalid username-password pair or user is disabled")
}

func (session *Session) Expire() (err error) {
func (session *Session) Expire() {
session.Status = SessionStatusExpired
return
}
5 changes: 1 addition & 4 deletions internal/auth/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,7 @@ func TestSessionValidate(t *testing.T) {

func TestSessionExpire(t *testing.T) {
session := NewSession()
err := session.Expire()
if err != nil {
t.Errorf("Session.Expire() returned an error: %v", err)
}
session.Expire()
if session.Status != SessionStatusExpired {
t.Errorf("Session.Expire() did not set status to Expired. Got %v, want %v", session.Status, SessionStatusExpired)
}
Expand Down
11 changes: 11 additions & 0 deletions internal/clientio/iohandler/iohandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package iohandler

import (
"context"
)

type IOHandler interface {
Read(ctx context.Context) ([]byte, error)
Write(ctx context.Context, response []byte) error
Close() error
}
185 changes: 185 additions & 0 deletions internal/clientio/iohandler/netconn/netconn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package netconn

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"log/slog"
"net"
"os"
"syscall"
"time"

"github.com/dicedb/dice/internal/clientio/iohandler"
)

const (
maxRequestSize = 512 * 1024 // 512 KB
readBufferSize = 4 * 1024 // 4 KB
idleTimeout = 10 * time.Minute
)

var (
ErrRequestTooLarge = errors.New("request too large")
ErrIdleTimeout = errors.New("connection idle timeout")
ErrorClosed = errors.New("connection closed")
)

// IOHandler handles I/O operations for a network connection
type IOHandler struct {
fd int
file *os.File
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
logger *slog.Logger
}

var _ iohandler.IOHandler = (*IOHandler)(nil)

// NewIOHandler creates a new IOHandler from a file descriptor
func NewIOHandler(clientFD int, logger *slog.Logger) (*IOHandler, error) {
file := os.NewFile(uintptr(clientFD), "client-connection")
if file == nil {
return nil, fmt.Errorf("failed to create file from file descriptor")
}

// Ensure the file is closed if we exit this function with an error
var conn net.Conn
defer func() {
if conn == nil {
// Only close the file if we haven't successfully created a net.Conn
err := file.Close()
if err != nil {
logger.Warn("Error closing file in NewIOHandler:", slog.Any("error", err))
}
}
}()

var err error
conn, err = net.FileConn(file)
if err != nil {
return nil, fmt.Errorf("failed to create net.Conn from file descriptor: %w", err)
}

return &IOHandler{
fd: clientFD,
file: file,
conn: conn,
reader: bufio.NewReader(conn),
writer: bufio.NewWriter(conn),
logger: logger,
}, nil
}

func NewIOHandlerWithConn(conn net.Conn) *IOHandler {
return &IOHandler{
conn: conn,
reader: bufio.NewReader(conn),
writer: bufio.NewWriter(conn),
}
}

func (h *IOHandler) FileDescriptor() int {
return h.fd
}

// ReadRequest reads data from the network connection
func (h *IOHandler) Read(ctx context.Context) ([]byte, error) {
var data []byte
buf := make([]byte, readBufferSize)

for {
select {
case <-ctx.Done():
return data, ctx.Err()
default:
err := h.conn.SetReadDeadline(time.Now().Add(idleTimeout))
if err != nil {
return nil, fmt.Errorf("error setting read deadline: %w", err)
}

n, err := h.reader.Read(buf)
if n > 0 {
data = append(data, buf[:n]...)
}
if err != nil {
switch {
case errors.Is(err, syscall.EAGAIN), errors.Is(err, syscall.EWOULDBLOCK), errors.Is(err, io.EOF):
// No more data to read at this time
return data, nil
case errors.Is(err, net.ErrClosed), errors.Is(err, syscall.EPIPE), errors.Is(err, syscall.ECONNRESET):
h.logger.Error("Connection closed", slog.Any("error", err))
cerr := h.Close()
if cerr != nil {
h.logger.Warn("Error closing connection", slog.Any("error", errors.Join(err, cerr)))
}
return nil, ErrorClosed
case errors.Is(err, syscall.ETIMEDOUT):
h.logger.Info("Connection idle timeout", slog.Any("error", err))
cerr := h.Close()
if cerr != nil {
h.logger.Warn("Error closing connection", slog.Any("error", errors.Join(err, cerr)))
}
return nil, ErrIdleTimeout
default:
h.logger.Error("Error reading from connection", slog.Any("error", err))
return nil, fmt.Errorf("error reading request: %w", err)
}
}

if len(data) > maxRequestSize {
h.logger.Warn("Request too large", slog.Any("size", len(data)))
return nil, ErrRequestTooLarge
}

// If we've read less than the buffer size, we've likely got all the data
if n < len(buf) {
return data, nil
}
}
}
}

// WriteResponse writes the response back to the network connection
func (h *IOHandler) Write(ctx context.Context, response []byte) error {
errChan := make(chan error, 1)

go func(errChan chan error) {
_, err := h.writer.Write(response)
if err == nil {
err = h.writer.Flush()
}

errChan <- err
}(errChan)

select {
case <-ctx.Done():
return ctx.Err()
case err := <-errChan:
if err != nil {
if errors.Is(err, net.ErrClosed) || errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
cerr := h.Close()
if cerr != nil {
err = errors.Join(err, cerr)
}

h.logger.Error("Connection closed", slog.Any("error", err))
return err
}

return fmt.Errorf("error writing response: %w", err)
}
}

return nil
}

// Close underlying network connection
func (h *IOHandler) Close() error {
h.logger.Info("Closing connection")
return errors.Join(h.conn.Close(), h.file.Close())
}
Loading

0 comments on commit 7b36268

Please sign in to comment.