diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 162e4458237..bbecfb445de 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -52,6 +52,7 @@ import ( "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/flowexporter/exporter" "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/ipassigner/linkmonitor" "antrea.io/antrea/pkg/agent/memberlist" "antrea.io/antrea/pkg/agent/metrics" "antrea.io/antrea/pkg/agent/monitortool" @@ -546,6 +547,7 @@ func run(o *Options) error { var externalIPPoolController *externalippool.ExternalIPPoolController var externalIPController *serviceexternalip.ServiceExternalIPController var memberlistCluster *memberlist.Cluster + var linkMonitor linkmonitor.Interface if o.enableEgress || features.DefaultFeatureGate.Enabled(features.ServiceExternalIP) { externalIPPoolController = externalippool.NewExternalIPPoolController( @@ -565,6 +567,7 @@ func run(o *Options) error { if err != nil { return fmt.Errorf("error creating new memberlist cluster: %v", err) } + linkMonitor = linkmonitor.NewLinkMonitor() } if o.enableEgress { egressController, err = egress.NewEgressController( @@ -572,6 +575,7 @@ func run(o *Options) error { memberlistCluster, egressInformer, externalIPPoolInformer, nodeInformer, podUpdateChannel, serviceCIDRProvider, o.config.Egress.MaxEgressIPsPerNode, features.DefaultFeatureGate.Enabled(features.EgressTrafficShaping), features.DefaultFeatureGate.Enabled(features.EgressSeparateSubnet), + linkMonitor, ) if err != nil { return fmt.Errorf("error creating new Egress controller: %v", err) @@ -584,12 +588,17 @@ func run(o *Options) error { memberlistCluster, serviceInformer, endpointsInformer, + linkMonitor, ) if err != nil { return fmt.Errorf("error creating new ServiceExternalIP controller: %v", err) } } + if linkMonitor != nil { + go linkMonitor.Run(stopCh) + } + var cniServer *cniserver.CNIServer var externalNodeController *externalnode.ExternalNodeController var localExternalNodeInformer cache.SharedIndexInformer diff --git a/go.mod b/go.mod index 55be4d9f210..fcfa1165ab9 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( github.com/lithammer/dedent v1.1.0 github.com/mdlayher/arp v0.0.0-20220221190821-c37aaafac7f9 github.com/mdlayher/ethernet v0.0.0-20220221185849-529eae5b6118 - github.com/mdlayher/ndp v0.8.0 + github.com/mdlayher/ndp v1.1.0 github.com/mdlayher/packet v1.1.2 github.com/miekg/dns v1.1.62 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 @@ -213,7 +213,6 @@ require ( github.com/vishvananda/netns v0.0.4 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xlab/treeprint v1.2.0 // indirect - gitlab.com/golang-commonmark/puny v0.0.0-20191124015043-9f83538fa04f // indirect go.etcd.io/etcd/api/v3 v3.5.14 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.14 // indirect go.etcd.io/etcd/client/v3 v3.5.14 // indirect @@ -250,3 +249,6 @@ require ( sigs.k8s.io/kustomize/kyaml v0.17.1 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect ) + +// remove this when https://github.com/mdlayher/ndp/pull/32 gets merged +replace github.com/mdlayher/ndp => github.com/antrea-io/ndp v0.0.0-20241107040829-6f35f2e50f4c diff --git a/go.sum b/go.sum index 9a639da28e4..231aff4047b 100644 --- a/go.sum +++ b/go.sum @@ -59,6 +59,8 @@ github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= +github.com/antrea-io/ndp v0.0.0-20241107040829-6f35f2e50f4c h1:9BpiAOFYkjvKFzSCoQHRcOFmXBHpgq74wpSu0CkM6F8= +github.com/antrea-io/ndp v0.0.0-20241107040829-6f35f2e50f4c/go.mod h1:FmgESgemgjl38vuOIyAHWUUL6vQKA/pQNkvXdWsdQFM= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.4.0 h1:yCQqn7dwca4ITXb+CbubHmedzaQYHhNhrEXLYUeEe8Q= github.com/armon/go-metrics v0.4.0/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= @@ -347,7 +349,6 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -527,8 +528,6 @@ github.com/mdlayher/ethernet v0.0.0-20220221185849-529eae5b6118/go.mod h1:ZFUnHI github.com/mdlayher/ethtool v0.0.0-20210210192532-2b88debcdd43/go.mod h1:+t7E0lkKfbBsebllff1xdTmyJt8lH37niI6kwFk9OTo= github.com/mdlayher/genetlink v1.0.0 h1:OoHN1OdyEIkScEmRgxLEe2M9U8ClMytqA5niynLtfj0= github.com/mdlayher/genetlink v1.0.0/go.mod h1:0rJ0h4itni50A86M2kHcgS85ttZazNt7a8H2a2cw0Gc= -github.com/mdlayher/ndp v0.8.0 h1:oVCl5JZSzT/YJE6cJd7EnNDWmX1fl4hJV0S/UCBNoHE= -github.com/mdlayher/ndp v0.8.0/go.mod h1:32w/5dDZWVSEOxyniAgKK4d7dHTuO6TCxWmUznQe3f8= github.com/mdlayher/netlink v0.0.0-20190409211403-11939a169225/go.mod h1:eQB3mZE4aiYnlUsyGGCOpPETfdQq4Jhsgf1fk3cwQaA= github.com/mdlayher/netlink v1.0.0/go.mod h1:KxeJAFOFLG6AjpyDkQ/iIhxygIUKD+vcwqcnu43w/+M= github.com/mdlayher/netlink v1.1.0/go.mod h1:H4WCitaheIsdF9yOYu8CFmCgQthAPIWZmcKp9uZHgmY= @@ -770,8 +769,6 @@ github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1: github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -gitlab.com/golang-commonmark/puny v0.0.0-20191124015043-9f83538fa04f h1:Wku8eEdeJqIOFHtrfkYUByc4bCaTeA6fL0UJgfEiFMI= -gitlab.com/golang-commonmark/puny v0.0.0-20191124015043-9f83538fa04f/go.mod h1:Tiuhl+njh/JIg0uS/sOJVYi0x2HEa5rc1OAaVsb5tAs= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= @@ -886,7 +883,6 @@ golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -913,7 +909,6 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -950,7 +945,6 @@ golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200217220822-9197077df867/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200602100848-8d3cce7afc34/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200728102440-3e129f6d46b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go index c50345738b4..bdc91c8b414 100644 --- a/pkg/agent/controller/egress/egress_controller.go +++ b/pkg/agent/controller/egress/egress_controller.go @@ -44,6 +44,7 @@ import ( "antrea.io/antrea/pkg/agent/client" "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/ipassigner" + "antrea.io/antrea/pkg/agent/ipassigner/linkmonitor" "antrea.io/antrea/pkg/agent/memberlist" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/route" @@ -205,6 +206,8 @@ type EgressController struct { tableAllocator *idAllocator // Each subnet has its own route table. egressRouteTables map[crdv1b1.SubnetInfo]*egressRouteTable + + linkMonitor linkmonitor.Interface } func NewEgressController( @@ -225,6 +228,7 @@ func NewEgressController( maxEgressIPsPerNode int, trafficShapingEnabled bool, supportSeparateSubnet bool, + linkMonitor linkmonitor.Interface, ) (*EgressController, error) { if trafficShapingEnabled && !openflow.OVSMetersAreSupported() { klog.Info("EgressTrafficShaping feature gate is enabled, but it is ignored because OVS meters are not supported.") @@ -273,6 +277,7 @@ func NewEgressController( externalIPPoolLister: externalIPPoolInformer.Lister(), externalIPPoolListerSynced: externalIPPoolInformer.Informer().HasSynced, supportSeparateSubnet: supportSeparateSubnet, + linkMonitor: linkMonitor, } if supportSeparateSubnet { c.egressRouteTables = map[crdv1b1.SubnetInfo]*egressRouteTable{} @@ -285,7 +290,7 @@ func NewEgressController( resyncPeriod, ) } - ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice) + ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice, linkMonitor) if err != nil { return nil, fmt.Errorf("initializing egressIP assigner failed: %v", err) } @@ -505,7 +510,7 @@ func (c *EgressController) Run(stopCh <-chan struct{}) { go c.localIPDetector.Run(stopCh) go c.egressIPScheduler.Run(stopCh) go c.ipAssigner.Run(stopCh) - if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.egressListerSynced, c.externalIPPoolListerSynced, c.localIPDetector.HasSynced, c.egressIPScheduler.HasScheduled) { + if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.egressListerSynced, c.externalIPPoolListerSynced, c.localIPDetector.HasSynced, c.egressIPScheduler.HasScheduled, c.linkMonitor.HasSynced) { return } diff --git a/pkg/agent/controller/egress/egress_controller_test.go b/pkg/agent/controller/egress/egress_controller_test.go index a3d75ffc487..1f712351913 100644 --- a/pkg/agent/controller/egress/egress_controller_test.go +++ b/pkg/agent/controller/egress/egress_controller_test.go @@ -38,6 +38,7 @@ import ( "antrea.io/antrea/pkg/agent/interfacestore" "antrea.io/antrea/pkg/agent/ipassigner" + "antrea.io/antrea/pkg/agent/ipassigner/linkmonitor" ipassignertest "antrea.io/antrea/pkg/agent/ipassigner/testing" "antrea.io/antrea/pkg/agent/memberlist" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" @@ -137,7 +138,7 @@ func (c *fakeSingleNodeCluster) AddClusterEventHandler(handler memberlist.Cluste func mockNewIPAssigner(ipAssigner ipassigner.IPAssigner) func() { originalNewIPAssigner := newIPAssigner - newIPAssigner = func(_, _ string) (ipassigner.IPAssigner, error) { + newIPAssigner = func(_, _ string, _ linkmonitor.Interface) (ipassigner.IPAssigner, error) { return ipAssigner, nil } return func() { @@ -203,6 +204,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll 255, true, true, + nil, ) egressController.localIPDetector = localIPDetector return &fakeController{ diff --git a/pkg/agent/controller/serviceexternalip/controller.go b/pkg/agent/controller/serviceexternalip/controller.go index e2d2522f2e7..c7c349e07f3 100644 --- a/pkg/agent/controller/serviceexternalip/controller.go +++ b/pkg/agent/controller/serviceexternalip/controller.go @@ -32,6 +32,7 @@ import ( "antrea.io/antrea/pkg/agent/apis" "antrea.io/antrea/pkg/agent/ipassigner" + "antrea.io/antrea/pkg/agent/ipassigner/linkmonitor" "antrea.io/antrea/pkg/agent/memberlist" "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/querier" @@ -77,6 +78,8 @@ type ServiceExternalIPController struct { assignedIPs map[string]sets.Set[string] assignedIPsMutex sync.Mutex + + linkMonitor linkmonitor.Interface } var _ querier.ServiceExternalIPStatusQuerier = (*ServiceExternalIPController)(nil) @@ -87,6 +90,7 @@ func NewServiceExternalIPController( cluster memberlist.Interface, serviceInformer coreinformers.ServiceInformer, endpointsInformer coreinformers.EndpointsInformer, + linkMonitor linkmonitor.Interface, ) (*ServiceExternalIPController, error) { c := &ServiceExternalIPController{ nodeName: nodeName, @@ -105,8 +109,9 @@ func NewServiceExternalIPController( endpointsListerSynced: endpointsInformer.Informer().HasSynced, externalIPStates: make(map[apimachinerytypes.NamespacedName]externalIPState), assignedIPs: make(map[string]sets.Set[string]), + linkMonitor: linkMonitor, } - ipAssigner, err := ipassigner.NewIPAssigner(nodeTransportInterface, "") + ipAssigner, err := ipassigner.NewIPAssigner(nodeTransportInterface, "", linkMonitor) if err != nil { return nil, fmt.Errorf("initializing service external IP assigner failed: %v", err) } @@ -240,7 +245,7 @@ func (c *ServiceExternalIPController) Run(stopCh <-chan struct{}) { klog.Infof("Starting %s", controllerName) defer klog.Infof("Shutting down %s", controllerName) - if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.serviceListerSynced, c.endpointsListerSynced) { + if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.serviceListerSynced, c.endpointsListerSynced, c.linkMonitor.HasSynced) { return } diff --git a/pkg/agent/ipassigner/ip_assigner_linux.go b/pkg/agent/ipassigner/ip_assigner_linux.go index 6256bae61b2..defa0199f5e 100644 --- a/pkg/agent/ipassigner/ip_assigner_linux.go +++ b/pkg/agent/ipassigner/ip_assigner_linux.go @@ -27,6 +27,7 @@ import ( "k8s.io/klog/v2" utilnet "k8s.io/utils/net" + "antrea.io/antrea/pkg/agent/ipassigner/linkmonitor" "antrea.io/antrea/pkg/agent/ipassigner/responder" "antrea.io/antrea/pkg/agent/util" "antrea.io/antrea/pkg/agent/util/arping" @@ -217,7 +218,7 @@ type ipAssigner struct { } // NewIPAssigner returns an *ipAssigner. -func NewIPAssigner(nodeTransportInterface string, dummyDeviceName string) (IPAssigner, error) { +func NewIPAssigner(nodeTransportInterface string, dummyDeviceName string, linkMonitor linkmonitor.Interface) (IPAssigner, error) { ipv4, ipv6, externalInterface, err := util.GetIPNetDeviceByName(nodeTransportInterface) if err != nil { return nil, fmt.Errorf("get IPNetDevice from name %s error: %+v", nodeTransportInterface, err) @@ -242,17 +243,11 @@ func NewIPAssigner(nodeTransportInterface string, dummyDeviceName string) (IPAss return nil, err } if dummyDeviceName == "" || arpIgnore > 0 { - a.defaultAssignee.arpResponder, err = responder.NewARPResponder(externalInterface) - if err != nil { - return nil, fmt.Errorf("failed to create ARP responder for link %s: %v", externalInterface.Name, err) - } + a.defaultAssignee.arpResponder = responder.NewARPResponder(externalInterface.Name, linkMonitor) } } if ipv6 != nil { - a.defaultAssignee.ndpResponder, err = responder.NewNDPResponder(externalInterface) - if err != nil { - return nil, fmt.Errorf("failed to create NDP responder for link %s: %v", externalInterface.Name, err) - } + a.defaultAssignee.ndpResponder = responder.NewNDPResponder(externalInterface.Name, linkMonitor) } if dummyDeviceName != "" { a.defaultAssignee.link, err = ensureDummyDevice(dummyDeviceName) diff --git a/pkg/agent/ipassigner/ip_assigner_windows.go b/pkg/agent/ipassigner/ip_assigner_windows.go index 252974febc9..f5a11518a20 100644 --- a/pkg/agent/ipassigner/ip_assigner_windows.go +++ b/pkg/agent/ipassigner/ip_assigner_windows.go @@ -14,8 +14,12 @@ package ipassigner -import "errors" +import ( + "errors" -func NewIPAssigner(nodeTransportInterface string, dummyDeviceName string) (IPAssigner, error) { + "antrea.io/antrea/pkg/agent/ipassigner/linkmonitor" +) + +func NewIPAssigner(nodeTransportInterface string, dummyDeviceName string, linkMonitor linkmonitor.Interface) (IPAssigner, error) { return nil, errors.New("IPAssigner is not implemented on Windows") } diff --git a/pkg/agent/ipassigner/linkmonitor/link_monitor.go b/pkg/agent/ipassigner/linkmonitor/link_monitor.go new file mode 100644 index 00000000000..b45cbb063bb --- /dev/null +++ b/pkg/agent/ipassigner/linkmonitor/link_monitor.go @@ -0,0 +1,31 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linkmonitor + +type LinkEventHandler func(linkName string) + +type Interface interface { + LinkExists(linkName string) bool + + // Run starts the detector. + Run(stopCh <-chan struct{}) + + // AddEventHandler registers an eventHandler of link updates. It's not thread-safe and should be called before + // starting the detector. If no link name is provided, the handler will be called for all link updates. + AddEventHandler(handler LinkEventHandler, linkNames ...string) + + // HasSynced returns true if the cache has been initialized with the existing links. + HasSynced() bool +} diff --git a/pkg/agent/ipassigner/linkmonitor/link_monitor_linux.go b/pkg/agent/ipassigner/linkmonitor/link_monitor_linux.go new file mode 100644 index 00000000000..98a2cccf829 --- /dev/null +++ b/pkg/agent/ipassigner/linkmonitor/link_monitor_linux.go @@ -0,0 +1,161 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linkmonitor + +import ( + "sync" + "time" + + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + "k8s.io/utils/set" + + utilnetlink "antrea.io/antrea/pkg/agent/util/netlink" +) + +const ( + linkAny = "" +) + +type linkMonitor struct { + mutex sync.RWMutex + cacheSynced bool + linkSubscribeFunc func(ch chan<- netlink.LinkUpdate, done <-chan struct{}, options netlink.LinkSubscribeOptions) error + eventHandlers map[string][]LinkEventHandler + linkNames set.Set[string] // known link names + linkIndexMap map[int32]string // map from link index to link name + netlink utilnetlink.Interface +} + +func NewLinkMonitor() *linkMonitor { + return &linkMonitor{ + linkSubscribeFunc: netlink.LinkSubscribeWithOptions, + eventHandlers: make(map[string][]LinkEventHandler), + linkNames: set.New[string](), + linkIndexMap: make(map[int32]string), + netlink: &netlink.Handle{}, + } +} + +func (d *linkMonitor) HasSynced() bool { + d.mutex.RLock() + defer d.mutex.RUnlock() + return d.cacheSynced +} + +func (d *linkMonitor) AddEventHandler(handler LinkEventHandler, linkNames ...string) { + if len(linkNames) == 0 { + d.eventHandlers[linkAny] = append(d.eventHandlers[linkAny], handler) + return + } + for _, name := range linkNames { + d.eventHandlers[name] = append(d.eventHandlers[name], handler) + } +} + +func (d *linkMonitor) Run(stopCh <-chan struct{}) { + klog.InfoS("Starting LinkMonitor") + + wait.NonSlidingUntil(func() { d.listAndWatchLinks(stopCh) }, 5*time.Second, stopCh) + + <-stopCh +} + +func (d *linkMonitor) listAndWatchLinks(stopCh <-chan struct{}) { + ch := make(chan netlink.LinkUpdate, 100) + if err := d.linkSubscribeFunc(ch, stopCh, netlink.LinkSubscribeOptions{ + ErrorCallback: func(err error) { + klog.ErrorS(err, "Received error from link update subscription") + }, + }); err != nil { + klog.ErrorS(err, "Failed to subscribe link update") + return + } + + links, err := d.netlink.LinkList() + if err != nil { + klog.ErrorS(err, "failed to list links on the Node") + return + } + + d.mutex.Lock() + for _, l := range links { + d.linkIndexMap[int32(l.Attrs().Index)] = l.Attrs().Name + d.linkNames.Insert(l.Attrs().Name) + } + d.cacheSynced = true + d.mutex.Unlock() + + for _, l := range links { + d.notifyHandlers(l.Attrs().Name) + } + + for { + select { + case <-stopCh: + return + case event := <-ch: + eventLinkName := event.Attrs().Name + index := event.Index + previousName, exists := d.linkIndexMap[index] + + isDelete := event.Header.Type == unix.RTM_DELLINK + if isDelete { + delete(d.linkIndexMap, index) + d.deleteLinkName(eventLinkName) + } else { + d.linkIndexMap[index] = eventLinkName + d.addLinkName(eventLinkName) + } + + // For link rename events, notify handlers watching the original name + if exists && previousName != eventLinkName { + d.deleteLinkName(previousName) + d.notifyHandlers(previousName) + } + d.notifyHandlers(eventLinkName) + } + } +} + +func (d *linkMonitor) notifyHandlers(linkName string) { + for _, h := range d.eventHandlers[linkName] { + h(linkName) + } + for _, h := range d.eventHandlers[linkAny] { + h(linkName) + } +} + +// LinkExists checks if the provided interface is configured on the Node. +func (d *linkMonitor) LinkExists(name string) bool { + d.mutex.RLock() + defer d.mutex.RUnlock() + return d.linkNames.Has(name) +} + +func (d *linkMonitor) addLinkName(name string) { + d.mutex.Lock() + defer d.mutex.Unlock() + d.linkNames.Insert(name) +} + +func (d *linkMonitor) deleteLinkName(name string) { + d.mutex.Lock() + defer d.mutex.Unlock() + d.linkNames.Delete(name) +} diff --git a/pkg/agent/ipassigner/linkmonitor/link_monitor_linux_test.go b/pkg/agent/ipassigner/linkmonitor/link_monitor_linux_test.go new file mode 100644 index 00000000000..5d60bfa4723 --- /dev/null +++ b/pkg/agent/ipassigner/linkmonitor/link_monitor_linux_test.go @@ -0,0 +1,286 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linkmonitor + +import ( + "slices" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/vishvananda/netlink" + "github.com/vishvananda/netlink/nl" + "go.uber.org/mock/gomock" + "golang.org/x/sys/unix" + "k8s.io/utils/set" + + netlinktesting "antrea.io/antrea/pkg/agent/util/netlink/testing" +) + +type linkEventHandler struct { + watchLinkNames []string + receivedEvents []string + mutex sync.Mutex + expectedLinkEvents []string +} + +func (l *linkEventHandler) onLinkEvent(linkName string) { + l.mutex.Lock() + defer l.mutex.Unlock() + l.receivedEvents = append(l.receivedEvents, linkName) +} + +func (l *linkEventHandler) getReceivedLinkEvents() []string { + l.mutex.Lock() + defer l.mutex.Unlock() + return slices.Clone(l.receivedEvents) +} + +func newLinkEventHandler(expectedLinkEvents []string, watchLinkNames ...string) *linkEventHandler { + return &linkEventHandler{ + expectedLinkEvents: expectedLinkEvents, + watchLinkNames: watchLinkNames, + } +} + +func newLinkEvent(remove bool, linkName string, index int) netlink.LinkUpdate { + et := unix.RTM_NEWLINK + if remove { + et = unix.RTM_DELLINK + } + e := netlink.LinkUpdate{ + IfInfomsg: nl.IfInfomsg{ + IfInfomsg: unix.IfInfomsg{ + Index: int32(index), + }, + }, + Link: &netlink.Device{ + LinkAttrs: netlink.LinkAttrs{ + Name: linkName, + }, + }, + Header: unix.NlMsghdr{ + Type: uint16(et), + }, + } + return e +} + +func newLink(name string, index int) netlink.Link { + return &netlink.Device{ + LinkAttrs: netlink.LinkAttrs{ + Index: index, + Name: name, + }, + } +} + +func Test_linkMonitor_listAndWatchLinks(t *testing.T) { + tests := []struct { + name string + eventHandlers []*linkEventHandler + initialLinkList []netlink.Link + linkEvents []netlink.LinkUpdate + expectedExistingLinks []string + }{ + { + name: "initial notification", + eventHandlers: []*linkEventHandler{ + newLinkEventHandler([]string{ + "lo", + "eth0", + }), + }, + initialLinkList: []netlink.Link{ + newLink("lo", 1), + newLink("eth0", 2), + }, + expectedExistingLinks: []string{"lo", "eth0"}, + }, + { + name: "watch all links", + eventHandlers: []*linkEventHandler{ + newLinkEventHandler([]string{ + "lo", + "eth0", + "eth1", + }), + }, + linkEvents: []netlink.LinkUpdate{ + newLinkEvent(false, "eth1", 3), + }, + initialLinkList: []netlink.Link{ + newLink("lo", 1), + newLink("eth0", 2), + }, + expectedExistingLinks: []string{"lo", "eth0", "eth1"}, + }, + { + name: "watch eth1", + eventHandlers: []*linkEventHandler{ + newLinkEventHandler([]string{ + "eth1", + }, "eth1"), + }, + linkEvents: []netlink.LinkUpdate{ + newLinkEvent(false, "eth1", 3), + }, + initialLinkList: []netlink.Link{ + newLink("lo", 1), + newLink("eth0", 2), + }, + expectedExistingLinks: []string{"lo", "eth0", "eth1"}, + }, + { + name: "watch eth1 and eth1 deleted", + eventHandlers: []*linkEventHandler{ + newLinkEventHandler([]string{ + "eth1", // initial notification + "eth1", // delete notification + }, "eth1"), + }, + linkEvents: []netlink.LinkUpdate{ + newLinkEvent(true, "eth1", 3), + }, + initialLinkList: []netlink.Link{ + newLink("lo", 1), + newLink("eth0", 2), + newLink("eth1", 3), + }, + expectedExistingLinks: []string{"lo", "eth0"}, + }, + { + name: "watch eth1 and eth1 renamed", + eventHandlers: []*linkEventHandler{ + newLinkEventHandler([]string{ + "eth1", // initial notification + "eth1", // rename notification + }, "eth1"), + }, + linkEvents: []netlink.LinkUpdate{ + newLinkEvent(false, "eth1~", 3), + }, + initialLinkList: []netlink.Link{ + newLink("lo", 1), + newLink("eth0", 2), + newLink("eth1", 3), + }, + expectedExistingLinks: []string{"lo", "eth0", "eth1~"}, + }, + { + name: "watch eth1, eth1 renamed and created with new index", + eventHandlers: []*linkEventHandler{ + newLinkEventHandler([]string{ + "eth1", // initial notification + "eth1", // rename notification + "eth1", // new link notification + }, "eth1"), + }, + linkEvents: []netlink.LinkUpdate{ + newLinkEvent(false, "eth1~", 3), + newLinkEvent(false, "eth1", 4), + }, + initialLinkList: []netlink.Link{ + newLink("lo", 1), + newLink("eth0", 2), + newLink("eth1", 3), + }, + expectedExistingLinks: []string{"lo", "eth0", "eth1", "eth1~"}, + }, + { + name: "two different handlers watching the same link name", + eventHandlers: []*linkEventHandler{ + newLinkEventHandler([]string{ + "eth1", // add notification + }, "eth1"), + newLinkEventHandler([]string{ + "eth1", // add notification + }, "eth1"), + }, + linkEvents: []netlink.LinkUpdate{ + newLinkEvent(false, "eth1", 3), + }, + initialLinkList: []netlink.Link{ + newLink("lo", 1), + newLink("eth0", 2), + }, + expectedExistingLinks: []string{"lo", "eth0", "eth1"}, + }, + { + name: "two different handlers watching different interface link names", + eventHandlers: []*linkEventHandler{ + newLinkEventHandler([]string{ + "eth0", // initial notification + "eth0", // delete notification + }, "eth0"), + newLinkEventHandler([]string{ + "eth1", // add notification + }, "eth1"), + }, + linkEvents: []netlink.LinkUpdate{ + newLinkEvent(false, "eth1", 3), + newLinkEvent(true, "eth0", 2), + }, + initialLinkList: []netlink.Link{ + newLink("lo", 1), + newLink("eth0", 2), + }, + expectedExistingLinks: []string{"lo", "eth1"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + + mockLinkSubscribeFunc := func(ch chan<- netlink.LinkUpdate, done <-chan struct{}, options netlink.LinkSubscribeOptions) error { + go func() { + for _, e := range tt.linkEvents { + ch <- e + } + }() + return nil + } + + netlink := netlinktesting.NewMockInterface(ctrl) + netlink.EXPECT().LinkList().Return(tt.initialLinkList, nil) + + d := &linkMonitor{ + linkSubscribeFunc: mockLinkSubscribeFunc, + eventHandlers: map[string][]LinkEventHandler{}, + linkNames: set.New[string](), + linkIndexMap: map[int32]string{}, + netlink: netlink, + } + for _, h := range tt.eventHandlers { + d.AddEventHandler(h.onLinkEvent, h.watchLinkNames...) + } + + stopCh := make(chan struct{}) + defer close(stopCh) + go d.listAndWatchLinks(stopCh) + assert.EventuallyWithT( + t, + func(t *assert.CollectT) { + for _, l := range tt.eventHandlers { + assert.Equal(t, l.expectedLinkEvents, l.getReceivedLinkEvents()) + } + assert.ElementsMatch(t, tt.expectedExistingLinks, d.linkNames.UnsortedList()) + }, + 1*time.Second, 100*time.Millisecond, "timeout waiting for link events", + ) + }) + } +} diff --git a/pkg/agent/ipassigner/linkmonitor/link_monitor_windows.go b/pkg/agent/ipassigner/linkmonitor/link_monitor_windows.go new file mode 100644 index 00000000000..2091ce10071 --- /dev/null +++ b/pkg/agent/ipassigner/linkmonitor/link_monitor_windows.go @@ -0,0 +1,36 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package linkmonitor + +type linkMonitor struct { +} + +func NewLinkMonitor() *linkMonitor { + return &linkMonitor{} +} + +func (d *linkMonitor) HasSynced() bool { + return false +} + +func (d *linkMonitor) AddEventHandler(handler LinkEventHandler, linkNames ...string) { +} + +func (d *linkMonitor) Run(stopCh <-chan struct{}) { +} + +func (d *linkMonitor) LinkExists(name string) bool { + return false +} diff --git a/pkg/agent/ipassigner/responder/arp_responder.go b/pkg/agent/ipassigner/responder/arp_responder.go index a2cef75a9bf..6c502cf0a37 100644 --- a/pkg/agent/ipassigner/responder/arp_responder.go +++ b/pkg/agent/ipassigner/responder/arp_responder.go @@ -18,36 +18,27 @@ import ( "fmt" "net" "sync" + "time" "github.com/mdlayher/arp" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" ) type arpResponder struct { - iface *net.Interface - conn *arp.Client + once sync.Once + linkName string assignedIPs sets.Set[string] mutex sync.Mutex + linkEventCh chan struct{} } var _ Responder = (*arpResponder)(nil) -func NewARPResponder(iface *net.Interface) (*arpResponder, error) { - conn, err := arp.Dial(iface) - if err != nil { - return nil, fmt.Errorf("creating ARP responder for %q: %s", iface.Name, err) - } - return &arpResponder{ - iface: iface, - conn: conn, - assignedIPs: sets.New[string](), - }, nil -} - func (r *arpResponder) InterfaceName() string { - return r.iface.Name + return r.linkName } func (r *arpResponder) AddIP(ip net.IP) error { @@ -55,7 +46,7 @@ func (r *arpResponder) AddIP(ip net.IP) error { return fmt.Errorf("only IPv4 is supported") } if r.addIP(ip) { - klog.InfoS("Assigned IP to ARP responder", "ip", ip, "interface", r.iface.Name) + klog.InfoS("Assigned IP to ARP responder", "ip", ip, "interface", r.linkName) } return nil } @@ -65,13 +56,13 @@ func (r *arpResponder) RemoveIP(ip net.IP) error { return fmt.Errorf("only IPv4 is supported") } if r.deleteIP(ip) { - klog.InfoS("Removed IP from ARP responder", "ip", ip, "interface", r.iface.Name) + klog.InfoS("Removed IP from ARP responder", "ip", ip, "interface", r.linkName) } return nil } -func (r *arpResponder) handleARPRequest() error { - pkt, _, err := r.conn.Read() +func (r *arpResponder) handleARPRequest(client *arp.Client, iface *net.Interface) error { + pkt, _, err := client.Read() if err != nil { return err } @@ -79,26 +70,74 @@ func (r *arpResponder) handleARPRequest() error { return nil } if !r.isIPAssigned(pkt.TargetIP) { - klog.V(4).InfoS("Ignored ARP request", "ip", pkt.TargetIP, "interface", r.iface.Name) + klog.V(4).InfoS("Ignored ARP request", "ip", pkt.TargetIP, "interface", r.linkName) return nil } - if err := r.conn.Reply(pkt, r.iface.HardwareAddr, pkt.TargetIP); err != nil { + if err := client.Reply(pkt, iface.HardwareAddr, pkt.TargetIP); err != nil { return fmt.Errorf("failed to reply ARP packet for IP %s: %v", pkt.TargetIP, err) } - klog.V(4).InfoS("Sent ARP response", "ip", pkt.TargetIP, "interface", r.iface.Name) + klog.V(4).InfoS("Sent ARP response", "ip", pkt.TargetIP, "interface", r.linkName) return nil } func (r *arpResponder) Run(stopCh <-chan struct{}) { + // The responder instance is created by the factory and can be shared by multiple callers. + // Using once.Do here ensures it is started only once. + r.once.Do(func() { + go wait.NonSlidingUntil(func() { + r.dialAndHandleRequests(stopCh) + }, time.Second, stopCh) + }) + <-stopCh +} + +func (r *arpResponder) dialAndHandleRequests(stopCh <-chan struct{}) { + transportInterface, err := net.InterfaceByName(r.linkName) + if err != nil { + klog.ErrorS(err, "Failed to get interface by name", "deviceName", r.linkName) + return + } + client, err := arp.Dial(transportInterface) + if err != nil { + klog.ErrorS(err, "Failed to dial ARP client", "deviceName", r.linkName) + return + } + reloadCh := make(chan struct{}) + + klog.InfoS("ARP responder started", "interface", transportInterface.Name, "index", transportInterface.Index) + defer klog.InfoS("ARP responder stopped", "interface", transportInterface.Name, "index", transportInterface.Index) + + go func() { + defer client.Close() + defer close(reloadCh) + + for { + select { + case <-stopCh: + return + case <-r.linkEventCh: + newTransportInterface, err := net.InterfaceByName(r.linkName) + if err != nil { + klog.ErrorS(err, "Failed to get interface by name", "name", r.linkName) + continue + } + if transportInterface.Index != newTransportInterface.Index { + klog.InfoS("Transport interface index changed, restarting ARP responder", "name", transportInterface.Name, "oldIndex", transportInterface.Index, "newIndex", newTransportInterface.Index) + return + } + klog.V(4).InfoS("Transport interface not changed") + } + } + }() + for { select { - case <-stopCh: - r.conn.Close() + case <-reloadCh: return default: - err := r.handleARPRequest() + err := r.handleARPRequest(client, transportInterface) if err != nil { - klog.ErrorS(err, "Failed to handle ARP request", "deviceName", r.iface.Name) + klog.ErrorS(err, "Failed to handle ARP request", "deviceName", r.linkName) } } } @@ -129,3 +168,12 @@ func (r *arpResponder) addIP(ip net.IP) bool { } return !exist } + +func (r *arpResponder) onLinkUpdate(linkName string) { + klog.V(4).InfoS("Received link update event", "name", linkName) + select { + // if an event is already present in the channel, we can drop this new one as we only monitor one link + case r.linkEventCh <- struct{}{}: + default: + } +} diff --git a/pkg/agent/ipassigner/responder/arp_responder_test.go b/pkg/agent/ipassigner/responder/arp_responder_test.go index fcb05a8bb9a..176f6c56288 100644 --- a/pkg/agent/ipassigner/responder/arp_responder_test.go +++ b/pkg/agent/ipassigner/responder/arp_responder_test.go @@ -103,14 +103,13 @@ func TestARPResponder_HandleARPRequest(t *testing.T) { assignedIPs.Insert(ip.String()) } r := arpResponder{ - iface: localIface, - conn: localARPClient, + linkName: localIface.Name, assignedIPs: sets.New[string](), } for _, ip := range tt.assignedIPs { r.AddIP(ip) } - err = r.handleARPRequest() + err = r.handleARPRequest(localARPClient, localIface) require.NoError(t, err) // We cannot use remoteARPClient.ReadFrom as it is blocking. replyB, addr, err := remoteConn.Receive() @@ -159,7 +158,7 @@ func Test_arpResponder_addIP(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := &arpResponder{ - iface: iface, + linkName: iface.Name, assignedIPs: tt.assignedIPs, } err := r.AddIP(tt.ip) @@ -207,7 +206,7 @@ func Test_arpResponder_removeIP(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := &arpResponder{ - iface: iface, + linkName: iface.Name, assignedIPs: tt.assignedIPs, } err := r.RemoveIP(tt.ip) diff --git a/pkg/agent/ipassigner/responder/factory.go b/pkg/agent/ipassigner/responder/factory.go new file mode 100644 index 00000000000..7cf3f747753 --- /dev/null +++ b/pkg/agent/ipassigner/responder/factory.go @@ -0,0 +1,72 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package responder + +import ( + "net/netip" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/ipassigner/linkmonitor" +) + +var ( + // map of transportInterfaceName to ARP responder + arpResponders = make(map[string]*arpResponder) + // map of transportInterfaceName to NDP responder + ndpResponders = make(map[string]*ndpResponder) +) + +// NewARPResponder creates a new ARP responder if it does not exist for the given transportInterfaceName. +// This function is not thread-safe. +func NewARPResponder(transportInterfaceName string, linkMonitor linkmonitor.Interface) *arpResponder { + if responder, ok := arpResponders[transportInterfaceName]; ok { + klog.InfoS("ARP responder already exists", "interface", transportInterfaceName) + return responder + } + a := &arpResponder{ + linkName: transportInterfaceName, + assignedIPs: sets.New[string](), + linkEventCh: make(chan struct{}, 1), + } + if linkMonitor != nil { + linkMonitor.AddEventHandler(a.onLinkUpdate, transportInterfaceName) + } + klog.InfoS("Created new ARP responder", "interface", transportInterfaceName) + arpResponders[transportInterfaceName] = a + return a +} + +// NewNDPResponder creates a new NDP responder if it does not exist for the given transportInterfaceName. +// This function is not thread-safe. +func NewNDPResponder(transportInterfaceName string, linkMonitor linkmonitor.Interface) *ndpResponder { + if responder, ok := ndpResponders[transportInterfaceName]; ok { + klog.InfoS("NDP responder already exists", "interface", transportInterfaceName) + return responder + } + n := &ndpResponder{ + linkName: transportInterfaceName, + multicastGroups: make(map[netip.Addr]int), + assignedIPs: sets.New[netip.Addr](), + linkEventCh: make(chan struct{}, 1), + } + if linkMonitor != nil { + linkMonitor.AddEventHandler(n.onLinkUpdate, transportInterfaceName) + } + klog.InfoS("Created new NDP responder", "interface", transportInterfaceName) + ndpResponders[transportInterfaceName] = n + return n +} diff --git a/pkg/agent/ipassigner/responder/ndp_responder.go b/pkg/agent/ipassigner/responder/ndp_responder.go index 103843fecdf..931d3e59cec 100644 --- a/pkg/agent/ipassigner/responder/ndp_responder.go +++ b/pkg/agent/ipassigner/responder/ndp_responder.go @@ -17,64 +17,56 @@ package responder import ( "fmt" "net" + "net/netip" "sync" + "time" "github.com/mdlayher/ndp" "golang.org/x/net/ipv6" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" ) -const ( - solicitedNodeMulticastAddressPrefix = "ff02::1:ff00:0" +var ( + solicitedNodeMulticastAddressPrefix = netip.MustParseAddr("ff02::1:ff00:0") ) type ndpConn interface { - WriteTo(message ndp.Message, cm *ipv6.ControlMessage, dstIP net.IP) error - ReadFrom() (ndp.Message, *ipv6.ControlMessage, net.IP, error) - JoinGroup(net.IP) error - LeaveGroup(net.IP) error + WriteTo(message ndp.Message, cm *ipv6.ControlMessage, dstIP netip.Addr) error + ReadFrom() (ndp.Message, *ipv6.ControlMessage, netip.Addr, error) + JoinGroup(netip.Addr) error + LeaveGroup(netip.Addr) error Close() error } type ndpResponder struct { - iface *net.Interface + once sync.Once + linkName string conn ndpConn - assignedIPs sets.Set[string] - multicastGroups map[int]int + linkEventCh chan struct{} + assignedIPs sets.Set[netip.Addr] + multicastGroups map[netip.Addr]int mutex sync.Mutex } var _ Responder = (*ndpResponder)(nil) -func parseIPv6SolicitedNodeMulticastAddress(ip net.IP) (net.IP, int) { - group := net.ParseIP(solicitedNodeMulticastAddressPrefix) +func parseIPv6SolicitedNodeMulticastAddress(ip netip.Addr) netip.Addr { + target := ip.As16() + prefix := solicitedNodeMulticastAddressPrefix.As16() // copy lower 24 bits - copy(group[13:], ip[13:]) - key := int(group[13])<<16 | int(group[14])<<8 | int(group[15]) - return group, key -} - -func NewNDPResponder(iface *net.Interface) (*ndpResponder, error) { - conn, _, err := ndp.Listen(iface, ndp.LinkLocal) - if err != nil { - return nil, err - } - return &ndpResponder{ - iface: iface, - conn: conn, - multicastGroups: make(map[int]int), - assignedIPs: sets.New[string](), - }, nil + copy(prefix[13:], target[13:]) + return netip.AddrFrom16(prefix) } func (r *ndpResponder) InterfaceName() string { - return r.iface.Name + return r.linkName } -func (r *ndpResponder) handleNeighborSolicitation() error { - pkt, _, srcIP, err := r.conn.ReadFrom() +func (r *ndpResponder) handleNeighborSolicitation(conn ndpConn, link *net.Interface) error { + pkt, _, srcIP, err := conn.ReadFrom() if err != nil { return err } @@ -98,7 +90,7 @@ func (r *ndpResponder) handleNeighborSolicitation() error { return nil } if !r.isIPAssigned(ns.TargetAddress) { - klog.V(4).InfoS("Ignored Neighbor Solicitation", "ip", ns.TargetAddress.String(), "interface", r.iface.Name) + klog.V(4).InfoS("Ignored Neighbor Solicitation", "ip", ns.TargetAddress, "interface", r.linkName) return nil } na := &ndp.NeighborAdvertisement{ @@ -108,27 +100,90 @@ func (r *ndpResponder) handleNeighborSolicitation() error { Options: []ndp.Option{ &ndp.LinkLayerAddress{ Direction: ndp.Target, - Addr: r.iface.HardwareAddr, + Addr: link.HardwareAddr, }, }, } - if err := r.conn.WriteTo(na, nil, srcIP); err != nil { + if err := conn.WriteTo(na, nil, srcIP); err != nil { return err } - klog.V(4).InfoS("Sent Neighbor Advertisement", "ip", ns.TargetAddress.String(), "interface", r.iface.Name) + klog.V(4).InfoS("Sent Neighbor Advertisement", "ip", ns.TargetAddress.String(), "interface", r.linkName) return nil } func (r *ndpResponder) Run(stopCh <-chan struct{}) { + // The responder instance is created by the factory and can be shared by multiple callers. + // Using once.Do here ensures it is started only once. + r.once.Do(func() { + go wait.NonSlidingUntil(func() { + r.dialAndHandleRequests(stopCh) + }, time.Second, stopCh) + }) + <-stopCh +} + +func (r *ndpResponder) dialAndHandleRequests(stopCh <-chan struct{}) { + transportInterface, err := net.InterfaceByName(r.linkName) + if err != nil { + klog.ErrorS(err, "Failed to get interface", "interface", r.linkName) + return + } + + // It may take time for the interface to be ready for socket binding. For example, IPv6 introduces Duplicate Address Detection, + // which may take time to allow the address to be used for socket binding. EADDRNOTAVAIL (bind: cannot assign requested address) + // may be returned for such cases. + klog.InfoS("Binding NDP responder on interface", "interface", r.linkName) + conn, _, err := ndp.Listen(transportInterface, ndp.LinkLocal) + if err != nil { + klog.ErrorS(err, "Failed to create NDP responder", "interface", r.linkName) + return + } + + r.mutex.Lock() + r.conn = conn + for ip := range r.assignedIPs { + if err := r.joinMulticastGroup(ip); err != nil { + klog.ErrorS(err, "Failed to join multicast group", "ip", ip, "interface", r.linkName) + } + } + r.mutex.Unlock() + + reloadCh := make(chan struct{}) + + klog.InfoS("NDP responder started", "interface", transportInterface.Name, "index", transportInterface.Index) + defer klog.InfoS("NDP responder stopped", "interface", transportInterface.Name, "index", transportInterface.Index) + + go func() { + defer conn.Close() + defer close(reloadCh) + + for { + select { + case <-stopCh: + return + case <-r.linkEventCh: + newTransportInterface, err := net.InterfaceByName(r.linkName) + if err != nil { + klog.ErrorS(err, "Failed to get interface by name", "name", r.linkName) + continue + } + if transportInterface.Index != newTransportInterface.Index { + klog.InfoS("Transport interface index changed, restarting NDP responder", "name", transportInterface.Name, "oldIndex", transportInterface.Index, "newIndex", newTransportInterface.Index) + return + } + klog.V(4).InfoS("Transport interface not changed") + } + } + }() + for { select { - case <-stopCh: - r.conn.Close() + case <-reloadCh: return default: - err := r.handleNeighborSolicitation() + err := r.handleNeighborSolicitation(conn, transportInterface) if err != nil { - klog.ErrorS(err, "Failed to handle Neighbor Solicitation", "deviceName", r.iface.Name) + klog.ErrorS(err, "Failed to handle Neighbor Solicitation", "deviceName", r.linkName) } } } @@ -138,26 +193,55 @@ func (r *ndpResponder) AddIP(ip net.IP) error { if !utilnet.IsIPv6(ip) { return fmt.Errorf("only IPv6 is supported") } - if r.isIPAssigned(ip) { + + target, _ := netip.AddrFromSlice(ip) + r.mutex.Lock() + defer r.mutex.Unlock() + + if r.assignedIPs.Has(target) { return nil } - group, key := parseIPv6SolicitedNodeMulticastAddress(ip) - if err := func() error { - r.mutex.Lock() - defer r.mutex.Unlock() - if r.multicastGroups[key] == 0 { - if err := r.conn.JoinGroup(group); err != nil { - return fmt.Errorf("joining solicited-node multicast group %s for %q failed: %v", group, ip, err) - } - klog.InfoS("Joined solicited-node multicast group", "group", group, "interface", r.iface.Name) - } - klog.InfoS("Assigned IP to NDP responder", "ip", ip, "interface", r.iface.Name) - r.multicastGroups[key]++ - r.assignedIPs.Insert(ip.String()) - return nil - }(); err != nil { + if err := r.joinMulticastGroup(target); err != nil { return err } + r.assignedIPs.Insert(target) + + return nil +} + +func (r *ndpResponder) joinMulticastGroup(ip netip.Addr) error { + if r.conn == nil { + klog.InfoS("NDP responder is not initialized") + return nil + } + group := parseIPv6SolicitedNodeMulticastAddress(ip) + if r.multicastGroups[group] > 0 { + r.multicastGroups[group]++ + return nil + } + if err := r.conn.JoinGroup(group); err != nil { + return fmt.Errorf("joining multicast group %s failed: %v", group, err) + } + klog.InfoS("Joined multicast group", "group", group, "interface", r.linkName) + r.multicastGroups[group]++ + return nil +} + +func (r *ndpResponder) leaveMulticastGroup(ip netip.Addr) error { + if r.conn == nil { + klog.InfoS("NDP responder is not initialized") + return nil + } + group := parseIPv6SolicitedNodeMulticastAddress(ip) + if r.multicastGroups[group] > 1 { + r.multicastGroups[group]-- + return nil + } + if err := r.conn.LeaveGroup(group); err != nil { + return fmt.Errorf("leaving multicast group %s failed: %v", group, err) + } + klog.InfoS("Left multicast group", "group", group, "interface", r.linkName) + delete(r.multicastGroups, group) return nil } @@ -165,28 +249,33 @@ func (r *ndpResponder) RemoveIP(ip net.IP) error { if !utilnet.IsIPv6(ip) { return fmt.Errorf("only IPv6 is supported") } + target, _ := netip.AddrFromSlice(ip) r.mutex.Lock() defer r.mutex.Unlock() - if !r.assignedIPs.Has(ip.String()) { + + if !r.assignedIPs.Has(target) { return nil } - group, key := parseIPv6SolicitedNodeMulticastAddress(ip) - if r.multicastGroups[key] == 1 { - if err := r.conn.LeaveGroup(group); err != nil { - return fmt.Errorf("leaving solicited-node multicast group %s for %q failed: %v", group, ip, err) - } - klog.InfoS("Left solicited-node multicast group", "group", group, "interface", r.iface.Name) - delete(r.multicastGroups, key) - } else { - r.multicastGroups[key]-- + if err := r.leaveMulticastGroup(target); err != nil { + return err } - r.assignedIPs.Delete(ip.String()) - klog.InfoS("Removed IP from NDP responder", "ip", ip, "interface", r.iface.Name) + r.assignedIPs.Delete(target) + + klog.InfoS("Removed IP from NDP responder", "ip", ip, "interface", r.linkName) return nil } -func (r *ndpResponder) isIPAssigned(ip net.IP) bool { +func (r *ndpResponder) isIPAssigned(ip netip.Addr) bool { r.mutex.Lock() defer r.mutex.Unlock() - return r.assignedIPs.Has(ip.String()) + return r.assignedIPs.Has(ip) +} + +func (r *ndpResponder) onLinkUpdate(linkName string) { + klog.V(4).InfoS("Received link update event", "name", linkName) + select { + // if an event is already present in the channel, we can drop this new one as we only monitor one link + case r.linkEventCh <- struct{}{}: + default: + } } diff --git a/pkg/agent/ipassigner/responder/ndp_responder_test.go b/pkg/agent/ipassigner/responder/ndp_responder_test.go index 5d2f66d4c14..2ec9be4dca8 100644 --- a/pkg/agent/ipassigner/responder/ndp_responder_test.go +++ b/pkg/agent/ipassigner/responder/ndp_responder_test.go @@ -16,7 +16,7 @@ package responder import ( "bytes" - "net" + "net/netip" "testing" "github.com/mdlayher/ndp" @@ -26,17 +26,17 @@ import ( ) type fakeNDPConn struct { - readFrom func() (ndp.Message, *ipv6.ControlMessage, net.IP, error) - writeTo func(ndp.Message, *ipv6.ControlMessage, net.IP) error - joinGroup func(ip net.IP) error - leaveGroup func(ip net.IP) error + readFrom func() (ndp.Message, *ipv6.ControlMessage, netip.Addr, error) + writeTo func(ndp.Message, *ipv6.ControlMessage, netip.Addr) error + joinGroup func(ip netip.Addr) error + leaveGroup func(ip netip.Addr) error } -func (c *fakeNDPConn) ReadFrom() (ndp.Message, *ipv6.ControlMessage, net.IP, error) { +func (c *fakeNDPConn) ReadFrom() (ndp.Message, *ipv6.ControlMessage, netip.Addr, error) { return c.readFrom() } -func (c *fakeNDPConn) WriteTo(message ndp.Message, cm *ipv6.ControlMessage, dstIP net.IP) error { +func (c *fakeNDPConn) WriteTo(message ndp.Message, cm *ipv6.ControlMessage, dstIP netip.Addr) error { return c.writeTo(message, cm, dstIP) } @@ -44,11 +44,11 @@ func (c *fakeNDPConn) Close() error { return nil } -func (c *fakeNDPConn) JoinGroup(ip net.IP) error { +func (c *fakeNDPConn) JoinGroup(ip netip.Addr) error { return c.joinGroup(ip) } -func (c *fakeNDPConn) LeaveGroup(ip net.IP) error { +func (c *fakeNDPConn) LeaveGroup(ip netip.Addr) error { return c.leaveGroup(ip) } @@ -59,8 +59,8 @@ func TestNDPResponder_handleNeighborSolicitation(t *testing.T) { tests := []struct { name string requestMessage []byte - requestIP net.IP - assignedIPs []net.IP + requestIP netip.Addr + assignedIPs []netip.Addr expectError bool expectedReply []byte }{ @@ -76,10 +76,10 @@ func TestNDPResponder_handleNeighborSolicitation(t *testing.T) { 0x01, // length (units of 8 octets including type and length fields) 0x00, 0x11, 0x22, 0x33, 0x44, 0x66, // hardware address }, - requestIP: net.ParseIP("fe80::c1"), - assignedIPs: []net.IP{ - net.ParseIP("fe80::a1"), - net.ParseIP("fe80::a2"), + requestIP: netip.MustParseAddr("fe80::c1"), + assignedIPs: []netip.Addr{ + netip.MustParseAddr("fe80::a1"), + netip.MustParseAddr("fe80::a2"), }, expectError: false, expectedReply: []byte{ @@ -105,10 +105,10 @@ func TestNDPResponder_handleNeighborSolicitation(t *testing.T) { 0x01, // length (units of 8 octets including type and length fields) 0x00, 0x11, 0x22, 0x33, 0x44, 0x66, // hardware address }, - requestIP: net.ParseIP("fe80::c1"), - assignedIPs: []net.IP{ - net.ParseIP("fe80::a1"), - net.ParseIP("fe80::a2"), + requestIP: netip.MustParseAddr("fe80::c1"), + assignedIPs: []netip.Addr{ + netip.MustParseAddr("fe80::a1"), + netip.MustParseAddr("fe80::a2"), }, expectError: false, expectedReply: nil, @@ -118,13 +118,13 @@ func TestNDPResponder_handleNeighborSolicitation(t *testing.T) { t.Run(tt.name, func(t *testing.T) { buffer := bytes.NewBuffer(nil) fakeConn := &fakeNDPConn{ - writeTo: func(msg ndp.Message, _ *ipv6.ControlMessage, _ net.IP) error { + writeTo: func(msg ndp.Message, _ *ipv6.ControlMessage, _ netip.Addr) error { bs, err := ndp.MarshalMessage(msg) assert.NoError(t, err) buffer.Write(bs) return nil }, - readFrom: func() (ndp.Message, *ipv6.ControlMessage, net.IP, error) { + readFrom: func() (ndp.Message, *ipv6.ControlMessage, netip.Addr, error) { msg, err := ndp.ParseMessage(tt.requestMessage) return msg, nil, tt.requestIP, err }, @@ -134,14 +134,14 @@ func TestNDPResponder_handleNeighborSolicitation(t *testing.T) { assignedIPs.Insert(ip.String()) } responder := &ndpResponder{ - iface: iface, + linkName: iface.Name, conn: fakeConn, - assignedIPs: sets.New[string](), + assignedIPs: sets.New[netip.Addr](), } for _, ip := range tt.assignedIPs { - responder.assignedIPs[ip.String()] = struct{}{} + responder.assignedIPs[ip] = struct{}{} } - err := responder.handleNeighborSolicitation() + err := responder.handleNeighborSolicitation(fakeConn, iface) if tt.expectError { assert.Error(t, err) } else { @@ -155,34 +155,30 @@ func TestNDPResponder_handleNeighborSolicitation(t *testing.T) { func Test_parseIPv6SolicitedNodeMulticastAddress(t *testing.T) { tests := []struct { name string - ip net.IP - expectedGroup net.IP + ip netip.Addr + expectedGroup netip.Addr expectedKey int }{ { name: "global unicast IPv6 address 1", - ip: net.ParseIP("2022:abcd::11:1111"), - expectedGroup: net.ParseIP("ff02::1:ff11:1111"), - expectedKey: 0x111111, + ip: netip.MustParseAddr("2022:abcd::11:1111"), + expectedGroup: netip.MustParseAddr("ff02::1:ff11:1111"), }, { name: "global unicast IPv6 address 2", - ip: net.ParseIP("2022:ffff::1234:5678"), - expectedGroup: net.ParseIP("ff02::1:ff34:5678"), - expectedKey: 0x345678, + ip: netip.MustParseAddr("2022:ffff::1234:5678"), + expectedGroup: netip.MustParseAddr("ff02::1:ff34:5678"), }, { name: "link-local unicast IPv6 address", - ip: net.ParseIP("fe80::1122:3344"), - expectedGroup: net.ParseIP("ff02::1:ff22:3344"), - expectedKey: 0x223344, + ip: netip.MustParseAddr("fe80::1122:3344"), + expectedGroup: netip.MustParseAddr("ff02::1:ff22:3344"), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - group, key := parseIPv6SolicitedNodeMulticastAddress(tt.ip) + group := parseIPv6SolicitedNodeMulticastAddress(tt.ip) assert.Equal(t, tt.expectedGroup, group) - assert.Equal(t, tt.expectedKey, key) }) } } @@ -193,86 +189,86 @@ func Test_ndpResponder_addIP(t *testing.T) { tests := []struct { name string - ip net.IP + ip netip.Addr conn ndpConn - assignedIPs sets.Set[string] - multicastGroups map[int]int - expectedJoinedGroups []net.IP - expectedLeftGroups []net.IP - expectedMulticastGroups map[int]int - expectedAssigndIPs sets.Set[string] + assignedIPs sets.Set[netip.Addr] + multicastGroups map[netip.Addr]int + expectedJoinedGroups []netip.Addr + expectedLeftGroups []netip.Addr + expectedMulticastGroups map[netip.Addr]int + expectedAssigndIPs sets.Set[netip.Addr] expectedError bool }{ { name: "Add new IP from a new multicast group 1", - ip: net.ParseIP("2022::beaf"), - assignedIPs: sets.New[string](), - multicastGroups: map[int]int{}, - expectedJoinedGroups: []net.IP{net.ParseIP("ff02::1:ff00:beaf")}, + ip: netip.MustParseAddr("2022::beaf"), + assignedIPs: sets.New[netip.Addr](), + multicastGroups: map[netip.Addr]int{}, + expectedJoinedGroups: []netip.Addr{netip.MustParseAddr("ff02::1:ff00:beaf")}, expectedLeftGroups: nil, - expectedMulticastGroups: map[int]int{ - 0xbeaf: 1, + expectedMulticastGroups: map[netip.Addr]int{ + netip.MustParseAddr("ff02::1:ff00:beaf"): 1, }, - expectedAssigndIPs: sets.New[string]("2022::beaf"), + expectedAssigndIPs: sets.New[netip.Addr](netip.MustParseAddr("2022::beaf")), }, { name: "Add new IP from a new multicast group 2", - ip: net.ParseIP("2022::beaf:beaf"), - assignedIPs: sets.New[string](), - multicastGroups: map[int]int{}, - expectedJoinedGroups: []net.IP{net.ParseIP("ff02::1:ffaf:beaf")}, + ip: netip.MustParseAddr("2022::beaf:beaf"), + assignedIPs: sets.New[netip.Addr](), + multicastGroups: map[netip.Addr]int{}, + expectedJoinedGroups: []netip.Addr{netip.MustParseAddr("ff02::1:ffaf:beaf")}, expectedLeftGroups: nil, - expectedMulticastGroups: map[int]int{ - 0xafbeaf: 1, + expectedMulticastGroups: map[netip.Addr]int{ + netip.MustParseAddr("ff02::1:ffaf:beaf"): 1, }, - expectedAssigndIPs: sets.New[string]("2022::beaf:beaf"), + expectedAssigndIPs: sets.New[netip.Addr](netip.MustParseAddr("2022::beaf:beaf")), }, { name: "Add new IP from an existing multicast group", - ip: net.ParseIP("2021::beaf"), - assignedIPs: sets.New[string]("2022::beaf"), - multicastGroups: map[int]int{ - 0xbeaf: 1, + ip: netip.MustParseAddr("2021::beaf"), + assignedIPs: sets.New[netip.Addr](netip.MustParseAddr("2022::beaf")), + multicastGroups: map[netip.Addr]int{ + netip.MustParseAddr("ff02::1:ff00:beaf"): 1, }, expectedJoinedGroups: nil, expectedLeftGroups: nil, - expectedMulticastGroups: map[int]int{ - 0xbeaf: 2, + expectedMulticastGroups: map[netip.Addr]int{ + netip.MustParseAddr("ff02::1:ff00:beaf"): 2, }, - expectedAssigndIPs: sets.New[string]("2021::beaf", "2022::beaf"), + expectedAssigndIPs: sets.New[netip.Addr](netip.MustParseAddr("2021::beaf"), netip.MustParseAddr("2022::beaf")), }, { name: "Add invalid IP", - ip: net.ParseIP("1.2.3.4"), - assignedIPs: sets.New[string](), - multicastGroups: map[int]int{}, + ip: netip.MustParseAddr("1.2.3.4"), + assignedIPs: sets.New[netip.Addr](), + multicastGroups: map[netip.Addr]int{}, expectedError: true, - expectedMulticastGroups: map[int]int{}, - expectedAssigndIPs: sets.New[string](), + expectedMulticastGroups: map[netip.Addr]int{}, + expectedAssigndIPs: sets.New[netip.Addr](), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var joinedGroup, leftGroup []net.IP + var joinedGroup, leftGroup []netip.Addr r := &ndpResponder{ - iface: iface, + linkName: iface.Name, conn: &fakeNDPConn{ - joinGroup: func(ip net.IP) error { + joinGroup: func(ip netip.Addr) error { joinedGroup = append(joinedGroup, ip) return nil }, - leaveGroup: func(ip net.IP) error { + leaveGroup: func(ip netip.Addr) error { leftGroup = append(leftGroup, ip) return nil }, - writeTo: func(_ ndp.Message, _ *ipv6.ControlMessage, _ net.IP) error { + writeTo: func(_ ndp.Message, _ *ipv6.ControlMessage, _ netip.Addr) error { return nil }, }, assignedIPs: tt.assignedIPs, multicastGroups: tt.multicastGroups, } - err := r.AddIP(tt.ip) + err := r.AddIP(tt.ip.AsSlice()) if tt.expectedError { assert.Error(t, err) } else { @@ -292,82 +288,85 @@ func Test_ndpResponder_removeIP(t *testing.T) { tests := []struct { name string - ip net.IP + ip netip.Addr conn ndpConn - assignedIPs sets.Set[string] - multicastGroups map[int]int - expectedJoinedGroups []net.IP - expectedLeftGroups []net.IP - expectedMulticastGroups map[int]int - expectedAssigndIPs sets.Set[string] + assignedIPs sets.Set[netip.Addr] + multicastGroups map[netip.Addr]int + expectedJoinedGroups []netip.Addr + expectedLeftGroups []netip.Addr + expectedMulticastGroups map[netip.Addr]int + expectedAssigndIPs sets.Set[netip.Addr] expectedError bool }{ { name: "Remove IP and leave multicast group", - ip: net.ParseIP("2022::beaf"), - assignedIPs: sets.New[string]("2022::beaf"), - multicastGroups: map[int]int{ - 0xbeaf: 1, + ip: netip.MustParseAddr("2022::beaf"), + assignedIPs: sets.New[netip.Addr](netip.MustParseAddr("2022::beaf")), + multicastGroups: map[netip.Addr]int{ + netip.MustParseAddr("ff02::1:ff00:beaf"): 1, }, expectedJoinedGroups: nil, - expectedLeftGroups: []net.IP{net.ParseIP("ff02::1:ff00:beaf")}, - expectedMulticastGroups: map[int]int{}, - expectedAssigndIPs: sets.New[string](), + expectedLeftGroups: []netip.Addr{netip.MustParseAddr("ff02::1:ff00:beaf")}, + expectedMulticastGroups: map[netip.Addr]int{}, + expectedAssigndIPs: sets.New[netip.Addr](), }, { - name: "Remove IP and should not leave multicast group", - ip: net.ParseIP("2022::beaf"), - assignedIPs: sets.New[string]("2022::beaf", "2021::beaf"), - multicastGroups: map[int]int{ - 0xbeaf: 2, + name: "Remove IP and should not leave multicast group", + ip: netip.MustParseAddr("2022::beaf"), + assignedIPs: sets.New[netip.Addr]( + netip.MustParseAddr("2022::beaf"), + netip.MustParseAddr("2021::beaf"), + ), + multicastGroups: map[netip.Addr]int{ + netip.MustParseAddr("ff02::1:ff00:beaf"): 2, }, expectedJoinedGroups: nil, expectedLeftGroups: nil, - expectedMulticastGroups: map[int]int{ - 0xbeaf: 1, + expectedMulticastGroups: map[netip.Addr]int{ + netip.MustParseAddr("ff02::1:ff00:beaf"): 1, }, - expectedAssigndIPs: sets.New[string]("2021::beaf"), + expectedAssigndIPs: sets.New[netip.Addr](netip.MustParseAddr("2021::beaf")), }, { name: "Remove non-existent IP", - ip: net.ParseIP("2022::beaf"), - assignedIPs: sets.New[string]("2021::beaf"), - multicastGroups: map[int]int{ - 0xbeaf: 1, + ip: netip.MustParseAddr("2022::beaf"), + assignedIPs: sets.New[netip.Addr](netip.MustParseAddr("2021::beaf")), + multicastGroups: map[netip.Addr]int{ + netip.MustParseAddr("ff02::1:ff00:beaf"): 1, }, expectedJoinedGroups: nil, expectedLeftGroups: nil, - expectedMulticastGroups: map[int]int{ - 0xbeaf: 1, + expectedMulticastGroups: map[netip.Addr]int{ + netip.MustParseAddr("ff02::1:ff00:beaf"): 1, }, - expectedAssigndIPs: sets.New[string]("2021::beaf"), + expectedAssigndIPs: sets.New[netip.Addr](netip.MustParseAddr("2021::beaf")), }, { name: "Remove invalid IP", - ip: net.ParseIP("1.2.3.4"), - assignedIPs: sets.New[string]("2021::beaf"), - multicastGroups: map[int]int{ - 0xbeaf: 1, + ip: netip.MustParseAddr("1.2.3.4"), + assignedIPs: sets.New[netip.Addr](netip.MustParseAddr("2021::beaf")), + multicastGroups: map[netip.Addr]int{ + netip.MustParseAddr("ff02::1:ff00:beaf"): 1, }, expectedJoinedGroups: nil, expectedLeftGroups: nil, - expectedMulticastGroups: map[int]int{ - 0xbeaf: 1, + expectedMulticastGroups: map[netip.Addr]int{ + netip.MustParseAddr("ff02::1:ff00:beaf"): 1, }, - expectedAssigndIPs: sets.New[string]("2021::beaf"), + expectedAssigndIPs: sets.New[netip.Addr](netip.MustParseAddr("2021::beaf")), expectedError: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var joinedGroup, leftGroup []net.IP + var joinedGroup, leftGroup []netip.Addr r := &ndpResponder{ - iface: iface, + linkName: iface.Name, conn: &fakeNDPConn{ - joinGroup: func(ip net.IP) error { + joinGroup: func(ip netip.Addr) error { joinedGroup = append(joinedGroup, ip) return nil }, - leaveGroup: func(ip net.IP) error { + leaveGroup: func(ip netip.Addr) error { leftGroup = append(leftGroup, ip) return nil }, @@ -375,7 +374,7 @@ func Test_ndpResponder_removeIP(t *testing.T) { assignedIPs: tt.assignedIPs, multicastGroups: tt.multicastGroups, } - err := r.RemoveIP(tt.ip) + err := r.RemoveIP(tt.ip.AsSlice()) if tt.expectedError { assert.Error(t, err) } else { diff --git a/pkg/agent/util/ndp/ndp.go b/pkg/agent/util/ndp/ndp.go index e4723ab113b..147a1c9e91e 100644 --- a/pkg/agent/util/ndp/ndp.go +++ b/pkg/agent/util/ndp/ndp.go @@ -17,6 +17,7 @@ package ndp import ( "fmt" "net" + "net/netip" "github.com/mdlayher/ndp" ) @@ -32,10 +33,10 @@ func GratuitousNDPOverIface(srcIP net.IP, iface *net.Interface) error { return fmt.Errorf("failed to create NDP responder for %q: %s", iface.Name, err) } defer conn.Close() - + target, _ := netip.AddrFromSlice(srcIP) na := &ndp.NeighborAdvertisement{ Override: true, - TargetAddress: srcIP, + TargetAddress: target, Options: []ndp.Option{ &ndp.LinkLayerAddress{ Direction: ndp.Target, @@ -43,5 +44,5 @@ func GratuitousNDPOverIface(srcIP net.IP, iface *net.Interface) error { }, }, } - return conn.WriteTo(na, nil, net.IPv6linklocalallnodes) + return conn.WriteTo(na, nil, netip.IPv6LinkLocalAllNodes()) } diff --git a/pkg/agent/util/netlink/netlink_linux.go b/pkg/agent/util/netlink/netlink_linux.go index ffe56ca02fb..b0b98413bde 100644 --- a/pkg/agent/util/netlink/netlink_linux.go +++ b/pkg/agent/util/netlink/netlink_linux.go @@ -70,5 +70,7 @@ type Interface interface { LinkSetUp(link netlink.Link) error + LinkList() ([]netlink.Link, error) + ConntrackDeleteFilter(table netlink.ConntrackTableType, family netlink.InetFamily, filter netlink.CustomConntrackFilter) (uint, error) } diff --git a/pkg/agent/util/netlink/testing/mock_netlink_linux.go b/pkg/agent/util/netlink/testing/mock_netlink_linux.go index 38c78110d4d..a67d082f73d 100644 --- a/pkg/agent/util/netlink/testing/mock_netlink_linux.go +++ b/pkg/agent/util/netlink/testing/mock_netlink_linux.go @@ -186,6 +186,21 @@ func (mr *MockInterfaceMockRecorder) LinkDelAltName(link, name any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LinkDelAltName", reflect.TypeOf((*MockInterface)(nil).LinkDelAltName), link, name) } +// LinkList mocks base method. +func (m *MockInterface) LinkList() ([]netlink.Link, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LinkList") + ret0, _ := ret[0].([]netlink.Link) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LinkList indicates an expected call of LinkList. +func (mr *MockInterfaceMockRecorder) LinkList() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LinkList", reflect.TypeOf((*MockInterface)(nil).LinkList)) +} + // LinkSetDown mocks base method. func (m *MockInterface) LinkSetDown(link netlink.Link) error { m.ctrl.T.Helper() diff --git a/test/integration/agent/ip_assigner_linux_test.go b/test/integration/agent/ip_assigner_linux_test.go index 1016502753e..ae4571bc0fc 100644 --- a/test/integration/agent/ip_assigner_linux_test.go +++ b/test/integration/agent/ip_assigner_linux_test.go @@ -34,7 +34,7 @@ func TestIPAssigner(t *testing.T) { nodeLinkName := nodeIntf.Name require.NotNil(t, nodeLinkName, "Get Node link failed") - ipAssigner, err := ipassigner.NewIPAssigner(nodeLinkName, dummyDeviceName) + ipAssigner, err := ipassigner.NewIPAssigner(nodeLinkName, dummyDeviceName, nil) require.NoError(t, err, "Initializing IP assigner failed") dummyDevice, err := netlink.LinkByName(dummyDeviceName) @@ -83,7 +83,7 @@ func TestIPAssigner(t *testing.T) { require.NoError(t, err, "Failed to list IP addresses") assert.Equal(t, sets.New[string](fmt.Sprintf("%s/%d", ip1VLAN30, subnet30.PrefixLength)), actualIPs, "Actual IPs don't match") - newIPAssigner, err := ipassigner.NewIPAssigner(nodeLinkName, dummyDeviceName) + newIPAssigner, err := ipassigner.NewIPAssigner(nodeLinkName, dummyDeviceName, nil) require.NoError(t, err, "Initializing new IP assigner failed") assert.Equal(t, map[string]*crdv1b1.SubnetInfo{}, newIPAssigner.AssignedIPs(), "Assigned IPs don't match")