Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the transport layer. #23

Merged
merged 2 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions cmd/tool/visualize/visualize.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,36 @@ package main
import (
"flag"
"fmt"
"log/slog"
"os"
"time"

"github.com/danl5/goelect/pkg/config"
"github.com/danl5/goelect/pkg/consensus"
"github.com/danl5/goelect/pkg/model"
"github.com/danl5/goelect/pkg/transport/rpc"
)

var (
outputPath = flag.String("o", "./fsm_visual", "output path")
)

func main() {
c, _ := consensus.NewConsensus(&config.Config{
ConnectTimeout: 10 * time.Second,
Peers: []config.NodeConfig{},
}, nil, model.ElectNode{})
rpcTransport, _ := rpc.NewRPC(slog.Default())
c, _ := consensus.NewConsensus(
model.ElectNode{
Node: model.Node{
ID: "test",
Address: "test",
},
},
rpcTransport,
&rpc.Config{},
&config.Config{
ConnectTimeout: 10 * time.Second,
Peers: []config.NodeConfig{},
},
nil)
visualStr := c.Visualize()

f, err := os.OpenFile(*outputPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
Expand Down
65 changes: 24 additions & 41 deletions elect.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/danl5/goelect/pkg/consensus"
"github.com/danl5/goelect/pkg/log"
"github.com/danl5/goelect/pkg/model"
"github.com/danl5/goelect/pkg/rpc"
)

const (
Expand All @@ -23,7 +22,11 @@ const (
)

// NewElect creates a new Elect instance
func NewElect(cfg *ElectConfig, logger log.Logger) (*Elect, error) {
func NewElect(
trans model.Transport,
transConfig model.TransportConfig,
cfg *ElectConfig,
logger log.Logger) (*Elect, error) {
var peers []config.NodeConfig
for _, n := range cfg.Peers {
peers = append(peers, config.NodeConfig{
Expand All @@ -48,27 +51,31 @@ func NewElect(cfg *ElectConfig, logger log.Logger) (*Elect, error) {
}

// new consensus instance
crh, err := consensus.NewConsensusRpcHandler(&config.Config{
ElectTimeout: time.Duration(electTimeout) * time.Millisecond,
HeartBeatInterval: time.Duration(heartbeatInterval) * time.Millisecond,
ConnectTimeout: time.Duration(connectTimeout) * time.Second,
Peers: peers,
}, logger, model.ElectNode{
Node: model.Node{
Address: cfg.Node.Address,
ID: cfg.Node.ID,
Tags: cfg.Node.Tags,
c, err := consensus.NewConsensus(
model.ElectNode{
Node: model.Node{
Address: cfg.Node.Address,
ID: cfg.Node.ID,
Tags: cfg.Node.Tags,
},
NoVote: cfg.Node.NoVote,
},
NoVote: cfg.Node.NoVote,
})
trans,
transConfig,
&config.Config{
ElectTimeout: time.Duration(electTimeout) * time.Millisecond,
HeartBeatInterval: time.Duration(heartbeatInterval) * time.Millisecond,
ConnectTimeout: time.Duration(connectTimeout) * time.Second,
Peers: peers,
}, logger)
if err != nil {
return nil, err
}
return &Elect{
cfg: cfg,
logger: logger,
callBackTimeout: cfg.CallBackTimeout,
consensus: crh,
consensus: c,
callBacks: cfg.CallBacks,
errChan: make(chan error, 10),
}, nil
Expand All @@ -81,7 +88,7 @@ type Elect struct {
// callBackTimeout is the timeout for the callbacks
callBackTimeout int
// consensus is a pointer to an RpcHandler which encapsulates the implementation of the consensus algorithm.
consensus *consensus.RpcHandler
consensus *consensus.Consensus
// errChan is a channel for errors
errChan chan error

Expand All @@ -94,12 +101,6 @@ type Elect struct {
// Run is the main function of the Elect struct
// It starts the RPC server, runs the consensus algorithm.
func (e *Elect) Run() error {
// start the RPC server
err := e.startServer()
if err != nil {
e.logger.Error("elect, failed to start rpc server", "error", err.Error())
return err
}

// run the consensus algorithm
stateChan, err := e.consensus.Run()
Expand All @@ -122,7 +123,7 @@ func (e *Elect) Errors() <-chan error {

// CurrentState returns current node state
func (e *Elect) CurrentState() string {
return e.consensus.CurrentState().State.String()
return e.consensus.CurrentState().String()
}

// ClusterState returns current cluster state
Expand All @@ -145,24 +146,6 @@ func (e *Elect) Leader() (string, error) {
return l.ID, nil
}

func (e *Elect) startServer() error {
rpcSvr, err := rpc.NewRpcServer(e.logger)
if err != nil {
return err
}

go func() {
err = rpcSvr.Start(e.cfg.Node.Address, e.consensus)
if err != nil {
e.logger.Error("elect, failed to start rpc server", "error", err.Error())
return
}
}()

e.logger.Info("start rpc server")
return nil
}

func (e *Elect) sendError(err error) {
select {
case e.errChan <- err:
Expand Down
158 changes: 96 additions & 62 deletions examples/onenode/node.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
/*
Usage:

Single node:
go run node.go
Three nodes:
go run node.go --nodeaddr=127.0.0.1:9981 --peers=127.0.0.1:9981,127.0.0.1:9982,127.0.0.1:9983
go run node.go --nodeaddr=127.0.0.1:9982 --peers=127.0.0.1:9981,127.0.0.1:9982,127.0.0.1:9983
go run node.go --nodeaddr=127.0.0.1:9983 --peers=127.0.0.1:9981,127.0.0.1:9982,127.0.0.1:9983
*/
package main

import (
Expand All @@ -10,6 +20,7 @@ import (

"github.com/danl5/goelect"
"github.com/danl5/goelect/pkg/model"
"github.com/danl5/goelect/pkg/transport/rpc"
)

var (
Expand All @@ -20,68 +31,61 @@ var (
peers = flag.String("peers", "127.0.0.1:9981", "peers node address separated by comma")
)

// Callback functions for state transitions
func enterLeader(ctx context.Context, st model.StateTransition) error {
fmt.Println("enter leader,", st.State, st.SrcState)
return nil
}

func leaveLeader(ctx context.Context, st model.StateTransition) error {
fmt.Println("leave leader,", st.State, st.SrcState)
return nil
}

func enterFollower(ctx context.Context, st model.StateTransition) error {
fmt.Println("enter follower,", st.State, st.SrcState)
return nil
}

func leaveFollower(ctx context.Context, st model.StateTransition) error {
fmt.Println("leave follower,", st.State, st.SrcState)
return nil
}

func enterCandidate(ctx context.Context, st model.StateTransition) error {
fmt.Println("enter candidate,", st.State, st.SrcState)
return nil
}

func leaveCandidate(ctx context.Context, st model.StateTransition) error {
fmt.Println("leave candidate,", st.State, st.SrcState)
return nil
}

func newElect() (*goelect.Elect, error) {
pAddrs := strings.Split(*peers, ",")
if len(pAddrs) == 0 {
peerAddrs := strings.Split(*peers, ",")
if len(peerAddrs) == 0 {
panic("peers is empty")
}

var peerNodes []goelect.Node
for _, pa := range pAddrs {
for _, pa := range peerAddrs {
peerNodes = append(peerNodes, goelect.Node{Address: pa, ID: pa})
}

e, err := goelect.NewElect(&goelect.ElectConfig{
ElectTimeout: 200,
HeartBeatInterval: 150,
ConnectTimeout: 10,
Peers: peerNodes,
// state transition callbacks
CallBacks: &goelect.StateCallBacks{
EnterLeader: enterLeader,
LeaveLeader: leaveLeader,
EnterFollower: enterFollower,
LeaveFollower: leaveFollower,
EnterCandidate: enterCandidate,
LeaveCandidate: leaveCandidate,
},
// self node
Node: goelect.Node{
Address: *nodeAddress,
ID: *nodeAddress,
},
}, slog.Default())
logger := slog.Default()

// rpc transport
rpcTransport, err := rpc.NewRPC(logger)
if err != nil {
return nil, err
}
// rpc transport config
rpcTransportConfig := &rpc.Config{
ServerCAs: nil,
ServerKey: "",
ServerCert: "",
ServerSkipVerify: false,
ClientCAs: nil,
ClientCert: "",
ClientKey: "",
ClientSkipVerify: false,
ConnectTimeout: 0,
}

// new elect
e, err := goelect.NewElect(
rpcTransport,
rpcTransportConfig,
&goelect.ElectConfig{
ElectTimeout: 200,
HeartBeatInterval: 150,
ConnectTimeout: 10,
Peers: peerNodes,
// state transition callbacks
CallBacks: &goelect.StateCallBacks{
EnterLeader: enterLeader,
LeaveLeader: leaveLeader,
EnterFollower: enterFollower,
LeaveFollower: leaveFollower,
EnterCandidate: enterCandidate,
LeaveCandidate: leaveCandidate,
},
// self node
Node: goelect.Node{
Address: *nodeAddress,
ID: *nodeAddress,
},
}, logger)
if err != nil {
return nil, err
}
Expand All @@ -98,31 +102,61 @@ func main() {
}

// run the elect
go func() {
err = e.Run()
if err != nil {
panic(err)
}
}()
err = e.Run()
if err != nil {
panic(err)
}

tk := time.NewTicker(5 * time.Second)
defer tk.Stop()
for {
select {
case <-tk.C:
// get and print the cluster state
cs, _ := e.ClusterState()
fmt.Println("Node\tState\t")
for addr, n := range cs.Nodes {
fmt.Println(addr, n.State.String())
}
fmt.Println()

// get and print the leader
leaderNode, _ := e.Leader()
fmt.Println("Leader:", leaderNode)

fmt.Println()
isLeader := e.IsLeader()
fmt.Println("IsLeader:", isLeader)
fmt.Println()
}
}
}

// Callback functions for state transitions
func enterLeader(ctx context.Context, st model.StateTransition) error {
fmt.Println("enter leader,", st.State, st.SrcState)
return nil
}

func leaveLeader(ctx context.Context, st model.StateTransition) error {
fmt.Println("leave leader,", st.State, st.SrcState)
return nil
}

func enterFollower(ctx context.Context, st model.StateTransition) error {
fmt.Println("enter follower,", st.State, st.SrcState)
return nil
}

func leaveFollower(ctx context.Context, st model.StateTransition) error {
fmt.Println("leave follower,", st.State, st.SrcState)
return nil
}

func enterCandidate(ctx context.Context, st model.StateTransition) error {
fmt.Println("enter candidate,", st.State, st.SrcState)
return nil
}

func leaveCandidate(ctx context.Context, st model.StateTransition) error {
fmt.Println("leave candidate,", st.State, st.SrcState)
return nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ go 1.21

require (
github.com/looplab/fsm v1.0.1
github.com/mitchellh/mapstructure v1.5.0
github.com/silenceper/pool v1.0.0
github.com/stretchr/testify v1.9.0
github.com/ugorji/go/codec v1.2.12
golang.org/x/sync v0.7.0
)

Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/looplab/fsm v1.0.1 h1:OEW0ORrIx095N/6lgoGkFkotqH6s7vaFPsgjLAaF5QU=
github.com/looplab/fsm v1.0.1/go.mod h1:PmD3fFvQEIsjMEfvZdrCDZ6y8VwKTwWNjlpEr6IKPO4=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/silenceper/pool v1.0.0 h1:JTCaA+U6hJAA0P8nCx+JfsRCHMwLTfatsm5QXelffmU=
Expand All @@ -17,6 +19,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
Loading
Loading