diff --git a/sets/sets.go b/sets/sets.go index 0343052..6e6bdbc 100644 --- a/sets/sets.go +++ b/sets/sets.go @@ -7,6 +7,7 @@ import ( "sync" "inet.af/netaddr" + v1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" @@ -138,8 +139,6 @@ func NewKubernetesSet(ctx context.Context, f informers.SharedInformerFactory, se port = "5060" } - informer := f.Discovery().V1().EndpointSlices() - s := &kubernetesSet{ id: setID, namespace: namespace, @@ -147,6 +146,8 @@ func NewKubernetesSet(ctx context.Context, f informers.SharedInformerFactory, se port: port, } + informer := f.Discovery().V1().EndpointSlices() + informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: s.addFunc, UpdateFunc: s.updateFunc, @@ -217,7 +218,7 @@ func (s *kubernetesSet) Close() {} func (s *kubernetesSet) State() *State { return &State{ - ID: s.id, + ID: s.id, Endpoints: s.endpoints, } } @@ -245,6 +246,187 @@ func (s *kubernetesSet) IsMember(addr string, port uint32) bool { return false } +// legacyKubernetesSet represents a dispatcher set whose +// data should be derived from Kubernetes. +type legacyKubernetesSet struct { + // id is the dispatch set index for this set + id int + + endpoints []*Endpoint + + // callbacks is the set of functions which should be called when the endpoint membership changes. + callbacks []func(*State) + + // name is the name of the Kubernetes Endpoint List + // from which the dispatcher endpoints should be derived. + name string + + // namespace is the namespace in which the Endpoint + // should be found. + namespace string + + port string + + mu sync.Mutex +} + +// NewLegacyKunbernetesSet returns a new Kubernetes-based dispatcher set using the older Endpoints method. +// If the server is running an older version of Kubernetes which does not support v1.EndpointSlices ( 0 { + if ep.Port != port { + return false + } + } + return true + } + } + return false +} + +func flattenEndpoints(refPort string, epList *v1.Endpoints) (out []*Endpoint, err error) { + parsedPortNumber, err := strconv.Atoi(refPort) + if err != nil { + parsedPortNumber = 0 + } + + portNumber := uint32(parsedPortNumber) + + for _, ss := range epList.Subsets { + + if portNumber == 0 { + for _, p := range ss.Ports { + if p.Name == "" { + continue + } + + if p.Name == refPort { + if p.Port > 0 { + portNumber = uint32(p.Port) + } + } + } + + if portNumber == 0 { + return nil, fmt.Errorf("failed to find port %s in Endpoints %q", refPort, epList.Name) + } + } + + for _, addr := range ss.Addresses { + out = append(out, &Endpoint{ + Address: addr.IP, + Port: portNumber, + }) + } + } + + return out, nil +} + func flattenEndpointSlice(refPort string, epSlice *discoveryv1.EndpointSlice) (out []*Endpoint, err error) { portNumber, err := strconv.Atoi(refPort) if err != nil {