From 91cb09da648adcb46b68c55e5dfad4aa11af123c Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Thu, 16 Nov 2023 16:41:03 +0800 Subject: [PATCH 1/5] Store NetworkPolicy in filesystem as fallback data source In the previous implementation, traffic from/to a Pod may bypass NetworkPolicies applied to the Pod in a time window when the agent restarts because realizing NetworkPolicies and enabling forwarding are asynchronous. This patch stores NetworkPolicy data in files when they are received, and makes antre-agent fallback to use the files as data source if it can't connect to antrea-controller on startup. This prevents security regression: a NetworkPolicy that has been realized on a Node will continue to work even if antrea-controller is not available after antrea-agent restarts. The benchmark results of the storage's operations are as below: BenchmarkFileStoreAddNetworkPolicy-40 70383 16102 ns/op 520 B/op 9 allocs/op BenchmarkFileStoreAddAppliedToGroup-40 45382 25880 ns/op 3019 B/op 9 allocs/op BenchmarkFileStoreAddAddressGroup-40 7400 180000 ns/op 49538 B/op 9 allocs/op BenchmarkFileStoreReplaceAll-40 10 127088004 ns/op 17815943 B/op 33099 allocs/op The disk usage when storing 1k NetworkPolicies, AddressGroups, and AppliedToGroups created by BenchmarkFileStoreReplaceAll is as below: 16M /var/run/antrea-test/file-store/address-groups 4.0M /var/run/antrea-test/file-store/applied-to-groups 4.0M /var/run/antrea-test/file-store/network-policies Signed-off-by: Quan Tian --- cmd/antrea-agent/agent.go | 2 + pkg/agent/controller/networkpolicy/cache.go | 32 ++- .../controller/networkpolicy/cache_test.go | 10 +- .../controller/networkpolicy/filestore.go | 134 ++++++++++++ .../networkpolicy/filestore_test.go | 190 ++++++++++++++++++ .../networkpolicy/networkpolicy_controller.go | 158 +++++++++++++-- .../networkpolicy_controller_test.go | 184 ++++++++++++++++- test/e2e/networkpolicy_test.go | 95 ++++++++- 8 files changed, 781 insertions(+), 24 deletions(-) create mode 100644 pkg/agent/controller/networkpolicy/filestore.go create mode 100644 pkg/agent/controller/networkpolicy/filestore_test.go diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 1bef425d4b5..f431efd0b91 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -20,6 +20,7 @@ import ( "net" "time" + "github.com/spf13/afero" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/sets" @@ -446,6 +447,7 @@ func run(o *Options) error { antreaClientProvider, ofClient, ifaceStore, + afero.NewOsFs(), nodeKey, podUpdateChannel, externalEntityUpdateChannel, diff --git a/pkg/agent/controller/networkpolicy/cache.go b/pkg/agent/controller/networkpolicy/cache.go index 70dd3a711f0..2eb162f7a37 100644 --- a/pkg/agent/controller/networkpolicy/cache.go +++ b/pkg/agent/controller/networkpolicy/cache.go @@ -551,13 +551,14 @@ func (c *ruleCache) addAddressGroupLocked(group *v1beta.AddressGroup) error { // PatchAddressGroup updates a cached *v1beta.AddressGroup. // The rules referencing it will be regarded as dirty. -func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) error { +// It returns a copy of the patched AddressGroup, or an error if the AddressGroup doesn't exist. +func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) (*v1beta.AddressGroup, error) { c.addressSetLock.Lock() defer c.addressSetLock.Unlock() groupMemberSet, exists := c.addressSetByGroup[patch.Name] if !exists { - return fmt.Errorf("AddressGroup %v doesn't exist in cache, can't be patched", patch.Name) + return nil, fmt.Errorf("AddressGroup %v doesn't exist in cache, can't be patched", patch.Name) } for i := range patch.AddedGroupMembers { groupMemberSet.Insert(&patch.AddedGroupMembers[i]) @@ -567,7 +568,16 @@ func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) error { } c.onAddressGroupUpdate(patch.Name) - return nil + + members := make([]v1beta.GroupMember, 0, len(groupMemberSet)) + for _, member := range groupMemberSet { + members = append(members, *member) + } + group := &v1beta.AddressGroup{ + ObjectMeta: patch.ObjectMeta, + GroupMembers: members, + } + return group, nil } // DeleteAddressGroup deletes a cached *v1beta.AddressGroup. @@ -639,13 +649,14 @@ func (c *ruleCache) addAppliedToGroupLocked(group *v1beta.AppliedToGroup) error // PatchAppliedToGroup updates a cached *v1beta.AppliedToGroupPatch. // The rules referencing it will be regarded as dirty. -func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) error { +// It returns a copy of the patched AppliedToGroup, or an error if the AppliedToGroup doesn't exist. +func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) (*v1beta.AppliedToGroup, error) { c.appliedToSetLock.Lock() defer c.appliedToSetLock.Unlock() memberSet, exists := c.appliedToSetByGroup[patch.Name] if !exists { - return fmt.Errorf("AppliedToGroup %v doesn't exist in cache, can't be patched", patch.Name) + return nil, fmt.Errorf("AppliedToGroup %v doesn't exist in cache, can't be patched", patch.Name) } for i := range patch.AddedGroupMembers { memberSet.Insert(&patch.AddedGroupMembers[i]) @@ -654,7 +665,16 @@ func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) error memberSet.Delete(&patch.RemovedGroupMembers[i]) } c.onAppliedToGroupUpdate(patch.Name) - return nil + + members := make([]v1beta.GroupMember, 0, len(memberSet)) + for _, member := range memberSet { + members = append(members, *member) + } + group := &v1beta.AppliedToGroup{ + ObjectMeta: patch.ObjectMeta, + GroupMembers: members, + } + return group, nil } // DeleteAppliedToGroup deletes a cached *v1beta.AppliedToGroup. diff --git a/pkg/agent/controller/networkpolicy/cache_test.go b/pkg/agent/controller/networkpolicy/cache_test.go index 0ced8235e26..dc68f2f5b13 100644 --- a/pkg/agent/controller/networkpolicy/cache_test.go +++ b/pkg/agent/controller/networkpolicy/cache_test.go @@ -1039,7 +1039,7 @@ func TestRuleCachePatchAppliedToGroup(t *testing.T) { for _, rule := range tt.rules { c.rules.Add(rule) } - err := c.PatchAppliedToGroup(tt.args) + ret, err := c.PatchAppliedToGroup(tt.args) if (err == nil) == tt.expectedErr { t.Fatalf("Got error %v, expected %t", err, tt.expectedErr) } @@ -1048,6 +1048,9 @@ func TestRuleCachePatchAppliedToGroup(t *testing.T) { } actualPods, _ := c.appliedToSetByGroup[tt.args.Name] assert.ElementsMatch(t, tt.expectedPods, actualPods.Items(), "stored Pods not equal") + if !tt.expectedErr { + assert.Equal(t, len(ret.GroupMembers), len(actualPods)) + } }) } } @@ -1116,7 +1119,7 @@ func TestRuleCachePatchAddressGroup(t *testing.T) { for _, rule := range tt.rules { c.rules.Add(rule) } - err := c.PatchAddressGroup(tt.args) + ret, err := c.PatchAddressGroup(tt.args) if (err == nil) == tt.expectedErr { t.Fatalf("Got error %v, expected %t", err, tt.expectedErr) } @@ -1125,6 +1128,9 @@ func TestRuleCachePatchAddressGroup(t *testing.T) { } actualAddresses, _ := c.addressSetByGroup[tt.args.Name] assert.ElementsMatch(t, tt.expectedAddresses, actualAddresses.Items(), "stored addresses not equal") + if !tt.expectedErr { + assert.Equal(t, len(ret.GroupMembers), len(actualAddresses)) + } }) } } diff --git a/pkg/agent/controller/networkpolicy/filestore.go b/pkg/agent/controller/networkpolicy/filestore.go new file mode 100644 index 00000000000..702a6b163f8 --- /dev/null +++ b/pkg/agent/controller/networkpolicy/filestore.go @@ -0,0 +1,134 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkpolicy + +import ( + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + + "github.com/spf13/afero" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" +) + +// fileStore encodes and stores runtime.Objects in files. Each object will be stored in a separate file under the given +// directory. +type fileStore struct { + fs afero.Fs + // The directory to store the files. + dir string + // serializer knows how to encode and decode the objects. + serializer runtime.Serializer +} + +func newFileStore(fs afero.Fs, dir string, serializer runtime.Serializer) (*fileStore, error) { + s := &fileStore{ + fs: fs, + dir: dir, + serializer: serializer, + } + klog.V(2).InfoS("Creating directory for NetworkPolicy cache", "dir", dir) + if err := s.fs.MkdirAll(dir, 0o600); err != nil { + return nil, err + } + return s, nil +} + +// save stores the given object in file with the object's UID as the file name, overwriting any existing content if the +// file already exists. Note the method may update the object's GroupVersionKind in-place during serialization. +func (s fileStore) save(item runtime.Object) error { + object := item.(metav1.Object) + path := filepath.Join(s.dir, string(object.GetUID())) + file, err := s.fs.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600) + if err != nil { + return fmt.Errorf("error opening file for writing object %v: %w", object.GetUID(), err) + } + defer file.Close() + // Encode may update the object's GroupVersionKind in-place during serialization. + err = s.serializer.Encode(item, file) + if err != nil { + return fmt.Errorf("error writing object %v to file: %w", object.GetUID(), err) + } + return nil +} + +// delete removes the file with the object's UID as the file name if it exists. +func (s fileStore) delete(item runtime.Object) error { + object := item.(metav1.Object) + path := filepath.Join(s.dir, string(object.GetUID())) + err := s.fs.Remove(path) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + return nil +} + +// replaceAll replaces all files under the directory with the given objects. Existing files not in the given objects +// will be removed. Note the method may update the object's GroupVersionKind in-place during serialization. +func (s fileStore) replaceAll(items []runtime.Object) error { + if err := s.fs.RemoveAll(s.dir); err != nil { + return err + } + if err := s.fs.MkdirAll(s.dir, 0o600); err != nil { + return err + } + for _, item := range items { + if err := s.save(item); err != nil { + return err + } + } + return nil +} + +func (s fileStore) loadAll() ([]runtime.Object, error) { + var objects []runtime.Object + err := afero.Walk(s.fs, s.dir, func(path string, info fs.FileInfo, err error) error { + if info.IsDir() { + return nil + } + file, err2 := s.fs.Open(path) + if err2 != nil { + return err2 + } + defer file.Close() + data, err2 := io.ReadAll(file) + if err2 != nil { + return err2 + } + + object, gkv, err2 := s.serializer.Decode(data, nil, nil) + // If the data is corrupted somehow, we still want to load other data and continue the process. + if err2 != nil { + klog.ErrorS(err2, "Failed to decode data from file, ignore it", "file", path) + return nil + } + // Note: we haven't stored a different version so far but version conversion should be performed when the used + // version is upgraded in the future. + klog.V(2).InfoS("Loaded object from file", "gkv", gkv, "object", object) + objects = append(objects, object) + return nil + }) + if err != nil { + return nil, err + } + return objects, nil +} diff --git a/pkg/agent/controller/networkpolicy/filestore_test.go b/pkg/agent/controller/networkpolicy/filestore_test.go new file mode 100644 index 00000000000..71ef6a59c13 --- /dev/null +++ b/pkg/agent/controller/networkpolicy/filestore_test.go @@ -0,0 +1,190 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkpolicy + +import ( + "fmt" + "testing" + + "github.com/google/uuid" + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer/protobuf" + "k8s.io/apimachinery/pkg/types" + + "antrea.io/antrea/pkg/apis/controlplane/v1beta2" +) + +const ( + testDataPath = "/var/run/antrea-test/file-store" +) + +// Set it to NewMemMapFs as the file system may be not writable. +// Change it to NewOsFs to evaluate performance when writing to disk. +var newFS = afero.NewMemMapFs + +func newFakeFileStore(tb testing.TB, dir string) *fileStore { + serializer := protobuf.NewSerializer(scheme, scheme) + codec := codecs.CodecForVersions(serializer, serializer, v1beta2.SchemeGroupVersion, v1beta2.SchemeGroupVersion) + // Create a new FS for every fileStore in case of interaction between tests. + fs := afero.NewBasePathFs(newFS(), testDataPath) + s, err := newFileStore(fs, dir, codec) + assert.NoError(tb, err) + return s +} + +func TestFileStore(t *testing.T) { + policy1 := newNetworkPolicy("policy1", "uid1", []string{"addressGroup1"}, nil, []string{"appliedToGroup1"}, nil) + policy2 := newNetworkPolicy("policy2", "uid2", []string{"addressGroup2"}, nil, []string{"appliedToGroup2"}, nil) + policy3 := newNetworkPolicy("policy3", "uid3", []string{"addressGroup3"}, nil, []string{"appliedToGroup3"}, nil) + updatedPolicy2 := policy2.DeepCopy() + updatedPolicy2.AppliedToGroups = []string{"foo"} + + tests := []struct { + name string + ops func(*fileStore) + expectedObjects []runtime.Object + }{ + { + name: "add", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.save(policy3) + }, + expectedObjects: []runtime.Object{policy1, policy2, policy3}, + }, + { + name: "update", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.save(updatedPolicy2) + }, + expectedObjects: []runtime.Object{policy1, updatedPolicy2}, + }, + { + name: "delete", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.delete(policy2) + }, + expectedObjects: []runtime.Object{policy1}, + }, + { + name: "replace", + ops: func(store *fileStore) { + store.save(policy1) + store.save(policy2) + store.replaceAll([]runtime.Object{updatedPolicy2, policy3}) + }, + expectedObjects: []runtime.Object{updatedPolicy2, policy3}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := newFakeFileStore(t, networkPoliciesDir) + tt.ops(s) + gotObjects, err := s.loadAll() + require.NoError(t, err) + assert.Equal(t, tt.expectedObjects, gotObjects) + }) + } +} + +func BenchmarkFileStoreAddNetworkPolicy(b *testing.B) { + policy := newNetworkPolicy("policy1", types.UID(uuid.New().String()), []string{uuid.New().String()}, nil, []string{uuid.New().String()}, nil) + s := newFakeFileStore(b, networkPoliciesDir) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + s.save(policy) + } +} + +func BenchmarkFileStoreAddAppliedToGroup(b *testing.B) { + members := make([]v1beta2.GroupMember, 0, 100) + for i := 0; i < 100; i++ { + members = append(members, *newAppliedToGroupMemberPod(fmt.Sprintf("pod-%d", i), "namespace")) + } + atg := newAppliedToGroup(uuid.New().String(), members) + s := newFakeFileStore(b, appliedToGroupsDir) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + s.save(atg) + } +} + +func BenchmarkFileStoreAddAddressGroup(b *testing.B) { + members := make([]v1beta2.GroupMember, 0, 1000) + for i := 0; i < 1000; i++ { + members = append(members, *newAddressGroupPodMember(fmt.Sprintf("pod-%d", i), "namespace", "192.168.0.1")) + } + ag := newAddressGroup(uuid.New().String(), members) + s := newFakeFileStore(b, addressGroupsDir) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + s.save(ag) + } +} + +func BenchmarkFileStoreReplaceAll(b *testing.B) { + nps := make([]runtime.Object, 0, 1000) + atgs := make([]runtime.Object, 0, 1000) + ags := make([]runtime.Object, 0, 1000) + for i := 0; i < 1000; i++ { + policyName := uuid.New().String() + addressGroupName := uuid.New().String() + appliedToGroupName := uuid.New().String() + nps = append(nps, newNetworkPolicy(policyName, types.UID(policyName), []string{addressGroupName}, nil, []string{appliedToGroupName}, nil)) + + var atgMembers []v1beta2.GroupMember + for j := 0; j < 100; j++ { + atgMembers = append(atgMembers, *newAppliedToGroupMemberPod(fmt.Sprintf("pod-%d", j), "namespace")) + } + atg := newAppliedToGroup(appliedToGroupName, atgMembers) + atgs = append(atgs, atg) + + var agMembers []v1beta2.GroupMember + podNum := 100 + if i < 10 { + podNum = 10000 + } else if i < 110 { + podNum = 1000 + } + for j := 0; j < podNum; j++ { + agMembers = append(agMembers, *newAddressGroupPodMember(fmt.Sprintf("pod-%d", j), "namespace", "192.168.0.1")) + } + ag := newAddressGroup(addressGroupName, agMembers) + ags = append(ags, ag) + } + + networkPolicyStore := newFakeFileStore(b, networkPoliciesDir) + appliedToGroupStore := newFakeFileStore(b, appliedToGroupsDir) + addressGroupStore := newFakeFileStore(b, addressGroupsDir) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + networkPolicyStore.replaceAll(nps) + appliedToGroupStore.replaceAll(atgs) + addressGroupStore.replaceAll(ags) + } +} diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index 813b3ab0f07..2e144bdb94b 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -23,9 +23,12 @@ import ( "time" "antrea.io/ofnet/ofctrl" + "github.com/spf13/afero" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/runtime/serializer/protobuf" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/util/workqueue" @@ -39,6 +42,7 @@ import ( "antrea.io/antrea/pkg/agent/openflow" proxytypes "antrea.io/antrea/pkg/agent/proxy/types" "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/apis/controlplane/install" "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/querier" "antrea.io/antrea/pkg/util/channel" @@ -58,6 +62,13 @@ const ( dnsInterceptRuleID = uint32(1) ) +const ( + dataPath = "/var/run/antrea/networkpolicy" + networkPoliciesDir = "network-policies" + appliedToGroupsDir = "applied-to-groups" + addressGroupsDir = "address-groups" +) + type L7RuleReconciler interface { AddRule(ruleID, policyName string, vlanID uint32, l7Protocols []v1beta2.L7Protocol, enableLogging bool) error DeleteRule(ruleID string, vlanID uint32) error @@ -65,6 +76,15 @@ type L7RuleReconciler interface { var emptyWatch = watch.NewEmptyWatch() +var ( + scheme = runtime.NewScheme() + codecs = serializer.NewCodecFactory(scheme) +) + +func init() { + install.Install(scheme) +} + type packetInAction func(*ofctrl.PacketIn) error // Controller is responsible for watching Antrea AddressGroups, AppliedToGroups, @@ -130,6 +150,12 @@ type Controller struct { tunPort uint32 nodeConfig *config.NodeConfig + // The fileStores store runtime.Objects in files and use them as the fallback data source when agent can't connect + // to antrea-controller on startup. + networkPolicyStore *fileStore + appliedToGroupStore *fileStore + addressGroupStore *fileStore + logPacketAction packetInAction rejectRequestAction packetInAction storeDenyConnectionAction packetInAction @@ -139,6 +165,7 @@ type Controller struct { func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, + fs afero.Fs, nodeName string, podUpdateSubscriber channel.Subscriber, externalEntityUpdateSubscriber channel.Subscriber, @@ -179,8 +206,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, c.l7VlanIDAllocator = newL7VlanIDAllocator() } + var err error if antreaPolicyEnabled { - var err error if c.fqdnController, err = newFQDNController(ofClient, idAllocator, dnsServerOverride, c.enqueueRule, v4Enabled, v6Enabled, gwPort); err != nil { return nil, err } @@ -192,6 +219,23 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, c.reconciler = newReconciler(ofClient, ifaceStore, idAllocator, c.fqdnController, groupCounters, v4Enabled, v6Enabled, antreaPolicyEnabled, multicastEnabled) c.ruleCache = newRuleCache(c.enqueueRule, podUpdateSubscriber, externalEntityUpdateSubscriber, groupIDUpdates, nodeType) + + serializer := protobuf.NewSerializer(scheme, scheme) + codec := codecs.CodecForVersions(serializer, serializer, v1beta2.SchemeGroupVersion, v1beta2.SchemeGroupVersion) + fs = afero.NewBasePathFs(fs, dataPath) + c.networkPolicyStore, err = newFileStore(fs, networkPoliciesDir, codec) + if err != nil { + return nil, fmt.Errorf("error creating file store for NetworkPolicy: %w", err) + } + c.appliedToGroupStore, err = newFileStore(fs, appliedToGroupsDir, codec) + if err != nil { + return nil, fmt.Errorf("error creating file store for AppliedToGroup: %w", err) + } + c.addressGroupStore, err = newFileStore(fs, addressGroupsDir, codec) + if err != nil { + return nil, fmt.Errorf("error creating file store for AddressGroup: %w", err) + } + if statusManagerEnabled { c.statusManager = newStatusController(antreaClientGetter, nodeName, c.ruleCache) } @@ -238,6 +282,11 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, "policyName", policy.SourceRef.ToString()) return nil } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.networkPolicyStore.save(policy); err != nil { + klog.ErrorS(err, "Failed to store the NetworkPolicy to file", "policyName", policy.SourceRef.ToString()) + } c.ruleCache.AddNetworkPolicy(policy) klog.InfoS("NetworkPolicy applied to Pods on this Node", "policyName", policy.SourceRef.ToString()) return nil @@ -252,6 +301,11 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, "policyName", policy.SourceRef.ToString()) return nil } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.networkPolicyStore.save(policy); err != nil { + klog.ErrorS(err, "Failed to store the NetworkPolicy to file", "policyName", policy.SourceRef.ToString()) + } updated := c.ruleCache.UpdateNetworkPolicy(policy) // If any rule or the generation changes, we ensure statusManager will resync the policy's status once, in // case the changes don't cause any actual rule update but the whole policy's generation is changed. @@ -272,6 +326,9 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, } c.ruleCache.DeleteNetworkPolicy(policy) klog.InfoS("NetworkPolicy no longer applied to Pods on this Node", "policyName", policy.SourceRef.ToString()) + if err := c.networkPolicyStore.save(policy); err != nil { + klog.ErrorS(err, "Failed to delete the NetworkPolicy from file", "policyName", policy.SourceRef.ToString()) + } return nil }, ReplaceFunc: func(objs []runtime.Object) error { @@ -296,9 +353,15 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, c.statusManager.Resync(policies[i].UID) } } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.networkPolicyStore.replaceAll(objs); err != nil { + klog.ErrorS(err, "Failed to store the NetworkPolicies to files") + } c.ruleCache.ReplaceNetworkPolicies(policies) return nil }, + FallbackFunc: c.networkPolicyStore.loadAll, fullSyncWaitGroup: &c.fullSyncGroup, fullSynced: false, } @@ -317,15 +380,28 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, if !ok { return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", obj) } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.appliedToGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AppliedToGroup to file", "groupName", group.Name) + } c.ruleCache.AddAppliedToGroup(group) return nil }, UpdateFunc: func(obj runtime.Object) error { - group, ok := obj.(*v1beta2.AppliedToGroupPatch) + patch, ok := obj.(*v1beta2.AppliedToGroupPatch) if !ok { - return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", obj) + return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroupPatch: %v", obj) + } + group, err := c.ruleCache.PatchAppliedToGroup(patch) + if err != nil { + return err + } + // It's fine to store the object to file after applying the patch to ruleCache because the returned object + // is newly created, and ruleCache itself doesn't use it. + if err := c.appliedToGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AppliedToGroup to file", "groupName", group.Name) } - c.ruleCache.PatchAppliedToGroup(group) return nil }, DeleteFunc: func(obj runtime.Object) error { @@ -334,6 +410,9 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", obj) } c.ruleCache.DeleteAppliedToGroup(group) + if err := c.appliedToGroupStore.delete(group); err != nil { + klog.ErrorS(err, "Failed to delete the AppliedToGroup from file", "groupName", group.Name) + } return nil }, ReplaceFunc: func(objs []runtime.Object) error { @@ -345,9 +424,15 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AppliedToGroup: %v", objs[i]) } } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if c.appliedToGroupStore.replaceAll(objs); err != nil { + klog.ErrorS(err, "Failed to store the AppliedToGroups to files") + } c.ruleCache.ReplaceAppliedToGroups(groups) return nil }, + FallbackFunc: c.appliedToGroupStore.loadAll, fullSyncWaitGroup: &c.fullSyncGroup, fullSynced: false, } @@ -366,15 +451,28 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, if !ok { return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", obj) } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if err := c.addressGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AddressGroup to file", "groupName", group.Name) + } c.ruleCache.AddAddressGroup(group) return nil }, UpdateFunc: func(obj runtime.Object) error { - group, ok := obj.(*v1beta2.AddressGroupPatch) + patch, ok := obj.(*v1beta2.AddressGroupPatch) if !ok { - return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", obj) + return fmt.Errorf("cannot convert to *v1beta1.AddressGroupPatch: %v", obj) + } + group, err := c.ruleCache.PatchAddressGroup(patch) + if err != nil { + return err + } + // It's fine to store the object to file after applying the patch to ruleCache because the returned object + // is newly created, and ruleCache itself doesn't use it. + if err := c.addressGroupStore.save(group); err != nil { + klog.ErrorS(err, "Failed to store the AddressGroup to file", "groupName", group.Name) } - c.ruleCache.PatchAddressGroup(group) return nil }, DeleteFunc: func(obj runtime.Object) error { @@ -383,6 +481,9 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", obj) } c.ruleCache.DeleteAddressGroup(group) + if err := c.addressGroupStore.delete(group); err != nil { + klog.ErrorS(err, "Failed to delete the AddressGroup from file", "groupName", group.Name) + } return nil }, ReplaceFunc: func(objs []runtime.Object) error { @@ -394,9 +495,15 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, return fmt.Errorf("cannot convert to *v1beta1.AddressGroup: %v", objs[i]) } } + // Storing the object to file first because its GroupVersionKind can be updated in-place during + // serialization, which may incur data race if we add it to ruleCache first. + if c.addressGroupStore.replaceAll(objs); err != nil { + klog.ErrorS(err, "Failed to store the AddressGroups to files") + } c.ruleCache.ReplaceAddressGroups(groups) return nil }, + FallbackFunc: c.addressGroupStore.loadAll, fullSyncWaitGroup: &c.fullSyncGroup, fullSynced: false, } @@ -744,6 +851,8 @@ type watcher struct { DeleteFunc func(obj runtime.Object) error // ReplaceFunc is the function that handles init events. ReplaceFunc func(objs []runtime.Object) error + // FallbackFunc is the function that provides the data when it can't start the watch successfully. + FallbackFunc func() ([]runtime.Object, error) // connected represents whether the watch has connected to apiserver successfully. connected bool // lock protects connected. @@ -766,17 +875,46 @@ func (w *watcher) setConnected(connected bool) { w.connected = connected } +// fallback gets init events from the FallbackFunc if the watcher hasn't been synced once. +func (w *watcher) fallback() { + // If the watcher has been synced once, the fallback data source doesn't have newer data, do nothing. + if w.fullSynced { + return + } + klog.InfoS("Getting init events for %s from fallback", w.objectType) + objects, err := w.FallbackFunc() + if err != nil { + klog.ErrorS(err, "Failed to get init events for %s from fallback", w.objectType) + return + } + if err := w.ReplaceFunc(objects); err != nil { + klog.ErrorS(err, "Failed to handle init events") + return + } + w.onFullSync() +} + +func (w *watcher) onFullSync() { + if !w.fullSynced { + w.fullSynced = true + // Notify fullSyncWaitGroup that all events before bookmark is handled + w.fullSyncWaitGroup.Done() + } +} + func (w *watcher) watch() { klog.Infof("Starting watch for %s", w.objectType) watcher, err := w.watchFunc() if err != nil { klog.Warningf("Failed to start watch for %s: %v", w.objectType, err) + w.fallback() return } // Watch method doesn't return error but "emptyWatch" in case of some partial data errors, // e.g. timeout error. Make sure that watcher is not empty and log warning otherwise. if reflect.TypeOf(watcher) == reflect.TypeOf(emptyWatch) { klog.Warningf("Failed to start watch for %s, please ensure antrea service is reachable for the agent", w.objectType) + w.fallback() return } @@ -817,11 +955,7 @@ loop: klog.Errorf("Failed to handle init events: %v", err) return } - if !w.fullSynced { - w.fullSynced = true - // Notify fullSyncWaitGroup that all events before bookmark is handled - w.fullSyncWaitGroup.Done() - } + w.onFullSync() for { select { diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index 54c591c1e91..3dcec07d66b 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -15,17 +15,21 @@ package networkpolicy import ( + "encoding/base64" "fmt" "net" + "os" "strings" "sync" "testing" "time" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/watch" @@ -71,7 +75,8 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) { ch2 := make(chan string, 100) groupIDAllocator := openflow.NewGroupAllocator() groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)} - controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, true, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{}) + fs := afero.NewMemMapFs() + controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, fs, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, true, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{}) reconciler := newMockReconciler() controller.reconciler = reconciler controller.antreaPolicyLogger = nil @@ -146,14 +151,16 @@ var _ Reconciler = &mockReconciler{} func newAddressGroup(name string, addresses []v1beta2.GroupMember) *v1beta2.AddressGroup { return &v1beta2.AddressGroup{ - ObjectMeta: v1.ObjectMeta{Name: name}, + TypeMeta: v1.TypeMeta{Kind: "AddressGroup", APIVersion: "controlplane.antrea.io/v1beta2"}, + ObjectMeta: v1.ObjectMeta{Name: name, UID: types.UID(name)}, GroupMembers: addresses, } } func newAppliedToGroup(name string, pods []v1beta2.GroupMember) *v1beta2.AppliedToGroup { return &v1beta2.AppliedToGroup{ - ObjectMeta: v1.ObjectMeta{Name: name}, + TypeMeta: v1.TypeMeta{Kind: "AppliedToGroup", APIVersion: "controlplane.antrea.io/v1beta2"}, + ObjectMeta: v1.ObjectMeta{Name: name, UID: types.UID(name)}, GroupMembers: pods, } } @@ -165,6 +172,7 @@ func newNetworkPolicy(name string, uid types.UID, from, to, appliedTo []string, } networkPolicyRule1 := newPolicyRule(dir, from, to, services) return &v1beta2.NetworkPolicy{ + TypeMeta: v1.TypeMeta{Kind: "NetworkPolicy", APIVersion: "controlplane.antrea.io/v1beta2"}, ObjectMeta: v1.ObjectMeta{UID: uid, Name: string(uid)}, Rules: []v1beta2.NetworkPolicyRule{networkPolicyRule1}, AppliedToGroups: appliedTo, @@ -507,6 +515,176 @@ func TestAddNetworkPolicyWithMultipleRules(t *testing.T) { assert.Equal(t, 1, controller.GetAppliedToGroupNum()) } +func writeToFile(t *testing.T, fs afero.Fs, dir, file string, base64Str string) { + data, err := base64.StdEncoding.DecodeString(base64Str) + require.NoError(t, err) + f, err := fs.OpenFile(dir+"/"+file, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600) + require.NoError(t, err) + defer f.Close() + _, err = f.Write(data) + require.NoError(t, err) +} + +func TestFallbackToFileStore(t *testing.T) { + prepareMockTables() + tests := []struct { + name string + initFileStore func(networkPolicyStore, appliedToGroupStore, addressGroupStore *fileStore) + expectedRule *CompletedRule + }{ + { + name: "same storage version", + initFileStore: func(networkPolicyStore, appliedToGroupStore, addressGroupStore *fileStore) { + networkPolicyStore.save(newNetworkPolicy("policy1", "uid1", []string{"addressGroup1"}, nil, []string{"appliedToGroup1"}, nil)) + appliedToGroupStore.save(newAppliedToGroup("appliedToGroup1", []v1beta2.GroupMember{*newAppliedToGroupMemberPod("pod1", "namespace")})) + addressGroupStore.save(newAddressGroup("addressGroup1", []v1beta2.GroupMember{*newAddressGroupPodMember("pod2", "namespace", "192.168.0.1")})) + }, + expectedRule: &CompletedRule{ + rule: &rule{ + Direction: v1beta2.DirectionIn, + From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1"}}, + MaxPriority: -1, + AppliedToGroups: []string{"appliedToGroup1"}, + PolicyUID: "uid1", + PolicyName: "uid1", + SourceRef: &v1beta2.NetworkPolicyReference{ + Type: v1beta2.K8sNetworkPolicy, + Namespace: testNamespace, + Name: "policy1", + UID: "uid1", + }, + }, + FromAddresses: v1beta2.NewGroupMemberSet(newAddressGroupPodMember("pod2", "namespace", "192.168.0.1")), + TargetMembers: v1beta2.NewGroupMemberSet(newAppliedToGroupMemberPod("pod1", "namespace")), + }, + }, + { + // The test is to ensure compatibility with v1beta2 storage version if one day the used version is upgraded. + name: "compatible with v1beta2", + initFileStore: func(networkPolicyStore, appliedToGroupStore, addressGroupStore *fileStore) { + // The bytes of v1beta2 objects serialized in protobuf. + // They are not supposed to be updated when bumping up the used version. + base64EncodedPolicy := "azhzAAovCh5jb250cm9scGxhbmUuYW50cmVhLmlvL3YxYmV0YTISDU5ldHdvcmtQb2xpY3kSdAoYCgR1aWQxEgAaACIAKgR1aWQxMgA4AEIAEh8KAkluEg8KDWFkZHJlc3NHcm91cDEaACgAOABKAFoAGg9hcHBsaWVkVG9Hcm91cDEyJgoQSzhzTmV0d29ya1BvbGljeRIDbnMxGgdwb2xpY3kxIgR1aWQxGgAiAA==" + base64EncodedAppliedToGroup := "azhzAAowCh5jb250cm9scGxhbmUuYW50cmVhLmlvL3YxYmV0YTISDkFwcGxpZWRUb0dyb3VwEkUKLgoPYXBwbGllZFRvR3JvdXAxEgAaACIAKg9hcHBsaWVkVG9Hcm91cDEyADgAQgASEwoRCgRwb2QxEgluYW1lc3BhY2UaACIA" + base64EncodedAddressGroup := "azhzAAouCh5jb250cm9scGxhbmUuYW50cmVhLmlvL3YxYmV0YTISDEFkZHJlc3NHcm91cBJTCioKDWFkZHJlc3NHcm91cDESABoAIgAqDWFkZHJlc3NHcm91cDEyADgAQgASJQoRCgRwb2QyEgluYW1lc3BhY2UaEAAAAAAAAAAAAAD//8CoAAEaACIA" + writeToFile(t, networkPolicyStore.fs, networkPoliciesDir, "uid1", base64EncodedPolicy) + writeToFile(t, appliedToGroupStore.fs, appliedToGroupsDir, "appliedToGroup1", base64EncodedAppliedToGroup) + writeToFile(t, addressGroupStore.fs, addressGroupsDir, "addressGroup1", base64EncodedAddressGroup) + }, + expectedRule: &CompletedRule{ + rule: &rule{ + Direction: v1beta2.DirectionIn, + From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1"}}, + MaxPriority: -1, + AppliedToGroups: []string{"appliedToGroup1"}, + PolicyUID: "uid1", + PolicyName: "uid1", + SourceRef: &v1beta2.NetworkPolicyReference{ + Type: v1beta2.K8sNetworkPolicy, + Namespace: testNamespace, + Name: "policy1", + UID: "uid1", + }, + }, + FromAddresses: v1beta2.NewGroupMemberSet( + &v1beta2.GroupMember{ + Pod: &v1beta2.PodReference{Name: "pod2", Namespace: "namespace"}, + IPs: []v1beta2.IPAddress{v1beta2.IPAddress(net.ParseIP("192.168.0.1"))}, + }, + ), + TargetMembers: v1beta2.NewGroupMemberSet( + &v1beta2.GroupMember{ + Pod: &v1beta2.PodReference{Name: "pod1", Namespace: "namespace"}, + }, + ), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + controller, clientset, reconciler := newTestController() + addressGroupWatcher := watch.NewFake() + appliedToGroupWatcher := watch.NewFake() + networkPolicyWatcher := watch.NewFake() + clientset.AddWatchReactor("addressgroups", k8stesting.DefaultWatchReactor(addressGroupWatcher, fmt.Errorf("network unavailable"))) + clientset.AddWatchReactor("appliedtogroups", k8stesting.DefaultWatchReactor(appliedToGroupWatcher, fmt.Errorf("network unavailable"))) + clientset.AddWatchReactor("networkpolicies", k8stesting.DefaultWatchReactor(networkPolicyWatcher, fmt.Errorf("network unavailable"))) + + tt.initFileStore(controller.networkPolicyStore, controller.appliedToGroupStore, controller.addressGroupStore) + + stopCh := make(chan struct{}) + defer close(stopCh) + go controller.Run(stopCh) + + select { + case ruleID := <-reconciler.updated: + actualRule, _ := reconciler.getLastRealized(ruleID) + // Rule ID is a hash value, we don't care about its exact value. + actualRule.ID = "" + assert.Equal(t, tt.expectedRule, actualRule) + case <-time.After(time.Second): + t.Fatal("Expected one rule update, got timeout") + } + }) + } +} + +func TestOverrideFileStore(t *testing.T) { + prepareMockTables() + controller, clientset, reconciler := newTestController() + addressGroupWatcher := watch.NewFake() + appliedToGroupWatcher := watch.NewFake() + networkPolicyWatcher := watch.NewFake() + clientset.AddWatchReactor("addressgroups", k8stesting.DefaultWatchReactor(addressGroupWatcher, nil)) + clientset.AddWatchReactor("appliedtogroups", k8stesting.DefaultWatchReactor(appliedToGroupWatcher, nil)) + clientset.AddWatchReactor("networkpolicies", k8stesting.DefaultWatchReactor(networkPolicyWatcher, nil)) + + policy1 := newNetworkPolicy("policy1", "uid1", []string{"addressGroup1"}, nil, []string{"appliedToGroup1"}, nil) + policy2 := newNetworkPolicy("policy2", "uid2", []string{"addressGroup2"}, nil, []string{"appliedToGroup2"}, nil) + atgMember1 := newAppliedToGroupMemberPod("pod1", "namespace") + atgMember2 := newAppliedToGroupMemberPod("pod2", "namespace") + agMember1 := newAddressGroupPodMember("pod3", "namespace", "192.168.0.1") + agMember2 := newAddressGroupPodMember("pod4", "namespace", "192.168.0.2") + atg1 := newAppliedToGroup("appliedToGroup1", []v1beta2.GroupMember{*atgMember1}) + atg2 := newAppliedToGroup("appliedToGroup2", []v1beta2.GroupMember{*atgMember2}) + ag1 := newAddressGroup("addressGroup1", []v1beta2.GroupMember{*agMember1}) + ag2 := newAddressGroup("addressGroup2", []v1beta2.GroupMember{*agMember2}) + controller.networkPolicyStore.save(policy1) + controller.appliedToGroupStore.save(atg1) + controller.addressGroupStore.save(ag1) + + stopCh := make(chan struct{}) + defer close(stopCh) + go controller.Run(stopCh) + + networkPolicyWatcher.Add(policy2) + networkPolicyWatcher.Action(watch.Bookmark, nil) + addressGroupWatcher.Add(ag2) + addressGroupWatcher.Action(watch.Bookmark, nil) + appliedToGroupWatcher.Add(atg2) + appliedToGroupWatcher.Action(watch.Bookmark, nil) + + select { + case ruleID := <-reconciler.updated: + actualRule, _ := reconciler.getLastRealized(ruleID) + assert.Equal(t, v1beta2.NewGroupMemberSet(atgMember2), actualRule.TargetMembers) + assert.Equal(t, v1beta2.NewGroupMemberSet(agMember2), actualRule.FromAddresses) + assert.Equal(t, policy2.SourceRef, actualRule.SourceRef) + case <-time.After(time.Second): + t.Fatal("Expected one rule update, got timeout") + } + + objects, err := controller.appliedToGroupStore.loadAll() + require.NoError(t, err) + assert.Equal(t, []runtime.Object{atg2}, objects) + objects, err = controller.addressGroupStore.loadAll() + require.NoError(t, err) + assert.Equal(t, []runtime.Object{ag2}, objects) + objects, err = controller.networkPolicyStore.loadAll() + require.NoError(t, err) + assert.Equal(t, []runtime.Object{policy2}, objects) +} + func TestNetworkPolicyMetrics(t *testing.T) { prepareMockTables() // Initialize NetworkPolicy metrics (prometheus) diff --git a/test/e2e/networkpolicy_test.go b/test/e2e/networkpolicy_test.go index 421be42bd58..3b3bce7d3b3 100644 --- a/test/e2e/networkpolicy_test.go +++ b/test/e2e/networkpolicy_test.go @@ -96,6 +96,10 @@ func TestNetworkPolicy(t *testing.T) { skipIfProxyDisabled(t) testAllowHairpinService(t, data) }) + t.Run("testNetworkPolicyAfterAgentRestart", func(t *testing.T) { + t.Cleanup(exportLogsForSubtest(t, data)) + testNetworkPolicyAfterAgentRestart(t, data) + }) } func testNetworkPolicyStats(t *testing.T, data *TestData) { @@ -704,6 +708,94 @@ func testNetworkPolicyResyncAfterRestart(t *testing.T, data *TestData) { } } +// The test validates that Pods can't bypass NetworkPolicy when antrea-agent restarts. +func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { + workerNode := workerNodeName(1) + var isolatedPod, deniedPod, allowedPod string + var isolatedPodIPs, deniedPodIPs, allowedPodIPs *PodIPs + var wg sync.WaitGroup + createTestPod := func(prefix string) (string, *PodIPs) { + defer wg.Done() + podName, podIPs, cleanup := createAndWaitForPod(t, data, data.createNginxPodOnNode, prefix, workerNode, data.testNamespace, false) + t.Cleanup(cleanup) + return podName, podIPs + } + wg.Add(3) + go func() { + isolatedPod, isolatedPodIPs = createTestPod("test-isolated") + }() + go func() { + deniedPod, deniedPodIPs = createTestPod("test-denied") + }() + go func() { + allowedPod, allowedPodIPs = createTestPod("test-allowed") + }() + wg.Wait() + + allowedPeer := networkingv1.NetworkPolicyPeer{ + PodSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"antrea-e2e": allowedPod}}, + } + netpol, err := data.createNetworkPolicy("test-isolated", &networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"antrea-e2e": isolatedPod}}, + Ingress: []networkingv1.NetworkPolicyIngressRule{{From: []networkingv1.NetworkPolicyPeer{allowedPeer}}}, + Egress: []networkingv1.NetworkPolicyEgressRule{{To: []networkingv1.NetworkPolicyPeer{allowedPeer}}}, + }) + require.NoError(t, err) + t.Cleanup(func() { data.deleteNetworkpolicy(netpol) }) + + checkFunc := func(testPod string, testPodIPs *PodIPs, expectErr bool) { + var wg sync.WaitGroup + checkOne := func(clientPod, serverPod string, serverIP *net.IP) { + defer wg.Done() + if serverIP != nil { + _, _, err := data.runWgetCommandFromTestPodWithRetry(clientPod, data.testNamespace, nginxContainerName, serverIP.String(), 1) + if expectErr && err == nil { + t.Errorf("Pod %s should not be able to connect %s, but was able to connect", clientPod, serverPod) + } else if !expectErr && err != nil { + t.Errorf("Pod %s should be able to connect %s, but was not able to connect, err: %v", clientPod, serverPod, err) + } + } + } + wg.Add(4) + go checkOne(isolatedPod, testPod, testPodIPs.ipv4) + go checkOne(isolatedPod, testPod, testPodIPs.ipv6) + go checkOne(testPod, isolatedPod, isolatedPodIPs.ipv4) + go checkOne(testPod, isolatedPod, isolatedPodIPs.ipv6) + wg.Wait() + } + + scaleFunc := func(replicas int32) { + scale, err := data.clientset.AppsV1().Deployments(antreaNamespace).GetScale(context.TODO(), antreaDeployment, metav1.GetOptions{}) + require.NoError(t, err) + scale.Spec.Replicas = replicas + _, err = data.clientset.AppsV1().Deployments(antreaNamespace).UpdateScale(context.TODO(), antreaDeployment, scale, metav1.UpdateOptions{}) + require.NoError(t, err) + } + + // Scale antrea-controller to 0 so antrea-agent will lose connection with antrea-controller. + scaleFunc(0) + t.Cleanup(func() { scaleFunc(1) }) + + // Restart the antrea-agent. + _, err = data.deleteAntreaAgentOnNode(workerNode, 30, defaultTimeout) + require.NoError(t, err) + antreaPod, err := data.getAntreaPodOnNode(workerNode) + require.NoError(t, err) + // Make sure the new antrea-agent disconnects from antrea-controller but connects to OVS. + waitForAgentCondition(t, data, antreaPod, v1beta1.ControllerConnectionUp, corev1.ConditionFalse) + waitForAgentCondition(t, data, antreaPod, v1beta1.OpenflowConnectionUp, corev1.ConditionTrue) + // Even the new antrea-agent can't connect to antrea-controller, the previous policy should continue working. + checkFunc(deniedPod, deniedPodIPs, true) + checkFunc(allowedPod, allowedPodIPs, false) + + // Scale antrea-controller to 1 so antrea-agent will connect to antrea-controller. + scaleFunc(1) + // Make sure antrea-agent connects to antrea-controller. + waitForAgentCondition(t, data, antreaPod, v1beta1.ControllerConnectionUp, corev1.ConditionTrue) + checkFunc(deniedPod, deniedPodIPs, true) + checkFunc(allowedPod, allowedPodIPs, false) +} + func testIngressPolicyWithoutPortNumber(t *testing.T, data *TestData) { serverPort := int32(80) _, serverIPs, cleanupFunc := createAndWaitForPod(t, data, data.createNginxPodOnNode, "test-server-", "", data.testNamespace, false) @@ -1039,8 +1131,9 @@ func waitForAgentCondition(t *testing.T, data *TestData, podName string, conditi t.Logf("cmds: %s", cmds) stdout, _, err := runAntctl(podName, cmds, data) + // The server may not be available yet. if err != nil { - return true, err + return false, nil } var agentInfo agentinfo.AntreaAgentInfoResponse err = json.Unmarshal([]byte(stdout), &agentInfo) From 095af9453f323cf86d5994f9b2d0bbee4ce7fbe0 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Wed, 22 Nov 2023 18:50:53 +0800 Subject: [PATCH 2/5] Enable Pod network after realizing initial NetworkPolicies Pod network should only be enabled after realizing initial NetworkPolicies, otherwise traffic from/to Pods could bypass NetworkPolicy when antrea-agent restarts. After commit f9fc979345bf ("Store NetworkPolicy in filesystem as fallback data source"), antrea-agent can realize either the latest NetworkPolicies got from antrea-controller or the ones got from filesystem as fallback. Therefore, waiting for NetworkPolicies to be realized should not add marked delay or make antrea-controller a failure point of Pod network. This commit adds an implementation of wait group capable of waiting with a timeout, and uses it to wait for common initialization and NetworkPolicy realization before installing any flows for Pods. More preconditions can be added via the wait group if needed in the future. Signed-off-by: Quan Tian --- cmd/antrea-agent/agent.go | 15 +- pkg/agent/agent.go | 23 +-- pkg/agent/cniserver/pod_configuration.go | 46 +++--- pkg/agent/cniserver/server.go | 17 ++- pkg/agent/cniserver/server_test.go | 5 +- .../networkpolicy/networkpolicy_controller.go | 15 +- .../networkpolicy_controller_test.go | 3 +- pkg/util/wait/wait.go | 85 +++++++++++ pkg/util/wait/wait_test.go | 133 ++++++++++++++++++ test/e2e/networkpolicy_test.go | 9 +- test/integration/agent/cniserver_test.go | 14 +- 11 files changed, 302 insertions(+), 63 deletions(-) create mode 100644 pkg/util/wait/wait.go create mode 100644 pkg/util/wait/wait_test.go diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index f431efd0b91..28f8d607528 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -79,6 +79,7 @@ import ( "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/k8s" "antrea.io/antrea/pkg/util/podstore" + utilwait "antrea.io/antrea/pkg/util/wait" "antrea.io/antrea/pkg/version" ) @@ -220,9 +221,12 @@ func run(o *Options) error { // Create an ifaceStore that caches network interfaces managed by this node. ifaceStore := interfacestore.NewInterfaceStore() - // networkReadyCh is used to notify that the Node's network is ready. - // Functions that rely on the Node's network should wait for the channel to close. - networkReadyCh := make(chan struct{}) + // podNetworkWait is used to wait and notify that preconditions for Pod network are ready. + // Processes that are supposed to finish before enabling Pod network should increment the wait group and decrement + // it when finished. + // Processes that enable Pod network should wait for it. + podNetworkWait := utilwait.NewGroup() + // set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will // cause the stopCh channel to be closed; if another signal is received before the program // exits, we will force exit. @@ -267,7 +271,7 @@ func run(o *Options) error { wireguardConfig, egressConfig, serviceConfig, - networkReadyCh, + podNetworkWait, stopCh, o.nodeType, o.config.ExternalNode.ExternalNodeNamespace, @@ -467,6 +471,7 @@ func run(o *Options) error { gwPort, tunPort, nodeConfig, + podNetworkWait, ) if err != nil { return fmt.Errorf("error creating new NetworkPolicy controller: %v", err) @@ -538,7 +543,7 @@ func run(o *Options) error { enableAntreaIPAM, o.config.DisableTXChecksumOffload, networkConfig, - networkReadyCh) + podNetworkWait) if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) { cniPodInfoStore = cnipodcache.NewCNIPodInfoStore() diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 6f5a453db93..8a368bcfc32 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -23,7 +23,6 @@ import ( "os" "strconv" "strings" - "sync" "time" "github.com/containernetworking/plugins/pkg/ip" @@ -58,6 +57,7 @@ import ( "antrea.io/antrea/pkg/util/env" utilip "antrea.io/antrea/pkg/util/ip" "antrea.io/antrea/pkg/util/k8s" + utilwait "antrea.io/antrea/pkg/util/wait" ) const ( @@ -119,9 +119,9 @@ type Initializer struct { l7NetworkPolicyConfig *config.L7NetworkPolicyConfig enableL7NetworkPolicy bool connectUplinkToBridge bool - // networkReadyCh should be closed once the Node's network is ready. + // podNetworkWait should be decremented once the Node's network is ready. // The CNI server will wait for it before handling any CNI Add requests. - networkReadyCh chan<- struct{} + podNetworkWait *utilwait.Group stopCh <-chan struct{} nodeType config.NodeType externalNodeNamespace string @@ -142,7 +142,7 @@ func NewInitializer( wireGuardConfig *config.WireGuardConfig, egressConfig *config.EgressConfig, serviceConfig *config.ServiceConfig, - networkReadyCh chan<- struct{}, + podNetworkWait *utilwait.Group, stopCh <-chan struct{}, nodeType config.NodeType, externalNodeNamespace string, @@ -165,7 +165,7 @@ func NewInitializer( egressConfig: egressConfig, serviceConfig: serviceConfig, l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{}, - networkReadyCh: networkReadyCh, + podNetworkWait: podNetworkWait, stopCh: stopCh, nodeType: nodeType, externalNodeNamespace: externalNodeNamespace, @@ -403,9 +403,6 @@ func (i *Initializer) restorePortConfigs() error { // Initialize sets up agent initial configurations. func (i *Initializer) Initialize() error { klog.Info("Setting up node network") - // wg is used to wait for the asynchronous initialization. - var wg sync.WaitGroup - if err := i.initNodeLocalConfig(); err != nil { return err } @@ -481,10 +478,10 @@ func (i *Initializer) Initialize() error { } if i.nodeType == config.K8sNode { - wg.Add(1) + i.podNetworkWait.Increment() // routeClient.Initialize() should be after i.setupOVSBridge() which // creates the host gateway interface. - if err := i.routeClient.Initialize(i.nodeConfig, wg.Done); err != nil { + if err := i.routeClient.Initialize(i.nodeConfig, i.podNetworkWait.Done); err != nil { return err } @@ -492,12 +489,6 @@ func (i *Initializer) Initialize() error { if err := i.initOpenFlowPipeline(); err != nil { return err } - - // The Node's network is ready only when both synchronous and asynchronous initialization are done. - go func() { - wg.Wait() - close(i.networkReadyCh) - }() } else { // Install OpenFlow entries on OVS bridge. if err := i.initOpenFlowPipeline(); err != nil { diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index 1e0782dfe88..855ff506dc1 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -37,6 +37,7 @@ import ( "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/channel" "antrea.io/antrea/pkg/util/k8s" + "antrea.io/antrea/pkg/util/wait" ) type vethPair struct { @@ -416,7 +417,7 @@ func parsePrevResult(conf *types.NetworkConfig) error { return nil } -func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator) error { +func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator, podNetworkWait *wait.Group) error { // desiredPods is the set of Pods that should be present, based on the // current list of Pods got from the Kubernetes API. desiredPods := sets.New[string]() @@ -441,21 +442,34 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain missingIfConfigs = append(missingIfConfigs, containerConfig) continue } - // This interface matches an existing Pod. - // We rely on the interface cache / store - which is initialized from the persistent - // OVSDB - to map the Pod to its interface configuration. The interface - // configuration includes the parameters we need to replay the flows. - klog.V(4).Infof("Syncing interface %s for Pod %s", containerConfig.InterfaceName, namespacedName) - if err := pc.ofClient.InstallPodFlows( - containerConfig.InterfaceName, - containerConfig.IPs, - containerConfig.MAC, - uint32(containerConfig.OFPort), - containerConfig.VLANID, - nil, - ); err != nil { - klog.Errorf("Error when re-installing flows for Pod %s", namespacedName) - } + go func(containerID, pod, namespace string) { + // Do not install Pod flows until all preconditions are met. + podNetworkWait.Wait() + // To avoid race condition with CNIServer CNI event handlers. + containerAccess.lockContainer(containerID) + defer containerAccess.unlockContainer(containerID) + + containerConfig, exists := pc.ifaceStore.GetContainerInterface(containerID) + if !exists { + klog.InfoS("The container interface had been deleted, skip installing flows for Pod", "Pod", klog.KRef(namespace, pod), "containerID", containerID) + return + } + // This interface matches an existing Pod. + // We rely on the interface cache / store - which is initialized from the persistent + // OVSDB - to map the Pod to its interface configuration. The interface + // configuration includes the parameters we need to replay the flows. + klog.V(4).InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, pod), "iface", containerConfig.InterfaceName) + if err := pc.ofClient.InstallPodFlows( + containerConfig.InterfaceName, + containerConfig.IPs, + containerConfig.MAC, + uint32(containerConfig.OFPort), + containerConfig.VLANID, + nil, + ); err != nil { + klog.ErrorS(err, "Error when re-installing flows for Pod", "Pod", klog.KRef(namespace, pod)) + } + }(containerConfig.ContainerID, containerConfig.PodName, containerConfig.PodNamespace) } else { // clean-up and delete interface klog.V(4).Infof("Deleting interface %s", containerConfig.InterfaceName) diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index dc3135d5123..04f1ed1e018 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -45,6 +45,7 @@ import ( "antrea.io/antrea/pkg/cni" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/wait" ) const ( @@ -118,8 +119,8 @@ type CNIServer struct { disableTXChecksumOffload bool secondaryNetworkEnabled bool networkConfig *config.NetworkConfig - // networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it. - networkReadyCh <-chan struct{} + // podNetworkWait notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it. + podNetworkWait *wait.Group } var supportedCNIVersionSet map[string]bool @@ -441,11 +442,9 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (* return resp, err } - select { - case <-time.After(networkReadyTimeout): - klog.ErrorS(nil, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout) + if err := s.podNetworkWait.WaitWithTimeout(networkReadyTimeout); err != nil { + klog.ErrorS(err, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout) return s.tryAgainLaterResponse(), nil - case <-s.networkReadyCh: } result := &ipam.IPAMResult{Result: current.Result{CNIVersion: current.ImplementedSpecVersion}} @@ -634,7 +633,7 @@ func New( routeClient route.Interface, isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool, networkConfig *config.NetworkConfig, - networkReadyCh <-chan struct{}, + podNetworkWait *wait.Group, ) *CNIServer { return &CNIServer{ cniSocket: cniSocket, @@ -650,7 +649,7 @@ func New( disableTXChecksumOffload: disableTXChecksumOffload, enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM, networkConfig: networkConfig, - networkReadyCh: networkReadyCh, + podNetworkWait: podNetworkWait, } } @@ -773,7 +772,7 @@ func (s *CNIServer) reconcile() error { return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err) } - return s.podConfigurator.reconcile(pods.Items, s.containerAccess) + return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait) } func init() { diff --git a/pkg/agent/cniserver/server_test.go b/pkg/agent/cniserver/server_test.go index 037ee24834e..478960d4e76 100644 --- a/pkg/agent/cniserver/server_test.go +++ b/pkg/agent/cniserver/server_test.go @@ -43,6 +43,7 @@ import ( "antrea.io/antrea/pkg/cni" "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" + "antrea.io/antrea/pkg/util/wait" ) const ( @@ -664,15 +665,13 @@ func translateRawPrevResult(prevResult *current.Result, cniVersion string) (map[ } func newCNIServer(t *testing.T) *CNIServer { - networkReadyCh := make(chan struct{}) cniServer := &CNIServer{ cniSocket: testSocket, nodeConfig: testNodeConfig, serverVersion: cni.AntreaCNIVersion, containerAccess: newContainerAccessArbitrator(), - networkReadyCh: networkReadyCh, + podNetworkWait: wait.NewGroup(), } - close(networkReadyCh) cniServer.supportedCNIVersions = buildVersionSet() cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450} return cniServer diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go index 2e144bdb94b..71b6ecf9003 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller.go @@ -46,6 +46,7 @@ import ( "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/querier" "antrea.io/antrea/pkg/util/channel" + utilwait "antrea.io/antrea/pkg/util/wait" ) const ( @@ -145,10 +146,11 @@ type Controller struct { fullSyncGroup sync.WaitGroup ifaceStore interfacestore.InterfaceStore // denyConnStore is for storing deny connections for flow exporter. - denyConnStore *connections.DenyConnectionStore - gwPort uint32 - tunPort uint32 - nodeConfig *config.NodeConfig + denyConnStore *connections.DenyConnectionStore + gwPort uint32 + tunPort uint32 + nodeConfig *config.NodeConfig + podNetworkWait *utilwait.Group // The fileStores store runtime.Objects in files and use them as the fallback data source when agent can't connect // to antrea-controller on startup. @@ -183,7 +185,8 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, v4Enabled bool, v6Enabled bool, gwPort, tunPort uint32, - nodeConfig *config.NodeConfig) (*Controller, error) { + nodeConfig *config.NodeConfig, + podNetworkWait *utilwait.Group) (*Controller, error) { idAllocator := newIDAllocator(asyncRuleDeleteInterval, dnsInterceptRuleID) c := &Controller{ antreaClientProvider: antreaClientGetter, @@ -199,6 +202,7 @@ func NewNetworkPolicyController(antreaClientGetter agent.AntreaClientProvider, gwPort: gwPort, tunPort: tunPort, nodeConfig: nodeConfig, + podNetworkWait: podNetworkWait.Increment(), } if l7NetworkPolicyEnabled { @@ -613,6 +617,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { klog.Infof("All watchers have completed full sync, installing flows for init events") // Batch install all rules in queue after fullSync is finished. c.processAllItemsInQueue() + c.podNetworkWait.Done() klog.Infof("Starting NetworkPolicy workers now") defer c.queue.ShutDown() diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index 3dcec07d66b..5eb33646735 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -47,6 +47,7 @@ import ( "antrea.io/antrea/pkg/client/clientset/versioned/fake" "antrea.io/antrea/pkg/querier" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/wait" ) const testNamespace = "ns1" @@ -76,7 +77,7 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) { groupIDAllocator := openflow.NewGroupAllocator() groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)} fs := afero.NewMemMapFs() - controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, fs, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, true, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{}) + controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, fs, "node1", podUpdateChannel, nil, groupCounters, ch2, true, true, true, true, false, true, testAsyncDeleteInterval, "8.8.8.8:53", config.K8sNode, true, false, config.HostGatewayOFPort, config.DefaultTunOFPort, &config.NodeConfig{}, wait.NewGroup()) reconciler := newMockReconciler() controller.reconciler = reconciler controller.antreaPolicyLogger = nil diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go new file mode 100644 index 00000000000..6897ec2fb24 --- /dev/null +++ b/pkg/util/wait/wait.go @@ -0,0 +1,85 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wait + +import ( + "fmt" + "sync" + "time" + + "k8s.io/utils/clock" +) + +// Group allows to wait for a collection of goroutines to finish with a timeout or a stop channel. +type Group struct { + wg *sync.WaitGroup + doneCh chan struct{} + once sync.Once + clock clock.Clock +} + +func NewGroup() *Group { + return newGroupWithClock(clock.RealClock{}) +} + +func newGroupWithClock(clock clock.Clock) *Group { + return &Group{ + wg: &sync.WaitGroup{}, + doneCh: make(chan struct{}), + clock: clock, + } +} + +func (g *Group) Increment() *Group { + g.wg.Add(1) + return g +} + +func (g *Group) Done() { + g.wg.Done() +} + +func (g *Group) wait() { + g.once.Do(func() { + go func() { + g.wg.Wait() + close(g.doneCh) + }() + }) +} + +func (g *Group) WaitWithTimeout(timeout time.Duration) error { + g.wait() + select { + case <-g.doneCh: + return nil + case <-g.clock.After(timeout): + return fmt.Errorf("timeout waiting for group") + } +} + +func (g *Group) WaitUntil(stopCh <-chan struct{}) error { + g.wait() + select { + case <-g.doneCh: + return nil + case <-stopCh: + return fmt.Errorf("stopCh closed, stop waiting") + } +} + +func (g *Group) Wait() { + g.wg.Wait() +} diff --git a/pkg/util/wait/wait_test.go b/pkg/util/wait/wait_test.go new file mode 100644 index 00000000000..f471685b1d9 --- /dev/null +++ b/pkg/util/wait/wait_test.go @@ -0,0 +1,133 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wait + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + clock "k8s.io/utils/clock/testing" +) + +func TestGroupWaitWithTimeout(t *testing.T) { + const timeout = 100 * time.Millisecond + tests := []struct { + name string + add int + processFn func(group *Group, fakeClock *clock.FakeClock) + expectWaitErr bool + }{ + { + name: "add only", + add: 1, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + fakeClock.Step(timeout) + }, + expectWaitErr: true, + }, + { + name: "add greater than done", + add: 2, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + group.Done() + fakeClock.Step(timeout) + }, + expectWaitErr: true, + }, + { + name: "add equal to done", + add: 2, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + group.Done() + fakeClock.Step(timeout / 2) + group.Done() + }, + expectWaitErr: false, + }, + { + name: "add with delay", + add: 2, + processFn: func(group *Group, fakeClock *clock.FakeClock) { + group.Done() + fakeClock.Step(timeout * 2) + group.Done() + }, + expectWaitErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClock := clock.NewFakeClock(time.Now()) + g := newGroupWithClock(fakeClock) + for i := 0; i < tt.add; i++ { + g.Increment() + } + resCh := make(chan error, 1) + go func() { + resCh <- g.WaitWithTimeout(timeout) + }() + require.Eventually(t, func() bool { + return fakeClock.HasWaiters() + }, 1*time.Second, 10*time.Millisecond) + tt.processFn(g, fakeClock) + err := <-resCh + if tt.expectWaitErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestGroupWait(t *testing.T) { + g := NewGroup() + g.Increment() + returnedCh := make(chan struct{}) + go func() { + g.Wait() + close(returnedCh) + }() + select { + case <-time.After(100 * time.Millisecond): + case <-returnedCh: + t.Errorf("Wait should not return before it's done") + } + g.Done() + select { + case <-time.After(500 * time.Millisecond): + t.Errorf("Wait should return after it's done") + case <-returnedCh: + } +} + +func TestGroupWaitUntil(t *testing.T) { + g := NewGroup() + g.Increment() + stopCh := make(chan struct{}) + go func() { + time.Sleep(100 * time.Millisecond) + close(stopCh) + }() + err := g.WaitUntil(stopCh) + assert.Error(t, err) + + stopCh = make(chan struct{}) + g.Done() + err = g.WaitUntil(stopCh) + assert.NoError(t, err) +} diff --git a/test/e2e/networkpolicy_test.go b/test/e2e/networkpolicy_test.go index 3b3bce7d3b3..0310e5d7c58 100644 --- a/test/e2e/networkpolicy_test.go +++ b/test/e2e/networkpolicy_test.go @@ -748,7 +748,8 @@ func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { checkOne := func(clientPod, serverPod string, serverIP *net.IP) { defer wg.Done() if serverIP != nil { - _, _, err := data.runWgetCommandFromTestPodWithRetry(clientPod, data.testNamespace, nginxContainerName, serverIP.String(), 1) + cmd := []string{"wget", "-O", "-", serverIP.String(), "-T", "1"} + _, _, err := data.RunCommandFromPod(data.testNamespace, clientPod, nginxContainerName, cmd) if expectErr && err == nil { t.Errorf("Pod %s should not be able to connect %s, but was able to connect", clientPod, serverPod) } else if !expectErr && err != nil { @@ -779,6 +780,12 @@ func testNetworkPolicyAfterAgentRestart(t *testing.T, data *TestData) { // Restart the antrea-agent. _, err = data.deleteAntreaAgentOnNode(workerNode, 30, defaultTimeout) require.NoError(t, err) + + // While the new antrea-agent starts, the denied Pod should never connect to the isolated Pod successfully. + for i := 0; i < 5; i++ { + checkFunc(deniedPod, deniedPodIPs, true) + } + antreaPod, err := data.getAntreaPodOnNode(workerNode) require.NoError(t, err) // Make sure the new antrea-agent disconnects from antrea-controller but connects to OVS. diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 0681056a134..f0991828d15 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -57,6 +57,7 @@ import ( "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/wait" ) const ( @@ -295,7 +296,7 @@ type cmdAddDelTester struct { targetNS ns.NetNS request *cnimsg.CniCmdRequest vethName string - networkReadyCh chan struct{} + podNetworkWait *wait.Group } func (tester *cmdAddDelTester) setNS(testNS ns.NetNS, targetNS ns.NetNS) { @@ -564,14 +565,14 @@ func (tester *cmdAddDelTester) cmdDelTest(tc testCase, dataDir string) { func newTester() *cmdAddDelTester { tester := &cmdAddDelTester{} ifaceStore := interfacestore.NewInterfaceStore() - tester.networkReadyCh = make(chan struct{}) + tester.podNetworkWait = wait.NewGroup() tester.server = cniserver.New(testSock, "", testNodeConfig, k8sFake.NewSimpleClientset(), routeMock, false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - tester.networkReadyCh) + tester.podNetworkWait.Increment()) tester.server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100), nil) ctx := context.Background() tester.ctx = ctx @@ -607,7 +608,7 @@ func cmdAddDelCheckTest(testNS ns.NetNS, tc testCase, dataDir string) { ovsServiceMock.EXPECT().GetOFPort(ovsPortname, false).Return(int32(10), nil).AnyTimes() ofServiceMock.EXPECT().InstallPodFlows(ovsPortname, mock.Any(), mock.Any(), mock.Any(), uint16(0), nil).Return(nil) - close(tester.networkReadyCh) + tester.podNetworkWait.Done() // Test ips allocation prevResult, err := tester.cmdAddTest(tc, dataDir) testRequire.Nil(err) @@ -726,15 +727,14 @@ func setupChainTest( if newServer { routeMock = routetest.NewMockInterface(controller) - networkReadyCh := make(chan struct{}) - close(networkReadyCh) + podNetworkWait := wait.NewGroup() server = cniserver.New(testSock, "", testNodeConfig, k8sFake.NewSimpleClientset(), routeMock, true, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, - networkReadyCh) + podNetworkWait) } else { server = inServer } From ad6a998c860a96fe5815914d022ce807a549f470 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Wed, 13 Dec 2023 20:40:02 +0800 Subject: [PATCH 3/5] Support Local ExternalTrafficPolicy for Services with ExternalIPs Since K8s 1.29, setting Local ExternalTrafficPolicy for ClusterIP Services with ExternalIPs is supported. Signed-off-by: Quan Tian --- pkg/agent/proxy/proxier_test.go | 49 ++++++++++++++++++++++++++----- third_party/proxy/util/service.go | 9 ++++-- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 4480575dace..69dcaede88a 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -155,6 +155,7 @@ func makeTestClusterIPService(svcPortName *k8sproxy.ServicePortName, nested bool, labels map[string]string) *corev1.Service { return makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { + svc.Spec.Type = corev1.ServiceTypeClusterIP svc.Spec.ClusterIP = clusterIP.String() svc.Spec.Ports = []corev1.ServicePort{{ Name: svcPortName.Port, @@ -2672,6 +2673,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, loadBalancerIP net.IP, + externalIP net.IP, ep1IP net.IP, ep2IP net.IP, svcType corev1.ServiceType, @@ -2683,12 +2685,17 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, var svc, updatedSvc *corev1.Service switch svcType { + case corev1.ServiceTypeClusterIP: + // ExternalTrafficPolicy defaults to Cluster. + svc = makeTestClusterIPService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) + updatedSvc = svc.DeepCopy() + updatedSvc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeLocal case corev1.ServiceTypeNodePort: - svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeLocal) + svc = makeTestNodePortService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestNodePortService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeLocal) case corev1.ServiceTypeLoadBalancer: - svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeLocal) + svc = makeTestLoadBalancerService(&svcPortName, svcIP, []net.IP{externalIP}, []net.IP{loadBalancerIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, []net.IP{externalIP}, []net.IP{loadBalancerIP}, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeLocal) } makeServiceMap(fp, svc) @@ -2719,6 +2726,14 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, Protocol: bindingProtocol, ClusterGroupID: 1, }).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ + ServiceIP: externalIP, + ServicePort: uint16(svcPort), + Protocol: bindingProtocol, + ClusterGroupID: 1, + IsExternal: true, + }).Times(1) + mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ @@ -2750,6 +2765,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(2), false, expectedLocalEps).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(externalIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ ServiceIP: svcIP, ServicePort: uint16(svcPort), @@ -2757,6 +2773,17 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, LocalGroupID: 2, ClusterGroupID: 1, }).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ + ServiceIP: externalIP, + ServicePort: uint16(svcPort), + Protocol: bindingProtocol, + LocalGroupID: 2, + ClusterGroupID: 1, + TrafficPolicyLocal: true, + IsExternal: true, + }).Times(1) + mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP).Times(1) + mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { s1 := mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) @@ -2798,19 +2825,25 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, func TestServiceExternalTrafficPolicyUpdate(t *testing.T) { t.Run("IPv4", func(t *testing.T) { + t.Run("ClusterIP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, nil, svc1IPv4, nil, externalIPv4, ep1IPv4, ep2IPv4, corev1.ServiceTypeClusterIP, false) + }) t.Run("NodePort", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv4, svc1IPv4, nil, ep1IPv4, ep2IPv4, corev1.ServiceTypeNodePort, false) + testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv4, svc1IPv4, nil, externalIPv4, ep1IPv4, ep2IPv4, corev1.ServiceTypeNodePort, false) }) t.Run("LoadBalancer", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv4, svc1IPv4, loadBalancerIPv4, ep1IPv4, ep2IPv4, corev1.ServiceTypeLoadBalancer, false) + testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv4, svc1IPv4, loadBalancerIPv4, externalIPv4, ep1IPv4, ep2IPv4, corev1.ServiceTypeLoadBalancer, false) }) }) t.Run("IPv6", func(t *testing.T) { + t.Run("ClusterIP", func(t *testing.T) { + testServiceExternalTrafficPolicyUpdate(t, nil, svc1IPv6, nil, externalIPv6, ep1IPv6, ep2IPv6, corev1.ServiceTypeClusterIP, true) + }) t.Run("NodePort", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv6, svc1IPv6, nil, ep1IPv6, ep2IPv6, corev1.ServiceTypeNodePort, true) + testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv6, svc1IPv6, nil, externalIPv6, ep1IPv6, ep2IPv6, corev1.ServiceTypeNodePort, true) }) t.Run("LoadBalancer", func(t *testing.T) { - testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv6, svc1IPv6, loadBalancerIPv6, ep1IPv6, ep2IPv6, corev1.ServiceTypeLoadBalancer, true) + testServiceExternalTrafficPolicyUpdate(t, nodePortAddressesIPv6, svc1IPv6, loadBalancerIPv6, externalIPv6, ep1IPv6, ep2IPv6, corev1.ServiceTypeLoadBalancer, true) }) }) } diff --git a/third_party/proxy/util/service.go b/third_party/proxy/util/service.go index 883c373d47f..9056f7b4626 100644 --- a/third_party/proxy/util/service.go +++ b/third_party/proxy/util/service.go @@ -37,10 +37,15 @@ package util import v1 "k8s.io/api/core/v1" +func ExternallyAccessible(service *v1.Service) bool { + return service.Spec.Type == v1.ServiceTypeLoadBalancer || + service.Spec.Type == v1.ServiceTypeNodePort || + (service.Spec.Type == v1.ServiceTypeClusterIP && len(service.Spec.ExternalIPs) > 0) +} + // ExternalPolicyLocal checks if service has ETP = Local. func ExternalPolicyLocal(service *v1.Service) bool { - if service.Spec.Type != v1.ServiceTypeLoadBalancer && - service.Spec.Type != v1.ServiceTypeNodePort { + if !ExternallyAccessible(service) { return false } return service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal From a36454f2139722afd63a775809085359bac3cc24 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Thu, 14 Dec 2023 11:54:03 +0800 Subject: [PATCH 4/5] Fix unit test TestReconcile cniServer.reconcile() now installs flows asynchorously. Signed-off-by: Quan Tian --- .../interface_configuration_linux_test.go | 2 +- .../cniserver/pod_configuration_linux_test.go | 20 +-- pkg/agent/cniserver/server_linux_test.go | 105 +++---------- pkg/agent/cniserver/server_test.go | 86 +++++++++- pkg/agent/cniserver/server_windows_test.go | 147 +++++------------- pkg/util/ip/ip.go | 8 + 6 files changed, 160 insertions(+), 208 deletions(-) diff --git a/pkg/agent/cniserver/interface_configuration_linux_test.go b/pkg/agent/cniserver/interface_configuration_linux_test.go index 1452dee54c3..e9672866a1b 100644 --- a/pkg/agent/cniserver/interface_configuration_linux_test.go +++ b/pkg/agent/cniserver/interface_configuration_linux_test.go @@ -127,7 +127,7 @@ func (ns *fakeNS) clear() { } func createNS(t *testing.T, waitForComplete bool) *fakeNS { - nsPath := generateUUID(t) + nsPath := generateUUID() fakeNs := &fakeNS{path: nsPath, fd: uintptr(unsafe.Pointer(&nsPath)), waitCompleted: waitForComplete, stopCh: make(chan struct{})} validNSs.Store(nsPath, fakeNs) return fakeNs diff --git a/pkg/agent/cniserver/pod_configuration_linux_test.go b/pkg/agent/cniserver/pod_configuration_linux_test.go index 66b94f7f6e9..206b7cd7801 100644 --- a/pkg/agent/cniserver/pod_configuration_linux_test.go +++ b/pkg/agent/cniserver/pod_configuration_linux_test.go @@ -136,7 +136,7 @@ func TestConnectInterceptedInterface(t *testing.T) { testPodName := "test-pod" podNamespace := testPodNamespace hostInterfaceName := util.GenerateContainerInterfaceName(testPodName, testPodNamespace, testPodInfraContainerID) - containerID := generateUUID(t) + containerID := generateUUID() containerNetNS := "container-ns" containerDev := "eth0" @@ -210,7 +210,7 @@ func TestConnectInterceptedInterface(t *testing.T) { if tc.migratedRoute { mockRoute.EXPECT().MigrateRoutesToGw(hostInterfaceName).Return(tc.migrateRouteErr) } - ovsPortID := generateUUID(t) + ovsPortID := generateUUID() if tc.connectedOVS { mockOVSBridgeClient.EXPECT().CreatePort(hostInterfaceName, gomock.Any(), gomock.Any()).Return(ovsPortID, tc.createOVSPortErr).Times(1) if tc.createOVSPortErr == nil { @@ -239,7 +239,7 @@ func TestConnectInterceptedInterface(t *testing.T) { func TestCreateOVSPort(t *testing.T) { controller := gomock.NewController(t) - containerID := generateUUID(t) + containerID := generateUUID() podName := "p0" podNamespace := testPodNamespace @@ -271,10 +271,10 @@ func TestCreateOVSPort(t *testing.T) { containerConfig := buildContainerConfig(tc.portName, containerID, podName, podNamespace, ¤t.Interface{Mac: "01:02:03:04:05:06"}, ipamResult.IPs, tc.vlanID) attachInfo := BuildOVSPortExternalIDs(containerConfig) if tc.createOVSPort { - mockOVSBridgeClient.EXPECT().CreatePort(tc.portName, tc.portName, attachInfo).Times(1).Return(generateUUID(t), nil) + mockOVSBridgeClient.EXPECT().CreatePort(tc.portName, tc.portName, attachInfo).Times(1).Return(generateUUID(), nil) } if tc.createOVSAccessPort { - mockOVSBridgeClient.EXPECT().CreateAccessPort(tc.portName, tc.portName, attachInfo, tc.vlanID).Times(1).Return(generateUUID(t), nil) + mockOVSBridgeClient.EXPECT().CreateAccessPort(tc.portName, tc.portName, attachInfo, tc.vlanID).Times(1).Return(generateUUID(), nil) } _, err := podConfigurator.createOVSPort(tc.portName, attachInfo, tc.vlanID) assert.NoError(t, err) @@ -283,8 +283,8 @@ func TestCreateOVSPort(t *testing.T) { } func TestParseOVSPortInterfaceConfig(t *testing.T) { - containerID := generateUUID(t) - portUUID := generateUUID(t) + containerID := generateUUID() + portUUID := generateUUID() ofPort := int32(1) containerIPs := "1.1.1.2,aabb:1122::101:102" parsedIPs := []net.IP{net.ParseIP("1.1.1.2"), net.ParseIP("aabb:1122::101:102")} @@ -398,14 +398,14 @@ func TestParseOVSPortInterfaceConfig(t *testing.T) { func TestCheckHostInterface(t *testing.T) { controller := gomock.NewController(t) hostIfaceName := "port1" - containerID := generateUUID(t) + containerID := generateUUID() containerIntf := ¤t.Interface{Name: ifname, Sandbox: netns, Mac: "01:02:03:04:05:06"} interfaces := []*current.Interface{containerIntf, {Name: hostIfaceName}} containeIPs := ipamResult.IPs ifaceMAC, _ := net.ParseMAC("01:02:03:04:05:06") containerInterface := interfacestore.NewContainerInterface(hostIfaceName, containerID, "pod1", testPodNamespace, ifaceMAC, []net.IP{containerIP}, 1) containerInterface.OVSPortConfig = &interfacestore.OVSPortConfig{ - PortUUID: generateUUID(t), + PortUUID: generateUUID(), OFPort: int32(10), } @@ -454,7 +454,7 @@ func TestCheckHostInterface(t *testing.T) { func TestConfigureSriovSecondaryInterface(t *testing.T) { controller := gomock.NewController(t) - containerID := generateUUID(t) + containerID := generateUUID() containerNS := "containerNS" for _, tc := range []struct { diff --git a/pkg/agent/cniserver/server_linux_test.go b/pkg/agent/cniserver/server_linux_test.go index 9acabb19443..9eb2f6b808c 100644 --- a/pkg/agent/cniserver/server_linux_test.go +++ b/pkg/agent/cniserver/server_linux_test.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "testing" + "time" cnitypes "github.com/containernetworking/cni/pkg/types" current "github.com/containernetworking/cni/pkg/types/100" @@ -27,9 +28,6 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" fakeclientset "k8s.io/client-go/kubernetes/fake" "antrea.io/antrea/pkg/agent/cniserver/ipam" @@ -348,7 +346,7 @@ func TestCmdAdd(t *testing.T) { if tc.addLocalIPAMRoute { mockRoute.EXPECT().AddLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(tc.addLocalIPAMRouteError).Times(1) } - ovsPortID := generateUUID(t) + ovsPortID := generateUUID() if tc.connectOVS { mockOVSBridgeClient.EXPECT().CreatePort(hostInterfaceName, gomock.Any(), gomock.Any()).Return(ovsPortID, nil).Times(1) mockOVSBridgeClient.EXPECT().GetOFPort(hostInterfaceName, false).Return(int32(100), nil).Times(1) @@ -394,7 +392,7 @@ func TestCmdAdd(t *testing.T) { func TestCmdDel(t *testing.T) { controller := gomock.NewController(t) ipamMock := ipamtest.NewMockIPAMDriver(controller) - ovsPortID := generateUUID(t) + ovsPortID := generateUUID() ovsPort := int32(100) ctx := context.TODO() @@ -543,7 +541,7 @@ func TestCmdDel(t *testing.T) { func TestCmdCheck(t *testing.T) { controller := gomock.NewController(t) ipamMock := ipamtest.NewMockIPAMDriver(controller) - ovsPortID := generateUUID(t) + ovsPortID := generateUUID() ovsPort := int32(100) ctx := context.TODO() @@ -635,98 +633,33 @@ func TestReconcile(t *testing.T) { mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) - nodeName := "node1" cniServer := newCNIServer(t) cniServer.routeClient = mockRoute - gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, channel.NewSubscribableChannel("PodUpdate", 100), nil, false) cniServer.podConfigurator.ifConfigurator = newTestInterfaceConfigurator() cniServer.nodeConfig = &config.NodeConfig{ Name: nodeName, } - pods := []runtime.Object{ - &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "p1", - Namespace: testPodNamespace, - }, - Spec: v1.PodSpec{ - NodeName: nodeName, - }, - }, - &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "p2", - Namespace: testPodNamespace, - }, - Spec: v1.PodSpec{ - NodeName: nodeName, - HostNetwork: true, - }, - }, - &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "p4", - Namespace: testPodNamespace, - }, - Spec: v1.PodSpec{ - NodeName: nodeName, - }, - }, - } - containerIfaces := map[string]*interfacestore.InterfaceConfig{ - "iface1": { - InterfaceName: "iface1", - Type: interfacestore.ContainerInterface, - OVSPortConfig: &interfacestore.OVSPortConfig{ - PortUUID: generateUUID(t), - OFPort: int32(3), - }, - ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ - PodName: "p1", - PodNamespace: testPodNamespace, - ContainerID: generateUUID(t), - }, - }, - "iface3": { - InterfaceName: "iface3", - Type: interfacestore.ContainerInterface, - OVSPortConfig: &interfacestore.OVSPortConfig{ - PortUUID: generateUUID(t), - OFPort: int32(4), - }, - ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ - PodName: "p3", - PodNamespace: testPodNamespace, - ContainerID: generateUUID(t), - }, - }, - "iface4": { - InterfaceName: "iface4", - Type: interfacestore.ContainerInterface, - OVSPortConfig: &interfacestore.OVSPortConfig{ - PortUUID: generateUUID(t), - OFPort: int32(-1), - }, - ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ - PodName: "p4", - PodNamespace: testPodNamespace, - ContainerID: generateUUID(t), - }, - }, - } - kubeClient := fakeclientset.NewSimpleClientset(pods...) + kubeClient := fakeclientset.NewSimpleClientset(pod1, pod2, pod3) cniServer.kubeClient = kubeClient - for _, containerIface := range containerIfaces { + for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { ifaceStore.AddInterface(containerIface) } - mockOFClient.EXPECT().InstallPodFlows("iface1", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1) - iface := containerIfaces["iface3"] - mockOFClient.EXPECT().UninstallPodFlows("iface3").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().DeletePort(iface.PortUUID).Return(nil).Times(1) + podFlowsInstalled := make(chan struct{}) + mockOFClient.EXPECT().InstallPodFlows(normalInterface.InterfaceName, normalInterface.IPs, normalInterface.MAC, uint32(normalInterface.OFPort), uint16(0), nil). + Do(func(_ string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) { + close(podFlowsInstalled) + }).Times(1) + mockOFClient.EXPECT().UninstallPodFlows(staleInterface.InterfaceName).Return(nil).Times(1) + mockOVSBridgeClient.EXPECT().DeletePort(staleInterface.PortUUID).Return(nil).Times(1) mockRoute.EXPECT().DeleteLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(nil).Times(1) err := cniServer.reconcile() assert.NoError(t, err) - _, exists := ifaceStore.GetInterfaceByName("iface3") + _, exists := ifaceStore.GetInterfaceByName(staleInterface.InterfaceName) assert.False(t, exists) + select { + case <-podFlowsInstalled: + case <-time.After(500 * time.Millisecond): + t.Errorf("InstallPodFlows for %s should be called but was not", normalInterface.InterfaceName) + } } diff --git a/pkg/agent/cniserver/server_test.go b/pkg/agent/cniserver/server_test.go index 478960d4e76..6ec5edc4ce8 100644 --- a/pkg/agent/cniserver/server_test.go +++ b/pkg/agent/cniserver/server_test.go @@ -29,6 +29,8 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "antrea.io/antrea/pkg/agent/cniserver/ipam" ipamtest "antrea.io/antrea/pkg/agent/cniserver/ipam/testing" @@ -43,6 +45,7 @@ import ( "antrea.io/antrea/pkg/cni" "antrea.io/antrea/pkg/ovs/ovsconfig" ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" + utilip "antrea.io/antrea/pkg/util/ip" "antrea.io/antrea/pkg/util/wait" ) @@ -76,6 +79,80 @@ var ( ifaceStore interfacestore.InterfaceStore emptyResponse = &cnipb.CniCmdResponse{CniResult: []byte("")} + + nodeName = "node1" + gwMAC = utilip.MustParseMAC("00:00:11:11:11:11") + pod1 = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "p1", + Namespace: testPodNamespace, + }, + Spec: v1.PodSpec{ + NodeName: nodeName, + }, + } + pod2 = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "p2", + Namespace: testPodNamespace, + }, + Spec: v1.PodSpec{ + NodeName: nodeName, + HostNetwork: true, + }, + } + pod3 = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "p3", + Namespace: testPodNamespace, + }, + Spec: v1.PodSpec{ + NodeName: nodeName, + }, + } + normalInterface = &interfacestore.InterfaceConfig{ + InterfaceName: "iface1", + Type: interfacestore.ContainerInterface, + IPs: []net.IP{net.ParseIP("1.1.1.1")}, + MAC: utilip.MustParseMAC("00:11:22:33:44:01"), + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: generateUUID(), + OFPort: int32(3), + }, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodName: pod1.Name, + PodNamespace: testPodNamespace, + ContainerID: generateUUID(), + }, + } + staleInterface = &interfacestore.InterfaceConfig{ + InterfaceName: "iface3", + Type: interfacestore.ContainerInterface, + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: generateUUID(), + OFPort: int32(4), + }, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodName: "non-existing-pod", + PodNamespace: testPodNamespace, + ContainerID: generateUUID(), + }, + } + unconnectedInterface = &interfacestore.InterfaceConfig{ + InterfaceName: "iface4", + Type: interfacestore.ContainerInterface, + IPs: []net.IP{net.ParseIP("1.1.1.2")}, + MAC: utilip.MustParseMAC("00:11:22:33:44:02"), + OVSPortConfig: &interfacestore.OVSPortConfig{ + PortUUID: generateUUID(), + OFPort: int32(-1), + }, + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ + PodName: pod3.Name, + PodNamespace: testPodNamespace, + ContainerID: generateUUID(), + }, + } ) func TestLoadNetConfig(t *testing.T) { @@ -695,7 +772,7 @@ func generateNetworkConfiguration(name, cniVersion, cniType, ipamType string) *t } func newRequest(args string, netCfg *types.NetworkConfig, path string, t *testing.T) (*cnipb.CniCmdRequest, string) { - containerID := generateUUID(t) + containerID := generateUUID() networkConfig, err := json.Marshal(netCfg) if err != nil { t.Error("Failed to generate Network configuration") @@ -714,11 +791,8 @@ func newRequest(args string, netCfg *types.NetworkConfig, path string, t *testin return cmdRequest, containerID } -func generateUUID(t *testing.T) string { - newID, err := uuid.NewUUID() - if err != nil { - t.Fatal("Failed to generate UUID") - } +func generateUUID() string { + newID, _ := uuid.NewUUID() return newID.String() } diff --git a/pkg/agent/cniserver/server_windows_test.go b/pkg/agent/cniserver/server_windows_test.go index 27b3e68d180..2a7f38f4d72 100644 --- a/pkg/agent/cniserver/server_windows_test.go +++ b/pkg/agent/cniserver/server_windows_test.go @@ -28,9 +28,6 @@ import ( current "github.com/containernetworking/cni/pkg/types/100" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" fakeclientset "k8s.io/client-go/kubernetes/fake" @@ -326,8 +323,8 @@ func TestCmdAdd(t *testing.T) { dockerInfraContainer := "261a1970-5b6c-11ed-8caf-000c294e5d03" dockerWorkContainer := "261e579a-5b6c-11ed-8caf-000c294e5d03" - unknownInfraContainer := generateUUID(t) - containerdInfraContainer := generateUUID(t) + unknownInfraContainer := generateUUID() + containerdInfraContainer := generateUUID() defer mockHostInterfaceExists()() defer mockGetHnsNetworkByName()() @@ -450,7 +447,7 @@ func TestCmdAdd(t *testing.T) { podName: "pod8", containerID: containerdInfraContainer, infraContainerID: containerdInfraContainer, - netns: generateUUID(t), + netns: generateUUID(), ipamAdd: true, connectOVS: true, containerIfaceExist: true, @@ -459,7 +456,7 @@ func TestCmdAdd(t *testing.T) { podName: "pod9", containerID: containerdInfraContainer, infraContainerID: containerdInfraContainer, - netns: generateUUID(t), + netns: generateUUID(), oriIPAMResult: oriIPAMResult, connectOVS: true, containerIfaceExist: true, @@ -469,7 +466,7 @@ func TestCmdAdd(t *testing.T) { podName: "pod10", containerID: containerdInfraContainer, infraContainerID: containerdInfraContainer, - netns: generateUUID(t), + netns: generateUUID(), ipamDel: true, oriIPAMResult: oriIPAMResult, endpointAttachErr: fmt.Errorf("unable to attach HnsEndpoint"), @@ -483,14 +480,14 @@ func TestCmdAdd(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { isDocker := isDockerContainer(tc.netns) - testUtil := newHnsTestUtil(generateUUID(t), tc.existingHnsEndpoints, isDocker, tc.isAttached, tc.hnsEndpointCreateErr, tc.endpointAttachErr) + testUtil := newHnsTestUtil(generateUUID(), tc.existingHnsEndpoints, isDocker, tc.isAttached, tc.hnsEndpointCreateErr, tc.endpointAttachErr) testUtil.setFunctions() defer testUtil.restore() waiter := newAsyncWaiter(tc.podName, tc.infraContainerID) server := newMockCNIServer(t, controller, waiter.notifier) requestMsg, ovsPortName := prepareSetup(t, ipamType, tc.podName, tc.containerID, tc.infraContainerID, tc.netns, nil) if tc.endpointExists { - server.podConfigurator.ifConfigurator.(*ifConfigurator).addEndpoint(getHnsEndpoint(generateUUID(t), ovsPortName)) + server.podConfigurator.ifConfigurator.(*ifConfigurator).addEndpoint(getHnsEndpoint(generateUUID(), ovsPortName)) } if tc.oriIPAMResult != nil { ipam.AddIPAMResult(tc.infraContainerID, tc.oriIPAMResult) @@ -501,7 +498,7 @@ func TestCmdAdd(t *testing.T) { if tc.ipamDel { ipamMock.EXPECT().Del(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).Times(1) } - ovsPortID := generateUUID(t) + ovsPortID := generateUUID() if tc.connectOVS { if isDocker { mockOVSBridgeClient.EXPECT().CreateInternalPort(ovsPortName, int32(0), gomock.Any(), gomock.Any()).Return(ovsPortID, nil).Times(1) @@ -615,7 +612,7 @@ func TestCmdDel(t *testing.T) { t.Run(tc.name, func(t *testing.T) { isDocker := isDockerContainer(tc.netns) requestMsg, ovsPortName := prepareSetup(t, ipamType, tc.podName, tc.containerID, tc.containerID, tc.netns, nil) - hnsEndpoint := getHnsEndpoint(generateUUID(t), ovsPortName) + hnsEndpoint := getHnsEndpoint(generateUUID(), ovsPortName) var existingHnsEndpoints []hcsshim.HNSEndpoint if tc.endpointExists { existingHnsEndpoints = append(existingHnsEndpoints, *hnsEndpoint) @@ -625,7 +622,7 @@ func TestCmdDel(t *testing.T) { defer testUtil.restore() waiter := newAsyncWaiter(tc.podName, tc.containerID) server := newMockCNIServer(t, controller, waiter.notifier) - ovsPortID := generateUUID(t) + ovsPortID := generateUUID() if tc.endpointExists { server.podConfigurator.ifConfigurator.(*ifConfigurator).addEndpoint(hnsEndpoint) } @@ -683,7 +680,7 @@ func TestCmdCheck(t *testing.T) { defer mockSetInterfaceMTU(nil)() defer mockListHnsEndpoint(nil, nil)() defer mockGetNetInterfaceAddrs(containerIPNet, nil)() - defer mockGetHnsEndpointByName(generateUUID(t), mac)() + defer mockGetHnsEndpointByName(generateUUID(), mac)() wrapperIPAMResult := func(ipamResult current.Result, interfaces []*current.Interface) *current.Result { result := ipamResult @@ -721,7 +718,7 @@ func TestCmdCheck(t *testing.T) { {Name: "pod0-6631b7", Mac: "11:22:33:44:33:22", Sandbox: ""}, {Name: "pod0-6631b7_eth0", Mac: "11:22:33:44:33:22", Sandbox: "none"}, }), - existingIface: wrapperContainerInterface("pod0-6631b7", containerID, "pod0", generateUUID(t), mac, containerIP), + existingIface: wrapperContainerInterface("pod0-6631b7", containerID, "pod0", generateUUID(), mac, containerIP), netInterface: &net.Interface{ Name: "vEthernet (pod0-6631b7)", HardwareAddr: mac, @@ -737,7 +734,7 @@ func TestCmdCheck(t *testing.T) { {Name: "pod1-6631b7", Mac: "11:22:33:44:33:22", Sandbox: ""}, {Name: "pod1-6631b7_eth0", Mac: "11:22:33:44:33:22", Sandbox: "invalid-namespace"}, }), - existingIface: wrapperContainerInterface("pod1-6631b7", containerID, "pod1", generateUUID(t), mac, containerIP), + existingIface: wrapperContainerInterface("pod1-6631b7", containerID, "pod1", generateUUID(), mac, containerIP), netInterface: &net.Interface{ Name: "vEthernet (pod1-6631b7)", HardwareAddr: mac, @@ -759,7 +756,7 @@ func TestCmdCheck(t *testing.T) { {Name: "pod2-6631b7", Mac: "11:22:33:44:33:22", Sandbox: ""}, {Name: "eth0", Mac: "11:22:33:44:33:22", Sandbox: "none"}, }), - existingIface: wrapperContainerInterface("pod2-6631b7", containerID, "pod2", generateUUID(t), mac, containerIP), + existingIface: wrapperContainerInterface("pod2-6631b7", containerID, "pod2", generateUUID(), mac, containerIP), netInterface: &net.Interface{ Name: "vEthernet (pod2-6631b7)", HardwareAddr: mac, @@ -781,7 +778,7 @@ func TestCmdCheck(t *testing.T) { {Name: "pod3-6631b7", Mac: "11:22:33:44:33:22", Sandbox: ""}, {Name: "pod3-6631b7_eth0", Mac: "11:22:33:44:33:33", Sandbox: "none"}, }), - existingIface: wrapperContainerInterface("pod3-6631b7", containerID, "pod3", generateUUID(t), mac, containerIP), + existingIface: wrapperContainerInterface("pod3-6631b7", containerID, "pod3", generateUUID(), mac, containerIP), netInterface: &net.Interface{ Name: "vEthernet (pod3-6631b7)", HardwareAddr: mac, @@ -856,10 +853,10 @@ func TestReconcile(t *testing.T) { mockOFClient = openflowtest.NewMockClient(controller) ifaceStore = interfacestore.NewInterfaceStore() mockRoute = routetest.NewMockInterface(controller) - nodeName := "node1" + defer mockHostInterfaceExists()() defer mockGetHnsNetworkByName()() - missingEndpoint := getHnsEndpoint(generateUUID(t), "iface4") + missingEndpoint := getHnsEndpoint(generateUUID(), "iface4") testUtil := newHnsTestUtil(missingEndpoint.Id, []hcsshim.HNSEndpoint{*missingEndpoint}, false, true, nil, nil) testUtil.createHnsEndpoint(missingEndpoint) testUtil.setFunctions() @@ -867,105 +864,45 @@ func TestReconcile(t *testing.T) { cniServer := newCNIServer(t) cniServer.routeClient = mockRoute - gwMAC, _ := net.ParseMAC("00:00:11:11:11:11") - pods := []runtime.Object{ - &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "p1", - Namespace: testPodNamespace, - }, - Spec: v1.PodSpec{ - NodeName: nodeName, - }, - }, - &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "p2", - Namespace: testPodNamespace, - }, - Spec: v1.PodSpec{ - NodeName: nodeName, - HostNetwork: true, - }, - }, - &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "p4", - Namespace: testPodNamespace, - }, - Spec: v1.PodSpec{ - NodeName: nodeName, - }, - }, - } - kubeClient := fakeclientset.NewSimpleClientset(pods...) + kubeClient := fakeclientset.NewSimpleClientset(pod1, pod2, pod3) cniServer.kubeClient = kubeClient - containerIfaces := map[string]*interfacestore.InterfaceConfig{ - "iface1": { - InterfaceName: "iface1", - Type: interfacestore.ContainerInterface, - OVSPortConfig: &interfacestore.OVSPortConfig{ - PortUUID: generateUUID(t), - OFPort: int32(3), - }, - ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ - PodName: "p1", - PodNamespace: testPodNamespace, - ContainerID: generateUUID(t), - }, - }, - "iface3": { - InterfaceName: "iface3", - Type: interfacestore.ContainerInterface, - OVSPortConfig: &interfacestore.OVSPortConfig{ - PortUUID: generateUUID(t), - OFPort: int32(4), - }, - ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ - PodName: "p3", - PodNamespace: testPodNamespace, - ContainerID: generateUUID(t), - }, - }, - "iface4": { - InterfaceName: "iface4", - Type: interfacestore.ContainerInterface, - OVSPortConfig: &interfacestore.OVSPortConfig{ - PortUUID: generateUUID(t), - OFPort: int32(-1), - }, - ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{ - PodName: "p4", - PodNamespace: testPodNamespace, - ContainerID: generateUUID(t), - }, - }, - } - for _, containerIface := range containerIfaces { + for _, containerIface := range []*interfacestore.InterfaceConfig{normalInterface, staleInterface, unconnectedInterface} { ifaceStore.AddInterface(containerIface) } - pod4IfaceName := "iface4" - pod4Iface := containerIfaces["iface4"] - waiter := newAsyncWaiter(pod4Iface.PodName, pod4Iface.ContainerID) + waiter := newAsyncWaiter(unconnectedInterface.PodName, unconnectedInterface.ContainerID) cniServer.podConfigurator, _ = newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, waiter.notifier, nil, false) cniServer.nodeConfig = &config.NodeConfig{Name: nodeName} // Re-install Pod1 flows - mockOFClient.EXPECT().InstallPodFlows("iface1", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1) + podFlowsInstalled := make(chan string, 2) + mockOFClient.EXPECT().InstallPodFlows(normalInterface.InterfaceName, normalInterface.IPs, normalInterface.MAC, uint32(normalInterface.OFPort), uint16(0), nil). + Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) { + podFlowsInstalled <- interfaceName + }).Times(1) // Uninstall Pod3 flows which is deleted. - iface := containerIfaces["iface3"] - mockOFClient.EXPECT().UninstallPodFlows("iface3").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().DeletePort(iface.PortUUID).Return(nil).Times(1) + mockOFClient.EXPECT().UninstallPodFlows(staleInterface.InterfaceName).Return(nil).Times(1) + mockOVSBridgeClient.EXPECT().DeletePort(staleInterface.PortUUID).Return(nil).Times(1) mockRoute.EXPECT().DeleteLocalAntreaFlexibleIPAMPodRule(gomock.Any()).Return(nil).Times(1) // Re-connect to Pod4 - hostIfaces.Store(fmt.Sprintf("vEthernet (%s)", pod4IfaceName), true) - mockOVSBridgeClient.EXPECT().SetInterfaceType(pod4IfaceName, "internal").Return(nil).Times(1) - mockOVSBridgeClient.EXPECT().GetOFPort(pod4IfaceName, true).Return(int32(5), nil).Times(1) - mockOFClient.EXPECT().InstallPodFlows(pod4IfaceName, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1) + hostIfaces.Store(fmt.Sprintf("vEthernet (%s)", unconnectedInterface.InterfaceName), true) + mockOVSBridgeClient.EXPECT().SetInterfaceType(unconnectedInterface.InterfaceName, "internal").Return(nil).Times(1) + mockOVSBridgeClient.EXPECT().GetOFPort(unconnectedInterface.InterfaceName, true).Return(int32(5), nil).Times(1) + mockOFClient.EXPECT().InstallPodFlows(unconnectedInterface.InterfaceName, unconnectedInterface.IPs, unconnectedInterface.MAC, uint32(5), uint16(0), nil). + Do(func(interfaceName string, _ []net.IP, _ net.HardwareAddr, _ uint32, _ uint16, _ *uint32) { + podFlowsInstalled <- interfaceName + }).Times(1) err := cniServer.reconcile() assert.NoError(t, err) _, exists := ifaceStore.GetInterfaceByName("iface3") assert.False(t, exists) + for i := 0; i < 2; i++ { + select { + case <-podFlowsInstalled: + case <-time.After(500 * time.Millisecond): + t.Errorf("InstallPodFlows should be called 2 times but was only called %d times", i) + break + } + } waiter.wait() waiter.close() } diff --git a/pkg/util/ip/ip.go b/pkg/util/ip/ip.go index bd82ea18989..58563e2274c 100644 --- a/pkg/util/ip/ip.go +++ b/pkg/util/ip/ip.go @@ -195,6 +195,14 @@ func MustParseCIDR(cidr string) *net.IPNet { return ipNet } +func MustParseMAC(mac string) net.HardwareAddr { + addr, err := net.ParseMAC(mac) + if err != nil { + panic(fmt.Errorf("cannot parse '%v': %v", mac, err)) + } + return addr +} + // IPNetEqual returns if the provided IPNets are the same subnet. func IPNetEqual(ipNet1, ipNet2 *net.IPNet) bool { if ipNet1 == nil && ipNet2 == nil { From f57e6f701da674da9f8aa4409e39c4c480ae4eca Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Tue, 2 Jan 2024 17:40:39 +0800 Subject: [PATCH 5/5] Enable IPv4/IPv6 forwarding on demand automatically Although it has been documented as a prerequisite in [1], there are some platforms not enabling ip forwarding by default. kube-proxy ipvs mode and some CNIs enable it by themselves to ensure Pod networking work properly. As Antrea needs IP forwarding to be enabled, there seems no reason to not do it by itself, rather than expecting users or other components to do it. [1] https://kubernetes.io/docs/setup/production-environment/container-runtimes/#install-and-configure-prerequisites Signed-off-by: Quan Tian --- pkg/agent/route/route_linux.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index e4c6c4dfcec..584c777a466 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -182,15 +182,17 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error { return fmt.Errorf("failed to initialize ip routes: %v", err) } + // Ensure IPv4 forwarding is enabled if it is a dual-stack or IPv4-only cluster. + if c.nodeConfig.NodeIPv4Addr != nil { + if err := sysctl.EnsureSysctlNetValue("ipv4/ip_forward", 1); err != nil { + return fmt.Errorf("failed to enable IPv4 forwarding: %w", err) + } + } + // Ensure IPv6 forwarding is enabled if it is a dual-stack or IPv6-only cluster. if c.nodeConfig.NodeIPv6Addr != nil { - sysctlFilename := "ipv6/conf/all/forwarding" - v, err := sysctl.GetSysctlNet(sysctlFilename) - if err != nil { - return fmt.Errorf("failed to read value of sysctl file: %s", sysctlFilename) - } - if v != 1 { - return fmt.Errorf("IPv6 forwarding is not enabled") + if err := sysctl.EnsureSysctlNetValue("ipv6/conf/all/forwarding", 1); err != nil { + return fmt.Errorf("failed to enable IPv6 forwarding: %w", err) } }