Skip to content

Commit

Permalink
Merge pull request #108 from ohkinozomu/raft-log-level
Browse files Browse the repository at this point in the history
Add configurable Raft log level
  • Loading branch information
wind-c authored Aug 4, 2024
2 parents 65a405a + a754c55 commit 64b89ae
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 42 deletions.
23 changes: 20 additions & 3 deletions cluster/raft/etcd/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/wind-c/comqtt/v2/cluster/log"
Expand Down Expand Up @@ -108,7 +109,7 @@ func Setup(conf *config.Cluster, notifyCh chan<- *message.Message) (*Peer, error
stopC: make(chan struct{}),
httpStopC: make(chan struct{}),
httpDoneC: make(chan struct{}),
logger: getZapLogger(),
logger: getZapLogger(conf.RaftLogLevel),
}

go peer.startRaft()
Expand Down Expand Up @@ -181,7 +182,23 @@ func (p *Peer) DelByNode(node string) int {
return p.kvStore.DelByNode(node)
}

func getZapLogger() *zap.Logger {
func mapRaftLogLevelToZap(raftLogLevel string) zapcore.Level {
raftLogLevel = strings.ToLower(raftLogLevel)
switch raftLogLevel {
case "debug":
return zapcore.DebugLevel
case "info":
return zapcore.InfoLevel
case "warn":
return zapcore.WarnLevel
case "error":
return zapcore.ErrorLevel
default:
return zapcore.ErrorLevel
}
}

func getZapLogger(raftLogLevel string) *zap.Logger {
encoderCfg := zapcore.EncoderConfig{
MessageKey: "msg",
LevelKey: "level",
Expand All @@ -190,7 +207,7 @@ func getZapLogger() *zap.Logger {
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
}
core := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), zapcore.AddSync(log.Writer()), zapcore.ErrorLevel)
core := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), zapcore.AddSync(log.Writer()), mapRaftLogLevelToZap(raftLogLevel))
return zap.New(core)
}

Expand Down
43 changes: 4 additions & 39 deletions cluster/raft/hashicorp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ type peerEntry struct {
func Setup(conf *config.Cluster, notifyCh chan<- *message.Message) (*Peer, error) {
config := raft.DefaultConfig()
config.LocalID = raft.ServerID(conf.NodeName)
config.LogLevel = "ERROR"
if conf.RaftLogLevel == "" {
conf.RaftLogLevel = "ERROR"
}
config.LogLevel = conf.RaftLogLevel
config.LogOutput = log.Writer()
//config.ShutdownOnRemove = true // Enable shutdown on removal
//config.SnapshotInterval = 30 * time.Second // Check every 30 seconds to see if there are enough new entries for a snapshot, can be overridden
Expand Down Expand Up @@ -231,12 +234,6 @@ func (p *Peer) IsApplyRight() bool {
return p.raft.State() == raft.Leader
}

// leaderLastContact returns the time of last contact by a leader.
// This only makes sense if we are currently a follower.
func (p *Peer) leaderLastContact() time.Time {
return p.raft.LastContact()
}

func (p *Peer) GetLeader() (addr, id string) {
leaderAddr, leaderId := p.raft.LeaderWithID()
addr = string(leaderAddr)
Expand Down Expand Up @@ -376,35 +373,3 @@ func (p *Peer) waitForLeader(timeout time.Duration) (string, error) {
}
}
}

// waitForAppliedIndex blocks until a given log index has been applied,
// or the timeout expires.
func (p *Peer) waitForAppliedIndex(idx uint64, timeout time.Duration) error {
tck := time.NewTicker(appliedWaitDelay)
defer tck.Stop()
tmr := time.NewTimer(timeout)
defer tmr.Stop()

for {
select {
case <-tck.C:
if p.raft.AppliedIndex() >= idx {
return nil
}
case <-tmr.C:
return fmt.Errorf("timeout expired")
}
}
}

// waitForApplied waits for all Raft log entries to to be applied to the
// underlying database.
func (p *Peer) waitForApplied(timeout time.Duration) error {
if timeout == 0 {
return nil
}
if err := p.waitForAppliedIndex(p.raft.LastIndex(), timeout); err != nil {
return errors.New("timeout waiting for initial logs application")
}
return nil
}
1 change: 1 addition & 0 deletions cmd/cluster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func realMain(ctx context.Context) error {
flag.IntVar(&cfg.Cluster.BindPort, "gossip-port", 7946, "this port is used to discover nodes in a cluster")
flag.IntVar(&cfg.Cluster.RaftPort, "raft-port", 8946, "this port is used for raft peer communication")
flag.BoolVar(&cfg.Cluster.RaftBootstrap, "raft-bootstrap", false, "should be `true` for the first node of the cluster. It can elect a leader without any other nodes being present.")
flag.StringVar(&cfg.Cluster.RaftLogLevel, "raft-log-level", "error", "Raft log level, with supported values debug, info, warn, error.")
flag.StringVar(&members, "members", "", "seeds member list of cluster,such as 192.168.0.103:7946,192.168.0.104:7946")
flag.BoolVar(&cfg.Cluster.GrpcEnable, "grpc-enable", false, "grpc is used for raft transport and reliable communication between nodes")
flag.IntVar(&cfg.Cluster.GrpcPort, "grpc-port", 17946, "grpc communication port between nodes")
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ type Cluster struct {
RaftPort int `yaml:"raft-port" json:"raft-port"`
RaftDir string `yaml:"raft-dir" json:"raft-dir"`
RaftBootstrap bool `yaml:"raft-bootstrap" json:"raft-bootstrap"`
RaftLogLevel string `yaml:"raft-log-level" json:"raft-log-level"`
GrpcEnable bool `yaml:"grpc-enable" json:"grpc-enable"`
GrpcPort int `yaml:"grpc-port" json:"grpc-port"`
InboundPoolSize int `yaml:"inbound-pool-size" json:"inbound-pool-size"`
Expand Down

0 comments on commit 64b89ae

Please sign in to comment.