Skip to content

Commit

Permalink
Delay removal of flow-restore-wait
Browse files Browse the repository at this point in the history
Until a set of "essential" flows has been installed. At the moment, we
include NetworkPolicy flows (using podNetworkWait as the signal), Pod
forwarding flows (reconciled by the CNIServer), and Node routing flows
(installed by the NodeRouteController). This set can be extended in the
future if desired.

We leverage the wrapper around sync.WaitGroup which was introduced
previously in antrea-io#5777. It simplifies unit testing, and we can achieve some
symmetry with podNetworkWait.

We can also start leveraging this new wait group
(flowRestoreCompleteWait) as the signal to delete flows from previous
rounds. However, at the moment this is incomplete, as we don't wait for
all controllers to signal that they have installed initial flows.

Because the NodeRouteController does not have an initial "reconcile"
operation (like the CNIServer) to install flows for the initial Node
list, we instead rely on a different mechanims provided by upstream K8s
for controllers. When registering event handlers, we can request for the
ADD handler to include a boolean flag indicating whether the object is
part of the initial list retrieved by the informer. Using this
mechanism, we can reliably signal through flowRestoreCompleteWait when
this initial list of Nodes has been synced at least once.

This change is possible because of antrea-io#6361, which removed the dependency
on the proxy (kube-proxy or AntreaProxy) to access the Antrea
Controller. Prior to antrea-io#6361, there would have been a circular dependency
in the case where kube-proxy was removed: flow-restore-wait will not be
removed until the Pod network is "ready", which will not happen until
the NetworkPolicy controller has started its watchers, and that depends
on antrea Service reachability which depends on flow-restore-wait being
removed.

Fixes antrea-io#6338

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas committed May 31, 2024
1 parent 264372d commit 01b3092
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 86 deletions.
48 changes: 33 additions & 15 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,20 @@ func run(o *Options) error {
// Processes that enable Pod network should wait for it.
podNetworkWait := utilwait.NewGroup()

// flowRestoreCompleteWait is used to wait until "essential" flows have been installed
// successfully in OVS. These flows include NetworkPolicy flows (guaranteed by
// podNetworkWait), Pod forwarding flows and flows installed by the
// NodeRouteController. Additional requirements may be added in the future.
flowRestoreCompleteWait := utilwait.NewGroup()
// We ensure that flowRestoreCompleteWait.Wait() cannot return until podNetworkWait.Wait()
// returns. This is not strictly necessary because it is guatanteed by the CNIServer Pod
// reconciliation logic but it helps with readability.
flowRestoreCompleteWait.Increment()
go func() {
defer flowRestoreCompleteWait.Done()
podNetworkWait.Wait()
}()

// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
Expand Down Expand Up @@ -299,6 +313,7 @@ func run(o *Options) error {
egressConfig,
serviceConfig,
podNetworkWait,
flowRestoreCompleteWait,
stopCh,
o.nodeType,
o.config.ExternalNode.ExternalNodeNamespace,
Expand Down Expand Up @@ -332,6 +347,7 @@ func run(o *Options) error {
nodeConfig,
agentInitializer.GetWireGuardClient(),
ipsecCertController,
flowRestoreCompleteWait,
)
}

Expand Down Expand Up @@ -596,7 +612,8 @@ func run(o *Options) error {
enableAntreaIPAM,
o.config.DisableTXChecksumOffload,
networkConfig,
podNetworkWait)
podNetworkWait,
flowRestoreCompleteWait)

err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel)
if err != nil {
Expand Down Expand Up @@ -638,20 +655,6 @@ func run(o *Options) error {
o.enableAntreaProxy)
}

// TODO: we should call this after installing flows for initial node routes
// and initial NetworkPolicies so that no packets will be mishandled.
if err := agentInitializer.FlowRestoreComplete(); err != nil {
return err
}
// ConnectUplinkToOVSBridge must be run immediately after FlowRestoreComplete
if connectUplinkToBridge {
// Restore network config before shutdown. ovsdbConnection must be alive when restore.
defer agentInitializer.RestoreOVSBridge()
if err := agentInitializer.ConnectUplinkToOVSBridge(); err != nil {
return fmt.Errorf("failed to connect uplink to OVS bridge: %w", err)
}
}

if err := antreaClientProvider.RunOnce(); err != nil {
return err
}
Expand Down Expand Up @@ -848,6 +851,21 @@ func run(o *Options) error {
go mcStrechedNetworkPolicyController.Run(stopCh)
}

klog.InfoS("Waiting for flow restoration to complete")
flowRestoreCompleteWait.Wait()
if err := agentInitializer.FlowRestoreComplete(); err != nil {
return err
}
klog.InfoS("Flow restoration complete")
// ConnectUplinkToOVSBridge must be run immediately after FlowRestoreComplete
if connectUplinkToBridge {
// Restore network config before shutdown. ovsdbConnection must be alive when restore.
defer agentInitializer.RestoreOVSBridge()
if err := agentInitializer.ConnectUplinkToOVSBridge(); err != nil {
return fmt.Errorf("failed to connect uplink to OVS bridge: %w", err)
}
}

// statsCollector collects stats and reports to the antrea-controller periodically. For now it's only used for
// NetworkPolicy stats and Multicast stats.
if features.DefaultFeatureGate.Enabled(features.NetworkPolicyStats) {
Expand Down
73 changes: 40 additions & 33 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,14 @@ type Initializer struct {
enableAntreaProxy bool
// podNetworkWait should be decremented once the Node's network is ready.
// The CNI server will wait for it before handling any CNI Add requests.
podNetworkWait *utilwait.Group
stopCh <-chan struct{}
nodeType config.NodeType
externalNodeNamespace string
podNetworkWait *utilwait.Group
// flowRestoreCompleteWait is used to indicate that required flows have
// been installed. We use it to determine whether flows from previous
// rounds can be deleted.
flowRestoreCompleteWait *utilwait.Group
stopCh <-chan struct{}
nodeType config.NodeType
externalNodeNamespace string
}

func NewInitializer(
Expand All @@ -154,6 +158,7 @@ func NewInitializer(
egressConfig *config.EgressConfig,
serviceConfig *config.ServiceConfig,
podNetworkWait *utilwait.Group,
flowRestoreCompleteWait *utilwait.Group,
stopCh <-chan struct{},
nodeType config.NodeType,
externalNodeNamespace string,
Expand All @@ -163,29 +168,30 @@ func NewInitializer(
enableL7FlowExporter bool,
) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
ovsCtlClient: ovsCtlClient,
client: k8sClient,
crdClient: crdClient,
ifaceStore: ifaceStore,
ofClient: ofClient,
routeClient: routeClient,
ovsBridge: ovsBridge,
hostGateway: hostGateway,
mtu: mtu,
networkConfig: networkConfig,
wireGuardConfig: wireGuardConfig,
egressConfig: egressConfig,
serviceConfig: serviceConfig,
l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{},
podNetworkWait: podNetworkWait,
stopCh: stopCh,
nodeType: nodeType,
externalNodeNamespace: externalNodeNamespace,
connectUplinkToBridge: connectUplinkToBridge,
enableAntreaProxy: enableAntreaProxy,
enableL7NetworkPolicy: enableL7NetworkPolicy,
enableL7FlowExporter: enableL7FlowExporter,
ovsBridgeClient: ovsBridgeClient,
ovsCtlClient: ovsCtlClient,
client: k8sClient,
crdClient: crdClient,
ifaceStore: ifaceStore,
ofClient: ofClient,
routeClient: routeClient,
ovsBridge: ovsBridge,
hostGateway: hostGateway,
mtu: mtu,
networkConfig: networkConfig,
wireGuardConfig: wireGuardConfig,
egressConfig: egressConfig,
serviceConfig: serviceConfig,
l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{},
podNetworkWait: podNetworkWait,
flowRestoreCompleteWait: flowRestoreCompleteWait,
stopCh: stopCh,
nodeType: nodeType,
externalNodeNamespace: externalNodeNamespace,
connectUplinkToBridge: connectUplinkToBridge,
enableAntreaProxy: enableAntreaProxy,
enableL7NetworkPolicy: enableL7NetworkPolicy,
enableL7FlowExporter: enableL7FlowExporter,
}
}

Expand Down Expand Up @@ -529,13 +535,14 @@ func (i *Initializer) initOpenFlowPipeline() error {
// the new round number), otherwise we would disrupt the dataplane. Unfortunately,
// the time required for convergence may be large and there is no simple way to
// determine when is a right time to perform the cleanup task.
// TODO: introduce a deterministic mechanism through which the different entities
// responsible for installing flows can notify the agent that this deletion
// operation can take place. A waitGroup can be created here and notified when
// full sync in agent networkpolicy controller is complete. This would signal NP
// flows have been synced once. Other mechanisms are still needed for node flows
// fullSync check.
// We took a first step towards introducing a deterministic mechanism through which
// the different entities responsible for installing flows can notify the agent that
// this deletion operation can take place. i.flowRestoreCompleteWait.Wait() will
// block until some key flows (NetworkPolicy flows, Pod flows, Node route flows)
// have been installed. But not all entities responsible for installing flows
// currently use this wait group, so we block for a minimum of 10 seconds.
time.Sleep(10 * time.Second)
i.flowRestoreCompleteWait.Wait()
klog.Info("Deleting stale flows from previous round if any")
if err := i.ofClient.DeleteStaleFlows(); err != nil {
klog.Errorf("Error when deleting stale flows from previous round: %v", err)
Expand Down
13 changes: 11 additions & 2 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net"
"strings"
"sync"

current "github.com/containernetworking/cni/pkg/types/100"
"github.com/containernetworking/cni/pkg/version"
Expand Down Expand Up @@ -404,7 +405,7 @@ func parsePrevResult(conf *types.NetworkConfig) error {
return nil
}

func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator, podNetworkWait *wait.Group) error {
func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator, podNetworkWait, flowRestoreCompleteWait *wait.Group) error {
// desiredPods is the set of Pods that should be present, based on the
// current list of Pods got from the Kubernetes API.
desiredPods := sets.New[string]()
Expand All @@ -413,6 +414,8 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
// knownInterfaces is the list of interfaces currently in the local cache.
knownInterfaces := pc.ifaceStore.GetInterfacesByType(interfacestore.ContainerInterface)

var podWg sync.WaitGroup

for _, pod := range pods {
// Skip Pods for which we are not in charge of the networking.
if pod.Spec.HostNetwork {
Expand All @@ -436,7 +439,9 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
missingIfConfigs = append(missingIfConfigs, containerConfig)
continue
}
podWg.Add(1)
go func(containerID, pod, namespace string) {
defer podWg.Done()
// Do not install Pod flows until all preconditions are met.
podNetworkWait.Wait()
// To avoid race condition with CNIServer CNI event handlers.
Expand All @@ -452,7 +457,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
// We rely on the interface cache / store - which is initialized from the persistent
// OVSDB - to map the Pod to its interface configuration. The interface
// configuration includes the parameters we need to replay the flows.
klog.V(4).InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName)
klog.InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName)
if err := pc.ofClient.InstallPodFlows(
containerConfig.InterfaceName,
containerConfig.IPs,
Expand All @@ -473,6 +478,10 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
// interface should no longer be in store after the call to removeInterfaces
}
}
go func() {
defer flowRestoreCompleteWait.Done()
podWg.Wait()
}()
if len(missingIfConfigs) > 0 {
pc.reconcileMissingPods(missingIfConfigs, containerAccess)
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ type CNIServer struct {
networkConfig *config.NetworkConfig
// podNetworkWait notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it.
podNetworkWait *wait.Group
// flowRestoreCompleteWait will be decremented and Pod reconciliation is completed.
flowRestoreCompleteWait *wait.Group
}

var supportedCNIVersionSet map[string]bool
Expand Down Expand Up @@ -630,7 +632,7 @@ func New(
routeClient route.Interface,
isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool,
networkConfig *config.NetworkConfig,
podNetworkWait *wait.Group,
podNetworkWait, flowRestoreCompleteWait *wait.Group,
) *CNIServer {
return &CNIServer{
cniSocket: cniSocket,
Expand All @@ -646,6 +648,7 @@ func New(
enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM,
networkConfig: networkConfig,
podNetworkWait: podNetworkWait,
flowRestoreCompleteWait: flowRestoreCompleteWait.Increment(),
}
}

Expand Down Expand Up @@ -748,7 +751,7 @@ func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse,
// installing Pod flows, so as part of this reconciliation process we retrieve the Pod list from the
// K8s apiserver and replay the necessary flows.
func (s *CNIServer) reconcile() error {
klog.InfoS("Reconciliation for CNI server")
klog.InfoS("Starting reconciliation for CNI server")
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
Expand All @@ -759,7 +762,7 @@ func (s *CNIServer) reconcile() error {
return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err)
}

return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait)
return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait, s.flowRestoreCompleteWait)
}

func init() {
Expand Down
11 changes: 6 additions & 5 deletions pkg/agent/cniserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,11 +761,12 @@ func translateRawPrevResult(prevResult *current.Result, cniVersion string) (map[

func newCNIServer(t *testing.T) *CNIServer {
cniServer := &CNIServer{
cniSocket: testSocket,
nodeConfig: testNodeConfig,
serverVersion: cni.AntreaCNIVersion,
containerAccess: newContainerAccessArbitrator(),
podNetworkWait: wait.NewGroup(),
cniSocket: testSocket,
nodeConfig: testNodeConfig,
serverVersion: cni.AntreaCNIVersion,
containerAccess: newContainerAccessArbitrator(),
podNetworkWait: wait.NewGroup(),
flowRestoreCompleteWait: wait.NewGroup().Increment(),
}
cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450}
return cniServer
Expand Down
Loading

0 comments on commit 01b3092

Please sign in to comment.