Skip to content

Commit

Permalink
rewriteAOF method in echovault.go now handles the rewrite synchronous…
Browse files Browse the repository at this point in the history
…ly. Removed newline character in Write method of append store. Added test case for REWRITEAOF command and restore from AOF.
  • Loading branch information
kelvinmwinuka committed Jun 8, 2024
1 parent 7661ab1 commit cb99ff8
Show file tree
Hide file tree
Showing 8 changed files with 1,858 additions and 1,601 deletions.
3,169 changes: 1,585 additions & 1,584 deletions coverage/coverage.out

Large diffs are not rendered by default.

8 changes: 3 additions & 5 deletions echovault/echovault.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,11 +552,9 @@ func (server *EchoVault) rewriteAOF() error {
if server.rewriteAOFInProgress.Load() {
return errors.New("aof rewrite in progress")
}
go func() {
if err := server.aofEngine.RewriteLog(); err != nil {
log.Println(err)
}
}()
if err := server.aofEngine.RewriteLog(); err != nil {
return err

Check warning on line 556 in echovault/echovault.go

View check run for this annotation

Codecov / codecov/patch

echovault/echovault.go#L556

Added line #L556 was not covered by tests
}
return nil
}

Expand Down
95 changes: 94 additions & 1 deletion echovault/echovault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,99 @@ func Test_Standalone(t *testing.T) {

t.Run("Test_AOFRestore", func(t *testing.T) {
t.Parallel()
// TODO: Implemented AOF persistence and restore.

ticker := time.NewTicker(50 * time.Millisecond)

dataDir := path.Join(".", "testdata", "test_aof")
t.Cleanup(func() {
_ = os.RemoveAll(dataDir)
ticker.Stop()
})

// Prepare data for testing.
data := map[string]map[string]string{
"before-rewrite": {
"key1": "value1",
"key2": "value2",
"key3": "value3",
"key4": "value4",
},
"after-rewrite": {
"key3": "value3-updated",
"key4": "value4-updated",
"key5": "value5",
"key6": "value6",
},
"expected-values": {
"key1": "value1",
"key2": "value2",
"key3": "value3-updated",
"key4": "value4-updated",
"key5": "value5",
"key6": "value6",
},
}

conf := DefaultConfig()
conf.RestoreAOF = true
conf.DataDir = dataDir
conf.AOFSyncStrategy = "always"

mockServer, err := NewEchoVault(WithConfig(conf))
if err != nil {
t.Error(err)
return
}

// Perform write commands from "before-rewrite"
for key, value := range data["before-rewrite"] {
if _, _, err := mockServer.Set(key, value, SetOptions{}); err != nil {
t.Error(err)
return
}
}

// Yield
<-ticker.C

// Rewrite AOF
if _, err := mockServer.RewriteAOF(); err != nil {
t.Error(err)
return
}

// Perform write commands from "after-rewrite"
for key, value := range data["after-rewrite"] {
if _, _, err := mockServer.Set(key, value, SetOptions{}); err != nil {
t.Error(err)
return
}
}

// Yield
<-ticker.C

// Shutdown the EchoVault instance
mockServer.ShutDown()

// Start another instance of EchoVault
mockServer, err = NewEchoVault(WithConfig(conf))
if err != nil {
t.Error(err)
return
}

// Check if the servers contains the keys and values from "expected-values"
for key, value := range data["expected-values"] {
res, err := mockServer.Get(key)
if err != nil {
t.Error(err)
return
}
if res != value {
t.Errorf("expected value at key \"%s\" to be \"%s\", got \"%s\"", key, value, res)
return
}
}
})
}
2 changes: 1 addition & 1 deletion internal/aof/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (engine *Engine) RewriteLog() error {
engine.startRewriteFunc()
defer engine.finishRewriteFunc()

// Create AOF preamble
// Create AOF preamble.
if err := engine.preambleStore.CreatePreamble(); err != nil {
return fmt.Errorf("rewrite log -> create preamble error: %+v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/aof/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func marshalRespCommand(command []string) []byte {
return []byte(fmt.Sprintf(
"*%d\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n$%d\r\n%s", len(command),
"*%d\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n", len(command),
len(command[0]), command[0],
len(command[1]), command[1],
len(command[2]), command[2],
Expand Down Expand Up @@ -151,11 +151,11 @@ func Test_AOFEngine(t *testing.T) {
}

if len(wantRestoredState) != len(restoredState) {
t.Errorf("expected restored state to be lenght %d, got %d", len(wantRestoredState), len(restoredState))
t.Errorf("expected restored state to be length %d, got %d", len(wantRestoredState), len(restoredState))
for key, data := range restoredState {
want, ok := wantRestoredState[key]
if !ok {
t.Errorf("could not find key %s in expected state state", key)
t.Errorf("could not find key %s in expected state", key)
}
if want.Value != data.Value {
t.Errorf("expected value %v for key %s, got %v", want.Value, key, data.Value)
Expand Down
4 changes: 1 addition & 3 deletions internal/aof/log/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,7 @@ func (store *AppendStore) Write(command []byte) error {
store.mut.Lock()
defer store.mut.Unlock()

// Add new line before writing to AOF file.
out := append(command, []byte("\r\n")...)
if _, err := store.rw.Write(out); err != nil {
if _, err := store.rw.Write(command); err != nil {
return err
}

Expand Down
5 changes: 1 addition & 4 deletions internal/aof/log/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func marshalRespCommand(command []string) []byte {
return []byte(fmt.Sprintf(
"*%d\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n$%d\r\n%s", len(command),
"*%d\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n", len(command),
len(command[0]), command[0],
len(command[1]), command[1],
len(command[2]), command[2],
Expand Down Expand Up @@ -138,9 +138,6 @@ func Test_AppendStore(t *testing.T) {
}()

ticker := time.NewTicker(200 * time.Millisecond)
defer func() {
ticker.Stop()
}()

select {
case <-done:
Expand Down
170 changes: 170 additions & 0 deletions internal/modules/admin/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,4 +825,174 @@ func Test_AdminCommands(t *testing.T) {
})
}
})

t.Run("Test REWRITEAOF command", func(t *testing.T) {
t.Parallel()

ticker := time.NewTicker(100 * time.Millisecond)

dataDir := path.Join(".", "testdata", "test_aof")
t.Cleanup(func() {
_ = os.RemoveAll(dataDir)
ticker.Stop()
})

// Prepare data for testing.
data := map[string]map[string]string{
"before-rewrite": {
"key1": "value1",
"key2": "value2",
"key3": "value3",
"key4": "value4",
},
"after-rewrite": {
"key3": "value3-updated",
"key4": "value4-updated",
"key5": "value5",
"key6": "value6",
},
"expected-values": {
"key1": "value1",
"key2": "value2",
"key3": "value3-updated",
"key4": "value4-updated",
"key5": "value5",
"key6": "value6",
},
}

port, err := internal.GetFreePort()
if err != nil {
t.Error(err)
return
}

conf := echovault.DefaultConfig()
conf.BindAddr = "localhost"
conf.Port = uint16(port)
conf.RestoreAOF = true
conf.DataDir = dataDir
conf.AOFSyncStrategy = "always"

// Start new server
mockServer, err := echovault.NewEchoVault(echovault.WithConfig(conf))
if err != nil {
t.Error(err)
return
}
go func() {
mockServer.Start()
}()

// Get client connection
conn, err := internal.GetConnection("localhost", port)
if err != nil {
t.Error(err)
return
}
client := resp.NewConn(conn)

// Perform write commands from "before-rewrite"
for key, value := range data["before-rewrite"] {
if err := client.WriteArray([]resp.Value{
resp.StringValue("SET"),
resp.StringValue(key),
resp.StringValue(value),
}); err != nil {
t.Error(err)
return
}
res, _, err := client.ReadValue()
if err != nil {
t.Error(err)
return
}
if !strings.EqualFold(res.String(), "ok") {
t.Errorf("expected response OK, got \"%s\"", res.String())
}
}

// Yield
<-ticker.C

// Rewrite AOF
if err := client.WriteArray([]resp.Value{resp.StringValue("REWRITEAOF")}); err != nil {
t.Error(err)
return
}
res, _, err := client.ReadValue()
if err != nil {
t.Error(err)
return
}
if !strings.EqualFold(res.String(), "ok") {
t.Errorf("expected response OK, got \"%s\"", res.String())
}

// Perform write commands from "after-rewrite"
for key, value := range data["after-rewrite"] {
if err := client.WriteArray([]resp.Value{
resp.StringValue("SET"),
resp.StringValue(key),
resp.StringValue(value),
}); err != nil {
t.Error(err)
return
}
res, _, err := client.ReadValue()
if err != nil {
t.Error(err)
return
}
if !strings.EqualFold(res.String(), "ok") {
t.Errorf("expected response OK, got \"%s\"", res.String())
}
}

// Yield
<-ticker.C

// Shutdown the EchoVault instance and close current client connection
mockServer.ShutDown()
_ = conn.Close()

// Start another instance of EchoVault
mockServer, err = echovault.NewEchoVault(echovault.WithConfig(conf))
if err != nil {
t.Error(err)
return
}
go func() {
mockServer.Start()
}()

// Get a new client connection
conn, err = internal.GetConnection("localhost", port)
if err != nil {
t.Error(err)
return
}
client = resp.NewConn(conn)

// Check if the servers contains the keys and values from "expected-values"
for key, value := range data["expected-values"] {
if err := client.WriteArray([]resp.Value{resp.StringValue("GET"), resp.StringValue(key)}); err != nil {
t.Error(err)
return
}
res, _, err := client.ReadValue()
if err != nil {
t.Error(err)
return
}
if res.String() != value {
t.Errorf("expected value at key \"%s\" to be \"%s\", got \"%s\"", key, value, res)
return
}
}

// Shutdown server and close client connection
_ = conn.Close()
mockServer.ShutDown()
})
}

0 comments on commit cb99ff8

Please sign in to comment.