Skip to content

Commit

Permalink
Merge pull request #75 from EchoVault/fix/cluster-advertise
Browse files Browse the repository at this point in the history
Fixed cluster discovery logic and cofiguration
  • Loading branch information
kelvinmwinuka authored Jun 19, 2024
2 parents 75baaa5 + 9c2b4d4 commit f3cf011
Show file tree
Hide file tree
Showing 10 changed files with 2,328 additions and 2,331 deletions.
3 changes: 1 addition & 2 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ WORKDIR /opt/echovault/bin
CMD "./server" \
"--bind-addr" "${BIND_ADDR}" \
"--port" "${PORT}" \
"--memberlist-port" "${ML_PORT}" \
"--raft-port" "${RAFT_PORT}" \
"--discovery-port" "${DISCOVERY_PORT}" \
"--server-id" "${SERVER_ID}" \
"--join-addr" "${JOIN_ADDR}" \
"--data-dir" "${DATA_DIR}" \
Expand Down
9 changes: 0 additions & 9 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,6 @@ func main() {

ctx := context.WithValue(context.Background(), internal.ContextServerID("ServerID"), conf.ServerID)

// Default BindAddr if it's not specified
if conf.BindAddr == "" {
if addr, err := internal.GetIPAddress(); err != nil {
log.Fatal(err)
} else {
conf.BindAddr = addr
}
}

cancelCh := make(chan os.Signal, 1)
signal.Notify(cancelCh, syscall.SIGINT, syscall.SIGTERM, os.Interrupt)

Expand Down
4,390 changes: 2,199 additions & 2,191 deletions coverage/coverage.out

Large diffs are not rendered by default.

30 changes: 12 additions & 18 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ services:
context: .
dockerfile: Dockerfile.dev
environment:
- BIND_ADDR=0.0.0.0
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- DISCOVERY_PORT=7946
- SERVER_ID=1
- PLUGIN_DIR=/usr/local/lib/echovault
- DATA_DIR=/var/lib/echovault
Expand Down Expand Up @@ -44,7 +44,6 @@ services:
ports:
- "7480:7480"
- "7946:7946"
- "7999:8000"
volumes:
- ./volumes/config:/etc/echovault/config
- ./volumes/modules:/lib/echovault/modules
Expand All @@ -58,9 +57,9 @@ services:
context: .
dockerfile: Dockerfile.dev
environment:
- BIND_ADDR=0.0.0.0
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- DISCOVERY_PORT=7946
- SERVER_ID=1
- JOIN_ADDR=2/cluster_node_2:7946
- DATA_DIR=/var/lib/echovault
Expand Down Expand Up @@ -90,7 +89,6 @@ services:
ports:
- "7481:7480"
- "7945:7946"
- "8000:8000"
volumes:
- ./volumes/config:/etc/echovault/config
- ./volumes/plugins:/lib/echovault/plugins
Expand All @@ -104,9 +102,9 @@ services:
context: .
dockerfile: Dockerfile.dev
environment:
- BIND_ADDR=0.0.0.0
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- DISCOVERY_PORT=7946
- SERVER_ID=2
- JOIN_ADDR=3/cluster_node_3:7946
- DATA_DIR=/var/lib/echovault
Expand Down Expand Up @@ -136,7 +134,6 @@ services:
ports:
- "7482:7480"
- "7947:7946"
- "8001:8000"
volumes:
- ./volumes/config:/etc/echovault/config
- ./volumes/plugins:/lib/echovault/plugins
Expand All @@ -150,9 +147,9 @@ services:
context: .
dockerfile: Dockerfile.dev
environment:
- BIND_ADDR=0.0.0.0
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- DISCOVERY_PORT=7946
- SERVER_ID=3
- JOIN_ADDR=4/cluster_node_4:7946
- DATA_DIR=/var/lib/echovault
Expand Down Expand Up @@ -182,7 +179,6 @@ services:
ports:
- "7483:7480"
- "7948:7946"
- "8002:8000"
volumes:
- ./volumes/config:/etc/echovault/config
- ./volumes/plugins:/lib/echovault/plugins
Expand All @@ -196,9 +192,9 @@ services:
context: .
dockerfile: Dockerfile.dev
environment:
- BIND_ADDR=0.0.0.0
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- DISCOVERY_PORT=7946
- SERVER_ID=4
- JOIN_ADDR=5/cluster_node_5:7946
- DATA_DIR=/var/lib/echovault
Expand Down Expand Up @@ -228,7 +224,6 @@ services:
ports:
- "7484:7480"
- "7949:7946"
- "8003:8000"
volumes:
- ./volumes/config:/etc/echovault/config
- ./volumes/plugins:/lib/echovault/plugins
Expand All @@ -242,9 +237,9 @@ services:
context: .
dockerfile: Dockerfile.dev
environment:
- BIND_ADDR=0.0.0.0
- PORT=7480
- RAFT_PORT=8000
- ML_PORT=7946
- DISCOVERY_PORT=7946
- SERVER_ID=5
- JOIN_ADDR=1/cluster_node_1:7946
- DATA_DIR=/var/lib/echovault
Expand Down Expand Up @@ -274,7 +269,6 @@ services:
ports:
- "7485:7480"
- "7950:7946"
- "8004:8000"
volumes:
- ./volumes/config:/etc/echovault/config
- ./volumes/plugins:/lib/echovault/plugins
Expand Down
23 changes: 7 additions & 16 deletions echovault/echovault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ type ClientServerPair struct {
serverId string
bindAddr string
port int
raftPort int
mlPort int
discoveryPort int
bootstrapCluster bool
forwardCommand bool
joinAddr string
Expand Down Expand Up @@ -79,8 +78,7 @@ func setupServer(
bindAddr,
joinAddr string,
port,
raftPort,
mlPort int,
discoveryPort int,
) (*EchoVault, error) {
conf := DefaultConfig()
conf.DataDir = dataDir
Expand All @@ -89,8 +87,7 @@ func setupServer(
conf.JoinAddr = joinAddr
conf.Port = uint16(port)
conf.ServerID = serverId
conf.RaftBindPort = uint16(raftPort)
conf.MemberListBindPort = uint16(mlPort)
conf.DiscoveryPort = uint16(discoveryPort)
conf.BootstrapCluster = bootstrapCluster
conf.EvictionPolicy = constants.NoEviction

Expand All @@ -109,8 +106,7 @@ func setupNode(node *ClientServerPair, isLeader bool, errChan *chan error) {
node.bindAddr,
node.joinAddr,
node.port,
node.raftPort,
node.mlPort,
node.discoveryPort,
)
if err != nil {
*errChan <- fmt.Errorf("could not start server; %v", err)
Expand Down Expand Up @@ -161,17 +157,13 @@ func makeCluster(size int) ([]ClientServerPair, error) {
forwardCommand := i < len(pairs)-1 // The last node will not forward commands to the cluster leader.
joinAddr := ""
if !bootstrapCluster {
joinAddr = fmt.Sprintf("%s/%s:%d", pairs[0].serverId, pairs[0].bindAddr, pairs[0].mlPort)
joinAddr = fmt.Sprintf("%s/%s:%d", pairs[0].serverId, pairs[0].bindAddr, pairs[0].discoveryPort)
}
port, err := internal.GetFreePort()
if err != nil {
return nil, fmt.Errorf("could not get free port: %v", err)
}
raftPort, err := internal.GetFreePort()
if err != nil {
return nil, fmt.Errorf("could not get free raft port: %v", err)
}
memberlistPort, err := internal.GetFreePort()
discoveryPort, err := internal.GetFreePort()
if err != nil {
return nil, fmt.Errorf("could not get free memberlist port: %v", err)
}
Expand All @@ -181,8 +173,7 @@ func makeCluster(size int) ([]ClientServerPair, error) {
serverId: serverId,
bindAddr: bindAddr,
port: port,
raftPort: raftPort,
mlPort: memberlistPort,
discoveryPort: discoveryPort,
bootstrapCluster: bootstrapCluster,
forwardCommand: forwardCommand,
joinAddr: joinAddr,
Expand Down
120 changes: 65 additions & 55 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,33 @@ import (
)

type Config struct {
TLS bool `json:"TLS" yaml:"TLS"`
MTLS bool `json:"MTLS" yaml:"MTLS"`
CertKeyPairs [][]string `json:"CertKeyPairs" yaml:"CertKeyPairs"`
ClientCAs []string `json:"ClientCAs" yaml:"ClientCAs"`
Port uint16 `json:"Port" yaml:"Port"`
ServerID string `json:"ServerId" yaml:"ServerId"`
JoinAddr string `json:"JoinAddr" yaml:"JoinAddr"`
BindAddr string `json:"BindAddr" yaml:"BindAddr"`
RaftBindPort uint16 `json:"RaftPort" yaml:"RaftPort"`
MemberListBindPort uint16 `json:"MlPort" yaml:"MlPort"`
DataDir string `json:"DataDir" yaml:"DataDir"`
BootstrapCluster bool `json:"BootstrapCluster" yaml:"BootstrapCluster"`
AclConfig string `json:"AclConfig" yaml:"AclConfig"`
ForwardCommand bool `json:"ForwardCommand" yaml:"ForwardCommand"`
RequirePass bool `json:"RequirePass" yaml:"RequirePass"`
Password string `json:"Password" yaml:"Password"`
SnapShotThreshold uint64 `json:"SnapshotThreshold" yaml:"SnapshotThreshold"`
SnapshotInterval time.Duration `json:"SnapshotInterval" yaml:"SnapshotInterval"`
RestoreSnapshot bool `json:"RestoreSnapshot" yaml:"RestoreSnapshot"`
RestoreAOF bool `json:"RestoreAOF" yaml:"RestoreAOF"`
AOFSyncStrategy string `json:"AOFSyncStrategy" yaml:"AOFSyncStrategy"`
MaxMemory uint64 `json:"MaxMemory" yaml:"MaxMemory"`
EvictionPolicy string `json:"EvictionPolicy" yaml:"EvictionPolicy"`
EvictionSample uint `json:"EvictionSample" yaml:"EvictionSample"`
EvictionInterval time.Duration `json:"EvictionInterval" yaml:"EvictionInterval"`
Modules []string `json:"Plugins" yaml:"Plugins"`
TLS bool `json:"TLS" yaml:"TLS"`
MTLS bool `json:"MTLS" yaml:"MTLS"`
CertKeyPairs [][]string `json:"CertKeyPairs" yaml:"CertKeyPairs"`
ClientCAs []string `json:"ClientCAs" yaml:"ClientCAs"`
Port uint16 `json:"Port" yaml:"Port"`
ServerID string `json:"ServerId" yaml:"ServerId"`
JoinAddr string `json:"JoinAddr" yaml:"JoinAddr"`
BindAddr string `json:"BindAddr" yaml:"BindAddr"`
DataDir string `json:"DataDir" yaml:"DataDir"`
BootstrapCluster bool `json:"BootstrapCluster" yaml:"BootstrapCluster"`
AclConfig string `json:"AclConfig" yaml:"AclConfig"`
ForwardCommand bool `json:"ForwardCommand" yaml:"ForwardCommand"`
RequirePass bool `json:"RequirePass" yaml:"RequirePass"`
Password string `json:"Password" yaml:"Password"`
SnapShotThreshold uint64 `json:"SnapshotThreshold" yaml:"SnapshotThreshold"`
SnapshotInterval time.Duration `json:"SnapshotInterval" yaml:"SnapshotInterval"`
RestoreSnapshot bool `json:"RestoreSnapshot" yaml:"RestoreSnapshot"`
RestoreAOF bool `json:"RestoreAOF" yaml:"RestoreAOF"`
AOFSyncStrategy string `json:"AOFSyncStrategy" yaml:"AOFSyncStrategy"`
MaxMemory uint64 `json:"MaxMemory" yaml:"MaxMemory"`
EvictionPolicy string `json:"EvictionPolicy" yaml:"EvictionPolicy"`
EvictionSample uint `json:"EvictionSample" yaml:"EvictionSample"`
EvictionInterval time.Duration `json:"EvictionInterval" yaml:"EvictionInterval"`
Modules []string `json:"Plugins" yaml:"Plugins"`
DiscoveryPort uint16 `json:"DiscoveryPort" yaml:"DiscoveryPort"`
RaftBindAddr string
RaftBindPort uint16
}

func GetConfig() (Config, error) {
Expand Down Expand Up @@ -148,9 +149,8 @@ There is no limit by default.`, func(memory string) error {
port := flag.Int("port", 7480, "Port to use. Default is 7480")
serverId := flag.String("server-id", "1", "EchoVault ID in raft cluster. Leave empty for client.")
joinAddr := flag.String("join-addr", "", "Address of cluster member in a cluster to you want to join.")
bindAddr := flag.String("bind-addr", "", "Address to bind the echovault to.")
raftBindPort := flag.Uint("raft-port", 7481, "Port to use for intra-cluster communication. Leave on the client.")
mlBindPort := flag.Uint("memberlist-port", 7946, "Port to use for memberlist communication.")
bindAddr := flag.String("bind-addr", "127.0.0.1", "Address to bind the echovault to.")
discoveryPort := flag.Uint("discovery-port", 7946, "Port to use for memberlist cluster discovery.")
dataDir := flag.String("data-dir", ".", "Directory to store snapshots and logs.")
bootstrapCluster := flag.Bool("bootstrap-cluster", false, "Whether this instance should bootstrap a new cluster.")
aclConfig := flag.String("acl-config", "", "ACL config file path.")
Expand Down Expand Up @@ -184,33 +184,43 @@ It is a plain text value by default but you can provide a SHA256 hash by adding

flag.Parse()

raftBindAddr, e := internal.GetIPAddress()
if e != nil {
return Config{}, e
}
raftBindPort, e := internal.GetFreePort()
if e != nil {
return Config{}, e
}

conf := Config{
CertKeyPairs: certKeyPairs,
ClientCAs: clientCAs,
TLS: *tls,
MTLS: *mtls,
Port: uint16(*port),
ServerID: *serverId,
JoinAddr: *joinAddr,
BindAddr: *bindAddr,
RaftBindPort: uint16(*raftBindPort),
MemberListBindPort: uint16(*mlBindPort),
DataDir: *dataDir,
BootstrapCluster: *bootstrapCluster,
AclConfig: *aclConfig,
ForwardCommand: *forwardCommand,
RequirePass: *requirePass,
Password: *password,
SnapShotThreshold: *snapshotThreshold,
SnapshotInterval: *snapshotInterval,
RestoreSnapshot: *restoreSnapshot,
RestoreAOF: *restoreAOF,
AOFSyncStrategy: aofSyncStrategy,
MaxMemory: maxMemory,
EvictionPolicy: evictionPolicy,
EvictionSample: *evictionSample,
EvictionInterval: *evictionInterval,
Modules: modules,
CertKeyPairs: certKeyPairs,
ClientCAs: clientCAs,
TLS: *tls,
MTLS: *mtls,
Port: uint16(*port),
ServerID: *serverId,
JoinAddr: *joinAddr,
BindAddr: *bindAddr,
DataDir: *dataDir,
BootstrapCluster: *bootstrapCluster,
AclConfig: *aclConfig,
ForwardCommand: *forwardCommand,
RequirePass: *requirePass,
Password: *password,
SnapShotThreshold: *snapshotThreshold,
SnapshotInterval: *snapshotInterval,
RestoreSnapshot: *restoreSnapshot,
RestoreAOF: *restoreAOF,
AOFSyncStrategy: aofSyncStrategy,
MaxMemory: maxMemory,
EvictionPolicy: evictionPolicy,
EvictionSample: *evictionSample,
EvictionInterval: *evictionInterval,
Modules: modules,
DiscoveryPort: uint16(*discoveryPort),
RaftBindAddr: raftBindAddr,
RaftBindPort: uint16(raftBindPort),
}

if len(*config) > 0 {
Expand Down
Loading

0 comments on commit f3cf011

Please sign in to comment.