Skip to content

Commit

Permalink
Implemented a Clock interface that is injected everywhere time.Now an…
Browse files Browse the repository at this point in the history
…d time.After are used.
  • Loading branch information
kelvinmwinuka committed Apr 4, 2024
1 parent f4d0f2e commit 1e421cb
Show file tree
Hide file tree
Showing 13 changed files with 1,177 additions and 1,069 deletions.
1,871 changes: 936 additions & 935 deletions coverage/coverage.out

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions internal/aof/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/echovault/echovault/internal"
logstore "github.com/echovault/echovault/internal/aof/log"
"github.com/echovault/echovault/internal/aof/preamble"
"github.com/echovault/echovault/internal/clock"
"log"
"sync"
)
Expand All @@ -27,6 +28,7 @@ import (
// Logging in replication clusters is handled in the raft layer.

type Engine struct {
clock clock.Clock
syncStrategy string
directory string
preambleRW preamble.PreambleReadWriter
Expand All @@ -45,6 +47,12 @@ type Engine struct {
handleCommand func(command []byte)
}

func WithClock(clock clock.Clock) func(engine *Engine) {
return func(engine *Engine) {
engine.clock = clock
}
}

func WithStrategy(strategy string) func(engine *Engine) {
return func(engine *Engine) {
engine.syncStrategy = strategy
Expand Down Expand Up @@ -101,6 +109,7 @@ func WithAppendReadWriter(rw logstore.AppendReadWriter) func(engine *Engine) {

func NewAOFEngine(options ...func(engine *Engine)) *Engine {
engine := &Engine{
clock: clock.NewClock(),
syncStrategy: "everysec",
directory: "",
mut: sync.Mutex{},
Expand All @@ -121,6 +130,7 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine {

// Setup Preamble engine
engine.preambleStore = preamble.NewPreambleStore(
preamble.WithClock(engine.clock),
preamble.WithDirectory(engine.directory),
preamble.WithReadWriter(engine.preambleRW),
preamble.WithGetStateFunc(engine.getStateFunc),
Expand All @@ -129,6 +139,7 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine {

// Setup AOF log store engine
engine.appendStore = logstore.NewAppendStore(
logstore.WithClock(engine.clock),
logstore.WithDirectory(engine.directory),
logstore.WithStrategy(engine.syncStrategy),
logstore.WithReadWriter(engine.appendRW),
Expand Down
11 changes: 10 additions & 1 deletion internal/aof/log/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"bytes"
"errors"
"fmt"
"github.com/echovault/echovault/internal/clock"
"io"
"log"
"os"
Expand All @@ -36,13 +37,20 @@ type AppendReadWriter interface {
}

type AppendStore struct {
clock clock.Clock
strategy string // Append file sync strategy. Can only be "always", "everysec", or "no
mut sync.Mutex // Store mutex
rw AppendReadWriter // The ReadWriter used to persist and load the log
directory string // The directory for the AOF file if we must create one
handleCommand func(command []byte) // Function to handle command read from AOF log after restore
}

func WithClock(clock clock.Clock) func(store *AppendStore) {
return func(store *AppendStore) {
store.clock = clock
}
}

func WithStrategy(strategy string) func(store *AppendStore) {
return func(store *AppendStore) {
store.strategy = strategy
Expand All @@ -69,6 +77,7 @@ func WithHandleCommandFunc(f func(command []byte)) func(store *AppendStore) {

func NewAppendStore(options ...func(store *AppendStore)) *AppendStore {
store := &AppendStore{
clock: clock.NewClock(),
directory: "",
strategy: "everysec",
rw: nil,
Expand Down Expand Up @@ -103,7 +112,7 @@ func NewAppendStore(options ...func(store *AppendStore)) *AppendStore {
log.Println(fmt.Errorf("new append store error: %+v", err))
break
}
<-time.After(1 * time.Second)
<-store.clock.After(1 * time.Second)
}
}()
}
Expand Down
12 changes: 10 additions & 2 deletions internal/aof/preamble/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
"encoding/json"
"fmt"
"github.com/echovault/echovault/internal"
"github.com/echovault/echovault/internal/clock"
"io"
"log"
"os"
"path"
"sync"
"time"
)

type PreambleReadWriter interface {
Expand All @@ -34,13 +34,20 @@ type PreambleReadWriter interface {
}

type PreambleStore struct {
clock clock.Clock
rw PreambleReadWriter
mut sync.Mutex
directory string
getStateFunc func() map[string]internal.KeyData
setKeyDataFunc func(key string, data internal.KeyData)
}

func WithClock(clock clock.Clock) func(store *PreambleStore) {
return func(store *PreambleStore) {
store.clock = clock
}
}

func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) {
return func(store *PreambleStore) {
store.rw = rw
Expand All @@ -67,6 +74,7 @@ func WithDirectory(directory string) func(store *PreambleStore) {

func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore {
store := &PreambleStore{
clock: clock.NewClock(),
rw: nil,
mut: sync.Mutex{},
directory: "",
Expand Down Expand Up @@ -166,7 +174,7 @@ func (store *PreambleStore) Close() error {
func (store *PreambleStore) filterExpiredKeys(state map[string]internal.KeyData) map[string]internal.KeyData {
var keysToDelete []string
for k, v := range state {
if v.ExpireAt.Before(time.Now()) {
if v.ExpireAt.Before(store.clock.Now()) {
keysToDelete = append(keysToDelete, k)
}
}
Expand Down
41 changes: 41 additions & 0 deletions internal/clock/clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package clock

import (
"os"
"strings"
"time"
)

type Clock interface {
Now() time.Time
After(d time.Duration) <-chan time.Time
}

func NewClock() Clock {
// If we're in a test environment, return the mock clock.
if strings.Contains(os.Args[0], ".test") {
return MockClock{}
}
return RealClock{}
}

type RealClock struct{}

func (RealClock) Now() time.Time {
return time.Now()
}

func (RealClock) After(d time.Duration) <-chan time.Time {
return time.After(d)
}

type MockClock struct{}

func (MockClock) Now() time.Time {
t, _ := time.Parse(time.RFC3339, "2036-01-02T15:04:05+07:00")
return t
}

func (MockClock) After(d time.Duration) <-chan time.Time {
return time.After(d)
}
13 changes: 11 additions & 2 deletions internal/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"github.com/echovault/echovault/internal"
"github.com/echovault/echovault/internal/clock"
"io"
"io/fs"
"log"
Expand All @@ -37,6 +38,7 @@ type Manifest struct {
}

type Engine struct {
clock clock.Clock
changeCount uint64
directory string
snapshotInterval time.Duration
Expand All @@ -49,6 +51,12 @@ type Engine struct {
setKeyDataFunc func(key string, data internal.KeyData)
}

func WithClock(clock clock.Clock) func(engine *Engine) {
return func(engine *Engine) {
engine.clock = clock
}
}

func WithDirectory(directory string) func(engine *Engine) {
return func(engine *Engine) {
engine.directory = directory
Expand Down Expand Up @@ -105,6 +113,7 @@ func WithSetKeyDataFunc(f func(key string, data internal.KeyData)) func(engine *

func NewSnapshotEngine(options ...func(engine *Engine)) *Engine {
engine := &Engine{
clock: clock.NewClock(),
changeCount: 0,
directory: "",
snapshotInterval: 5 * time.Minute,
Expand All @@ -128,7 +137,7 @@ func NewSnapshotEngine(options ...func(engine *Engine)) *Engine {
if engine.snapshotInterval != 0 {
go func() {
for {
<-time.After(engine.snapshotInterval)
<-engine.clock.After(engine.snapshotInterval)
if engine.changeCount == engine.snapshotThreshold {
if err := engine.TakeSnapshot(); err != nil {
log.Println(err)
Expand All @@ -146,7 +155,7 @@ func (engine *Engine) TakeSnapshot() error {
defer engine.finishSnapshotFunc()

// Extract current time
now := time.Now()
now := engine.clock.Now()
msec := now.UnixNano() / int64(time.Millisecond)

// Update manifest file to indicate the latest snapshot.
Expand Down
Loading

0 comments on commit 1e421cb

Please sign in to comment.