Skip to content

Commit

Permalink
Merge pull request #47 from EchoVault/chore/echovault-test
Browse files Browse the repository at this point in the history
Chore/echovault test
  • Loading branch information
kelvinmwinuka authored Jun 9, 2024
2 parents aa99038 + 55b9bc6 commit cf874a3
Show file tree
Hide file tree
Showing 11 changed files with 1,470 additions and 730 deletions.
1,269 changes: 632 additions & 637 deletions coverage/coverage.out

Large diffs are not rendered by default.

397 changes: 397 additions & 0 deletions echovault/api_acl_test.go

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions echovault/echovault.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ func NewEchoVault(options ...func(echovault *EchoVault)) (*EchoVault, error) {
if echovault.config.EvictionPolicy != constants.NoEviction {
go func() {
ticker := time.NewTicker(echovault.config.EvictionInterval)
defer func() {
ticker.Stop()
}()
for {
select {
case <-ticker.C:
Expand Down Expand Up @@ -549,11 +552,9 @@ func (server *EchoVault) rewriteAOF() error {
if server.rewriteAOFInProgress.Load() {
return errors.New("aof rewrite in progress")
}
go func() {
if err := server.aofEngine.RewriteLog(); err != nil {
log.Println(err)
}
}()
if err := server.aofEngine.RewriteLog(); err != nil {
return err
}
return nil
}

Expand Down
153 changes: 96 additions & 57 deletions echovault/echovault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,61 +894,7 @@ func Test_Standalone(t *testing.T) {
wantLastSave int
}{
{
name: "1. Snapshot with TCP connection",
dataDir: path.Join(dataDir, "with_tcp_connection"),
values: map[string]string{
"key1": "value1",
"key2": "value2",
"key3": "value3",
"key4": "value4",
},
snapshotFunc: func(mockServer *EchoVault) error {
// Start the server's TCP listener
go func() {
mockServer.Start()
}()
conn, err := internal.GetConnection("localhost", int(mockServer.config.Port))
if err != nil {
return err
}
defer func() {
_ = conn.Close()
}()
client := resp.NewConn(conn)
if err = client.WriteArray([]resp.Value{resp.StringValue("SAVE")}); err != nil {
return err
}
res, _, err := client.ReadValue()
if err != nil {
return err
}
if !strings.EqualFold(res.String(), "ok") {
return fmt.Errorf("expected save response to be \"OK\", got \"%s\"", res.String())
}
return nil
},
lastSaveFunc: func(mockServer *EchoVault) (int, error) {
conn, err := internal.GetConnection("localhost", int(mockServer.config.Port))
if err != nil {
return 0, err
}
defer func() {
_ = conn.Close()
}()
client := resp.NewConn(conn)
if err = client.WriteArray([]resp.Value{resp.StringValue("LASTSAVE")}); err != nil {
return 0, err
}
res, _, err := client.ReadValue()
if err != nil {
return 0, err
}
return res.Integer(), nil
},
wantLastSave: int(clock.NewClock().Now().UnixMilli()),
},
{
name: "2. Snapshot in embedded instance",
name: "1. Snapshot in embedded instance",
dataDir: path.Join(dataDir, "embedded_instance"),
values: map[string]string{
"key5": "value5",
Expand Down Expand Up @@ -1047,8 +993,101 @@ func Test_Standalone(t *testing.T) {
}
})

t.Run("Test_AOF", func(t *testing.T) {
t.Run("Test_AOFRestore", func(t *testing.T) {
t.Parallel()
// TODO: Implemented AOF persistence and restore.

ticker := time.NewTicker(50 * time.Millisecond)

dataDir := path.Join(".", "testdata", "test_aof")
t.Cleanup(func() {
_ = os.RemoveAll(dataDir)
ticker.Stop()
})

// Prepare data for testing.
data := map[string]map[string]string{
"before-rewrite": {
"key1": "value1",
"key2": "value2",
"key3": "value3",
"key4": "value4",
},
"after-rewrite": {
"key3": "value3-updated",
"key4": "value4-updated",
"key5": "value5",
"key6": "value6",
},
"expected-values": {
"key1": "value1",
"key2": "value2",
"key3": "value3-updated",
"key4": "value4-updated",
"key5": "value5",
"key6": "value6",
},
}

conf := DefaultConfig()
conf.RestoreAOF = true
conf.DataDir = dataDir
conf.AOFSyncStrategy = "always"

mockServer, err := NewEchoVault(WithConfig(conf))
if err != nil {
t.Error(err)
return
}

// Perform write commands from "before-rewrite"
for key, value := range data["before-rewrite"] {
if _, _, err := mockServer.Set(key, value, SetOptions{}); err != nil {
t.Error(err)
return
}
}

// Yield
<-ticker.C

// Rewrite AOF
if _, err := mockServer.RewriteAOF(); err != nil {
t.Error(err)
return
}

// Perform write commands from "after-rewrite"
for key, value := range data["after-rewrite"] {
if _, _, err := mockServer.Set(key, value, SetOptions{}); err != nil {
t.Error(err)
return
}
}

// Yield
<-ticker.C

// Shutdown the EchoVault instance
mockServer.ShutDown()

// Start another instance of EchoVault
mockServer, err = NewEchoVault(WithConfig(conf))
if err != nil {
t.Error(err)
return
}

// Check if the servers contains the keys and values from "expected-values"
for key, value := range data["expected-values"] {
res, err := mockServer.Get(key)
if err != nil {
t.Error(err)
return
}
if res != value {
t.Errorf("expected value at key \"%s\" to be \"%s\", got \"%s\"", key, value, res)
return
}
}
})
}
2 changes: 1 addition & 1 deletion internal/aof/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (engine *Engine) RewriteLog() error {
engine.startRewriteFunc()
defer engine.finishRewriteFunc()

// Create AOF preamble
// Create AOF preamble.
if err := engine.preambleStore.CreatePreamble(); err != nil {
return fmt.Errorf("rewrite log -> create preamble error: %+v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/aof/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func marshalRespCommand(command []string) []byte {
return []byte(fmt.Sprintf(
"*%d\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n$%d\r\n%s", len(command),
"*%d\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n", len(command),
len(command[0]), command[0],
len(command[1]), command[1],
len(command[2]), command[2],
Expand Down Expand Up @@ -151,11 +151,11 @@ func Test_AOFEngine(t *testing.T) {
}

if len(wantRestoredState) != len(restoredState) {
t.Errorf("expected restored state to be lenght %d, got %d", len(wantRestoredState), len(restoredState))
t.Errorf("expected restored state to be length %d, got %d", len(wantRestoredState), len(restoredState))
for key, data := range restoredState {
want, ok := wantRestoredState[key]
if !ok {
t.Errorf("could not find key %s in expected state state", key)
t.Errorf("could not find key %s in expected state", key)
}
if want.Value != data.Value {
t.Errorf("expected value %v for key %s, got %v", want.Value, key, data.Value)
Expand Down
10 changes: 6 additions & 4 deletions internal/aof/log/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func NewAppendStore(options ...func(store *AppendStore)) (*AppendStore, error) {
// No need to start this goroutine if sync strategy is anything other than 'everysec'.
if strings.EqualFold(store.strategy, "everysec") {
go func() {
ticker := time.NewTicker(1 * time.Second)
defer func() {
ticker.Stop()
}()
for {
store.mut.Lock()
if err := store.Sync(); err != nil {
Expand All @@ -113,7 +117,7 @@ func NewAppendStore(options ...func(store *AppendStore)) (*AppendStore, error) {
break
}
store.mut.Unlock()
<-store.clock.After(1 * time.Second)
<-ticker.C
}
}()
}
Expand All @@ -130,9 +134,7 @@ func (store *AppendStore) Write(command []byte) error {
store.mut.Lock()
defer store.mut.Unlock()

// Add new line before writing to AOF file.
out := append(command, []byte("\r\n")...)
if _, err := store.rw.Write(out); err != nil {
if _, err := store.rw.Write(command); err != nil {
return err
}

Expand Down
5 changes: 1 addition & 4 deletions internal/aof/log/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func marshalRespCommand(command []string) []byte {
return []byte(fmt.Sprintf(
"*%d\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n$%d\r\n%s", len(command),
"*%d\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n", len(command),
len(command[0]), command[0],
len(command[1]), command[1],
len(command[2]), command[2],
Expand Down Expand Up @@ -138,9 +138,6 @@ func Test_AppendStore(t *testing.T) {
}()

ticker := time.NewTicker(200 * time.Millisecond)
defer func() {
ticker.Stop()
}()

select {
case <-done:
Expand Down
Loading

0 comments on commit cf874a3

Please sign in to comment.