diff --git a/docker-compose.yaml b/docker-compose.yaml index a7f88e91..a44c39fe 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -43,207 +43,207 @@ services: - ./volumes/standalone_node:/var/lib/echovault networks: - testnet - - cluster_node_1: - container_name: cluster_node_1 - build: - context: . - dockerfile: Dockerfile.dev - environment: - - PORT=7480 - - RAFT_PORT=8000 - - ML_PORT=7946 - - KEY=/etc/ssl/certs/echovault/server1.key - - CERT=/etc/ssl/certs/echovault/server1.crt - - SERVER_ID=1 - - PLUGIN_DIR=/usr/local/lib/echovault - - DATA_DIR=/var/lib/echovault - - IN_MEMORY=false - - TLS=true - - MTLS=true - - BOOTSTRAP_CLUSTER=true - - ACL_CONFIG=/etc/config/echovault/acl.yml - - REQUIRE_PASS=false - - FORWARD_COMMAND=true - - SNAPSHOT_THRESHOLD=1000 - - SNAPSHOT_INTERVAL=5m30s - - RESTORE_SNAPSHOT=false - - RESTORE_AOF=false - # List of server cert/key pairs - - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key - - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key - # List of client certificate authorities - - CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt - ports: - - "7480:7480" - - "7945:7946" - - "8000:8000" - volumes: - - ./config/acl.yml:/etc/config/echovault/acl.yml - - ./volumes/cluster_node_1:/var/lib/echovault - networks: - - testnet - - cluster_node_2: - container_name: cluster_node_2 - build: - context: . - dockerfile: Dockerfile.dev - environment: - - PORT=7480 - - RAFT_PORT=8000 - - ML_PORT=7946 - - KEY=/etc/ssl/certs/echovault/server1.key - - CERT=/etc/ssl/certs/echovault/server1.crt - - SERVER_ID=2 - - JOIN_ADDR=cluster_node_1:7946 - - PLUGIN_DIR=/usr/local/lib/echovault - - DATA_DIR=/var/lib/echovault - - IN_MEMORY=false - - TLS=true - - MTLS=true - - BOOTSTRAP_CLUSTER=false - - ACL_CONFIG=/etc/config/echovault/acl.yml - - REQUIRE_PASS=false - - FORWARD_COMMAND=true - - SNAPSHOT_THRESHOLD=1000 - - SNAPSHOT_INTERVAL=5m30s - - RESTORE_SNAPSHOT=false - - RESTORE_AOF=false - # List of server cert/key pairs - - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key - - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key - # List of client certificate authorities - - CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt - ports: - - "7481:7480" - - "7947:7946" - - "8001:8000" - volumes: - - ./config/acl.yml:/etc/config/echovault/acl.yml - - ./volumes/cluster_node_2:/var/lib/echovault - networks: - - testnet - - cluster_node_3: - container_name: cluster_node_3 - build: - context: . - dockerfile: Dockerfile.dev - environment: - - PORT=7480 - - RAFT_PORT=8000 - - ML_PORT=7946 - - KEY=/etc/ssl/certs/echovault/server1.key - - CERT=/etc/ssl/certs/echovault/server1.crt - - SERVER_ID=3 - - JOIN_ADDR=cluster_node_1:7946 - - PLUGIN_DIR=/usr/local/lib/echovault - - DATA_DIR=/var/lib/echovault - - IN_MEMORY=false - - TLS=true - - MTLS=true - - BOOTSTRAP_CLUSTER=false - - ACL_CONFIG=/etc/config/echovault/acl.yml - - REQUIRE_PASS=false - - FORWARD_COMMAND=true - - SNAPSHOT_THRESHOLD=1000 - - SNAPSHOT_INTERVAL=5m30s - - RESTORE_SNAPSHOT=false - - RESTORE_AOF=false - # List of server cert/key pairs - - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key - - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key - # List of client certificate authorities - - CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt - ports: - - "7482:7480" - - "7948:7946" - - "8002:8000" - volumes: - - ./config/acl.yml:/etc/config/echovault/acl.yml - - ./volumes/cluster_node_3:/var/lib/echovault - networks: - - testnet - - cluster_node_4: - container_name: cluster_node_4 - build: - context: . - dockerfile: Dockerfile.dev - environment: - - PORT=7480 - - RAFT_PORT=8000 - - ML_PORT=7946 - - KEY=/etc/ssl/certs/echovault/server1.key - - CERT=/etc/ssl/certs/echovault/server1.crt - - SERVER_ID=4 - - JOIN_ADDR=cluster_node_1:7946 - - PLUGIN_DIR=/usr/local/lib/echovault - - DATA_DIR=/var/lib/echovault - - IN_MEMORY=false - - TLS=true - - MTLS=true - - BOOTSTRAP_CLUSTER=false - - ACL_CONFIG=/etc/config/echovault/acl.yml - - REQUIRE_PASS=false - - FORWARD_COMMAND=true - - SNAPSHOT_THRESHOLD=1000 - - SNAPSHOT_INTERVAL=5m30s - - RESTORE_SNAPSHOT=false - - RESTORE_AOF=false - # List of server cert/key pairs - - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key - - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key - # List of client certificate authorities - - CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt - ports: - - "7483:7480" - - "7949:7946" - - "8003:8000" - volumes: - - ./config/acl.yml:/etc/config/echovault/acl.yml - - ./volumes/cluster_node_4:/var/lib/echovault - networks: - - testnet - - cluster_node_5: - container_name: cluster_node_5 - build: - context: . - dockerfile: Dockerfile.dev - environment: - - PORT=7480 - - RAFT_PORT=8000 - - ML_PORT=7946 - - KEY=/etc/ssl/certs/echovault/server1.key - - CERT=/etc/ssl/certs/echovault/server1.crt - - SERVER_ID=5 - - JOIN_ADDR=cluster_node_1:7946 - - PLUGIN_DIR=/usr/local/lib/echovault - - DATA_DIR=/var/lib/echovault - - IN_MEMORY=false - - TLS=true - - MTLS=true - - BOOTSTRAP_CLUSTER=false - - ACL_CONFIG=/etc/config/echovault/acl.yml - - REQUIRE_PASS=false - - FORWARD_COMMAND=true - - SNAPSHOT_THRESHOLD=1000 - - SNAPSHOT_INTERVAL=5m30s - - RESTORE_SNAPSHOT=false - - RESTORE_AOF=false - # List of server cert/key pairs - - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key - - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key - # List of client certificate authorities - - CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt - ports: - - "7484:7480" - - "7950:7946" - - "8004:8000" - volumes: - - ./config/acl.yml:/etc/config/echovault/acl.yml - - ./volumes/cluster_node_5:/var/lib/echovault - networks: - - testnet \ No newline at end of file +# +# cluster_node_1: +# container_name: cluster_node_1 +# build: +# context: . +# dockerfile: Dockerfile.dev +# environment: +# - PORT=7480 +# - RAFT_PORT=8000 +# - ML_PORT=7946 +# - KEY=/etc/ssl/certs/echovault/server1.key +# - CERT=/etc/ssl/certs/echovault/server1.crt +# - SERVER_ID=1 +# - PLUGIN_DIR=/usr/local/lib/echovault +# - DATA_DIR=/var/lib/echovault +# - IN_MEMORY=false +# - TLS=true +# - MTLS=true +# - BOOTSTRAP_CLUSTER=true +# - ACL_CONFIG=/etc/config/echovault/acl.yml +# - REQUIRE_PASS=false +# - FORWARD_COMMAND=true +# - SNAPSHOT_THRESHOLD=1000 +# - SNAPSHOT_INTERVAL=5m30s +# - RESTORE_SNAPSHOT=false +# - RESTORE_AOF=false +# # List of server cert/key pairs +# - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key +# - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key +# # List of client certificate authorities +# - CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt +# ports: +# - "7480:7480" +# - "7945:7946" +# - "8000:8000" +# volumes: +# - ./config/acl.yml:/etc/config/echovault/acl.yml +# - ./volumes/cluster_node_1:/var/lib/echovault +# networks: +# - testnet +# +# cluster_node_2: +# container_name: cluster_node_2 +# build: +# context: . +# dockerfile: Dockerfile.dev +# environment: +# - PORT=7480 +# - RAFT_PORT=8000 +# - ML_PORT=7946 +# - KEY=/etc/ssl/certs/echovault/server1.key +# - CERT=/etc/ssl/certs/echovault/server1.crt +# - SERVER_ID=2 +# - JOIN_ADDR=cluster_node_1:7946 +# - PLUGIN_DIR=/usr/local/lib/echovault +# - DATA_DIR=/var/lib/echovault +# - IN_MEMORY=false +# - TLS=true +# - MTLS=true +# - BOOTSTRAP_CLUSTER=false +# - ACL_CONFIG=/etc/config/echovault/acl.yml +# - REQUIRE_PASS=false +# - FORWARD_COMMAND=true +# - SNAPSHOT_THRESHOLD=1000 +# - SNAPSHOT_INTERVAL=5m30s +# - RESTORE_SNAPSHOT=false +# - RESTORE_AOF=false +# # List of server cert/key pairs +# - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key +# - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key +# # List of client certificate authorities +# - CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt +# ports: +# - "7481:7480" +# - "7947:7946" +# - "8001:8000" +# volumes: +# - ./config/acl.yml:/etc/config/echovault/acl.yml +# - ./volumes/cluster_node_2:/var/lib/echovault +# networks: +# - testnet +# +# cluster_node_3: +# container_name: cluster_node_3 +# build: +# context: . +# dockerfile: Dockerfile.dev +# environment: +# - PORT=7480 +# - RAFT_PORT=8000 +# - ML_PORT=7946 +# - KEY=/etc/ssl/certs/echovault/server1.key +# - CERT=/etc/ssl/certs/echovault/server1.crt +# - SERVER_ID=3 +# - JOIN_ADDR=cluster_node_1:7946 +# - PLUGIN_DIR=/usr/local/lib/echovault +# - DATA_DIR=/var/lib/echovault +# - IN_MEMORY=false +# - TLS=true +# - MTLS=true +# - BOOTSTRAP_CLUSTER=false +# - ACL_CONFIG=/etc/config/echovault/acl.yml +# - REQUIRE_PASS=false +# - FORWARD_COMMAND=true +# - SNAPSHOT_THRESHOLD=1000 +# - SNAPSHOT_INTERVAL=5m30s +# - RESTORE_SNAPSHOT=false +# - RESTORE_AOF=false +# # List of server cert/key pairs +# - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key +# - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key +# # List of client certificate authorities +# - CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt +# ports: +# - "7482:7480" +# - "7948:7946" +# - "8002:8000" +# volumes: +# - ./config/acl.yml:/etc/config/echovault/acl.yml +# - ./volumes/cluster_node_3:/var/lib/echovault +# networks: +# - testnet +# +# cluster_node_4: +# container_name: cluster_node_4 +# build: +# context: . +# dockerfile: Dockerfile.dev +# environment: +# - PORT=7480 +# - RAFT_PORT=8000 +# - ML_PORT=7946 +# - KEY=/etc/ssl/certs/echovault/server1.key +# - CERT=/etc/ssl/certs/echovault/server1.crt +# - SERVER_ID=4 +# - JOIN_ADDR=cluster_node_1:7946 +# - PLUGIN_DIR=/usr/local/lib/echovault +# - DATA_DIR=/var/lib/echovault +# - IN_MEMORY=false +# - TLS=true +# - MTLS=true +# - BOOTSTRAP_CLUSTER=false +# - ACL_CONFIG=/etc/config/echovault/acl.yml +# - REQUIRE_PASS=false +# - FORWARD_COMMAND=true +# - SNAPSHOT_THRESHOLD=1000 +# - SNAPSHOT_INTERVAL=5m30s +# - RESTORE_SNAPSHOT=false +# - RESTORE_AOF=false +# # List of server cert/key pairs +# - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key +# - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key +# # List of client certificate authorities +# - CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt +# ports: +# - "7483:7480" +# - "7949:7946" +# - "8003:8000" +# volumes: +# - ./config/acl.yml:/etc/config/echovault/acl.yml +# - ./volumes/cluster_node_4:/var/lib/echovault +# networks: +# - testnet +# +# cluster_node_5: +# container_name: cluster_node_5 +# build: +# context: . +# dockerfile: Dockerfile.dev +# environment: +# - PORT=7480 +# - RAFT_PORT=8000 +# - ML_PORT=7946 +# - KEY=/etc/ssl/certs/echovault/server1.key +# - CERT=/etc/ssl/certs/echovault/server1.crt +# - SERVER_ID=5 +# - JOIN_ADDR=cluster_node_1:7946 +# - PLUGIN_DIR=/usr/local/lib/echovault +# - DATA_DIR=/var/lib/echovault +# - IN_MEMORY=false +# - TLS=true +# - MTLS=true +# - BOOTSTRAP_CLUSTER=false +# - ACL_CONFIG=/etc/config/echovault/acl.yml +# - REQUIRE_PASS=false +# - FORWARD_COMMAND=true +# - SNAPSHOT_THRESHOLD=1000 +# - SNAPSHOT_INTERVAL=5m30s +# - RESTORE_SNAPSHOT=false +# - RESTORE_AOF=false +# # List of server cert/key pairs +# - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key +# - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key +# # List of client certificate authorities +# - CLIENT_CA_1=/etc/ssl/certs/echovault/client/rootCA.crt +# ports: +# - "7484:7480" +# - "7950:7946" +# - "8004:8000" +# volumes: +# - ./config/acl.yml:/etc/config/echovault/acl.yml +# - ./volumes/cluster_node_5:/var/lib/echovault +# networks: +# - testnet \ No newline at end of file diff --git a/src/server/aof/aof_store.go b/src/server/aof/aof_store.go deleted file mode 100644 index 45bbb4e8..00000000 --- a/src/server/aof/aof_store.go +++ /dev/null @@ -1,101 +0,0 @@ -package aof - -import ( - "bufio" - "bytes" - "context" - "errors" - "fmt" - "io" - "net" - "os" - "sync" -) - -type AppendStore struct { - rw io.ReadWriter - mut sync.Mutex - handleCommand func(ctx context.Context, command []byte, conn *net.Conn, replay bool) ([]byte, error) -} - -func NewAppendStore() AppendStore { - return AppendStore{} -} - -func (store *AppendStore) Write(command []byte) error { - store.mut.Lock() - defer store.mut.Unlock() - _, err := store.rw.Write(command) - return err -} - -func (store *AppendStore) Sync() error { - store.mut.Lock() - store.mut.Unlock() - file, ok := store.rw.(*os.File) - if ok { - return file.Sync() - } - return nil -} - -func (store *AppendStore) Restore(ctx context.Context) error { - store.mut.Lock() - defer store.mut.Unlock() - - buf := bufio.NewReader(store.rw) - - var commands [][]byte - var line []byte - - for { - b, _, err := buf.ReadLine() - if err != nil && errors.Is(err, io.EOF) { - break - } else if err != nil { - return err - } - if len(b) <= 0 { - line = append(line, []byte("\r\n\r\n")...) - commands = append(commands, line) - line = []byte{} - continue - } - if len(line) > 0 { - line = append(line, append([]byte("\r\n"), bytes.TrimLeft(b, "\x00")...)...) - continue - } - line = append(line, bytes.TrimLeft(b, "\x00")...) - } - - for _, c := range commands { - if _, err := store.handleCommand(ctx, c, nil, true); err != nil { - return err - } - } - - return nil -} - -func (store *AppendStore) Truncate() error { - rw, ok := store.rw.(interface { - Truncate(size int64) error - }) - if !ok { - fmt.Println("could not truncate AOF file") - } - if err := rw.Truncate(0); err != nil { - return err - } - return nil -} - -func (store *AppendStore) Close() error { - store.mut.Lock() - defer store.mut.Unlock() - file, ok := store.rw.(*os.File) - if !ok { - return nil - } - return file.Close() -} diff --git a/src/server/aof/aof_engine.go b/src/server/aof/engine.go similarity index 54% rename from src/server/aof/aof_engine.go rename to src/server/aof/engine.go index ae8035f3..b6712391 100644 --- a/src/server/aof/aof_engine.go +++ b/src/server/aof/engine.go @@ -2,15 +2,12 @@ package aof import ( "context" + logstore "github.com/echovault/echovault/src/server/aof/log" + "github.com/echovault/echovault/src/server/aof/preamble" "github.com/echovault/echovault/src/utils" - "io" "log" "net" - "os" - "path" - "strings" "sync" - "time" ) // This package handles AOF logging in standalone mode only. @@ -32,11 +29,11 @@ type Engine struct { mut sync.Mutex logChan chan []byte logCount uint64 - preambleStore *PreambleStore - appendStore AppendStore + preambleStore *preamble.PreambleStore + appendStore *logstore.AppendStore } -func NewAOFEngine(opts Opts, appendRW io.ReadWriter, preambleRW io.ReadWriter) (*Engine, error) { +func NewAOFEngine(opts Opts, appendRW logstore.AppendReadWriter, preambleRW preamble.PreambleReadWriter) (*Engine, error) { engine := &Engine{ options: opts, mut: sync.Mutex{}, @@ -44,23 +41,12 @@ func NewAOFEngine(opts Opts, appendRW io.ReadWriter, preambleRW io.ReadWriter) ( logCount: 0, } - // Obtain preamble file handler - if preambleRW == nil { - f, err := os.OpenFile( - path.Join(engine.options.Config.DataDir, "aof", "preamble.bin"), - os.O_WRONLY|os.O_CREATE|os.O_APPEND, - os.ModePerm) - if err != nil { - return nil, err - } - preambleRW = f - } - // Setup Preamble engine - engine.preambleStore = NewPreambleStore( - WithReadWriter(preambleRW), - WithGetStateFunc(opts.GetState), - WithSetValueFunc(func(key string, value interface{}) { + engine.preambleStore = preamble.NewPreambleStore( + preamble.WithDirectory(engine.options.Config.DataDir), + preamble.WithReadWriter(preambleRW), + preamble.WithGetStateFunc(opts.GetState), + preamble.WithSetValueFunc(func(key string, value interface{}) { if _, err := engine.options.CreateKeyAndLock(context.Background(), key); err != nil { log.Println(err) } @@ -69,28 +55,18 @@ func NewAOFEngine(opts Opts, appendRW io.ReadWriter, preambleRW io.ReadWriter) ( }), ) - // 1. Create AOF directory if it does not exist. - if err := os.MkdirAll(path.Join(engine.options.Config.DataDir, "aof"), os.ModePerm); err != nil { - return nil, err - } - - // 2. Setup storage engine. - engine.appendStore = AppendStore{ - rw: appendRW, - mut: sync.Mutex{}, - } - - // If out is not provided by the caller, then create/open the new AOF file based on the configuration. - if appendRW == nil { - f, err := os.OpenFile( - path.Join(engine.options.Config.DataDir, "aof", "log.aof"), - os.O_WRONLY|os.O_CREATE|os.O_APPEND, - os.ModePerm) - if err != nil { - return nil, err - } - engine.appendStore.rw = f - } + // Setup AOF log store engine + engine.appendStore = logstore.NewAppendStore( + logstore.WithDirectory(engine.options.Config.DataDir), + logstore.WithStrategy(engine.options.Config.AOFSyncStrategy), + logstore.WithReadWriter(appendRW), + logstore.WithHandleCommandFunc(func(command []byte) { + _, err := engine.options.HandleCommand(context.Background(), command, nil, true) + if err != nil { + log.Println(err) + } + }), + ) // 3. Start the goroutine to pick up queued commands in order to write them to the file. // LogCommand will get the open file handler from the struct top perform the AOF operation. @@ -100,27 +76,9 @@ func NewAOFEngine(opts Opts, appendRW io.ReadWriter, preambleRW io.ReadWriter) ( if err := engine.appendStore.Write(c); err != nil { log.Println(err) } - if strings.EqualFold(engine.options.Config.AOFSyncStrategy, "always") { - if err := engine.appendStore.Sync(); err != nil { - log.Println(err) - } - } } }() - // 4. Start another goroutine that takes handles syncing the content to the file system. - // No need to start this goroutine if sync strategy is anything other than 'everysec'. - if strings.EqualFold(engine.options.Config.AOFSyncStrategy, "everysec") { - go func() { - for { - if err := engine.appendStore.Sync(); err != nil { - log.Println(err) - } - <-time.After(1 * time.Second) - } - }() - } - return engine, nil } diff --git a/src/server/aof/log/store.go b/src/server/aof/log/store.go new file mode 100644 index 00000000..892ecd90 --- /dev/null +++ b/src/server/aof/log/store.go @@ -0,0 +1,164 @@ +package log + +import ( + "bufio" + "bytes" + "context" + "errors" + "io" + "log" + "os" + "path" + "strings" + "sync" + "time" +) + +type AppendReadWriter interface { + io.ReadWriter + io.Closer + Truncate(size int64) error + Sync() error +} + +type AppendStore struct { + strategy string // Append file sync strategy. Can only be "always", "everysec", or "no + mut sync.Mutex // Store mutex + rw AppendReadWriter // The ReadWriter used to persist and load the log + directory string // The directory for the AOF file if we must create one + handleCommand func(command []byte) // Function to handle command read from AOF log after restore +} + +func WithStrategy(strategy string) func(store *AppendStore) { + return func(store *AppendStore) { + store.strategy = strategy + } +} + +func WithReadWriter(rw AppendReadWriter) func(store *AppendStore) { + return func(store *AppendStore) { + store.rw = rw + } +} + +func WithDirectory(directory string) func(store *AppendStore) { + return func(store *AppendStore) { + store.directory = directory + } +} + +func WithHandleCommandFunc(f func(command []byte)) func(store *AppendStore) { + return func(store *AppendStore) { + store.handleCommand = f + } +} + +func NewAppendStore(options ...func(store *AppendStore)) *AppendStore { + store := &AppendStore{ + directory: "", + strategy: "everysec", + rw: nil, + mut: sync.Mutex{}, + handleCommand: func(command []byte) { + // No-Op + }, + } + + for _, option := range options { + option(store) + } + + // If rw is nil, use a default file at the provided directory + if store.rw == nil { + f, err := os.OpenFile(path.Join(store.directory, "aof", "log.aof"), os.O_RDWR|os.O_CREATE|os.O_APPEND, os.ModePerm) + if err != nil { + log.Println(err) + } + store.rw = f + } + + // Start another goroutine that takes handles syncing the content to the file system. + // No need to start this goroutine if sync strategy is anything other than 'everysec'. + if strings.EqualFold(store.strategy, "everysec") { + go func() { + for { + if err := store.Sync(); err != nil { + log.Println(err) + } + <-time.After(1 * time.Second) + } + }() + } + return store +} + +func (store *AppendStore) Write(command []byte) error { + store.mut.Lock() + defer store.mut.Unlock() + if _, err := store.rw.Write(command); err != nil { + return err + } + if strings.EqualFold(store.strategy, "always") { + if err := store.Sync(); err != nil { + return err + } + } + return nil +} + +func (store *AppendStore) Sync() error { + store.mut.Lock() + store.mut.Unlock() + return store.rw.Sync() +} + +func (store *AppendStore) Restore(ctx context.Context) error { + store.mut.Lock() + defer store.mut.Unlock() + + buf := bufio.NewReader(store.rw) + + var commands [][]byte + var line []byte + + for { + b, _, err := buf.ReadLine() + if err != nil && errors.Is(err, io.EOF) { + break + } else if err != nil { + return err + } + if len(b) <= 0 { + line = append(line, []byte("\r\n\r\n")...) + commands = append(commands, line) + line = []byte{} + continue + } + if len(line) > 0 { + line = append(line, append([]byte("\r\n"), bytes.TrimLeft(b, "\x00")...)...) + continue + } + line = append(line, bytes.TrimLeft(b, "\x00")...) + } + + for _, c := range commands { + store.handleCommand(c) + } + + return nil +} + +func (store *AppendStore) Truncate() error { + store.mut.Lock() + defer store.mut.Unlock() + if err := store.rw.Truncate(0); err != nil { + return err + } + return nil +} + +func (store *AppendStore) Close() error { + store.mut.Lock() + defer store.mut.Unlock() + return store.rw.Close() +} diff --git a/src/server/aof/preamble_store.go b/src/server/aof/preamble/store.go similarity index 55% rename from src/server/aof/preamble_store.go rename to src/server/aof/preamble/store.go index c1a352e7..d8ac4487 100644 --- a/src/server/aof/preamble_store.go +++ b/src/server/aof/preamble/store.go @@ -1,23 +1,30 @@ -package aof +package preamble import ( - "context" "encoding/json" - "errors" "io" "log" "os" + "path" "sync" ) +type PreambleReadWriter interface { + io.ReadWriteSeeker + io.Closer + Truncate(size int64) error + Sync() error +} + type PreambleStore struct { - rw io.ReadWriter - mut sync.Mutex - getState func() map[string]interface{} - setValue func(key string, value interface{}) + rw PreambleReadWriter + mut sync.Mutex + directory string + getState func() map[string]interface{} + setValue func(key string, value interface{}) } -func WithReadWriter(rw io.ReadWriter) func(store *PreambleStore) { +func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) { return func(store *PreambleStore) { store.rw = rw } @@ -35,10 +42,17 @@ func WithSetValueFunc(f func(key string, value interface{})) func(store *Preambl } } +func WithDirectory(directory string) func(store *PreambleStore) { + return func(store *PreambleStore) { + store.directory = directory + } +} + func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore { store := &PreambleStore{ - rw: nil, - mut: sync.Mutex{}, + rw: nil, + mut: sync.Mutex{}, + directory: "", getState: func() map[string]interface{} { // No-Op by default return nil @@ -52,6 +66,15 @@ func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore { option(store) } + // If rw is nil, create the default + if store.rw == nil { + f, err := os.OpenFile(path.Join(store.directory, "aof", "preamble.bin"), os.O_RDWR|os.O_CREATE, os.ModePerm) + if err != nil { + log.Println(err) + } + store.rw = f + } + return store } @@ -67,14 +90,11 @@ func (store *PreambleStore) CreatePreamble() error { } // Truncate the preamble first - rw, ok := store.rw.(interface { - Truncate(size int64) error - }) - if !ok { - return errors.New("could not truncate preamble file") + if err = store.rw.Truncate(0); err != nil { + return err } - - if err = rw.Truncate(0); err != nil { + // Seek to the beginning of the file after truncating + if _, err = store.rw.Seek(0, 0); err != nil { return err } @@ -82,18 +102,15 @@ func (store *PreambleStore) CreatePreamble() error { return err } - // If the rw is a file, sync it immediately - file, ok := store.rw.(*os.File) - if ok { - if err = file.Sync(); err != nil { - log.Println(err) - } + // Sync the changes + if err = store.rw.Sync(); err != nil { + return err } return nil } -func (store *PreambleStore) Restore(ctx context.Context) error { +func (store *PreambleStore) Restore() error { if store.rw == nil { return nil } @@ -115,3 +132,9 @@ func (store *PreambleStore) Restore(ctx context.Context) error { return nil } + +func (store *PreambleStore) Close() error { + store.mut.Lock() + defer store.mut.Unlock() + return store.rw.Close() +} diff --git a/src/server/server.go b/src/server/server.go index 477834fd..1e423349 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -105,7 +105,7 @@ func NewServer(opts Opts) *Server { KeyUnlock: server.KeyUnlock, SetValue: server.SetValue, HandleCommand: server.handleCommand, - }, nil) + }, nil, nil) if err != nil { log.Println(err) }