diff --git a/cluster/raft/etcd/peer.go b/cluster/raft/etcd/peer.go index 15de44e..5d8c1b1 100644 --- a/cluster/raft/etcd/peer.go +++ b/cluster/raft/etcd/peer.go @@ -12,6 +12,7 @@ import ( "net/http" "os" "strconv" + "strings" "time" "github.com/wind-c/comqtt/v2/cluster/log" @@ -108,7 +109,7 @@ func Setup(conf *config.Cluster, notifyCh chan<- *message.Message) (*Peer, error stopC: make(chan struct{}), httpStopC: make(chan struct{}), httpDoneC: make(chan struct{}), - logger: getZapLogger(), + logger: getZapLogger(conf.RaftLogLevel), } go peer.startRaft() @@ -181,7 +182,23 @@ func (p *Peer) DelByNode(node string) int { return p.kvStore.DelByNode(node) } -func getZapLogger() *zap.Logger { +func mapRaftLogLevelToZap(raftLogLevel string) zapcore.Level { + raftLogLevel = strings.ToLower(raftLogLevel) + switch raftLogLevel { + case "debug": + return zapcore.DebugLevel + case "info": + return zapcore.InfoLevel + case "warn": + return zapcore.WarnLevel + case "error": + return zapcore.ErrorLevel + default: + return zapcore.ErrorLevel + } +} + +func getZapLogger(raftLogLevel string) *zap.Logger { encoderCfg := zapcore.EncoderConfig{ MessageKey: "msg", LevelKey: "level", @@ -190,7 +207,7 @@ func getZapLogger() *zap.Logger { EncodeTime: zapcore.ISO8601TimeEncoder, EncodeDuration: zapcore.StringDurationEncoder, } - core := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), zapcore.AddSync(log.Writer()), zapcore.ErrorLevel) + core := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), zapcore.AddSync(log.Writer()), mapRaftLogLevelToZap(raftLogLevel)) return zap.New(core) } diff --git a/cluster/raft/hashicorp/peer.go b/cluster/raft/hashicorp/peer.go index 0780938..e760c55 100644 --- a/cluster/raft/hashicorp/peer.go +++ b/cluster/raft/hashicorp/peer.go @@ -74,7 +74,10 @@ type peerEntry struct { func Setup(conf *config.Cluster, notifyCh chan<- *message.Message) (*Peer, error) { config := raft.DefaultConfig() config.LocalID = raft.ServerID(conf.NodeName) - config.LogLevel = "ERROR" + if conf.RaftLogLevel == "" { + conf.RaftLogLevel = "ERROR" + } + config.LogLevel = conf.RaftLogLevel config.LogOutput = log.Writer() //config.ShutdownOnRemove = true // Enable shutdown on removal //config.SnapshotInterval = 30 * time.Second // Check every 30 seconds to see if there are enough new entries for a snapshot, can be overridden @@ -231,12 +234,6 @@ func (p *Peer) IsApplyRight() bool { return p.raft.State() == raft.Leader } -// leaderLastContact returns the time of last contact by a leader. -// This only makes sense if we are currently a follower. -func (p *Peer) leaderLastContact() time.Time { - return p.raft.LastContact() -} - func (p *Peer) GetLeader() (addr, id string) { leaderAddr, leaderId := p.raft.LeaderWithID() addr = string(leaderAddr) @@ -376,35 +373,3 @@ func (p *Peer) waitForLeader(timeout time.Duration) (string, error) { } } } - -// waitForAppliedIndex blocks until a given log index has been applied, -// or the timeout expires. -func (p *Peer) waitForAppliedIndex(idx uint64, timeout time.Duration) error { - tck := time.NewTicker(appliedWaitDelay) - defer tck.Stop() - tmr := time.NewTimer(timeout) - defer tmr.Stop() - - for { - select { - case <-tck.C: - if p.raft.AppliedIndex() >= idx { - return nil - } - case <-tmr.C: - return fmt.Errorf("timeout expired") - } - } -} - -// waitForApplied waits for all Raft log entries to to be applied to the -// underlying database. -func (p *Peer) waitForApplied(timeout time.Duration) error { - if timeout == 0 { - return nil - } - if err := p.waitForAppliedIndex(p.raft.LastIndex(), timeout); err != nil { - return errors.New("timeout waiting for initial logs application") - } - return nil -} diff --git a/cmd/cluster/main.go b/cmd/cluster/main.go index a34989c..e8a3886 100644 --- a/cmd/cluster/main.go +++ b/cmd/cluster/main.go @@ -69,6 +69,7 @@ func realMain(ctx context.Context) error { flag.IntVar(&cfg.Cluster.BindPort, "gossip-port", 7946, "this port is used to discover nodes in a cluster") flag.IntVar(&cfg.Cluster.RaftPort, "raft-port", 8946, "this port is used for raft peer communication") flag.BoolVar(&cfg.Cluster.RaftBootstrap, "raft-bootstrap", false, "should be `true` for the first node of the cluster. It can elect a leader without any other nodes being present.") + flag.StringVar(&cfg.Cluster.RaftLogLevel, "raft-log-level", "error", "Raft log level, with supported values debug, info, warn, error.") flag.StringVar(&members, "members", "", "seeds member list of cluster,such as 192.168.0.103:7946,192.168.0.104:7946") flag.BoolVar(&cfg.Cluster.GrpcEnable, "grpc-enable", false, "grpc is used for raft transport and reliable communication between nodes") flag.IntVar(&cfg.Cluster.GrpcPort, "grpc-port", 17946, "grpc communication port between nodes") diff --git a/config/config.go b/config/config.go index 5fe93f4..2970eee 100644 --- a/config/config.go +++ b/config/config.go @@ -141,6 +141,7 @@ type Cluster struct { RaftPort int `yaml:"raft-port" json:"raft-port"` RaftDir string `yaml:"raft-dir" json:"raft-dir"` RaftBootstrap bool `yaml:"raft-bootstrap" json:"raft-bootstrap"` + RaftLogLevel string `yaml:"raft-log-level" json:"raft-log-level"` GrpcEnable bool `yaml:"grpc-enable" json:"grpc-enable"` GrpcPort int `yaml:"grpc-port" json:"grpc-port"` InboundPoolSize int `yaml:"inbound-pool-size" json:"inbound-pool-size"`