Skip to content

Commit

Permalink
gossip map fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Apr 8, 2024
1 parent 70c8559 commit 2a5372b
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 31 deletions.
60 changes: 30 additions & 30 deletions examples/cluster-gossip/node1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func main() {

func coloredConsoleLogging(system *actor.ActorSystem) *slog.Logger {
return slog.New(tint.NewHandler(os.Stdout, &tint.Options{
Level: slog.LevelInfo,
Level: slog.LevelError,
TimeFormat: time.RFC3339,
AddSource: true,
})).With("lib", "Proto.Actor").
Expand All @@ -56,35 +56,35 @@ func coloredConsoleLogging(system *actor.ActorSystem) *slog.Logger {
func startNode() *cluster.Cluster {
system := actor.NewActorSystem(actor.WithLoggerFactory(coloredConsoleLogging))

//system.EventStream.Subscribe(func(evt interface{}) {
// switch msg := evt.(type) {
//
// //subscribe to Cluster Topology changes
// case *cluster.ClusterTopology:
// fmt.Printf("\nClusterTopology %v\n\n", msg)
//
// //subscribe to Gossip updates, specifically MemberHeartbeat
// case *cluster.GossipUpdate:
// if msg.Key != "heartbeat" {
// return
// }
//
// heartbeat := &cluster.MemberHeartbeat{}
//
// fmt.Printf("Member %v\n", msg.MemberID)
// fmt.Printf("Sequence Number %v\n", msg.SeqNumber)
//
// unpackErr := msg.Value.UnmarshalTo(heartbeat)
// if unpackErr != nil {
// fmt.Printf("Unpack error %v\n", unpackErr)
// } else {
// //loop over as.ActorCount map
// for k, v := range heartbeat.ActorStatistics.ActorCount {
// fmt.Printf("ActorCount %v %v\n", k, v)
// }
// }
// }
//})
system.EventStream.Subscribe(func(evt interface{}) {
switch msg := evt.(type) {

//subscribe to Cluster Topology changes
case *cluster.ClusterTopology:
fmt.Printf("\nClusterTopology %v\n\n", msg)

//subscribe to Gossip updates, specifically MemberHeartbeat
case *cluster.GossipUpdate:
if msg.Key != "heartbeat" {
return
}

heartbeat := &cluster.MemberHeartbeat{}

fmt.Printf("Member %v\n", msg.MemberID)
fmt.Printf("Sequence Number %v\n", msg.SeqNumber)

unpackErr := msg.Value.UnmarshalTo(heartbeat)
if unpackErr != nil {
fmt.Printf("Unpack error %v\n", unpackErr)
} else {
//loop over as.ActorCount map
for k, v := range heartbeat.ActorStatistics.ActorCount {
fmt.Printf("ActorCount %v %v\n", k, v)
}
}
}
})

provider, _ := consul.New()
lookup := disthash.New()
Expand Down
18 changes: 17 additions & 1 deletion examples/cluster-gossip/node2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func main() {

func coloredConsoleLogging(system *actor.ActorSystem) *slog.Logger {
return slog.New(tint.NewHandler(os.Stdout, &tint.Options{
Level: slog.LevelDebug,
Level: slog.LevelError,
TimeFormat: time.RFC3339,
AddSource: true,
})).With("lib", "Proto.Actor").
Expand All @@ -37,6 +37,22 @@ func coloredConsoleLogging(system *actor.ActorSystem) *slog.Logger {

func startNode() *cluster.Cluster {
system := actor.NewActorSystem(actor.WithLoggerFactory(coloredConsoleLogging))
system.EventStream.Subscribe(func(evt interface{}) {
switch msg := evt.(type) {

//subscribe to Cluster Topology changes
case *cluster.ClusterTopology:
fmt.Printf("\nClusterTopology %v\n\n", msg)

//subscribe to Gossip updates, specifically MemberHeartbeat
case *cluster.GossipUpdate:
if msg.Key != "someGossipEntry" {
return
}

fmt.Printf("GossipUpdate %v\n", msg)
}
})

provider, _ := consul.New()
lookup := disthash.New()
Expand Down

0 comments on commit 2a5372b

Please sign in to comment.