Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Add Route Reconciliation #1749

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ docker-login: ## Logs into a docker registry using {DOCKER,QUAY}_{USERNAME,PASSW

push: container docker-login ## Pushes a Docker container image to a registry.
@echo Starting kube-router container image push.
$(DOCKER) push "$(REGISTRY_DEV):$(IMG_TAG)"
$(DOCKER) push "$(REGISTRY_DEV):$(subst /,,$(IMG_TAG))"
@echo Finished kube-router container image push.

push-manifest:
Expand Down
4 changes: 2 additions & 2 deletions pkg/bgp/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strconv"
"strings"

"github.com/cloudnativelabs/kube-router/v2/pkg/utils"
"github.com/cloudnativelabs/kube-router/v2/pkg"
gobgp "github.com/osrg/gobgp/v3/pkg/packet/bgp"
)

Expand All @@ -21,7 +21,7 @@ const (
// GenerateRouterID will generate a router ID based upon the user's configuration (or lack there of) and the node's
// primary IP address if the user has not specified. If the user has configured the router ID as "generate" then we
// will generate a router ID based upon fnv hashing the node's primary IP address.
func GenerateRouterID(nodeIPAware utils.NodeIPAware, configRouterID string) (string, error) {
func GenerateRouterID(nodeIPAware pkg.NodeIPAware, configRouterID string) (string, error) {
switch {
case configRouterID == "generate":
h := fnv.New32a()
Expand Down
55 changes: 55 additions & 0 deletions pkg/bgp/path.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package bgp

import (
"github.com/cloudnativelabs/kube-router/v2/pkg"
gobgpapi "github.com/osrg/gobgp/v3/api"
"k8s.io/klog/v2"
)

type PathHandler struct {
PeerLister PeerLister
RouteInjector pkg.RouteInjector
RouteSyncer pkg.RouteSyncer
TunnelCleaner pkg.TunnelCleaner
}

func (ph *PathHandler) Changed(path *gobgpapi.Path) error {
klog.V(2).Infof("Path Looks Like: %s", path.String())
dst, nextHop, err := ParsePath(path)
if err != nil {
return err
}
tunnelName := ph.TunnelCleaner.GenerateTunnelName(nextHop.String())

// If we've made it this far, then it is likely that the node is holding a destination route for this path already.
// If the path we've received from GoBGP is a withdrawal, we should clean up any lingering routes that may exist
// on the host (rather than creating a new one or updating an existing one), and then return.
if path.IsWithdraw {
klog.V(2).Infof("Removing route: '%s via %s' from peer in the routing table", dst, nextHop)

// The path might be withdrawn because the peer became unestablished or it may be withdrawn because just the
// path was withdrawn. Check to see if the peer is still established before deciding whether to clean the
// tunnel and tunnel routes or whether to just delete the destination route.
peerEstablished, err := IsPeerEstablished(ph.PeerLister, nextHop.String())
if err != nil {
klog.Errorf("encountered error while checking peer status: %v", err)
}
if err == nil && !peerEstablished {
klog.V(1).Infof("Peer '%s' was not found any longer, removing tunnel and routes",
nextHop.String())
// Also delete route from state map so that it doesn't get re-synced after deletion
ph.RouteSyncer.DelInjectedRoute(dst)
ph.TunnelCleaner.CleanupTunnel(dst, tunnelName)
return nil
}

// Also delete route from state map so that it doesn't get re-synced after deletion
ph.RouteSyncer.DelInjectedRoute(dst)
return nil
}

// If this is not a withdraw, then we need to process the route. This takes care of creating any necessary tunnels,
// and adding any necessary host routes depending on the user's config
_, err = ph.RouteInjector.InjectRoute(dst, nextHop)
return err
}
27 changes: 27 additions & 0 deletions pkg/bgp/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package bgp

import (
"context"
"fmt"

api "github.com/osrg/gobgp/v3/api"
)

type PeerLister interface {
ListPeer(ctx context.Context, r *api.ListPeerRequest, fn func(*api.Peer)) error
}

func IsPeerEstablished(pl PeerLister, peerIP string) (bool, error) {
var peerConnected bool
peerFunc := func(peer *api.Peer) {
if peer.Conf.NeighborAddress == peerIP && peer.State.SessionState == api.PeerState_ESTABLISHED {
peerConnected = true
}
}
err := pl.ListPeer(context.Background(), &api.ListPeerRequest{Address: peerIP}, peerFunc)
if err != nil {
return false, fmt.Errorf("unable to list peers to see if tunnel & routes need to be removed: %v", err)
}

return peerConnected, nil
}
3 changes: 2 additions & 1 deletion pkg/cmd/kube-router.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"syscall"
"time"

"github.com/cloudnativelabs/kube-router/v2/pkg"
"github.com/cloudnativelabs/kube-router/v2/pkg/controllers/lballoc"
"github.com/cloudnativelabs/kube-router/v2/pkg/controllers/netpol"
"github.com/cloudnativelabs/kube-router/v2/pkg/controllers/proxy"
Expand Down Expand Up @@ -85,7 +86,7 @@ func (kr *KubeRouter) Run() error {
os.Exit(0)
}

healthChan := make(chan *healthcheck.ControllerHeartbeat, healthControllerChannelLength)
healthChan := make(chan *pkg.ControllerHeartbeat, healthControllerChannelLength)
defer close(healthChan)
stopCh := make(chan struct{})

Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/lballoc/lballoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/cloudnativelabs/kube-router/v2/pkg"
"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/v2/pkg/options"
v1core "k8s.io/api/core/v1"
Expand Down Expand Up @@ -432,7 +433,7 @@ func (lbc *LoadBalancerController) allocator() {
}
}

func (lbc *LoadBalancerController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat,
func (lbc *LoadBalancerController) Run(healthChan chan<- *pkg.ControllerHeartbeat,
stopCh <-chan struct{}, wg *sync.WaitGroup) {
isLeader := false
isLeaderChan := make(chan bool)
Expand Down Expand Up @@ -461,7 +462,7 @@ func (lbc *LoadBalancerController) Run(healthChan chan<- *healthcheck.Controller
}
case <-timer.C:
timer.Reset(time.Minute)
healthcheck.SendHeartBeat(healthChan, "LBC")
healthcheck.SendHeartBeat(healthChan, pkg.HeartBeatCompLoadBalancerController)
if isLeader {
go lbc.walkServices()
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/controllers/netpol/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/cloudnativelabs/kube-router/v2/pkg"
"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/v2/pkg/metrics"
"github.com/cloudnativelabs/kube-router/v2/pkg/options"
Expand Down Expand Up @@ -63,15 +64,15 @@ var (

// NetworkPolicyController struct to hold information required by NetworkPolicyController
type NetworkPolicyController struct {
krNode utils.NodeIPAndFamilyAware
krNode pkg.NodeIPAndFamilyAware
serviceClusterIPRanges []net.IPNet
serviceExternalIPRanges []net.IPNet
serviceLoadBalancerIPRanges []net.IPNet
serviceNodePortRange string
mu sync.Mutex
syncPeriod time.Duration
MetricsEnabled bool
healthChan chan<- *healthcheck.ControllerHeartbeat
healthChan chan<- *pkg.ControllerHeartbeat
fullSyncRequestChan chan struct{}
ipsetMutex *sync.Mutex

Expand Down Expand Up @@ -153,7 +154,7 @@ type protocol2eps map[string]numericPort2eps
type namedPort2eps map[string]protocol2eps

// Run runs forever till we receive notification on stopCh
func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{},
func (npc *NetworkPolicyController) Run(healthChan chan<- *pkg.ControllerHeartbeat, stopCh <-chan struct{},
wg *sync.WaitGroup) {
t := time.NewTicker(npc.syncPeriod)
defer t.Stop()
Expand Down Expand Up @@ -241,7 +242,7 @@ func (npc *NetworkPolicyController) fullPolicySync() {
}
}

healthcheck.SendHeartBeat(npc.healthChan, "NPC")
healthcheck.SendHeartBeat(npc.healthChan, pkg.HeartBeatCompNetworkPolicyController)
start := time.Now()
syncVersion := strconv.FormatInt(start.UnixNano(), syncVersionBase)
defer func() {
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/proxy/hairpin_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/cloudnativelabs/kube-router/v2/pkg"
"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/v2/pkg/utils"
"github.com/vishvananda/netns"
Expand All @@ -27,7 +28,7 @@ type hairpinController struct {
}

func (hpc *hairpinController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup,
healthChan chan<- *healthcheck.ControllerHeartbeat) {
healthChan chan<- *pkg.ControllerHeartbeat) {
defer wg.Done()
klog.Infof("Starting hairping controller (handles setting hairpin_mode for veth interfaces)")

Expand All @@ -54,7 +55,7 @@ func (hpc *hairpinController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup,
endpointIP, err)
}
case <-t.C:
healthcheck.SendHeartBeat(healthChan, "HPC")
healthcheck.SendHeartBeat(healthChan, pkg.HeartBeatCompHairpinController)
}
}
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/controllers/proxy/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"syscall"
"time"

"github.com/cloudnativelabs/kube-router/v2/pkg"
"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/v2/pkg/metrics"
"github.com/cloudnativelabs/kube-router/v2/pkg/options"
Expand Down Expand Up @@ -108,7 +109,7 @@ const (

// NetworkServicesController struct stores information needed by the controller
type NetworkServicesController struct {
krNode utils.NodeAware
krNode pkg.NodeAware
syncPeriod time.Duration
mu sync.Mutex
serviceMap serviceInfoMap
Expand Down Expand Up @@ -226,7 +227,7 @@ type endpointSliceInfo struct {
type endpointSliceInfoMap map[string][]endpointSliceInfo

// Run periodically sync ipvs configuration to reflect desired state of services and endpoints
func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat,
func (nsc *NetworkServicesController) Run(healthChan chan<- *pkg.ControllerHeartbeat,
stopCh <-chan struct{}, wg *sync.WaitGroup) {
t := time.NewTicker(nsc.syncPeriod)
defer t.Stop()
Expand Down Expand Up @@ -340,7 +341,7 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control
}

case perform := <-nsc.syncChan:
healthcheck.SendHeartBeat(healthChan, "NSC")
healthcheck.SendHeartBeat(healthChan, pkg.HeartBeatCompNetworkServicesController)
switch perform {
case synctypeAll:
klog.V(1).Info("Performing requested full sync of services")
Expand All @@ -365,18 +366,18 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control
nsc.mu.Unlock()
}
if err == nil {
healthcheck.SendHeartBeat(healthChan, "NSC")
healthcheck.SendHeartBeat(healthChan, pkg.HeartBeatCompNetworkServicesController)
}

case <-t.C:
klog.V(1).Info("Performing periodic sync of ipvs services")
healthcheck.SendHeartBeat(healthChan, "NSC")
healthcheck.SendHeartBeat(healthChan, pkg.HeartBeatCompNetworkServicesController)
err := nsc.doSync()
if err != nil {
klog.Errorf("Error during periodic ipvs sync in network service controller. Error: " + err.Error())
klog.Errorf("Skipping sending heartbeat from network service controller as periodic sync failed.")
} else {
healthcheck.SendHeartBeat(healthChan, "NSC")
healthcheck.SendHeartBeat(healthChan, pkg.HeartBeatCompNetworkServicesController)
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/controllers/routing/bgp_policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cloudnativelabs/kube-router/v2/pkg/utils"
gobgpapi "github.com/osrg/gobgp/v3/api"
gobgp "github.com/osrg/gobgp/v3/pkg/server"
"github.com/stretchr/testify/assert"
)

type PolicyTestCase struct {
Expand Down Expand Up @@ -1457,10 +1458,11 @@ func Test_AddPolicies(t *testing.T) {
}

err := testcase.nrc.startBgpServer(false)
if !reflect.DeepEqual(err, testcase.startBGPServerErr) {
t.Logf("expected err when invoking startBGPServer(): %v", testcase.startBGPServerErr)
t.Logf("actual err from startBGPServer() received: %v", err)
t.Fatal("unexpected error")
if testcase.startBGPServerErr != nil && err == nil {
t.Errorf("expected error when starting BGP server, got nil on testcase: %s", testcase.name)
}
if err != nil {
assert.EqualError(t, testcase.startBGPServerErr, err.Error())
}
// If the server was not expected to start we should stop here as the rest of the tests are unimportant
if testcase.startBGPServerErr != nil {
Expand Down
7 changes: 4 additions & 3 deletions pkg/controllers/routing/ecmp_vip.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ func getServiceObject(obj interface{}) (svc *v1core.Service) {
}

func (nrc *NetworkRoutingController) handleServiceUpdate(svcOld, svcNew *v1core.Service) {
klog.V(2).Infof("Handling update for service: %s", svcNew)
if !nrc.bgpServerStarted {
klog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync",
klog.V(1).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync",
svcNew.Namespace, svcNew.Name)
return
}
Expand All @@ -156,9 +157,9 @@ func (nrc *NetworkRoutingController) handleServiceUpdate(svcOld, svcNew *v1core.
}

func (nrc *NetworkRoutingController) handleServiceDelete(oldSvc *v1core.Service) {

klog.V(2).Infof("Handling delete for service: %s", oldSvc)
if !nrc.bgpServerStarted {
klog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync",
klog.V(1).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync",
oldSvc.Namespace, oldSvc.Name)
return
}
Expand Down
Loading