Skip to content

Commit

Permalink
Install Pod flows after PortStatus message is received (#6763)
Browse files Browse the repository at this point in the history
This change introduces a worker in `podConfigurator` to listen for the
OpenFlow PortStatus messages generated when new OpenFlow ports are allocated
in OVS. After receiving a message, the Windows antrea-agent will install Pod-related
OpenFlow entries. If the OpenFlow port is not allocated within 30s after the CmdAdd
request is handled, a K8s event with type "NetworkNotReady" is added on the Pod.
Whenever the Pod networking forwarding rules are installed, a K8s event with type
"NetworkIsReady" is added.

Signed-off-by: Wenying Dong <[email protected]>
  • Loading branch information
wenyingd committed Jan 17, 2025
1 parent 9f52958 commit 75daaa8
Show file tree
Hide file tree
Showing 25 changed files with 1,142 additions and 158 deletions.
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ func run(o *Options) error {
o.config.CNISocket,
o.config.HostProcPathPrefix,
nodeConfig,
localPodInformer.Get(),
k8sClient,
routeClient,
isChaining,
Expand Down
11 changes: 5 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module antrea.io/antrea
go 1.23.0

require (
antrea.io/libOpenflow v0.14.0
antrea.io/ofnet v0.12.0
antrea.io/libOpenflow v0.15.0
antrea.io/ofnet v0.14.0
github.com/ClickHouse/clickhouse-go/v2 v2.6.1
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/Mellanox/sriovnet v1.1.0
Expand Down Expand Up @@ -113,10 +113,9 @@ require (
github.com/aws/smithy-go v1.12.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cenk/hub v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cenkalti/hub v1.0.1 // indirect
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cenkalti/hub v1.0.2 // indirect
github.com/cenkalti/rpc2 v1.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chai2010/gettext-go v1.0.2 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
Expand Down
22 changes: 10 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
antrea.io/libOpenflow v0.14.0 h1:6MS1E52nGQyz/AJ8j3OrotgAc/1ubef5vc5i8ytIxFE=
antrea.io/libOpenflow v0.14.0/go.mod h1:U8YR0ithWrjwLdUUhyeCsYnlKg/mEFjG5CbPNt1V+j0=
antrea.io/ofnet v0.12.0 h1:pgygAsEZJUPK/kGmeuIesDh5hoGLpYeavSLdG/Dp8Ao=
antrea.io/ofnet v0.12.0/go.mod h1:MB3qaInEimj+T8qtpBcIQK+EqeNiY1S/WbUdGk+TzNg=
antrea.io/libOpenflow v0.15.0 h1:wGk+IVCf8piGZgC4+lbf4qfGrJG5ikzfq5Y1T5LzqmI=
antrea.io/libOpenflow v0.15.0/go.mod h1:Mq1JEjYrb6eTVA7qjZRj9plVTKcsLM8wnQ87sLLYuiQ=
antrea.io/ofnet v0.14.0 h1:BGOqg5DdRkvxpBqyoEgWmvGd4EvpacdU/Py1s6qOvSc=
antrea.io/ofnet v0.14.0/go.mod h1:W5JPYFFcRM7tLwsItgmsKqIhtW/QofyIeNsUIecFaBo=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -120,14 +120,12 @@ github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdn
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/cenk/hub v1.0.1 h1:RBwXNOF4a8KjD8BJ08XqN8KbrqaGiQLDrgvUGJSHuPA=
github.com/cenk/hub v1.0.1/go.mod h1:rJM1LNAW0ppT8FMMuPK6c2NP/R2nH/UthtuRySSaf6Y=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/hub v1.0.1 h1:UMtjc6dHSaOQTO15SVA50MBIR9zQwvsukQupDrkIRtg=
github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs=
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa h1:t+iWhuJE2aropY4uxKMVbyP+IJ29o422f7YAd73aTjg=
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/hub v1.0.2 h1:Nqv9TNaA9boeO2wQFW8o87BY3zKthtnzXmWGmJqhAV8=
github.com/cenkalti/hub v1.0.2/go.mod h1:8LAFAZcCasb83vfxatMUnZHRoQcffho2ELpHb+kaTJU=
github.com/cenkalti/rpc2 v1.0.3 h1:OkMsNP/sP9seN1VRCLqhX1xkVGHPoLwWS6fZR14Ji/k=
github.com/cenkalti/rpc2 v1.0.3/go.mod h1:2yfU5b86vOr16+iY1jN3MvT6Kxc9Nf8j5iZWwUf7iaw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
21 changes: 11 additions & 10 deletions hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,17 @@ function generate_antrea_client_code {
--go-header-file hack/boilerplate/license_header.go.txt

$GOPATH/bin/deepcopy-gen \
--input-dirs "${ANTREA_PKG}/pkg/apis/controlplane" \
--input-dirs "${ANTREA_PKG}/pkg/apis/controlplane/v1beta2" \
--input-dirs "${ANTREA_PKG}/pkg/apis/system/v1beta1" \
--input-dirs "${ANTREA_PKG}/pkg/apis/crd/v1alpha1" \
--input-dirs "${ANTREA_PKG}/pkg/apis/crd/v1alpha2" \
--input-dirs "${ANTREA_PKG}/pkg/apis/crd/v1beta1" \
--input-dirs "${ANTREA_PKG}/pkg/apis/stats" \
--input-dirs "${ANTREA_PKG}/pkg/apis/stats/v1alpha1" \
-O zz_generated.deepcopy \
--go-header-file hack/boilerplate/license_header.go.txt
--output-file zz_generated.deepcopy.go \
--go-header-file hack/boilerplate/license_header.go.txt \
"${ANTREA_PKG}/pkg/apis/controlplane" \
"${ANTREA_PKG}/pkg/apis/controlplane/v1beta2" \
"${ANTREA_PKG}/pkg/apis/system/v1beta1" \
"${ANTREA_PKG}/pkg/apis/crd/v1alpha1" \
"${ANTREA_PKG}/pkg/apis/crd/v1alpha2" \
"${ANTREA_PKG}/pkg/apis/crd/v1beta1" \
"${ANTREA_PKG}/pkg/apis/stats" \
"${ANTREA_PKG}/pkg/apis/stats/v1alpha1" \
"${ANTREA_PKG}/pkg/agent/interfacestore"

$GOPATH/bin/conversion-gen \
--input-dirs "${ANTREA_PKG}/pkg/apis/controlplane/v1beta2,${ANTREA_PKG}/pkg/apis/controlplane/" \
Expand Down
155 changes: 146 additions & 9 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,24 @@
package cniserver

import (
"bytes"
"encoding/json"
"fmt"
"net"
"strings"
"sync"
"time"

"antrea.io/libOpenflow/openflow15"
current "github.com/containernetworking/cni/pkg/types/100"
"github.com/containernetworking/cni/pkg/version"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
Expand Down Expand Up @@ -61,6 +69,10 @@ const (

var (
getNSPath = util.GetNSPath
// retryInterval is the interval to re-install Pod OpenFlow entries if any error happened.
// Note, using a variable rather than constant for retryInterval because we may use a shorter time in the
// test code.
retryInterval = 5 * time.Second
)

type podConfigurator struct {
Expand All @@ -76,9 +88,19 @@ type podConfigurator struct {
// isSecondaryNetwork is true if this instance of podConfigurator is used to configure
// Pod secondary network interfaces.
isSecondaryNetwork bool

containerAccess *containerAccessArbitrator
eventBroadcaster record.EventBroadcaster
recorder record.EventRecorder
podListerSynced cache.InformerSynced
podLister v1.PodLister
kubeClient clientset.Interface
unreadyPortQueue workqueue.TypedDelayingInterface[string]
statusCh chan *openflow15.PortStatus
}

func newPodConfigurator(
kubeClient clientset.Interface,
ovsBridgeClient ovsconfig.OVSBridgeClient,
ofClient openflow.Client,
routeClient route.Interface,
Expand All @@ -88,20 +110,27 @@ func newPodConfigurator(
isOvsHardwareOffloadEnabled bool,
disableTXChecksumOffload bool,
podUpdateNotifier channel.Notifier,
podInformer cache.SharedIndexInformer,
containerAccess *containerAccessArbitrator,
) (*podConfigurator, error) {
ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled, disableTXChecksumOffload)
if err != nil {
return nil, err
}
return &podConfigurator{
pc := &podConfigurator{
ovsBridgeClient: ovsBridgeClient,
ofClient: ofClient,
routeClient: routeClient,
ifaceStore: ifaceStore,
gatewayMAC: gatewayMAC,
ifConfigurator: ifConfigurator,
podUpdateNotifier: podUpdateNotifier,
}, nil
kubeClient: kubeClient,
containerAccess: containerAccess,
}
// Initiate the PortStatus message listener. This function is a no-op except on Windows.
pc.initPortStatusMonitor(podInformer)
return pc, nil
}

func parseContainerIPs(ipcs []*current.IPConfig) ([]net.IP, error) {
Expand Down Expand Up @@ -166,13 +195,13 @@ func getContainerIPsString(ips []net.IP) string {
// not created for a Pod interface.
func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) *interfacestore.InterfaceConfig {
if portData.ExternalIDs == nil {
klog.V(2).Infof("OVS port %s has no external_ids", portData.Name)
klog.V(2).InfoS("OVS port has no external_ids", "port", portData.Name)
return nil
}

containerID, found := portData.ExternalIDs[ovsExternalIDContainerID]
if !found {
klog.V(2).Infof("OVS port %s has no %s in external_ids", portData.Name, ovsExternalIDContainerID)
klog.V(2).InfoS("OVS port has no containerID in external_ids", "port", portData.Name)
return nil
}

Expand All @@ -187,8 +216,7 @@ func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *in

containerMAC, err := net.ParseMAC(portData.ExternalIDs[ovsExternalIDMAC])
if err != nil {
klog.Errorf("Failed to parse MAC address from OVS external config %s: %v",
portData.ExternalIDs[ovsExternalIDMAC], err)
klog.ErrorS(err, "Failed to parse MAC address from OVS external config")
}
podName, _ := portData.ExternalIDs[ovsExternalIDPodName]
podNamespace, _ := portData.ExternalIDs[ovsExternalIDPodNamespace]
Expand Down Expand Up @@ -279,7 +307,7 @@ func (pc *podConfigurator) createOVSPort(ovsPortName string, ovsAttachInfo map[s
func (pc *podConfigurator) removeInterfaces(containerID string) error {
containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID)
if !found {
klog.V(2).Infof("Did not find the port for container %s in local cache", containerID)
klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID)
return nil
}

Expand Down Expand Up @@ -498,7 +526,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
// disconnectInterfaceFromOVS disconnects an existing interface from ovs br-int.
func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interfacestore.InterfaceConfig) error {
containerID := containerConfig.ContainerID
klog.V(2).Infof("Deleting Openflow entries for container %s", containerID)
klog.V(2).InfoS("Deleting Openflow entries for container", "container", containerID)
if !pc.isSecondaryNetwork {
if err := pc.ofClient.UninstallPodFlows(containerConfig.InterfaceName); err != nil {
return fmt.Errorf("failed to delete Openflow entries for container %s: %v", containerID, err)
Expand All @@ -513,6 +541,7 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface
if err := pc.ovsBridgeClient.DeletePort(containerConfig.PortUUID); err != nil {
return fmt.Errorf("failed to delete OVS port for container %s interface %s: %v", containerID, containerConfig.InterfaceName, err)
}

// Remove container configuration from cache.
pc.ifaceStore.DeleteInterface(containerConfig)
if !pc.isSecondaryNetwork {
Expand Down Expand Up @@ -558,7 +587,7 @@ func (pc *podConfigurator) connectInterceptedInterface(
func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace, containerID string) error {
containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID)
if !found {
klog.V(2).Infof("Did not find the port for container %s in local cache", containerID)
klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID)
return nil
}
for _, ip := range containerConfig.IPs {
Expand All @@ -570,3 +599,111 @@ func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace,
return pc.disconnectInterfaceFromOVS(containerConfig)
// TODO recover pre-connect state? repatch vethpair to original bridge etc ?? to make first CNI happy??
}

func (pc *podConfigurator) processNextWorkItem() bool {
key, quit := pc.unreadyPortQueue.Get()
if quit {
return false
}
defer pc.unreadyPortQueue.Done(key)

if err := pc.updateUnreadyPod(key); err != nil {
klog.ErrorS(err, "Failed install OpenFlow entries for OVS port interface", "name", key)
// Put the item back on the workqueue to handle any transient errors.
pc.unreadyPortQueue.AddAfter(key, retryInterval)
}
return true
}

func (pc *podConfigurator) updateUnreadyPod(ovsPort string) error {
ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort)
if !found {
klog.InfoS("Interface config is not found, skip processing the port", "name", ovsPort)
return nil
}

pc.containerAccess.lockContainer(ifConfig.ContainerID)
defer pc.containerAccess.unlockContainer(ifConfig.ContainerID)
// Get the InterfaceConfig again after the lock to avoid race conditions.
ifConfig, found = pc.ifaceStore.GetInterfaceByName(ovsPort)
if !found {
klog.InfoS("Interface config is not found, skip processing the port", "name", ovsPort)
return nil
}

if ifConfig.OFPort == 0 {
// Add Pod not-ready event if the pod flows are not successfully installed, and the OpenFlow port is not allocated.
// Returns error so that we can have a retry after 5s.
pc.recordPodEvent(ifConfig, false)
return fmt.Errorf("pod's OpenFlow port is not ready yet")
}

// Install OpenFlow entries for the Pod.
klog.V(2).InfoS("Setting up Openflow entries for OVS port", "port", ovsPort)
if err := pc.ofClient.InstallPodFlows(ovsPort, ifConfig.IPs, ifConfig.MAC, uint32(ifConfig.OFPort), ifConfig.VLANID, nil); err != nil {
// Add Pod not-ready event if the pod flows installation fails.
// Returns error so that we can have a retry after 5s.
pc.recordPodEvent(ifConfig, false)
return fmt.Errorf("failed to add Openflow entries for OVS port %s: %v", ovsPort, err)
}

// Notify the Pod update event to required components.
event := agenttypes.PodUpdate{
PodName: ifConfig.PodName,
PodNamespace: ifConfig.PodNamespace,
IsAdd: true,
ContainerID: ifConfig.ContainerID,
}
pc.podUpdateNotifier.Notify(event)

pc.recordPodEvent(ifConfig, true)
return nil
}

func (pc *podConfigurator) recordPodEvent(ifConfig *interfacestore.InterfaceConfig, installed bool) {
pod, err := pc.podLister.Pods(ifConfig.PodNamespace).Get(ifConfig.PodName)
if err != nil {
klog.InfoS("Unable to get Pod, skip recording Pod event", "Pod", klog.KRef(ifConfig.PodNamespace, ifConfig.PodName))
return
}

if installed {
// Add normal event to record Pod network is ready.
pc.recorder.Eventf(pod, corev1.EventTypeNormal, "NetworkReady", "Installed Pod network forwarding rules")
return
}

pc.recorder.Eventf(pod, corev1.EventTypeWarning, "NetworkNotReady", "Pod network forwarding rules not installed")
}

func (pc *podConfigurator) processPortStatusMessage(status *openflow15.PortStatus) {
// Update Pod OpenFlow entries only after the OpenFlow port state is live.
if status.Desc.State != openflow15.PS_LIVE {
return
}
ovsPort := string(bytes.Trim(status.Desc.Name, "\x00"))
ofPort := status.Desc.PortNo

ifConfig, found := pc.ifaceStore.GetInterfaceByName(ovsPort)
if !found {
klog.InfoS("Interface config is not found", "ovsPort", ovsPort)
return
}

func() {
pc.containerAccess.lockContainer(ifConfig.ContainerID)
defer pc.containerAccess.unlockContainer(ifConfig.ContainerID)
// Get the InterfaceConfig again after the lock to avoid race conditions.
ifConfig, found = pc.ifaceStore.GetInterfaceByName(ovsPort)
if !found {
klog.InfoS("Interface config is not found", "ovsPort", ovsPort)
return
}
// Update interface config with the ofPort.
newIfConfig := ifConfig.DeepCopy()
newIfConfig.OVSPortConfig.OFPort = int32(ofPort)
pc.ifaceStore.UpdateInterface(newIfConfig)
}()

pc.unreadyPortQueue.Add(ovsPort)
}
9 changes: 9 additions & 0 deletions pkg/agent/cniserver/pod_configuration_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

current "github.com/containernetworking/cni/pkg/types/100"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
Expand Down Expand Up @@ -113,3 +114,11 @@ func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.Inte
klog.Warningf("Interface for Pod %s/%s not found in the interface store", ifaceConfig.PodNamespace, ifaceConfig.PodName)
}
}

func (pc *podConfigurator) initPortStatusMonitor(_ cache.SharedIndexInformer) {

}

func (pc *podConfigurator) Run(stopCh <-chan struct{}) {
<-stopCh
}
2 changes: 1 addition & 1 deletion pkg/agent/cniserver/pod_configuration_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func createPodConfigurator(controller *gomock.Controller, testIfaceConfigurator
mockOFClient = openflowtest.NewMockClient(controller)
ifaceStore = interfacestore.NewInterfaceStore()
mockRoute = routetest.NewMockInterface(controller)
configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100))
configurator, _ := newPodConfigurator(nil, mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil, nil)
configurator.ifConfigurator = testIfaceConfigurator
return configurator
}
Expand Down
Loading

0 comments on commit 75daaa8

Please sign in to comment.