Skip to content

Commit

Permalink
More reliable instance sync
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Nov 22, 2024
1 parent 3cb12bf commit 9863697
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 4 deletions.
57 changes: 54 additions & 3 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ import (
"math"
"os"
"os/signal"
"sort"
"strconv"
"strings"
"sync"
"syscall"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/rabbitmq/omq/pkg/common"
"github.com/rabbitmq/omq/pkg/config"
"github.com/rabbitmq/omq/pkg/log"
Expand Down Expand Up @@ -250,7 +256,7 @@ func RootCmd() *cobra.Command {
rootCmd.PersistentFlags().StringArrayVar(&amqpAppPropertyFilters, "amqp-app-property-filter", []string{}, "AMQP application property filters, eg. key1=$p:prefix")
rootCmd.PersistentFlags().StringArrayVar(&amqpPropertyFilters, "amqp-property-filter", []string{}, "AMQP property filters, eg. key1=$p:prefix")
rootCmd.PersistentFlags().IntVar(&cfg.ExpectedInstances, "expected-instances", 1, "The number of instances to synchronize")
rootCmd.PersistentFlags().StringVar(&cfg.SyncName, "expected-instances-dns", "", "The DNS name that will return members to synchronize with")
rootCmd.PersistentFlags().StringVar(&cfg.SyncName, "expected-instances-endpoint", "", "The DNS name that will return members to synchronize with")

rootCmd.AddCommand(amqp_amqp)
rootCmd.AddCommand(amqp_stomp)
Expand Down Expand Up @@ -352,18 +358,57 @@ func join_cluster(expectedInstance int, serviceName string) {
}

if serviceName == "" {
log.Error("when --expected-instances is set, --expected-instances-dns must be set")
log.Error("when --expected-instances is set, --expected-instances-endpoint must be set")
os.Exit(1)
}

// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}

namespace := os.Getenv("MY_POD_NAMESPACE")
var endpoints *v1.Endpoints
var nodeCount int
for {
// wait until endpoints returns the expected number of instances
log.Info("getting endpoints", "name", serviceName)
endpoints, err = clientset.CoreV1().Endpoints(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
if err != nil || len(endpoints.Subsets) == 0 {
log.Error("failed to retrieve endpoints; retrying...", "name", serviceName, "error", err)
time.Sleep(time.Second)
continue
}
nodeCount = len(endpoints.Subsets[0].Addresses)
if nodeCount >= expectedInstance {
log.Info("reached the expected number of instances", "expected instances", expectedInstance, "current instances", nodeCount)
break
}
log.Info("waiting for the expected number of IPs to be returned from the endpoint", "exepcted", expectedInstance, "current", nodeCount)
time.Sleep(time.Second)
}

ips := make([]string, len(endpoints.Subsets[0].Addresses))
for i, node := range endpoints.Subsets[0].Addresses {
ips[i] = node.IP
}
ips = sort.StringSlice(ips)

Check failure on line 401 in cmd/root.go

View workflow job for this annotation

GitHub Actions / Lint

SA4029: sort.StringSlice is a type, not a function, and sort.StringSlice(ips) doesn't sort your values; consider using sort.Strings instead (staticcheck)

log.Info("IPs found", "all", ips, "selected", ips[0])
list, err := memberlist.Create(memberlist.DefaultLANConfig())
if err != nil {
panic("Failed to create memberlist: " + err.Error())
}

// join the cluster
for {
_, err = list.Join([]string{serviceName})
_, err = list.Join([]string{ips[0]})
if err == nil {
break
}
Expand All @@ -382,6 +427,12 @@ func join_cluster(expectedInstance int, serviceName string) {
time.Sleep(time.Second)
}

go func() {
time.Sleep(30 * time.Second)
log.Info("leaving the cluster")
_ = list.Leave(time.Second)
_ = list.Shutdown()
}()
}

func startConsumers(ctx context.Context, consumerProto config.Protocol, wg *sync.WaitGroup) {
Expand Down
34 changes: 33 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,21 @@ require (
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/charmbracelet/lipgloss v1.0.0 // indirect
github.com/charmbracelet/x/ansi v0.5.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.4 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.1.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20241101162523-b92577c0c142 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
Expand All @@ -39,27 +49,49 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-sockaddr v1.0.7 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/miekg/dns v1.1.62 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/muesli/termenv v0.15.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/streadway/amqp v1.1.0 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/net v0.31.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.9.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/term v0.26.0 // indirect
golang.org/x/text v0.20.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.27.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.31.3 // indirect
k8s.io/apimachinery v0.31.3 // indirect
k8s.io/client-go v0.31.3 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

require (
Expand Down
Loading

0 comments on commit 9863697

Please sign in to comment.