Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace herolog with slog and add the inline subscription feature #73

Merged
merged 10 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/runtests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.20'
go-version: '1.21'

- name: Build
run: go build ./...
Expand Down
60 changes: 24 additions & 36 deletions cluster/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@ package cluster
import (
"bytes"
"context"
"net"
"path"
"path/filepath"
"strconv"

"github.com/panjf2000/ants/v2"
"github.com/wind-c/comqtt/v2/cluster/discovery"
"github.com/wind-c/comqtt/v2/cluster/discovery/mlist"
"github.com/wind-c/comqtt/v2/cluster/discovery/serf"
"github.com/wind-c/comqtt/v2/cluster/log/zero"
"github.com/wind-c/comqtt/v2/cluster/log"
"github.com/wind-c/comqtt/v2/cluster/message"
"github.com/wind-c/comqtt/v2/cluster/raft"
"github.com/wind-c/comqtt/v2/cluster/raft/etcd"
Expand All @@ -21,10 +26,6 @@ import (
"github.com/wind-c/comqtt/v2/config"
"github.com/wind-c/comqtt/v2/mqtt"
"github.com/wind-c/comqtt/v2/mqtt/packets"
"net"
"path"
"path/filepath"
"strconv"
)

const (
Expand Down Expand Up @@ -119,7 +120,7 @@ func (a *Agent) Start() (err error) {
if err := a.grpcService.StartRpcServer(); err != nil {
return err
}
zero.Info().Str("addr", net.JoinHostPort(a.Config.BindAddr, strconv.Itoa(a.Config.GrpcPort))).Msg("grpc listen at")
log.Info("grpc listen at", "addr", net.JoinHostPort(a.Config.BindAddr, strconv.Itoa(a.Config.GrpcPort)))
}

// init goroutine pool
Expand Down Expand Up @@ -176,17 +177,16 @@ func (a *Agent) Stop() {
}

// stop raft
zero.Info().Msg("stopping raft...")
log.Info("stopping raft...")
a.raftPeer.Stop()
zero.Info().Msg("raft stopped")
log.Info("raft stopped")

// stop node
zero.Info().Msg("stopping node...")
log.Info("stopping node...")
a.membership.Stop()
a.grpcService.StopRpcServer()
zero.Info().Msg("grpc server stopped")
zero.Info().Msg("node stopped")
zero.Close()
log.Info("grpc server stopped")
log.Info("node stopped")
}

func (a *Agent) BindMqttServer(server *mqtt.Server) {
Expand Down Expand Up @@ -264,7 +264,7 @@ func (a *Agent) raftApplyListener() {
} else {
continue
}
zero.Info().Str("from", msg.NodeID).Str("filter", filter).Uint8("type", msg.Type).Msg("apply listening")
log.Info("apply listening", "from", msg.NodeID, "filter", filter, "type", msg.Type)
case <-a.ctx.Done():
return
}
Expand Down Expand Up @@ -306,10 +306,10 @@ func (a *Agent) getPeersFile() string {

func (a *Agent) genNodesFile() {
if err := discovery.GenNodesFile(a.getNodesFile(), a.membership.Members()); err != nil {
zero.Error().Err(err).Msg("gen nodes file")
log.Error("gen nodes file", "error", err)
}
if err := a.raftPeer.GenPeersFile(a.getPeersFile()); err != nil {
zero.Error().Err(err).Msg("gen peers file")
log.Error("gen peers file", "error", err)
}
}

Expand Down Expand Up @@ -461,39 +461,27 @@ func (a *Agent) processOutboundPacket(pk *packets.Packet) {
}

func OnJoinLog(nodeId, addr, prompt string, err error) {
logEvent := zero.Info()
if err != nil {
logEvent.Err(err)
}
logEvent.Str("node", nodeId).Str("addr", addr).Msg(prompt)
log.Info(prompt, "error", err, "addr", addr)
}

func OnApplyLog(leaderId, nodeId string, tp byte, filter []byte, prompt string, err error) {
logEvent := zero.Info()
if err != nil {
logEvent.Err(err)
}
logEvent.Str("leader", leaderId).Str("from", nodeId).Uint8("type", tp).Bytes("filter", filter).Msg(prompt)
log.Info(prompt, "error", err, "leader", leaderId, "from", nodeId, "type", tp, "filter", filter)
}

func OnPublishPacketLog(direction byte, nodeId, cid, topic string, pid uint16) {
logEvent := zero.Info()
if direction == DirectionInbound {
logEvent.Str("d", "inbound").Str("from", nodeId)
log.Info("publish message", "d", "inbound", "from", nodeId, "cid", cid, "pid", pid, "topic", topic)
} else {
logEvent.Str("d", "outbound").Str("to", nodeId)
log.Info("publish message", "d", "outbound", "to", nodeId, "cid", cid, "pid", pid, "topic", topic)
}
logEvent.Str("cid", cid).Uint16("pid", pid).Str("topic", topic).Msg("publish message")
}

func OnConnectPacketLog(direction byte, node, clientId string) {
logEvent := zero.Info()
if direction == DirectionInbound {
logEvent.Str("d", "inbound").Str("from", node)
log.Info("connection notification", "d", "inbound", "from", node, "cid", clientId)
} else {
logEvent.Str("d", "outbound").Str("to", node)
log.Info("connection notification", "d", "outbound", "to", node, "cid", clientId)
}
logEvent.Str("cid", clientId).Msg("connection notification")
}

func (a *Agent) Join(nodeName, addr string) error {
Expand All @@ -513,15 +501,15 @@ func (a *Agent) Leave() error {

func (a *Agent) AddRaftPeer(id, addr string) {
a.raftPeer.Join(id, addr)
zero.Info().Str("nid", id).Str("addr", addr).Msg("add peer")
log.Info("add peer", "nid", id, "addr", addr)
}

func (a *Agent) RemoveRaftPeer(id string) {
a.raftPeer.Leave(id)
zero.Info().Str("nid", id).Msg("remove peer")
log.Info("remove peer", "nid", id)
}

func (a *Agent) GetValue(key string) []string {
zero.Info().Str("key", key).Msg("get value")
log.Info("get value", "key", key)
return a.raftPeer.Lookup(key)
}
9 changes: 5 additions & 4 deletions cluster/discovery/mlist/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ package mlist

import (
"encoding/json"
"github.com/hashicorp/memberlist"
"github.com/wind-c/comqtt/v2/cluster/log/zero"
mqtt "github.com/wind-c/comqtt/v2/mqtt"
"sync"
"time"

"github.com/hashicorp/memberlist"
"github.com/wind-c/comqtt/v2/cluster/log"
mqtt "github.com/wind-c/comqtt/v2/mqtt"
)

// Maximum number of messages to be held in the queue.
Expand Down Expand Up @@ -126,7 +127,7 @@ func (d *Delegate) handleQueueDepth() {
case <-time.After(15 * time.Minute):
n := d.Broadcasts.NumQueued()
if n > maxQueueSize {
zero.Info().Int("current", n).Int("limit", maxQueueSize).Msg("delete messages")
log.Info("delete messages", "current", n, "limit", maxQueueSize)
d.Broadcasts.Prune(maxQueueSize)
}
}
Expand Down
4 changes: 2 additions & 2 deletions cluster/discovery/mlist/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package mlist
import (
"github.com/hashicorp/memberlist"
"github.com/wind-c/comqtt/v2/cluster/discovery"
"github.com/wind-c/comqtt/v2/cluster/log/zero"
"github.com/wind-c/comqtt/v2/cluster/log"
)

//type Event struct {
Expand Down Expand Up @@ -50,5 +50,5 @@ func (n *NodeEvents) NotifyUpdate(node *memberlist.Node) {
}

func onLog(node *memberlist.Node, prompt string) {
zero.Info().Str("node", node.Name).Str("addr", node.Addr.String()).Msg(prompt)
log.Info(prompt, "node", node.Name, "addr", node.Addr.String())
}
15 changes: 8 additions & 7 deletions cluster/discovery/mlist/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
package mlist

import (
"net"
"time"

"github.com/hashicorp/memberlist"
mb "github.com/wind-c/comqtt/v2/cluster/discovery"
"github.com/wind-c/comqtt/v2/cluster/log/zero"
"github.com/wind-c/comqtt/v2/cluster/log"
"github.com/wind-c/comqtt/v2/config"
"github.com/wind-c/comqtt/v2/mqtt"
"net"
"time"
)

type Membership struct {
Expand All @@ -25,7 +26,7 @@ type Membership struct {

func wrapOptions(conf *config.Cluster) *memberlist.Config {
opts := make([]Option, 3)
opts[0] = WithLogOutput(zero.Logger(), LogLevelInfo) //Used to filter memberlist logs
opts[0] = WithLogOutput(log.Writer(), LogLevelInfo) //Used to filter memberlist logs
opts[1] = WithBindPort(conf.BindPort)
opts[2] = WithHandoffQueueDepth(conf.QueueDepth)
if conf.NodeName != "" {
Expand Down Expand Up @@ -62,7 +63,7 @@ func (m *Membership) Setup() error {
return err
}
}
zero.Info().Str("addr", m.LocalAddr()).Int("port", m.config.BindPort).Msg("local member")
log.Info("local member", "addr", m.LocalAddr(), "port", m.config.BindPort)

return nil
}
Expand Down Expand Up @@ -153,7 +154,7 @@ func (m *Membership) SendToOthers(msg []byte) {
continue // skip self
}
if err := m.send(node, msg); err != nil {
zero.Error().Err(err).Str("from", m.config.NodeName).Str("to", node.Name).Msg("send to others")
log.Error("send to others", "error", err, "from", m.config.NodeName, "to", node.Name)
}
}
}
Expand All @@ -163,7 +164,7 @@ func (m *Membership) SendToNode(nodeName string, msg []byte) error {
for _, node := range m.aliveMembers() {
if node.Name == nodeName {
if err := m.send(node, msg); err != nil {
zero.Error().Err(err).Str("from", m.config.NodeName).Str("to", nodeName).Msg("send to node")
log.Error("send to others", "error", err, "from", m.config.NodeName, "to", nodeName)
return err
}
}
Expand Down
28 changes: 10 additions & 18 deletions cluster/discovery/serf/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@
package serf

import (
"strconv"

"github.com/hashicorp/logutils"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
mb "github.com/wind-c/comqtt/v2/cluster/discovery"
"github.com/wind-c/comqtt/v2/cluster/log/zero"
"github.com/wind-c/comqtt/v2/cluster/log"
"github.com/wind-c/comqtt/v2/config"
"github.com/wind-c/comqtt/v2/mqtt"
"io"
"os"
"strconv"
)

const (
Expand Down Expand Up @@ -42,16 +41,11 @@ func wrapOptions(conf *config.Cluster, ech chan serf.Event) *serf.Config {
if conf.QueueDepth != 0 {
config.MaxQueueDepth = conf.QueueDepth
}
var logger io.Writer
if zero.Logger() != nil {
logger = zero.Logger()
} else {
logger = os.Stderr
}

filter := &logutils.LevelFilter{
Levels: []logutils.LogLevel{LogLevelDebug, LogLevelWarn, LogLevelError, LogLevelInfo},
MinLevel: logutils.LogLevel(LogLevelError),
Writer: logger,
Writer: log.Writer(),
}
config.MemberlistConfig.LogOutput = filter
config.LogOutput = filter
Expand Down Expand Up @@ -93,8 +87,7 @@ func (m *Membership) Setup() (err error) {
}
}
}
zero.Info().Str("addr", m.LocalAddr()).Int("port", m.config.BindPort).Msg("local member")

log.Info("local member", "addr", m.LocalAddr(), "port", m.config.BindPort)
return
}

Expand Down Expand Up @@ -125,11 +118,11 @@ func (m *Membership) Stat() map[string]int64 {
func (m *Membership) Stop() {
err := m.serf.Leave()
if err != nil {
zero.Error().Err(err).Msg("serf leave")
log.Error("serf leave", "error", err)
}
err = m.serf.Shutdown()
if err != nil {
zero.Error().Err(err).Msg("serf shutdown")
log.Error("serf shutdown", "error", err)
}
// this shuts down the event loop, note that this can't be called multiple times
// if we need to do so, we could use a bool, sync.Once or recover from the panic
Expand Down Expand Up @@ -189,7 +182,6 @@ func (m *Membership) eventLoop() {
continue
}
m.msgCh <- ue.Payload
//zero.Info().Str("name", ue.Name).Msg("serf message")
case serf.EventQuery:
q := e.(*serf.Query)
if q.SourceNode() == m.config.NodeName {
Expand Down Expand Up @@ -217,7 +209,7 @@ func (m *Membership) SendToNode(nodeName string, msg []byte) error {
qp := m.serf.DefaultQueryParams()
qp.FilterNodes = m.otherNames(nodeName)
if _, err := m.serf.Query(m.config.NodeName, msg, qp); err != nil {
zero.Error().Err(err).Str("from", m.config.NodeName).Str("to", nodeName).Msg("send to node")
log.Error("send to node", "error", err, "from", m.config.NodeName, "to", nodeName)
return err
}

Expand All @@ -229,7 +221,7 @@ func (m *Membership) Broadcast(msg []byte) {
}

func onLog(node *serf.Member, prompt string) {
zero.Info().Str("node", node.Name).Str("addr", node.Addr.String()).Msg(prompt)
log.Info(prompt, "node", node.Name, "addr", node.Addr.String())
}

func (m *Membership) isLocal(member serf.Member) bool {
Expand Down
Loading