From 615b917f347bac3010cb2eb31565f6c1cfc0a3d8 Mon Sep 17 00:00:00 2001 From: Jianjun Shen Date: Wed, 6 Dec 2023 11:52:41 -0800 Subject: [PATCH] Change secondary network Pod controller to subscribe to CNIServer events (#5767) The commit moves CNI Pod information store from CNIServer to secondary network PodController, and let PodController subscribe to CNIServer Pod events to maintain the Pod CNI information in the store. This way simplifies CNIServer and removes secondary network logic from it. Signed-off-by: Jianjun Shen --- cmd/antrea-agent/agent.go | 20 ++---- pkg/agent/cniserver/pod_configuration.go | 19 +++--- .../cniserver/pod_configuration_linux.go | 3 +- .../cniserver/pod_configuration_linux_test.go | 2 +- .../cniserver/pod_configuration_windows.go | 3 +- pkg/agent/cniserver/secondary.go | 2 +- pkg/agent/cniserver/server.go | 31 +-------- pkg/agent/cniserver/server_linux_test.go | 67 ++++--------------- pkg/agent/cniserver/server_windows_test.go | 4 +- .../secondarynetwork/cnipodcache/types.go | 1 - pkg/agent/secondarynetwork/init.go | 6 +- .../secondarynetwork/podwatch/controller.go | 52 ++++++++++---- .../podwatch/controller_test.go | 4 +- pkg/agent/types/event.go | 3 +- test/integration/agent/cniserver_test.go | 6 +- 15 files changed, 85 insertions(+), 138 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 92b7ae82710..ee294a77672 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -60,7 +60,6 @@ import ( "antrea.io/antrea/pkg/agent/querier" "antrea.io/antrea/pkg/agent/route" "antrea.io/antrea/pkg/agent/secondarynetwork" - "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" "antrea.io/antrea/pkg/agent/servicecidr" "antrea.io/antrea/pkg/agent/stats" support "antrea.io/antrea/pkg/agent/supportbundlecollection" @@ -535,7 +534,6 @@ func run(o *Options) error { } var cniServer *cniserver.CNIServer - var cniPodInfoStore cnipodcache.CNIPodInfoStore var externalNodeController *externalnode.ExternalNodeController var localExternalNodeInformer cache.SharedIndexInformer @@ -554,17 +552,9 @@ func run(o *Options) error { networkConfig, networkReadyCh) - if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) { - cniPodInfoStore = cnipodcache.NewCNIPodInfoStore() - err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, cniPodInfoStore) - if err != nil { - return fmt.Errorf("error initializing CNI server with cniPodInfoStore cache: %v", err) - } - } else { - err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, nil) - if err != nil { - return fmt.Errorf("error initializing CNI server: %v", err) - } + err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel) + if err != nil { + return fmt.Errorf("error initializing CNI server with cniPodInfoStore cache: %v", err) } } else { listOptions := func(options *metav1.ListOptions) { @@ -700,8 +690,8 @@ func run(o *Options) error { if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) { if err := secondarynetwork.Initialize( o.config.ClientConnection, o.config.KubeAPIServerOverride, - k8sClient, localPodInformer.Get(), nodeConfig.Name, cniPodInfoStore, - stopCh, + k8sClient, localPodInformer.Get(), nodeConfig.Name, + podUpdateChannel, stopCh, &o.config.SecondaryNetwork, ovsdbConnection); err != nil { return fmt.Errorf("failed to initialize secondary network: %v", err) } diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index 9564fbb6e26..9f6398af48b 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -31,7 +31,6 @@ import ( "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/route" - "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" agenttypes "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/ovs/ovsconfig" @@ -72,8 +71,6 @@ type podConfigurator struct { // podUpdateNotifier is used for notifying updates of local Pods to other components which may benefit from this // information, i.e. NetworkPolicyController, EgressController. podUpdateNotifier channel.Notifier - // consumed by secondary network creation. - podInfoStore cnipodcache.CNIPodInfoStore } func newPodConfigurator( @@ -86,7 +83,6 @@ func newPodConfigurator( isOvsHardwareOffloadEnabled bool, disableTXChecksumOffload bool, podUpdateNotifier channel.Notifier, - podInfoStore cnipodcache.CNIPodInfoStore, ) (*podConfigurator, error) { ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled, disableTXChecksumOffload) if err != nil { @@ -100,7 +96,6 @@ func newPodConfigurator( gatewayMAC: gatewayMAC, ifConfigurator: ifConfigurator, podUpdateNotifier: podUpdateNotifier, - podInfoStore: podInfoStore, }, nil } @@ -243,7 +238,8 @@ func (pc *podConfigurator) configureInterfaces( } var containerConfig *interfacestore.InterfaceConfig - if containerConfig, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, hostIface, containerIface, result.IPs, result.VLANID, containerAccess); err != nil { + if containerConfig, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, containerNetNS, + hostIface, containerIface, result.IPs, result.VLANID, containerAccess); err != nil { return fmt.Errorf("failed to connect to ovs for container %s: %v", containerID, err) } defer func() { @@ -486,7 +482,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain return nil } -func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, containerConfig *interfacestore.InterfaceConfig) error { +func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName, netNS string, containerConfig *interfacestore.InterfaceConfig) error { // create OVS Port and add attach container configuration into external_ids containerID := containerConfig.ContainerID klog.V(2).Infof("Adding OVS port %s for container %s", ovsPortName, containerID) @@ -519,8 +515,9 @@ func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, conta event := agenttypes.PodUpdate{ PodName: containerConfig.PodName, PodNamespace: containerConfig.PodNamespace, - IsAdd: true, ContainerID: containerConfig.ContainerID, + NetNS: netNS, + IsAdd: true, } pc.podUpdateNotifier.Notify(event) return nil @@ -548,8 +545,8 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface event := agenttypes.PodUpdate{ PodName: containerConfig.PodName, PodNamespace: containerConfig.PodNamespace, - IsAdd: false, ContainerID: containerConfig.ContainerID, + IsAdd: false, } pc.podUpdateNotifier.Notify(event) klog.Infof("Removed interfaces for container %s", containerID) @@ -577,8 +574,8 @@ func (pc *podConfigurator) connectInterceptedInterface( if err = pc.routeClient.MigrateRoutesToGw(hostIface.Name); err != nil { return fmt.Errorf("connectInterceptedInterface failed to migrate: %w", err) } - _, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, hostIface, - containerIface, containerIPs, 0, containerAccess) + _, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, containerNetNS, + hostIface, containerIface, containerIPs, 0, containerAccess) return err } diff --git a/pkg/agent/cniserver/pod_configuration_linux.go b/pkg/agent/cniserver/pod_configuration_linux.go index 4376d32b1a6..1ca96ff2c33 100644 --- a/pkg/agent/cniserver/pod_configuration_linux.go +++ b/pkg/agent/cniserver/pod_configuration_linux.go @@ -29,6 +29,7 @@ func (pc *podConfigurator) connectInterfaceToOVS( podName string, podNamespace string, containerID string, + netNS string, hostIface *current.Interface, containerIface *current.Interface, ips []*current.IPConfig, @@ -38,7 +39,7 @@ func (pc *podConfigurator) connectInterfaceToOVS( // Use the outer veth interface name as the OVS port name. ovsPortName := hostIface.Name containerConfig := buildContainerConfig(ovsPortName, containerID, podName, podNamespace, containerIface, ips, vlanID) - return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, containerConfig) + return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, netNS, containerConfig) } func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) { diff --git a/pkg/agent/cniserver/pod_configuration_linux_test.go b/pkg/agent/cniserver/pod_configuration_linux_test.go index 1c1a8fefd4d..62e65069fc8 100644 --- a/pkg/agent/cniserver/pod_configuration_linux_test.go +++ b/pkg/agent/cniserver/pod_configuration_linux_test.go @@ -501,7 +501,7 @@ func createPodConfigurator(controller *gomock.Controller, testIfaceConfigurator mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) - configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) + configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) configurator.ifConfigurator = testIfaceConfigurator return configurator } diff --git a/pkg/agent/cniserver/pod_configuration_windows.go b/pkg/agent/cniserver/pod_configuration_windows.go index 1302e66bf58..9259b156d79 100644 --- a/pkg/agent/cniserver/pod_configuration_windows.go +++ b/pkg/agent/cniserver/pod_configuration_windows.go @@ -66,6 +66,7 @@ func (pc *podConfigurator) connectInterfaceToOVS( podName string, podNamespace string, containerID string, + netNS string, hostIface *current.Interface, containerIface *current.Interface, ips []*current.IPConfig, @@ -87,7 +88,7 @@ func (pc *podConfigurator) connectInterfaceToOVS( // HNSEndpoint/HostComputeEndpoint, the current implementation will still work. It will choose the synchronized // way to create OVS port. if hostInterfaceExistsFunc(hostIfAlias) { - return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, containerConfig) + return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, netNS, containerConfig) } klog.V(2).Infof("Adding OVS port %s for container %s", ovsPortName, containerID) ovsAttachInfo := BuildOVSPortExternalIDs(containerConfig) diff --git a/pkg/agent/cniserver/secondary.go b/pkg/agent/cniserver/secondary.go index ae01aed0dff..bfe94751e99 100644 --- a/pkg/agent/cniserver/secondary.go +++ b/pkg/agent/cniserver/secondary.go @@ -24,7 +24,7 @@ import ( ) func NewSecondaryInterfaceConfigurator(ovsBridgeClient ovsconfig.OVSBridgeClient) (*podConfigurator, error) { - return newPodConfigurator(ovsBridgeClient, nil, nil, nil, nil, ovsconfig.OVSDatapathSystem, false, false, nil, nil) + return newPodConfigurator(ovsBridgeClient, nil, nil, nil, nil, ovsconfig.OVSDatapathSystem, false, false, nil) } // ConfigureSriovSecondaryInterface configures a SR-IOV secondary interface for a Pod. diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index a99522d549d..1426fdc131c 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -39,7 +39,6 @@ import ( "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/route" - "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" "antrea.io/antrea/pkg/agent/util" cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1" "antrea.io/antrea/pkg/cni" @@ -115,7 +114,6 @@ type CNIServer struct { // Enable AntreaIPAM for secondary networks implementd by other CNIs. enableSecondaryNetworkIPAM bool disableTXChecksumOffload bool - secondaryNetworkEnabled bool networkConfig *config.NetworkConfig // networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it. networkReadyCh <-chan struct{} @@ -523,13 +521,6 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (* // mark success as true to avoid rollback success = true - if s.secondaryNetworkEnabled { - // Go cache the CNI server info at CNIConfigInfo cache, for podWatch usage - cniInfo := &cnipodcache.CNIConfigInfo{CNIVersion: cniVersion, PodName: podName, PodNamespace: podNamespace, - ContainerID: cniConfig.ContainerId, ContainerNetNS: netNS, PodCNIDeleted: false} - s.podConfigurator.podInfoStore.AddCNIConfigInfo(cniInfo) - } - return resultToResponse(cniResult), nil } @@ -558,16 +549,7 @@ func (s *CNIServer) cmdDel(_ context.Context, cniConfig *CNIConfig) (*cnipb.CniC return s.configInterfaceFailureResponse(err), nil } klog.InfoS("CmdDel for container succeeded", "container", cniConfig.ContainerId) - if s.secondaryNetworkEnabled { - podName := string(cniConfig.K8S_POD_NAME) - podNamespace := string(cniConfig.K8S_POD_NAMESPACE) - containerInfo := s.podConfigurator.podInfoStore.GetCNIConfigInfoByContainerID(podName, podNamespace, cniConfig.ContainerId) - if containerInfo != nil { - // Update PodCNIDeleted = true. - // This is to let Podwatch controller know that the CNI server cleaned up this Pod's primary network configuration. - s.podConfigurator.podInfoStore.SetPodCNIDeleted(containerInfo) - } - } + return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil } @@ -652,21 +634,12 @@ func (s *CNIServer) Initialize( ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, podUpdateNotifier channel.Notifier, - podInfoStore cnipodcache.CNIPodInfoStore, ) error { var err error - // If podInfoStore is not nil, secondaryNetwork configuration is supported. - if podInfoStore != nil { - s.secondaryNetworkEnabled = true - } else { - s.secondaryNetworkEnabled = false - } - s.podConfigurator, err = newPodConfigurator( ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC, ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(), - s.disableTXChecksumOffload, - podUpdateNotifier, podInfoStore) + s.disableTXChecksumOffload, podUpdateNotifier) if err != nil { return fmt.Errorf("error during initialize podConfigurator: %v", err) } diff --git a/pkg/agent/cniserver/server_linux_test.go b/pkg/agent/cniserver/server_linux_test.go index 844f2b99361..4760fc2ab93 100644 --- a/pkg/agent/cniserver/server_linux_test.go +++ b/pkg/agent/cniserver/server_linux_test.go @@ -40,7 +40,6 @@ import ( "antrea.io/antrea/pkg/agent/interfacestore" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" routetest "antrea.io/antrea/pkg/agent/route/testing" - "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" "antrea.io/antrea/pkg/agent/util" cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1" "antrea.io/antrea/pkg/ovs/ovsconfig" @@ -98,7 +97,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Ifname = ifname cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "" - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -109,7 +108,7 @@ func TestValidatePrevResult(t *testing.T) { cniConfig.Netns = "invalid_netns" sriovVFDeviceID := "0000:03:00.6" prevResult.Interfaces = []*current.Interface{hostIface, containerIface} - cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) + cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, false, channel.NewSubscribableChannel("PodUpdate", 100)) response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, prevResult, sriovVFDeviceID) checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "") }) @@ -122,7 +121,7 @@ func TestRemoveInterface(t *testing.T) { ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") - podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) + podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) require.Nil(t, err, "No error expected in podConfigurator constructor") containerMAC, _ := net.ParseMAC("aa:bb:cc:dd:ee:ff") @@ -191,7 +190,7 @@ func TestRemoveInterface(t *testing.T) { }) } -func newMockCNIServer(t *testing.T, controller *gomock.Controller, ipamDriver ipam.IPAMDriver, ipamType string, enableSecondaryNetworkIPAM, isChaining, secondaryNetworkEnabled bool) *CNIServer { +func newMockCNIServer(t *testing.T, controller *gomock.Controller, ipamDriver ipam.IPAMDriver, ipamType string, enableSecondaryNetworkIPAM, isChaining bool) *CNIServer { mockOVSBridgeClient = ovsconfigtest.NewMockOVSBridgeClient(controller) mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() @@ -203,13 +202,9 @@ func newMockCNIServer(t *testing.T, controller *gomock.Controller, ipamDriver ip gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") gateway := &config.GatewayConfig{Name: "", IPv4: gwIPv4, MAC: gwMAC} cniServer.nodeConfig = &config.NodeConfig{Name: "node1", PodIPv4CIDR: nodePodCIDRv4, GatewayConfig: gateway} - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) + cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) cniServer.enableSecondaryNetworkIPAM = enableSecondaryNetworkIPAM cniServer.isChaining = isChaining - cniServer.secondaryNetworkEnabled = secondaryNetworkEnabled - if secondaryNetworkEnabled { - cniServer.podConfigurator.podInfoStore = cnipodcache.NewCNIPodInfoStore() - } cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450} return cniServer } @@ -244,7 +239,6 @@ func TestCmdAdd(t *testing.T) { cniType string enableSecondaryNetworkIPAM bool isChaining bool - secondaryNetworkEnabled bool connectOVS bool migrateRoute bool addLocalIPAMRoute bool @@ -306,22 +300,11 @@ func TestCmdAdd(t *testing.T) { addLocalIPAMRoute: true, addLocalIPAMRouteError: fmt.Errorf("failed to configure route"), containerIfaceExist: false, - }, { - name: "add-secondary-network", - podName: "pod4", - ipamType: "test-cni-ipam", - ipamAdd: true, - enableSecondaryNetworkIPAM: false, - secondaryNetworkEnabled: true, - isChaining: false, - connectOVS: true, - addLocalIPAMRoute: true, - containerIfaceExist: true, }, } { t.Run(tc.name, func(t *testing.T) { defer mockGetNSPath(nil)() - cniserver := newMockCNIServer(t, controller, ipamMock, tc.ipamType, tc.enableSecondaryNetworkIPAM, tc.isChaining, tc.secondaryNetworkEnabled) + cniserver := newMockCNIServer(t, controller, ipamMock, tc.ipamType, tc.enableSecondaryNetworkIPAM, tc.isChaining) testIfaceConfigurator := newTestInterfaceConfigurator() requestMsg, hostInterfaceName := createCNIRequestAndInterfaceName(t, tc.podName, tc.cniType, ipamResult, tc.ipamType, true) testIfaceConfigurator.hostIfaceName = hostInterfaceName @@ -383,10 +366,6 @@ func TestCmdAdd(t *testing.T) { successResponse := resultToResponse(versionedResult) assert.Equal(t, successResponse, resp) } - if tc.secondaryNetworkEnabled { - cniConfigInfo := cniserver.podConfigurator.podInfoStore.GetCNIConfigInfoByContainerID(tc.podName, testPodNamespace, containerID) - assert.NotNil(t, cniConfigInfo) - } }) } } @@ -407,7 +386,6 @@ func TestCmdDel(t *testing.T) { cniType string enableSecondaryNetworkIPAM bool isChaining bool - secondaryNetworkEnabled bool disconnectOVS bool migrateRoute bool delLocalIPAMRoute bool @@ -470,20 +448,9 @@ func TestCmdDel(t *testing.T) { delLocalIPAMRoute: true, delLocalIPAMRouteError: fmt.Errorf("unable to delete flexible IPAM rule"), }, - { - name: "del-secondary-network", - podName: "pod4", - ipamType: "test-delete", - ipamDel: true, - enableSecondaryNetworkIPAM: false, - secondaryNetworkEnabled: true, - isChaining: false, - disconnectOVS: true, - delLocalIPAMRoute: true, - }, } { t.Run(tc.name, func(t *testing.T) { - cniserver := newMockCNIServer(t, controller, ipamMock, tc.ipamType, tc.enableSecondaryNetworkIPAM, tc.isChaining, tc.secondaryNetworkEnabled) + cniserver := newMockCNIServer(t, controller, ipamMock, tc.ipamType, tc.enableSecondaryNetworkIPAM, tc.isChaining) requestMsg, hostInterfaceName := createCNIRequestAndInterfaceName(t, tc.podName, tc.cniType, ipamResult, tc.ipamType, true) containerID := requestMsg.CniArgs.ContainerId containerIfaceConfig := interfacestore.NewContainerInterface(hostInterfaceName, containerID, tc.podName, testPodNamespace, containerVethMac, []net.IP{net.ParseIP("10.1.2.100")}, 0) @@ -492,11 +459,6 @@ func TestCmdDel(t *testing.T) { testIfaceConfigurator := newTestInterfaceConfigurator() testIfaceConfigurator.hostIfaceName = hostInterfaceName cniserver.podConfigurator.ifConfigurator = testIfaceConfigurator - if tc.secondaryNetworkEnabled { - cniInfo := &cnipodcache.CNIConfigInfo{CNIVersion: supportedCNIVersion, PodName: tc.podName, PodNamespace: testPodNamespace, - ContainerID: containerID, ContainerNetNS: netns, PodCNIDeleted: false} - cniserver.podConfigurator.podInfoStore.AddCNIConfigInfo(cniInfo) - } if tc.ipamDel { if tc.enableSecondaryNetworkIPAM { ipamSecondaryNetworkDel = func(cniArgs *cnipb.CniCmdArgs, k8sArgs *types.K8sArgs, networkConfig *types.NetworkConfig) error { @@ -530,11 +492,6 @@ func TestCmdDel(t *testing.T) { assert.Equal(t, emptyResponse, resp) } } - if tc.secondaryNetworkEnabled { - cniConfigInfo := cniserver.podConfigurator.podInfoStore.GetCNIConfigInfoByContainerID(tc.podName, testPodNamespace, containerID) - assert.NotNil(t, cniConfigInfo) - assert.True(t, cniConfigInfo.PodCNIDeleted) - } }) } } @@ -566,7 +523,7 @@ func TestCmdCheck(t *testing.T) { } t.Run("secondary-IPAM", func(t *testing.T) { ipamType := ipam.AntreaIPAMType - cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, true, false, false) + cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, true, false) requestMsg, _ := prepareRequest("pod0", "cniType", ipamType, false) ipamSecondaryNetworkCheck = func(cniArgs *cnipb.CniCmdArgs, k8sArgs *types.K8sArgs, networkConfig *types.NetworkConfig) error { return nil @@ -580,7 +537,7 @@ func TestCmdCheck(t *testing.T) { }) t.Run("secondary-IPAM-failure", func(t *testing.T) { ipamType := ipam.AntreaIPAMType - cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, true, false, false) + cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, true, false) requestMsg, _ := prepareRequest("pod0", "cniType", ipamType, false) ipamSecondaryNetworkCheck = func(cniArgs *cnipb.CniCmdArgs, k8sArgs *types.K8sArgs, networkConfig *types.NetworkConfig) error { return errors.New("failed to check secondary IPAM response") @@ -600,7 +557,7 @@ func TestCmdCheck(t *testing.T) { }) t.Run("chaining", func(t *testing.T) { ipamType := "test-check" - cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, false, true, false) + cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, false, true) requestMsg, hostInterfaceName := prepareRequest("pod1", "", ipamType, true) testIfaceConfigurator := newTestInterfaceConfigurator() testIfaceConfigurator.hostIfaceName = hostInterfaceName @@ -611,7 +568,7 @@ func TestCmdCheck(t *testing.T) { }) t.Run("check-general-cni", func(t *testing.T) { ipamType := "test-check" - cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, false, false, false) + cniserver := newMockCNIServer(t, controller, ipamMock, ipamType, false, false) requestMsg, hostInterfaceName := prepareRequest("pod2", "", ipamType, true) testIfaceConfigurator := newTestInterfaceConfigurator() testIfaceConfigurator.hostIfaceName = hostInterfaceName @@ -638,7 +595,7 @@ func TestReconcile(t *testing.T) { cniServer := newCNIServer(t) cniServer.routeClient = mockRoute gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil) + cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100)) cniServer.podConfigurator.ifConfigurator = newTestInterfaceConfigurator() cniServer.nodeConfig = &config.NodeConfig{ Name: nodeName, diff --git a/pkg/agent/cniserver/server_windows_test.go b/pkg/agent/cniserver/server_windows_test.go index 5e26c3041e3..2786dd2a8ae 100644 --- a/pkg/agent/cniserver/server_windows_test.go +++ b/pkg/agent/cniserver/server_windows_test.go @@ -293,7 +293,7 @@ func newMockCNIServer(t *testing.T, controller *gomock.Controller, podUpdateNoti gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") gateway := &config.GatewayConfig{Name: "", IPv4: gwIPv4, MAC: gwMAC} cniServer.nodeConfig = &config.NodeConfig{Name: "node1", PodIPv4CIDR: nodePodCIDRv4, GatewayConfig: gateway} - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, podUpdateNotifier, nil) + cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, podUpdateNotifier) return cniServer } @@ -947,7 +947,7 @@ func TestReconcile(t *testing.T) { pod4IfaceName := "iface4" pod4Iface := containerIfaces["iface4"] waiter := newAsyncWaiter(pod4Iface.PodName, pod4Iface.ContainerID) - cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, waiter.notifier, nil) + cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, waiter.notifier) cniServer.nodeConfig = &config.NodeConfig{Name: nodeName} // Re-install Pod1 flows diff --git a/pkg/agent/secondarynetwork/cnipodcache/types.go b/pkg/agent/secondarynetwork/cnipodcache/types.go index 83fa41d30b7..657ce431c2d 100644 --- a/pkg/agent/secondarynetwork/cnipodcache/types.go +++ b/pkg/agent/secondarynetwork/cnipodcache/types.go @@ -15,7 +15,6 @@ package cnipodcache type CNIConfigInfo struct { - CNIVersion string PodName string PodNamespace string ContainerID string diff --git a/pkg/agent/secondarynetwork/init.go b/pkg/agent/secondarynetwork/init.go index eda73d7a824..abc3351c001 100644 --- a/pkg/agent/secondarynetwork/init.go +++ b/pkg/agent/secondarynetwork/init.go @@ -26,10 +26,10 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/interfacestore" - "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" "antrea.io/antrea/pkg/agent/secondarynetwork/podwatch" agentconfig "antrea.io/antrea/pkg/config/agent" "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/k8s" ) @@ -46,7 +46,7 @@ func Initialize( k8sClient clientset.Interface, podInformer cache.SharedIndexInformer, nodeName string, - podCache cnipodcache.CNIPodInfoStore, + podUpdateSubscriber channel.Subscriber, stopCh <-chan struct{}, config *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB) error { @@ -66,7 +66,7 @@ func Initialize( // k8s.v1.cni.cncf.io/networks Annotation defined. if podWatchController, err := podwatch.NewPodController( k8sClient, netAttachDefClient, podInformer, - nodeName, podCache, ovsBridgeClient); err != nil { + nodeName, podUpdateSubscriber, ovsBridgeClient); err != nil { return err } else { go podWatchController.Run(stopCh) diff --git a/pkg/agent/secondarynetwork/podwatch/controller.go b/pkg/agent/secondarynetwork/podwatch/controller.go index 86d684bb833..fd6d9f8c500 100644 --- a/pkg/agent/secondarynetwork/podwatch/controller.go +++ b/pkg/agent/secondarynetwork/podwatch/controller.go @@ -37,10 +37,12 @@ import ( "antrea.io/antrea/pkg/agent/cniserver" "antrea.io/antrea/pkg/agent/cniserver/ipam" - "antrea.io/antrea/pkg/agent/cniserver/types" + cnitypes "antrea.io/antrea/pkg/agent/cniserver/types" cnipodcache "antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache" + "antrea.io/antrea/pkg/agent/types" crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/antrea/pkg/util/channel" ) const ( @@ -70,7 +72,7 @@ type InterfaceConfigurator interface { } type IPAMAllocator interface { - SecondaryNetworkAllocate(podOwner *crdv1a2.PodOwner, networkConfig *types.NetworkConfig) (*ipam.IPAMResult, error) + SecondaryNetworkAllocate(podOwner *crdv1a2.PodOwner, networkConfig *cnitypes.NetworkConfig) (*ipam.IPAMResult, error) SecondaryNetworkRelease(podOwner *crdv1a2.PodOwner) error } @@ -80,6 +82,7 @@ type PodController struct { queue workqueue.RateLimitingInterface podInformer cache.SharedIndexInformer nodeName string + podUpdateSubscriber channel.Subscriber podCache cnipodcache.CNIPodInfoStore ovsBridgeClient ovsconfig.OVSBridgeClient interfaceConfigurator InterfaceConfigurator @@ -92,7 +95,7 @@ func NewPodController( netAttachDefClient netdefclient.K8sCniCncfIoV1Interface, podInformer cache.SharedIndexInformer, nodeName string, - podCache cnipodcache.CNIPodInfoStore, + podUpdateSubscriber channel.Subscriber, ovsBridgeClient ovsconfig.OVSBridgeClient, ) (*PodController, error) { interfaceConfigurator, err := cniserver.NewSecondaryInterfaceConfigurator(ovsBridgeClient) @@ -105,7 +108,8 @@ func NewPodController( queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "podcontroller"), podInformer: podInformer, nodeName: nodeName, - podCache: podCache, + podUpdateSubscriber: podUpdateSubscriber, + podCache: cnipodcache.NewCNIPodInfoStore(), ovsBridgeClient: ovsBridgeClient, interfaceConfigurator: interfaceConfigurator, ipamAllocator: ipam.GetSecondaryNetworkAllocator(), @@ -118,11 +122,16 @@ func NewPodController( }, resyncPeriod, ) + // podUpdateSubscriber can be nil with test code. + if podUpdateSubscriber != nil { + // Subscribe Pod CNI add/del events. + podUpdateSubscriber.Subscribe(pc.processCNIUpdate) + } return &pc, nil } -func podKeyGet(pod *corev1.Pod) string { - return pod.Namespace + "/" + pod.Name +func podKeyGet(podName, podNamespace string) string { + return podNamespace + "/" + podName } func generatePodSecondaryIfaceName(podCNIInfo *cnipodcache.CNIConfigInfo) (string, error) { @@ -147,9 +156,8 @@ func (pc *PodController) deletePodSecondaryNetwork(podCNIInfo *cnipodcache.CNICo // NOTE: SR-IOV VF interface clean-up, upon Pod delete will be handled by SR-IOV device // plugin. Not handled here. for iface, interfaceInfo := range podCNIInfo.Interfaces { - klog.V(1).InfoS("Deleting secondary interface", - "Pod", klog.KRef(podCNIInfo.PodNamespace, podCNIInfo.PodName), - "interface", iface, "interfaceInfo", *interfaceInfo) + klog.InfoS("Deleting secondary interface", + "Pod", klog.KRef(podCNIInfo.PodNamespace, podCNIInfo.PodName), "interface", iface) if interfaceInfo.NetworkType == vlanNetworkType { if err := pc.interfaceConfigurator.DeleteVLANSecondaryInterface(podCNIInfo.ContainerID, interfaceInfo.HostInterfaceName, interfaceInfo.OVSPortUUID); err != nil { @@ -188,10 +196,29 @@ func (pc *PodController) enqueuePod(obj interface{}) { return } } - podKey := podKeyGet(pod) + podKey := podKeyGet(pod.Name, pod.Namespace) pc.queue.Add(podKey) } +// processCNIUpdate will be called when CNIServer publishes a Pod update event. +func (pc *PodController) processCNIUpdate(e interface{}) { + event := e.(types.PodUpdate) + if event.IsAdd { + // Go cache the CNI server info at CNIConfigInfo cache, for podWatch usage + cniInfo := &cnipodcache.CNIConfigInfo{PodName: event.PodName, PodNamespace: event.PodNamespace, + ContainerID: event.ContainerID, ContainerNetNS: event.NetNS, PodCNIDeleted: false} + pc.podCache.AddCNIConfigInfo(cniInfo) + } else { + containerInfo := pc.podCache.GetCNIConfigInfoByContainerID(event.PodName, event.PodNamespace, event.ContainerID) + if containerInfo != nil { + // Update PodCNIDeleted = true. + // This is to let Podwatch controller know that the CNI server cleaned up this Pod's primary network configuration. + pc.podCache.SetPodCNIDeleted(containerInfo) + } + } + pc.queue.Add(podKeyGet(event.PodName, event.PodNamespace)) +} + // handleAddUpdatePod handles Pod Add, Update events and updates annotation if required. func (pc *PodController) handleAddUpdatePod(obj interface{}) error { var err error @@ -217,7 +244,7 @@ func (pc *PodController) handleAddUpdatePod(obj interface{}) error { // Avoid processing Pod annotation, if we already have at least one secondary network successfully configured on this Pod. // We do not support/handle Annotation updates yet. if len(podCNIInfo.Interfaces) > 0 { - klog.V(1).InfoS("Secondary network already configured on this Pod and annotation update not supported, skipping update", "pod", klog.KObj(pod)) + klog.V(1).InfoS("Secondary network already configured on this Pod and annotation update not supported, skipping update", "Pod", klog.KObj(pod)) return nil } @@ -232,7 +259,7 @@ func (pc *PodController) handleAddUpdatePod(obj interface{}) error { err = pc.configurePodSecondaryNetwork(pod, networklist, podCNIInfo) if err != nil { - klog.ErrorS(err, "Error when configuring secondary network", "pod", klog.KObj(pod)) + klog.ErrorS(err, "Error when configuring secondary network", "Pod", klog.KObj(pod)) if len(podCNIInfo.Interfaces) == 0 { // Return error to requeue and retry. return err @@ -251,6 +278,7 @@ func (pc *PodController) handleRemovePod(key string) error { for _, info := range podCNIInfo { // Delete all secondary interfaces and release IPAM. if err = pc.deletePodSecondaryNetwork(info); err != nil { + klog.ErrorS(err, "Error when deleting secondary network", "Pod", klog.KRef(info.PodNamespace, info.PodName)) // Return error to requeue Pod delete. return err } else { diff --git a/pkg/agent/secondarynetwork/podwatch/controller_test.go b/pkg/agent/secondarynetwork/podwatch/controller_test.go index 71f2e16ddfe..63572d4353c 100644 --- a/pkg/agent/secondarynetwork/podwatch/controller_test.go +++ b/pkg/agent/secondarynetwork/podwatch/controller_test.go @@ -209,8 +209,8 @@ func TestPodControllerRun(t *testing.T) { netdefclient, informerFactory.Core().V1().Pods().Informer(), testNode, - podCache, - nil) + nil, nil) + podController.podCache = podCache podController.interfaceConfigurator = interfaceConfigurator podController.ipamAllocator = mockIPAM diff --git a/pkg/agent/types/event.go b/pkg/agent/types/event.go index eea70f66e42..297a4efb709 100644 --- a/pkg/agent/types/event.go +++ b/pkg/agent/types/event.go @@ -17,6 +17,7 @@ package types type PodUpdate struct { PodNamespace string PodName string - IsAdd bool ContainerID string + NetNS string + IsAdd bool } diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 846980eec0b..1852f4dbea1 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -574,7 +574,7 @@ func newTester() *cmdAddDelTester { routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, tester.networkReadyCh) - tester.server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100), nil) + tester.server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100)) ctx := context.Background() tester.ctx = ctx return tester @@ -795,7 +795,7 @@ func TestCNIServerChaining(t *testing.T) { ifaceStore := interfacestore.NewInterfaceStore() ovsServiceMock.EXPECT().IsHardwareOffloadEnabled().Return(false).AnyTimes() ovsServiceMock.EXPECT().GetOVSDatapathType().Return(ovsconfig.OVSDatapathSystem).AnyTimes() - err = server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100), nil) + err = server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100)) testRequire.Nil(err) } @@ -928,7 +928,7 @@ func TestCNIServerGCForHostLocalIPAM(t *testing.T) { ) // call Initialize, which will run reconciliation and perform host-local IPAM garbage collection - server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100), nil) + server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100)) getIPs := func(cID string) []net.IP { ipamStore, err := disk.New("antrea", "")