From 2a5372b5b465b3bb030dd26086cb5840465e7354 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Mon, 8 Apr 2024 20:08:28 +0200 Subject: [PATCH] gossip map fixes --- examples/cluster-gossip/node1/main.go | 60 +++++++++++++-------------- examples/cluster-gossip/node2/main.go | 18 +++++++- 2 files changed, 47 insertions(+), 31 deletions(-) diff --git a/examples/cluster-gossip/node1/main.go b/examples/cluster-gossip/node1/main.go index 8d612ba5..6e10ce1d 100644 --- a/examples/cluster-gossip/node1/main.go +++ b/examples/cluster-gossip/node1/main.go @@ -46,7 +46,7 @@ func main() { func coloredConsoleLogging(system *actor.ActorSystem) *slog.Logger { return slog.New(tint.NewHandler(os.Stdout, &tint.Options{ - Level: slog.LevelInfo, + Level: slog.LevelError, TimeFormat: time.RFC3339, AddSource: true, })).With("lib", "Proto.Actor"). @@ -56,35 +56,35 @@ func coloredConsoleLogging(system *actor.ActorSystem) *slog.Logger { func startNode() *cluster.Cluster { system := actor.NewActorSystem(actor.WithLoggerFactory(coloredConsoleLogging)) - //system.EventStream.Subscribe(func(evt interface{}) { - // switch msg := evt.(type) { - // - // //subscribe to Cluster Topology changes - // case *cluster.ClusterTopology: - // fmt.Printf("\nClusterTopology %v\n\n", msg) - // - // //subscribe to Gossip updates, specifically MemberHeartbeat - // case *cluster.GossipUpdate: - // if msg.Key != "heartbeat" { - // return - // } - // - // heartbeat := &cluster.MemberHeartbeat{} - // - // fmt.Printf("Member %v\n", msg.MemberID) - // fmt.Printf("Sequence Number %v\n", msg.SeqNumber) - // - // unpackErr := msg.Value.UnmarshalTo(heartbeat) - // if unpackErr != nil { - // fmt.Printf("Unpack error %v\n", unpackErr) - // } else { - // //loop over as.ActorCount map - // for k, v := range heartbeat.ActorStatistics.ActorCount { - // fmt.Printf("ActorCount %v %v\n", k, v) - // } - // } - // } - //}) + system.EventStream.Subscribe(func(evt interface{}) { + switch msg := evt.(type) { + + //subscribe to Cluster Topology changes + case *cluster.ClusterTopology: + fmt.Printf("\nClusterTopology %v\n\n", msg) + + //subscribe to Gossip updates, specifically MemberHeartbeat + case *cluster.GossipUpdate: + if msg.Key != "heartbeat" { + return + } + + heartbeat := &cluster.MemberHeartbeat{} + + fmt.Printf("Member %v\n", msg.MemberID) + fmt.Printf("Sequence Number %v\n", msg.SeqNumber) + + unpackErr := msg.Value.UnmarshalTo(heartbeat) + if unpackErr != nil { + fmt.Printf("Unpack error %v\n", unpackErr) + } else { + //loop over as.ActorCount map + for k, v := range heartbeat.ActorStatistics.ActorCount { + fmt.Printf("ActorCount %v %v\n", k, v) + } + } + } + }) provider, _ := consul.New() lookup := disthash.New() diff --git a/examples/cluster-gossip/node2/main.go b/examples/cluster-gossip/node2/main.go index 1043295c..d6498a6c 100644 --- a/examples/cluster-gossip/node2/main.go +++ b/examples/cluster-gossip/node2/main.go @@ -28,7 +28,7 @@ func main() { func coloredConsoleLogging(system *actor.ActorSystem) *slog.Logger { return slog.New(tint.NewHandler(os.Stdout, &tint.Options{ - Level: slog.LevelDebug, + Level: slog.LevelError, TimeFormat: time.RFC3339, AddSource: true, })).With("lib", "Proto.Actor"). @@ -37,6 +37,22 @@ func coloredConsoleLogging(system *actor.ActorSystem) *slog.Logger { func startNode() *cluster.Cluster { system := actor.NewActorSystem(actor.WithLoggerFactory(coloredConsoleLogging)) + system.EventStream.Subscribe(func(evt interface{}) { + switch msg := evt.(type) { + + //subscribe to Cluster Topology changes + case *cluster.ClusterTopology: + fmt.Printf("\nClusterTopology %v\n\n", msg) + + //subscribe to Gossip updates, specifically MemberHeartbeat + case *cluster.GossipUpdate: + if msg.Key != "someGossipEntry" { + return + } + + fmt.Printf("GossipUpdate %v\n", msg) + } + }) provider, _ := consul.New() lookup := disthash.New()