Skip to content

Commit

Permalink
Change secondary network Pod controller to subscribe to CNIServer eve…
Browse files Browse the repository at this point in the history
…nts (#5767)

The commit moves CNI Pod information store from CNIServer to secondary
network PodController, and let PodController subscribe to CNIServer Pod
events to maintain the Pod CNI information in the store. This way
simplifies CNIServer and removes secondary network logic from it.

Signed-off-by: Jianjun Shen <[email protected]>
  • Loading branch information
jianjuns authored Dec 6, 2023
1 parent f2a39da commit 615b917
Show file tree
Hide file tree
Showing 15 changed files with 85 additions and 138 deletions.
20 changes: 5 additions & 15 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ import (
"antrea.io/antrea/pkg/agent/querier"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/secondarynetwork"
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
"antrea.io/antrea/pkg/agent/servicecidr"
"antrea.io/antrea/pkg/agent/stats"
support "antrea.io/antrea/pkg/agent/supportbundlecollection"
Expand Down Expand Up @@ -535,7 +534,6 @@ func run(o *Options) error {
}

var cniServer *cniserver.CNIServer
var cniPodInfoStore cnipodcache.CNIPodInfoStore
var externalNodeController *externalnode.ExternalNodeController
var localExternalNodeInformer cache.SharedIndexInformer

Expand All @@ -554,17 +552,9 @@ func run(o *Options) error {
networkConfig,
networkReadyCh)

if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
cniPodInfoStore = cnipodcache.NewCNIPodInfoStore()
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, cniPodInfoStore)
if err != nil {
return fmt.Errorf("error initializing CNI server with cniPodInfoStore cache: %v", err)
}
} else {
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, nil)
if err != nil {
return fmt.Errorf("error initializing CNI server: %v", err)
}
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel)
if err != nil {
return fmt.Errorf("error initializing CNI server with cniPodInfoStore cache: %v", err)
}
} else {
listOptions := func(options *metav1.ListOptions) {
Expand Down Expand Up @@ -700,8 +690,8 @@ func run(o *Options) error {
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
if err := secondarynetwork.Initialize(
o.config.ClientConnection, o.config.KubeAPIServerOverride,
k8sClient, localPodInformer.Get(), nodeConfig.Name, cniPodInfoStore,
stopCh,
k8sClient, localPodInformer.Get(), nodeConfig.Name,
podUpdateChannel, stopCh,
&o.config.SecondaryNetwork, ovsdbConnection); err != nil {
return fmt.Errorf("failed to initialize secondary network: %v", err)
}
Expand Down
19 changes: 8 additions & 11 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
agenttypes "antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/ovs/ovsconfig"
Expand Down Expand Up @@ -72,8 +71,6 @@ type podConfigurator struct {
// podUpdateNotifier is used for notifying updates of local Pods to other components which may benefit from this
// information, i.e. NetworkPolicyController, EgressController.
podUpdateNotifier channel.Notifier
// consumed by secondary network creation.
podInfoStore cnipodcache.CNIPodInfoStore
}

func newPodConfigurator(
Expand All @@ -86,7 +83,6 @@ func newPodConfigurator(
isOvsHardwareOffloadEnabled bool,
disableTXChecksumOffload bool,
podUpdateNotifier channel.Notifier,
podInfoStore cnipodcache.CNIPodInfoStore,
) (*podConfigurator, error) {
ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled, disableTXChecksumOffload)
if err != nil {
Expand All @@ -100,7 +96,6 @@ func newPodConfigurator(
gatewayMAC: gatewayMAC,
ifConfigurator: ifConfigurator,
podUpdateNotifier: podUpdateNotifier,
podInfoStore: podInfoStore,
}, nil
}

Expand Down Expand Up @@ -243,7 +238,8 @@ func (pc *podConfigurator) configureInterfaces(
}

var containerConfig *interfacestore.InterfaceConfig
if containerConfig, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, hostIface, containerIface, result.IPs, result.VLANID, containerAccess); err != nil {
if containerConfig, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, containerNetNS,
hostIface, containerIface, result.IPs, result.VLANID, containerAccess); err != nil {
return fmt.Errorf("failed to connect to ovs for container %s: %v", containerID, err)
}
defer func() {
Expand Down Expand Up @@ -486,7 +482,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
return nil
}

func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, containerConfig *interfacestore.InterfaceConfig) error {
func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName, netNS string, containerConfig *interfacestore.InterfaceConfig) error {
// create OVS Port and add attach container configuration into external_ids
containerID := containerConfig.ContainerID
klog.V(2).Infof("Adding OVS port %s for container %s", ovsPortName, containerID)
Expand Down Expand Up @@ -519,8 +515,9 @@ func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, conta
event := agenttypes.PodUpdate{
PodName: containerConfig.PodName,
PodNamespace: containerConfig.PodNamespace,
IsAdd: true,
ContainerID: containerConfig.ContainerID,
NetNS: netNS,
IsAdd: true,
}
pc.podUpdateNotifier.Notify(event)
return nil
Expand Down Expand Up @@ -548,8 +545,8 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface
event := agenttypes.PodUpdate{
PodName: containerConfig.PodName,
PodNamespace: containerConfig.PodNamespace,
IsAdd: false,
ContainerID: containerConfig.ContainerID,
IsAdd: false,
}
pc.podUpdateNotifier.Notify(event)
klog.Infof("Removed interfaces for container %s", containerID)
Expand Down Expand Up @@ -577,8 +574,8 @@ func (pc *podConfigurator) connectInterceptedInterface(
if err = pc.routeClient.MigrateRoutesToGw(hostIface.Name); err != nil {
return fmt.Errorf("connectInterceptedInterface failed to migrate: %w", err)
}
_, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, hostIface,
containerIface, containerIPs, 0, containerAccess)
_, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, containerNetNS,
hostIface, containerIface, containerIPs, 0, containerAccess)
return err
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/cniserver/pod_configuration_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (pc *podConfigurator) connectInterfaceToOVS(
podName string,
podNamespace string,
containerID string,
netNS string,
hostIface *current.Interface,
containerIface *current.Interface,
ips []*current.IPConfig,
Expand All @@ -38,7 +39,7 @@ func (pc *podConfigurator) connectInterfaceToOVS(
// Use the outer veth interface name as the OVS port name.
ovsPortName := hostIface.Name
containerConfig := buildContainerConfig(ovsPortName, containerID, podName, podNamespace, containerIface, ips, vlanID)
return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, containerConfig)
return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, netNS, containerConfig)
}

func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/cniserver/pod_configuration_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func createPodConfigurator(controller *gomock.Controller, testIfaceConfigurator
mockOFClient = openflowtest.NewMockClient(controller)
ifaceStore = interfacestore.NewInterfaceStore()
mockRoute = routetest.NewMockInterface(controller)
configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil)
configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100))
configurator.ifConfigurator = testIfaceConfigurator
return configurator
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/cniserver/pod_configuration_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (pc *podConfigurator) connectInterfaceToOVS(
podName string,
podNamespace string,
containerID string,
netNS string,
hostIface *current.Interface,
containerIface *current.Interface,
ips []*current.IPConfig,
Expand All @@ -87,7 +88,7 @@ func (pc *podConfigurator) connectInterfaceToOVS(
// HNSEndpoint/HostComputeEndpoint, the current implementation will still work. It will choose the synchronized
// way to create OVS port.
if hostInterfaceExistsFunc(hostIfAlias) {
return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, containerConfig)
return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, netNS, containerConfig)
}
klog.V(2).Infof("Adding OVS port %s for container %s", ovsPortName, containerID)
ovsAttachInfo := BuildOVSPortExternalIDs(containerConfig)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/cniserver/secondary.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

func NewSecondaryInterfaceConfigurator(ovsBridgeClient ovsconfig.OVSBridgeClient) (*podConfigurator, error) {
return newPodConfigurator(ovsBridgeClient, nil, nil, nil, nil, ovsconfig.OVSDatapathSystem, false, false, nil, nil)
return newPodConfigurator(ovsBridgeClient, nil, nil, nil, nil, ovsconfig.OVSDatapathSystem, false, false, nil)
}

// ConfigureSriovSecondaryInterface configures a SR-IOV secondary interface for a Pod.
Expand Down
31 changes: 2 additions & 29 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
"antrea.io/antrea/pkg/agent/util"
cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1"
"antrea.io/antrea/pkg/cni"
Expand Down Expand Up @@ -115,7 +114,6 @@ type CNIServer struct {
// Enable AntreaIPAM for secondary networks implementd by other CNIs.
enableSecondaryNetworkIPAM bool
disableTXChecksumOffload bool
secondaryNetworkEnabled bool
networkConfig *config.NetworkConfig
// networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it.
networkReadyCh <-chan struct{}
Expand Down Expand Up @@ -523,13 +521,6 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
// mark success as true to avoid rollback
success = true

if s.secondaryNetworkEnabled {
// Go cache the CNI server info at CNIConfigInfo cache, for podWatch usage
cniInfo := &cnipodcache.CNIConfigInfo{CNIVersion: cniVersion, PodName: podName, PodNamespace: podNamespace,
ContainerID: cniConfig.ContainerId, ContainerNetNS: netNS, PodCNIDeleted: false}
s.podConfigurator.podInfoStore.AddCNIConfigInfo(cniInfo)
}

return resultToResponse(cniResult), nil
}

Expand Down Expand Up @@ -558,16 +549,7 @@ func (s *CNIServer) cmdDel(_ context.Context, cniConfig *CNIConfig) (*cnipb.CniC
return s.configInterfaceFailureResponse(err), nil
}
klog.InfoS("CmdDel for container succeeded", "container", cniConfig.ContainerId)
if s.secondaryNetworkEnabled {
podName := string(cniConfig.K8S_POD_NAME)
podNamespace := string(cniConfig.K8S_POD_NAMESPACE)
containerInfo := s.podConfigurator.podInfoStore.GetCNIConfigInfoByContainerID(podName, podNamespace, cniConfig.ContainerId)
if containerInfo != nil {
// Update PodCNIDeleted = true.
// This is to let Podwatch controller know that the CNI server cleaned up this Pod's primary network configuration.
s.podConfigurator.podInfoStore.SetPodCNIDeleted(containerInfo)
}
}

return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil
}

Expand Down Expand Up @@ -652,21 +634,12 @@ func (s *CNIServer) Initialize(
ofClient openflow.Client,
ifaceStore interfacestore.InterfaceStore,
podUpdateNotifier channel.Notifier,
podInfoStore cnipodcache.CNIPodInfoStore,
) error {
var err error
// If podInfoStore is not nil, secondaryNetwork configuration is supported.
if podInfoStore != nil {
s.secondaryNetworkEnabled = true
} else {
s.secondaryNetworkEnabled = false
}

s.podConfigurator, err = newPodConfigurator(
ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC,
ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(),
s.disableTXChecksumOffload,
podUpdateNotifier, podInfoStore)
s.disableTXChecksumOffload, podUpdateNotifier)
if err != nil {
return fmt.Errorf("error during initialize podConfigurator: %v", err)
}
Expand Down
Loading

0 comments on commit 615b917

Please sign in to comment.