Skip to content

Commit

Permalink
Allow Node SNAT for Static Egress case
Browse files Browse the repository at this point in the history
Implemented best effort scenario, where in case of
static Egress also, if there is no egress node then
the packets will be sent using normal Node SNAT, as
in case of dynamic Egress.

Signed-off-by: Pulkit Jain <[email protected]>
  • Loading branch information
jainpulkit22 committed Dec 13, 2024
1 parent 9a1a089 commit afe7285
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 27 deletions.
64 changes: 37 additions & 27 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ var emptyWatch = watch.NewEmptyWatch()

var newIPAssigner = ipassigner.NewIPAssigner

var egressNodeAvailability = hasEgressNode

// egressState keeps the actual state of an Egress that has been realized.
type egressState struct {
// The actual egress IP of the Egress. If it's different from the desired IP, there is an update to EgressIP, and we
Expand Down Expand Up @@ -989,6 +991,12 @@ func (c *EgressController) updateEgressStatus(egress *crdv1b1.Egress, egressIP s
return nil
}

func hasEgressNode(egress *crdv1b1.Egress) bool {
if egress.Status.EgressNode == "" {
return false
}
return true
}
func (c *EgressController) syncEgress(egressName string) error {
startTime := time.Now()
defer func() {
Expand Down Expand Up @@ -1118,39 +1126,41 @@ func (c *EgressController) syncEgress(egressName string) error {
}()

egressIP := net.ParseIP(eState.egressIP)
// Install SNAT flows for desired Pods.
for pod := range pods {
eState.pods.Insert(pod)
stalePods.Delete(pod)
if egressNodeAvailability(egress) {
// Install SNAT flows for desired Pods.
for pod := range pods {
eState.pods.Insert(pod)
stalePods.Delete(pod)

// If the Egress is not the effective one for the Pod, do nothing.
if !c.bindPodEgress(pod, egressName) {
continue
}

// If the Egress is not the effective one for the Pod, do nothing.
if !c.bindPodEgress(pod, egressName) {
continue
}
// Get the Pod's openflow port.
parts := strings.Split(pod, "/")
podNamespace, podName := parts[0], parts[1]
ifaces := c.ifaceStore.GetContainerInterfacesByPod(podName, podNamespace)
if len(ifaces) == 0 {
klog.Infof("Interfaces of Pod %s/%s not found", podNamespace, podName)
continue
}

// Get the Pod's openflow port.
parts := strings.Split(pod, "/")
podNamespace, podName := parts[0], parts[1]
ifaces := c.ifaceStore.GetContainerInterfacesByPod(podName, podNamespace)
if len(ifaces) == 0 {
klog.Infof("Interfaces of Pod %s/%s not found", podNamespace, podName)
continue
ofPort := ifaces[0].OFPort
if eState.ofPorts.Has(ofPort) {
staleOFPorts.Delete(ofPort)
continue
}
if err := c.ofClient.InstallPodSNATFlows(uint32(ofPort), egressIP, mark); err != nil {
return err
}
eState.ofPorts.Insert(ofPort)
}

ofPort := ifaces[0].OFPort
if eState.ofPorts.Has(ofPort) {
staleOFPorts.Delete(ofPort)
continue
}
if err := c.ofClient.InstallPodSNATFlows(uint32(ofPort), egressIP, mark); err != nil {
// Uninstall SNAT flows for stale Pods.
if err := c.uninstallPodFlows(egressName, eState, staleOFPorts, stalePods); err != nil {
return err
}
eState.ofPorts.Insert(ofPort)
}

// Uninstall SNAT flows for stale Pods.
if err := c.uninstallPodFlows(egressName, eState, staleOFPorts, stalePods); err != nil {
return err
}
return nil
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,15 @@ func TestSyncEgress(t *testing.T) {
},
},
}

egressNodeAvailability = func(egress *crdv1b1.Egress) bool {
return true
}

defer func() {
egressNodeAvailability = hasEgressNode
}()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
initObjects := []runtime.Object{tt.existingEgress}
Expand Down Expand Up @@ -1195,6 +1204,15 @@ func TestPodUpdateShouldSyncEgress(t *testing.T) {
{Pod: &cpv1b2.PodReference{Name: "pendingPod", Namespace: "ns1"}},
},
}

egressNodeAvailability = func(egress *crdv1b1.Egress) bool {
return true
}

defer func() {
egressNodeAvailability = hasEgressNode
}()

c := newFakeController(t, []runtime.Object{egress})
stopCh := make(chan struct{})
defer close(stopCh)
Expand Down Expand Up @@ -1327,6 +1345,15 @@ func TestSyncOverlappingEgress(t *testing.T) {
{Pod: &cpv1b2.PodReference{Name: "pod4", Namespace: "ns4"}},
},
}

egressNodeAvailability = func(egress *crdv1b1.Egress) bool {
return true
}

defer func() {
egressNodeAvailability = hasEgressNode
}()

c := newFakeController(t, []runtime.Object{egress1, egress2, egress3})
stopCh := make(chan struct{})
defer close(stopCh)
Expand Down

0 comments on commit afe7285

Please sign in to comment.