diff --git a/src/server/aof/engine.go b/src/server/aof/engine.go index 5b643f49..58a96d50 100644 --- a/src/server/aof/engine.go +++ b/src/server/aof/engine.go @@ -1,71 +1,120 @@ package aof import ( - "context" logstore "github.com/echovault/echovault/src/server/aof/log" "github.com/echovault/echovault/src/server/aof/preamble" - "github.com/echovault/echovault/src/utils" "log" - "net" "sync" ) // This package handles AOF logging in standalone mode only. // Logging in clusters is handled in the raft layer. -type Opts struct { - Config utils.Config - GetState func() map[string]interface{} - StartRewriteAOF func() - FinishRewriteAOF func() - CreateKeyAndLock func(ctx context.Context, key string) (bool, error) - KeyUnlock func(key string) - SetValue func(ctx context.Context, key string, value interface{}) - HandleCommand func(ctx context.Context, command []byte, conn *net.Conn, replay bool) ([]byte, error) -} - type Engine struct { - options Opts + syncStrategy string + directory string + preambleRW preamble.PreambleReadWriter + appendRW logstore.AppendReadWriter + mut sync.Mutex logChan chan []byte logCount uint64 preambleStore *preamble.PreambleStore appendStore *logstore.AppendStore + + startRewrite func() + finishRewrite func() + getState func() map[string]interface{} + setValue func(key string, value interface{}) + handleCommand func(command []byte) +} + +func WithStrategy(strategy string) func(engine *Engine) { + return func(engine *Engine) { + engine.syncStrategy = strategy + } +} + +func WithDirectory(directory string) func(engine *Engine) { + return func(engine *Engine) { + engine.directory = directory + } +} + +func WithStartRewriteFunc(f func()) func(engine *Engine) { + return func(engine *Engine) { + engine.startRewrite = f + } +} + +func WithFinishRewriteFunc(f func()) func(engine *Engine) { + return func(engine *Engine) { + engine.finishRewrite = f + } +} + +func WithGetStateFunc(f func() map[string]interface{}) func(engine *Engine) { + return func(engine *Engine) { + engine.getState = f + } } -func NewAOFEngine(opts Opts, appendRW logstore.AppendReadWriter, preambleRW preamble.PreambleReadWriter) (*Engine, error) { +func WithSetValueFunc(f func(key string, value interface{})) func(engine *Engine) { + return func(engine *Engine) { + engine.setValue = f + } +} + +func WithHandleCommandFunc(f func(command []byte)) func(engine *Engine) { + return func(engine *Engine) { + engine.handleCommand = f + } +} + +func WithPreambleReadWriter(rw preamble.PreambleReadWriter) func(engine *Engine) { + return func(engine *Engine) { + engine.preambleRW = rw + } +} + +func WithAppendReadWriter(rw logstore.AppendReadWriter) func(engine *Engine) { + return func(engine *Engine) { + engine.appendRW = rw + } +} + +func NewAOFEngine(options ...func(engine *Engine)) *Engine { engine := &Engine{ - options: opts, - mut: sync.Mutex{}, - logChan: make(chan []byte, 4096), - logCount: 0, + syncStrategy: "everysec", + directory: "", + mut: sync.Mutex{}, + logChan: make(chan []byte, 4096), + logCount: 0, + startRewrite: func() {}, + finishRewrite: func() {}, + getState: func() map[string]interface{} { return nil }, + setValue: func(key string, value interface{}) {}, + handleCommand: func(command []byte) {}, + } + + for _, option := range options { + option(engine) } // Setup Preamble engine engine.preambleStore = preamble.NewPreambleStore( - preamble.WithDirectory(engine.options.Config.DataDir), - preamble.WithReadWriter(preambleRW), - preamble.WithGetStateFunc(opts.GetState), - preamble.WithSetValueFunc(func(key string, value interface{}) { - if _, err := engine.options.CreateKeyAndLock(context.Background(), key); err != nil { - log.Println(err) - } - engine.options.SetValue(context.Background(), key, value) - engine.options.KeyUnlock(key) - }), + preamble.WithDirectory(engine.directory), + preamble.WithReadWriter(engine.preambleRW), + preamble.WithGetStateFunc(engine.getState), + preamble.WithSetValueFunc(engine.setValue), ) // Setup AOF log store engine engine.appendStore = logstore.NewAppendStore( - logstore.WithDirectory(engine.options.Config.DataDir), - logstore.WithStrategy(engine.options.Config.AOFSyncStrategy), - logstore.WithReadWriter(appendRW), - logstore.WithHandleCommandFunc(func(command []byte) { - _, err := engine.options.HandleCommand(context.Background(), command, nil, true) - if err != nil { - log.Println(err) - } - }), + logstore.WithDirectory(engine.directory), + logstore.WithStrategy(engine.syncStrategy), + logstore.WithReadWriter(engine.appendRW), + logstore.WithHandleCommandFunc(engine.handleCommand), ) // 3. Start the goroutine to pick up queued commands in order to write them to the file. @@ -79,7 +128,7 @@ func NewAOFEngine(opts Opts, appendRW logstore.AppendReadWriter, preambleRW prea } }() - return engine, nil + return engine } func (engine *Engine) QueueCommand(command []byte) { @@ -90,8 +139,8 @@ func (engine *Engine) RewriteLog() error { engine.mut.Lock() defer engine.mut.Unlock() - engine.options.StartRewriteAOF() - defer engine.options.FinishRewriteAOF() + engine.startRewrite() + defer engine.finishRewrite() // Create AOF preamble if err := engine.preambleStore.CreatePreamble(); err != nil { diff --git a/src/server/server.go b/src/server/server.go index 22584bdb..94f431c3 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -95,21 +95,27 @@ func NewServer(opts Opts) *Server { SetValue: server.SetValue, }) // Set up standalone AOF engine - - engine, err := aof.NewAOFEngine(aof.Opts{ - Config: opts.Config, - GetState: server.GetState, - StartRewriteAOF: server.StartRewriteAOF, - FinishRewriteAOF: server.FinishRewriteAOF, - CreateKeyAndLock: server.CreateKeyAndLock, - KeyUnlock: server.KeyUnlock, - SetValue: server.SetValue, - HandleCommand: server.handleCommand, - }, nil, nil) - if err != nil { - log.Println(err) - } - server.AOFEngine = engine + server.AOFEngine = aof.NewAOFEngine( + aof.WithDirectory(opts.Config.DataDir), + aof.WithStrategy(opts.Config.AOFSyncStrategy), + aof.WithStartRewriteFunc(server.StartRewriteAOF), + aof.WithFinishRewriteFunc(server.FinishRewriteAOF), + aof.WithGetStateFunc(server.GetState), + aof.WithSetValueFunc(func(key string, value interface{}) { + if _, err := server.CreateKeyAndLock(context.Background(), key); err != nil { + log.Println(err) + return + } + server.SetValue(context.Background(), key, value) + server.KeyUnlock(key) + }), + aof.WithHandleCommandFunc(func(command []byte) { + _, err := server.handleCommand(context.Background(), command, nil, true) + if err != nil { + log.Println(err) + } + }), + ) } return server }