From 647e567d9a560721d08733025d184418b6f28341 Mon Sep 17 00:00:00 2001 From: Hongliang Liu Date: Tue, 2 Apr 2024 20:53:56 +0800 Subject: [PATCH] Add support for `loadBalancerSourceRanges` in LoadBalancer Service For #5493 This commit introduces support for loadBalancerSourceRanges for LoadBalancer Services. Here is an example of a LoadBalancer Service configuration allowing access from specific CIDRs: ```yaml apiVersion: v1 kind: Service metadata: name: sample-loadbalancer-source-ranges spec: selector: app: web ports: - protocol: TCP port: 80 targetPort: 80 type: LoadBalancer loadBalancerSourceRanges: - "192.168.77.0/24" - "192.168.78.0/24" status: loadBalancer: ingress: - ip: 192.168.77.152 ``` To implement `loadBalancerSourceRanges`, a new table LoadBalancerSourceRanges is introduced after table PreRoutingClassifier. Here are the corresponding flows: ```text 1. table=LoadBalancerSourceRanges, priority=200,tcp,nw_src=192.168.77.0/24,nw_dst=192.168.77.152,tp_dst=80 actions=goto_table:SessionAffinity", 2. table=LoadBalancerSourceRanges, priority=200,tcp,nw_src=192.168.78.0/24,nw_dst=192.168.77.152,tp_dst=80 actions=goto_table:SessionAffinity", 3. table=LoadBalancerSourceRanges, priority=190,tcp,nw_dst=192.168.77.152,tp_dst=80 actions=drop", 4. table=LoadBalancerSourceRanges, priority=0 actions=goto_table:SessionAffinity ``` Flows 1-2 allow packets destined for the for sample [LoadBalancer] from CIDRs specified in the `loadBalancerSourceRanges` of the Service. Flow 3, with lower priority, drops packets destined for the sample [LoadBalancer] that don't match any CIDRs within the `loadBalancerSourceRanges`. Signed-off-by: Hongliang Liu Signed-off-by: Hongliang Liu --- cmd/antrea-agent/agent.go | 1 + docs/design/ovs-pipeline.md | 36 +++- pkg/agent/openflow/client.go | 4 + pkg/agent/openflow/client_test.go | 107 ++++++---- pkg/agent/openflow/framework.go | 3 + pkg/agent/openflow/framework_test.go | 14 +- pkg/agent/openflow/pipeline.go | 46 ++++- pkg/agent/openflow/pipeline_test.go | 6 +- pkg/agent/openflow/service.go | 3 + pkg/agent/openflow/service_test.go | 8 +- pkg/agent/proxy/proxier.go | 41 ++-- pkg/agent/proxy/proxier_test.go | 257 +++++++++++++++++++----- pkg/agent/types/service.go | 3 +- test/e2e/proxy_test.go | 82 ++++++++ test/integration/agent/openflow_test.go | 20 +- 15 files changed, 497 insertions(+), 134 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 24740ec0f67..20076c147c4 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -182,6 +182,7 @@ func run(o *Options) error { enableFlowExporter, o.config.AntreaProxy.ProxyAll, features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR), + *o.config.AntreaProxy.ProxyLoadBalancerIPs, connectUplinkToBridge, multicastEnabled, features.DefaultFeatureGate.Enabled(features.TrafficControl), diff --git a/docs/design/ovs-pipeline.md b/docs/design/ovs-pipeline.md index 5f3aecd2f07..7af04258c86 100644 --- a/docs/design/ovs-pipeline.md +++ b/docs/design/ovs-pipeline.md @@ -319,7 +319,8 @@ spec: ### LoadBalancer -A sample LoadBalancer Service with ingress IP `192.168.77.150` assigned by an ingress controller. +A sample LoadBalancer Service with ingress IP `192.168.77.150` assigned by an ingress controller, configured +`loadBalancerSourceRanges` to a CIDR list. ```yaml apiVersion: v1 @@ -334,6 +335,9 @@ spec: port: 80 targetPort: 80 type: LoadBalancer + loadBalancerSourceRanges: + - "192.168.77.0/24" + - "192.168.78.0/24" status: loadBalancer: ingress: @@ -919,7 +923,7 @@ If you dump the flows of this table, you may see the following: ```text 1. table=NodePortMark, priority=200,ip,nw_dst=192.168.77.102 actions=set_field:0x80000/0x80000->reg4 2. table=NodePortMark, priority=200,ip,nw_dst=169.254.0.252 actions=set_field:0x80000/0x80000->reg4 -3. table=NodePortMark, priority=0 actions=goto_table:SessionAffinity +3. table=NodePortMark, priority=0 actions=goto_table:LoadBalancerSourceRanges ``` Flow 1 matches packets destined for the local Node from local Pods. `NodePortRegMark` is loaded, indicating that the @@ -937,6 +941,28 @@ Note that packets of NodePort Services have not been identified in this table by identification of NodePort Services will be done finally in table [ServiceLB] by matching `NodePortRegMark` and the the specific destination port of a NodePort. +### LoadBalancerSourceRanges + +This table is designed to implement `loadBalancerSourceRanges` for LoadBalancer Services. + +If you dump the flows of this table, you may see the following: + +```text +1. table=LoadBalancerSourceRanges, priority=200,tcp,nw_src=192.168.77.0/24,nw_dst=192.168.77.152,tp_dst=80 actions=goto_table:SessionAffinity", +2. table=LoadBalancerSourceRanges, priority=200,tcp,nw_src=192.168.78.0/24,nw_dst=192.168.77.152,tp_dst=80 actions=goto_table:SessionAffinity", +3. table=LoadBalancerSourceRanges, priority=190,tcp,nw_dst=192.168.77.152,tp_dst=80 actions=drop", +4. table=LoadBalancerSourceRanges, priority=0 actions=goto_table:SessionAffinity +``` + +Flows 1-2 are used to match the packets destined for the sample [LoadBalancer], and these packets are also from the +CIDRs within the `loadBalancerSourceRanges` of the Services. + +Flow 3, having lower priority than that of flows 1-2, is also used to match the packets destined for the sample +[LoadBalancer], but these packets, which are not from any CIDRs within the `loadBalancerSourceRanges` of the Services, +will be dropped. + +Flow 4 is the table-miss flow. + ### SessionAffinity This table is designed to implement Service session affinity. The learned flows that cache the information of the @@ -978,7 +1004,7 @@ This table is used to implement Service Endpoint selection. It addresses specifi 3. LoadBalancer, as demonstrated in the example [LoadBalancer]. 4. Service configured with external IPs, as demonstrated in the example [Service with ExternalIP]. 5. Service configured with session affinity, as demonstrated in the example [Service with session affinity]. -6. Service configured with externalTrafficPolicy to `Local`, as demonstrated in the example [Service with +6. Service configured with `externalTrafficPolicy` to `Local`, as demonstrated in the example [Service with ExternalTrafficPolicy Local]. If you dump the flows of this table, you may see the following: @@ -1081,7 +1107,7 @@ If you dump the flows of this table, you may see the following:: ``` Flow 1 is designed for Services without Endpoints. It identifies the first packet of connections destined for such Service -by matching `SvcNoEpRegMark`. Subsequently, the packet is forwarded to the OpenFlow controller (Antrea Agent). For TCP +by matching `SvcRejectRegMark`. Subsequently, the packet is forwarded to the OpenFlow controller (Antrea Agent). For TCP Service traffic, the controller will send a TCP RST, and for all other cases the controller will send an ICMP Destination Unreachable message. @@ -1312,7 +1338,7 @@ the following cases when Antrea Proxy is not enabled: to complete the DNAT processes, e.g., kube-proxy. The destination MAC of the packets is rewritten in the table to avoid it is forwarded to the original client Pod by mistake. - When hairpin is involved, i.e. connections between 2 local Pods, for which NAT is performed. One example is a - Pod accessing a NodePort Service for which externalTrafficPolicy is set to `Local` using the local Node's IP address, + Pod accessing a NodePort Service for which `externalTrafficPolicy` is set to `Local` using the local Node's IP address, as there will be no SNAT for such traffic. Another example could be hostPort support, depending on how the feature is implemented. diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index e6d3ec5ef87..edff72aa7bc 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -799,6 +799,9 @@ func (c *client) InstallServiceFlows(config *types.ServiceConfig) error { if config.IsDSR { flows = append(flows, c.featureService.dsrServiceMarkFlow(config)) } + if c.proxyLoadBalancerIPs && len(config.LoadBalancerSourceRanges) != 0 { + flows = append(flows, c.featureService.loadBalancerSourceRangesValidateFlows(config)...) + } cacheKey := generateServicePortFlowCacheKey(config.ServiceIP, config.ServicePort, config.Protocol) return c.addFlows(c.featureService.cachedFlows, cacheKey, flows) } @@ -940,6 +943,7 @@ func (c *client) generatePipelines() { c.enableProxy, c.proxyAll, c.enableDSR, + c.proxyLoadBalancerIPs, c.connectUplinkToBridge) c.activatedFeatures = append(c.activatedFeatures, c.featureService) c.traceableFeatures = append(c.traceableFeatures, c.featureService) diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 658193b1220..b912f0c0fa5 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -83,6 +83,7 @@ func skipTest(tb testing.TB, skipLinux, skipWindows bool) { type clientOptions struct { enableOVSMeters bool enableProxy bool + proxyLoadBalancerIPs bool enableAntreaPolicy bool enableEgress bool enableEgressTrafficShaping bool @@ -393,9 +394,10 @@ func newFakeClientWithBridge( ) *client { // default options o := &clientOptions{ - enableProxy: true, - enableAntreaPolicy: true, - enableEgress: true, + enableProxy: true, + enableAntreaPolicy: true, + enableEgress: true, + proxyLoadBalancerIPs: true, } for _, fn := range options { fn(o) @@ -412,6 +414,7 @@ func newFakeClientWithBridge( false, o.proxyAll, o.enableDSR, + o.proxyLoadBalancerIPs, o.connectUplinkToBridge, o.enableMulticast, o.enableTrafficControl, @@ -1017,8 +1020,8 @@ func Test_client_GetPodFlowKeys(t *testing.T) { "table=1,priority=200,arp,in_port=11,arp_spa=10.10.0.11,arp_sha=00:00:10:10:00:11", "table=3,priority=190,in_port=11", "table=4,priority=200,ip,in_port=11,dl_src=00:00:10:10:00:11,nw_src=10.10.0.11", - "table=17,priority=200,ip,reg0=0x200/0x200,nw_dst=10.10.0.11", - "table=22,priority=200,dl_dst=00:00:10:10:00:11", + "table=18,priority=200,ip,reg0=0x200/0x200,nw_dst=10.10.0.11", + "table=23,priority=200,dl_dst=00:00:10:10:00:11", } assert.ElementsMatch(t, expectedFlowKeys, flowKeys) } @@ -1254,17 +1257,18 @@ func Test_client_InstallServiceFlows(t *testing.T) { port := uint16(80) testCases := []struct { - name string - trafficPolicyLocal bool - protocol binding.Protocol - svcIP net.IP - affinityTimeout uint16 - isExternal bool - isNodePort bool - isNested bool - isDSR bool - enableMulticluster bool - expectedFlows []string + name string + trafficPolicyLocal bool + protocol binding.Protocol + svcIP net.IP + affinityTimeout uint16 + isExternal bool + isNodePort bool + isNested bool + isDSR bool + enableMulticluster bool + loadBalancerSourceRanges []string + expectedFlows []string }{ { name: "Service ClusterIP", @@ -1449,6 +1453,38 @@ func Test_client_InstallServiceFlows(t *testing.T) { "cookie=0x1030000000064, table=DSRServiceMark, priority=200,tcp6,reg4=0xc000000/0xe000000,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=learn(table=SessionAffinity,idle_timeout=160,fin_idle_timeout=5,priority=210,delete_learned,cookie=0x1030000000064,eth_type=0x86dd,nw_proto=0x6,OXM_OF_TCP_SRC[],OXM_OF_TCP_DST[],NXM_NX_IPV6_SRC[],NXM_NX_IPV6_DST[],load:NXM_NX_REG4[0..15]->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG4[25],load:NXM_NX_XXREG3[]->NXM_NX_XXREG3[]),set_field:0x2000000/0x2000000->reg4,goto_table:EndpointDNAT", }, }, + { + name: "Service LoadBalancer,LoadBalancerSourceRanges,SessionAffinity,Short-circuiting", + protocol: binding.ProtocolSCTP, + svcIP: svcIPv4, + affinityTimeout: uint16(100), + isExternal: true, + trafficPolicyLocal: true, + loadBalancerSourceRanges: []string{"192.168.1.0/24", "192.168.2.0/24"}, + expectedFlows: []string{ + "cookie=0x1030000000000, table=LoadBalancerSourceRanges, priority=200,sctp,nw_src=192.168.1.0/24,nw_dst=10.96.0.100,tp_dst=80 actions=goto_table:SessionAffinity", + "cookie=0x1030000000000, table=LoadBalancerSourceRanges, priority=200,sctp,nw_src=192.168.2.0/24,nw_dst=10.96.0.100,tp_dst=80 actions=goto_table:SessionAffinity", + "cookie=0x1030000000000, table=LoadBalancerSourceRanges, priority=190,sctp,nw_dst=10.96.0.100,tp_dst=80 actions=drop", + "cookie=0x1030000000000, table=ServiceLB, priority=210,sctp,reg4=0x10010000/0x10070000,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x200/0x200->reg0,set_field:0x30000/0x70000->reg4,set_field:0x200000/0x200000->reg4,set_field:0x64->reg7,group:100", + "cookie=0x1030000000000, table=ServiceLB, priority=200,sctp,reg4=0x10000/0x70000,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x200/0x200->reg0,set_field:0x30000/0x70000->reg4,set_field:0x200000/0x200000->reg4,set_field:0x65->reg7,group:101", + "cookie=0x1030000000065, table=ServiceLB, priority=190,sctp,reg4=0x30000/0x70000,nw_dst=10.96.0.100,tp_dst=80 actions=learn(table=SessionAffinity,hard_timeout=100,priority=200,delete_learned,cookie=0x1030000000065,eth_type=0x800,nw_proto=0x84,OXM_OF_SCTP_DST[],NXM_OF_IP_DST[],NXM_OF_IP_SRC[],load:NXM_NX_REG4[0..15]->NXM_NX_REG4[0..15],load:NXM_NX_REG4[26]->NXM_NX_REG4[26],load:NXM_NX_REG3[]->NXM_NX_REG3[],load:0x2->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[9],load:0x1->NXM_NX_REG4[21]),set_field:0x20000/0x70000->reg4,goto_table:EndpointDNAT", + }, + }, + { + name: "Service LoadBalancer,LoadBalancerSourceRanges,IPv6,SessionAffinity", + protocol: binding.ProtocolSCTPv6, + svcIP: svcIPv6, + affinityTimeout: uint16(100), + isExternal: true, + loadBalancerSourceRanges: []string{"fec0:192:168:1::/64", "fec0:192:168:2::/64"}, + expectedFlows: []string{ + "cookie=0x1030000000000, table=LoadBalancerSourceRanges, priority=200,sctp6,ipv6_src=fec0:192:168:1::/64,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=goto_table:SessionAffinity", + "cookie=0x1030000000000, table=LoadBalancerSourceRanges, priority=200,sctp6,ipv6_src=fec0:192:168:2::/64,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=goto_table:SessionAffinity", + "cookie=0x1030000000000, table=LoadBalancerSourceRanges, priority=190,sctp6,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=drop", + "cookie=0x1030000000000, table=ServiceLB, priority=200,sctp6,reg4=0x10000/0x70000,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=set_field:0x200/0x200->reg0,set_field:0x30000/0x70000->reg4,set_field:0x200000/0x200000->reg4,set_field:0x64->reg7,group:100", + "cookie=0x1030000000064, table=ServiceLB, priority=190,sctp6,reg4=0x30000/0x70000,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=learn(table=SessionAffinity,hard_timeout=100,priority=200,delete_learned,cookie=0x1030000000064,eth_type=0x86dd,nw_proto=0x84,OXM_OF_SCTP_DST[],NXM_NX_IPV6_DST[],NXM_NX_IPV6_SRC[],load:NXM_NX_REG4[0..15]->NXM_NX_REG4[0..15],load:NXM_NX_REG4[26]->NXM_NX_REG4[26],load:NXM_NX_XXREG3[]->NXM_NX_XXREG3[],load:0x2->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[9],load:0x1->NXM_NX_REG4[21]),set_field:0x20000/0x70000->reg4,goto_table:EndpointDNAT", + }, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -1471,17 +1507,18 @@ func Test_client_InstallServiceFlows(t *testing.T) { cacheKey := generateServicePortFlowCacheKey(tc.svcIP, port, tc.protocol) assert.NoError(t, fc.InstallServiceFlows(&types.ServiceConfig{ - ServiceIP: tc.svcIP, - ServicePort: port, - Protocol: tc.protocol, - TrafficPolicyLocal: tc.trafficPolicyLocal, - LocalGroupID: localGroupID, - ClusterGroupID: clusterGroupID, - AffinityTimeout: tc.affinityTimeout, - IsExternal: tc.isExternal, - IsNodePort: tc.isNodePort, - IsNested: tc.isNested, - IsDSR: tc.isDSR, + ServiceIP: tc.svcIP, + ServicePort: port, + Protocol: tc.protocol, + TrafficPolicyLocal: tc.trafficPolicyLocal, + LocalGroupID: localGroupID, + ClusterGroupID: clusterGroupID, + AffinityTimeout: tc.affinityTimeout, + IsExternal: tc.isExternal, + IsNodePort: tc.isNodePort, + IsNested: tc.isNested, + IsDSR: tc.isDSR, + LoadBalancerSourceRanges: tc.loadBalancerSourceRanges, })) fCacheI, ok := fc.featureService.cachedFlows.Load(cacheKey) require.True(t, ok) @@ -1527,11 +1564,11 @@ func Test_client_GetServiceFlowKeys(t *testing.T) { assert.NoError(t, fc.InstallEndpointFlows(bindingProtocol, endpoints)) flowKeys := fc.GetServiceFlowKeys(svcIP, svcPort, bindingProtocol, endpoints) expectedFlowKeys := []string{ - "table=11,priority=200,tcp,reg4=0x10000/0x70000,nw_dst=10.96.0.224,tp_dst=80", - "table=11,priority=190,tcp,reg4=0x30000/0x70000,nw_dst=10.96.0.224,tp_dst=80", - "table=12,priority=200,tcp,reg3=0xa0a000b,reg4=0x20050/0x7ffff", - "table=12,priority=200,tcp,reg3=0xa0a000c,reg4=0x20050/0x7ffff", - "table=20,priority=190,ct_state=+new+trk,ip,nw_src=10.10.0.12,nw_dst=10.10.0.12", + "table=12,priority=200,tcp,reg4=0x10000/0x70000,nw_dst=10.96.0.224,tp_dst=80", + "table=12,priority=190,tcp,reg4=0x30000/0x70000,nw_dst=10.96.0.224,tp_dst=80", + "table=13,priority=200,tcp,reg3=0xa0a000b,reg4=0x20050/0x7ffff", + "table=13,priority=200,tcp,reg3=0xa0a000c,reg4=0x20050/0x7ffff", + "table=21,priority=190,ct_state=+new+trk,ip,nw_src=10.10.0.12,nw_dst=10.10.0.12", } assert.ElementsMatch(t, expectedFlowKeys, flowKeys) } @@ -2031,7 +2068,7 @@ func Test_client_setBasePacketOutBuilder(t *testing.T) { } func prepareSetBasePacketOutBuilder(ctrl *gomock.Controller, success bool) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, false, false, nil, false, defaultPacketInRate) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, true, false, false, false, false, false, nil, false, defaultPacketInRate) m := ovsoftest.NewMockBridge(ctrl) ofClient.bridge = m bridge := binding.OFBridge{} @@ -2787,8 +2824,8 @@ func Test_client_ReplayFlows(t *testing.T) { "cookie=0x1020000000000, table=IngressMetric, priority=200,reg0=0x400/0x400,reg3=0xf actions=drop", ) replayedFlows = append(replayedFlows, - "cookie=0x1020000000000, table=IngressRule, priority=200,conj_id=15 actions=set_field:0xf->reg3,set_field:0x400/0x400->reg0,set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x1b/0xff->reg2,group:4", - "cookie=0x1020000000000, table=IngressDefaultRule, priority=200,reg1=0x64 actions=set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x400000/0x600000->reg0,set_field:0x1c/0xff->reg2,goto_table:Output", + "cookie=0x1020000000000, table=IngressRule, priority=200,conj_id=15 actions=set_field:0xf->reg3,set_field:0x400/0x400->reg0,set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x1c/0xff->reg2,group:4", + "cookie=0x1020000000000, table=IngressDefaultRule, priority=200,reg1=0x64 actions=set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x400000/0x600000->reg0,set_field:0x1d/0xff->reg2,goto_table:Output", ) // Feature Pod connectivity replays flows. diff --git a/pkg/agent/openflow/framework.go b/pkg/agent/openflow/framework.go index d8c353ee2a2..0f120161125 100644 --- a/pkg/agent/openflow/framework.go +++ b/pkg/agent/openflow/framework.go @@ -269,6 +269,9 @@ func (f *featureService) getRequiredTables() []*Table { if f.enableDSR { tables = append(tables, DSRServiceMarkTable) } + if f.proxyLoadBalancerIPs { + tables = append(tables, LoadBalancerSourceRangesTable) + } return tables } diff --git a/pkg/agent/openflow/framework_test.go b/pkg/agent/openflow/framework_test.go index 2c69c85ffa5..bc37b83852d 100644 --- a/pkg/agent/openflow/framework_test.go +++ b/pkg/agent/openflow/framework_test.go @@ -36,6 +36,7 @@ var ( defaultOptions = clientOptions{ enableProxy: true, enableAntreaPolicy: true, + proxyLoadBalancerIPs: true, proxyAll: false, connectUplinkToBridge: false, enableMulticast: false, @@ -78,9 +79,10 @@ func newTestFeatureService(options ...clientOptionsFn) *featureService { fn(&o) } return &featureService{ - enableAntreaPolicy: o.enableAntreaPolicy, - enableProxy: o.enableProxy, - proxyAll: o.proxyAll, + enableAntreaPolicy: o.enableAntreaPolicy, + enableProxy: o.enableProxy, + proxyAll: o.proxyAll, + proxyLoadBalancerIPs: o.proxyLoadBalancerIPs, } } @@ -129,6 +131,7 @@ func TestBuildPipeline(t *testing.T) { ConntrackTable, ConntrackStateTable, PreRoutingClassifierTable, + LoadBalancerSourceRangesTable, SessionAffinityTable, ServiceLBTable, EndpointDNATTable, @@ -260,6 +263,7 @@ func TestBuildPipeline(t *testing.T) { ConntrackTable, ConntrackStateTable, PreRoutingClassifierTable, + LoadBalancerSourceRangesTable, SessionAffinityTable, ServiceLBTable, EndpointDNATTable, @@ -304,6 +308,7 @@ func TestBuildPipeline(t *testing.T) { ConntrackTable, ConntrackStateTable, PreRoutingClassifierTable, + LoadBalancerSourceRangesTable, SessionAffinityTable, ServiceLBTable, EndpointDNATTable, @@ -347,6 +352,7 @@ func TestBuildPipeline(t *testing.T) { ConntrackTable, ConntrackStateTable, PreRoutingClassifierTable, + LoadBalancerSourceRangesTable, SessionAffinityTable, ServiceLBTable, EndpointDNATTable, @@ -427,6 +433,7 @@ func TestBuildPipeline(t *testing.T) { ConntrackStateTable, PreRoutingClassifierTable, NodePortMarkTable, + LoadBalancerSourceRangesTable, SessionAffinityTable, ServiceLBTable, EndpointDNATTable, @@ -474,6 +481,7 @@ func TestBuildPipeline(t *testing.T) { ConntrackTable, ConntrackStateTable, PreRoutingClassifierTable, + LoadBalancerSourceRangesTable, SessionAffinityTable, ServiceLBTable, EndpointDNATTable, diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 601e910f186..8d21aa2f1e9 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -138,12 +138,13 @@ var ( // Tables in stagePreRouting: // When proxy is enabled. - PreRoutingClassifierTable = newTable("PreRoutingClassifier", stagePreRouting, pipelineIP) - NodePortMarkTable = newTable("NodePortMark", stagePreRouting, pipelineIP) - SessionAffinityTable = newTable("SessionAffinity", stagePreRouting, pipelineIP) - ServiceLBTable = newTable("ServiceLB", stagePreRouting, pipelineIP) - DSRServiceMarkTable = newTable("DSRServiceMark", stagePreRouting, pipelineIP) - EndpointDNATTable = newTable("EndpointDNAT", stagePreRouting, pipelineIP) + PreRoutingClassifierTable = newTable("PreRoutingClassifier", stagePreRouting, pipelineIP) + NodePortMarkTable = newTable("NodePortMark", stagePreRouting, pipelineIP) + LoadBalancerSourceRangesTable = newTable("LoadBalancerSourceRanges", stagePreRouting, pipelineIP) + SessionAffinityTable = newTable("SessionAffinity", stagePreRouting, pipelineIP) + ServiceLBTable = newTable("ServiceLB", stagePreRouting, pipelineIP) + DSRServiceMarkTable = newTable("DSRServiceMark", stagePreRouting, pipelineIP) + EndpointDNATTable = newTable("EndpointDNAT", stagePreRouting, pipelineIP) // When proxy is disabled. DNATTable = newTable("DNAT", stagePreRouting, pipelineIP) @@ -402,6 +403,7 @@ type client struct { enableL7FlowExporter bool enableMulticluster bool enablePrometheusMetrics bool + proxyLoadBalancerIPs bool connectUplinkToBridge bool nodeType config.NodeType roundInfo types.RoundInfo @@ -2827,6 +2829,7 @@ func NewClient(bridgeName string, enableDenyTracking bool, proxyAll bool, enableDSR bool, + proxyLoadBalancerIPs bool, connectUplinkToBridge bool, enableMulticast bool, enableTrafficControl bool, @@ -2853,6 +2856,7 @@ func NewClient(bridgeName string, enableL7FlowExporter: enableL7FlowExporter, enableMulticluster: enableMulticluster, enablePrometheusMetrics: enablePrometheusMetrics, + proxyLoadBalancerIPs: proxyLoadBalancerIPs, connectUplinkToBridge: connectUplinkToBridge, pipelines: make(map[binding.PipelineID]binding.Pipeline), packetInHandlers: map[uint8]PacketInHandler{}, @@ -3011,7 +3015,7 @@ func (f *featureService) preRoutingClassifierFlows() []binding.Flow { cookieID := f.cookieAllocator.Request(f.category).Raw() var flows []binding.Flow - targetTables := []uint8{SessionAffinityTable.GetID(), ServiceLBTable.GetID()} + targetTables := []uint8{LoadBalancerSourceRangesTable.GetID(), SessionAffinityTable.GetID(), ServiceLBTable.GetID()} if f.proxyAll { targetTables = append([]uint8{NodePortMarkTable.GetID()}, targetTables...) } @@ -3106,6 +3110,34 @@ func (f *featureService) gatewaySNATFlows() []binding.Flow { return flows } +func (f *featureService) loadBalancerSourceRangesValidateFlows(config *types.ServiceConfig) []binding.Flow { + cookieID := f.cookieAllocator.Request(f.category).Raw() + protocol := config.Protocol + ingressIP := config.ServiceIP + port := config.ServicePort + var flows []binding.Flow + for _, srcRange := range config.LoadBalancerSourceRanges { + _, srcIPNet, _ := net.ParseCIDR(srcRange) + flows = append(flows, LoadBalancerSourceRangesTable.ofTable.BuildFlow(priorityNormal). + Cookie(cookieID). + MatchProtocol(protocol). + MatchSrcIPNet(*srcIPNet). + MatchDstIP(ingressIP). + MatchDstPort(port, nil). + Action().NextTable(). + Done(), + ) + } + flows = append(flows, LoadBalancerSourceRangesTable.ofTable.BuildFlow(priorityLow). + Cookie(cookieID). + MatchProtocol(protocol). + MatchDstIP(ingressIP). + MatchDstPort(port, nil). + Action().Drop(). + Done()) + return flows +} + func getCachedFlowMessages(cache *flowCategoryCache) []*openflow15.FlowMod { var flows []*openflow15.FlowMod cache.Range(func(key, value interface{}) bool { diff --git a/pkg/agent/openflow/pipeline_test.go b/pkg/agent/openflow/pipeline_test.go index 451abe7ccc1..0c99cc04a59 100644 --- a/pkg/agent/openflow/pipeline_test.go +++ b/pkg/agent/openflow/pipeline_test.go @@ -67,7 +67,8 @@ func pipelineDefaultFlows(egressTrafficShapingEnabled, externalNodeEnabled, isEn "cookie=0x1000000000000, table=UnSNAT, priority=0 actions=goto_table:ConntrackZone", "cookie=0x1000000000000, table=ConntrackZone, priority=0 actions=goto_table:ConntrackState", "cookie=0x1000000000000, table=ConntrackState, priority=0 actions=goto_table:PreRoutingClassifier", - "cookie=0x1000000000000, table=PreRoutingClassifier, priority=0 actions=goto_table:SessionAffinity", + "cookie=0x1000000000000, table=PreRoutingClassifier, priority=0 actions=goto_table:LoadBalancerSourceRanges", + "cookie=0x1000000000000, table=LoadBalancerSourceRanges, priority=0 actions=goto_table:SessionAffinity", "cookie=0x1000000000000, table=SessionAffinity, priority=0 actions=goto_table:ServiceLB", "cookie=0x1000000000000, table=ServiceLB, priority=0 actions=goto_table:EndpointDNAT", "cookie=0x1000000000000, table=EndpointDNAT, priority=0 actions=goto_table:AntreaPolicyEgressRule", @@ -136,7 +137,8 @@ func pipelineDefaultFlows(egressTrafficShapingEnabled, externalNodeEnabled, isEn "cookie=0x1000000000000, table=UnSNAT, priority=0 actions=goto_table:ConntrackZone", "cookie=0x1000000000000, table=ConntrackZone, priority=0 actions=goto_table:ConntrackState", "cookie=0x1000000000000, table=ConntrackState, priority=0 actions=goto_table:PreRoutingClassifier", - "cookie=0x1000000000000, table=PreRoutingClassifier, priority=0 actions=goto_table:SessionAffinity", + "cookie=0x1000000000000, table=PreRoutingClassifier, priority=0 actions=goto_table:LoadBalancerSourceRanges", + "cookie=0x1000000000000, table=LoadBalancerSourceRanges, priority=0 actions=goto_table:SessionAffinity", "cookie=0x1000000000000, table=SessionAffinity, priority=0 actions=goto_table:ServiceLB", "cookie=0x1000000000000, table=ServiceLB, priority=0 actions=goto_table:EndpointDNAT", "cookie=0x1000000000000, table=EndpointDNAT, priority=0 actions=goto_table:AntreaPolicyEgressRule", diff --git a/pkg/agent/openflow/service.go b/pkg/agent/openflow/service.go index 2d8a4c0453b..54ba6ab1c05 100644 --- a/pkg/agent/openflow/service.go +++ b/pkg/agent/openflow/service.go @@ -51,6 +51,7 @@ type featureService struct { proxyAll bool enableDSR bool connectUplinkToBridge bool + proxyLoadBalancerIPs bool ctZoneSrcField *binding.RegField category cookie.Category @@ -72,6 +73,7 @@ func newFeatureService( enableProxy, proxyAll, enableDSR, + proxyLoadBalancerIPs, connectUplinkToBridge bool) *featureService { gatewayIPs := make(map[binding.Protocol]net.IP) virtualIPs := make(map[binding.Protocol]net.IP) @@ -125,6 +127,7 @@ func newFeatureService( enableProxy: enableProxy, proxyAll: proxyAll, enableDSR: enableDSR, + proxyLoadBalancerIPs: proxyLoadBalancerIPs, connectUplinkToBridge: connectUplinkToBridge, ctZoneSrcField: getZoneSrcField(connectUplinkToBridge), category: cookie.Service, diff --git a/pkg/agent/openflow/service_test.go b/pkg/agent/openflow/service_test.go index f8eb226db39..92962917dbf 100644 --- a/pkg/agent/openflow/service_test.go +++ b/pkg/agent/openflow/service_test.go @@ -49,13 +49,13 @@ func serviceInitFlows(proxyEnabled, isIPv4, proxyAllEnabled, dsrEnabled bool) [] } if proxyAllEnabled { flows = append(flows, - "cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ip actions=resubmit:NodePortMark,resubmit:SessionAffinity,resubmit:ServiceLB", + "cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ip actions=resubmit:NodePortMark,resubmit:LoadBalancerSourceRanges,resubmit:SessionAffinity,resubmit:ServiceLB", "cookie=0x1030000000000, table=NodePortMark, priority=200,ip,nw_dst=192.168.77.100 actions=set_field:0x80000/0x80000->reg4", "cookie=0x1030000000000, table=NodePortMark, priority=200,ip,nw_dst=169.254.0.252 actions=set_field:0x80000/0x80000->reg4", ) } else { flows = append(flows, - "cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ip actions=resubmit:SessionAffinity,resubmit:ServiceLB", + "cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ip actions=resubmit:LoadBalancerSourceRanges,resubmit:SessionAffinity,resubmit:ServiceLB", ) } if dsrEnabled { @@ -82,13 +82,13 @@ func serviceInitFlows(proxyEnabled, isIPv4, proxyAllEnabled, dsrEnabled bool) [] } if proxyAllEnabled { flows = append(flows, - "cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ipv6 actions=resubmit:NodePortMark,resubmit:SessionAffinity,resubmit:ServiceLB", + "cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ipv6 actions=resubmit:NodePortMark,resubmit:LoadBalancerSourceRanges,resubmit:SessionAffinity,resubmit:ServiceLB", "cookie=0x1030000000000, table=NodePortMark, priority=200,ipv6,ipv6_dst=fec0:192:168:77::100 actions=set_field:0x80000/0x80000->reg4", "cookie=0x1030000000000, table=NodePortMark, priority=200,ipv6,ipv6_dst=fc01::aabb:ccdd:eefe actions=set_field:0x80000/0x80000->reg4", ) } else { flows = append(flows, - "cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ipv6 actions=resubmit:SessionAffinity,resubmit:ServiceLB", + "cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ipv6 actions=resubmit:LoadBalancerSourceRanges,resubmit:SessionAffinity,resubmit:ServiceLB", ) } if dsrEnabled { diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index c2046d8a26e..d8beaec5231 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -501,7 +501,8 @@ func serviceIdentityChanged(svcInfo, pSvcInfo *types.ServiceInfo) bool { func serviceExternalAddressesChanged(svcInfo, pSvcInfo *types.ServiceInfo) bool { return svcInfo.NodePort() != pSvcInfo.NodePort() || !slices.Equal(svcInfo.LoadBalancerIPStrings(), pSvcInfo.LoadBalancerIPStrings()) || - !slices.Equal(svcInfo.ExternalIPStrings(), pSvcInfo.ExternalIPStrings()) + !slices.Equal(svcInfo.ExternalIPStrings(), pSvcInfo.ExternalIPStrings()) || + !slices.Equal(svcInfo.LoadBalancerSourceRanges(), pSvcInfo.LoadBalancerSourceRanges()) } // smallSliceDifference builds a slice which includes all the strings from s1 @@ -636,6 +637,7 @@ func (p *proxier) uninstallExternalIPService(svcInfoStr string, externalIPString func (p *proxier) installLoadBalancerService(svcInfoStr string, localGroupID, clusterGroupID binding.GroupIDType, + loadBalancerSourceRanges []string, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, @@ -646,17 +648,18 @@ func (p *proxier) installLoadBalancerService(svcInfoStr string, if ingress != "" { ip := net.ParseIP(ingress) if err := p.ofClient.InstallServiceFlows(&agenttypes.ServiceConfig{ - ServiceIP: ip, - ServicePort: svcPort, - Protocol: protocol, - TrafficPolicyLocal: trafficPolicyLocal, - LocalGroupID: localGroupID, - ClusterGroupID: clusterGroupID, - AffinityTimeout: affinityTimeout, - IsExternal: true, - IsNodePort: false, - IsNested: false, // Unsupported for LoadBalancerIP - IsDSR: features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR, + ServiceIP: ip, + ServicePort: svcPort, + Protocol: protocol, + TrafficPolicyLocal: trafficPolicyLocal, + LocalGroupID: localGroupID, + ClusterGroupID: clusterGroupID, + AffinityTimeout: affinityTimeout, + IsExternal: true, + IsNodePort: false, + IsNested: false, // Unsupported for LoadBalancerIP + IsDSR: features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR, + LoadBalancerSourceRanges: loadBalancerSourceRanges, }); err != nil { return fmt.Errorf("failed to install LoadBalancerIP load balancing OVS flows: %w", err) } @@ -895,7 +898,7 @@ func (p *proxier) installServiceFlows(svcInfo *types.ServiceInfo, localGroupID, } // Install LoadBalancer flows and configurations. if p.proxyLoadBalancerIPs { - if err := p.installLoadBalancerService(svcInfoStr, localGroupID, clusterGroupID, svcInfo.LoadBalancerIPStrings(), svcPort, svcProto, svcInfo.ExternalPolicyLocal(), affinityTimeout, loadBalancerMode); err != nil { + if err := p.installLoadBalancerService(svcInfoStr, localGroupID, clusterGroupID, svcInfo.LoadBalancerSourceRanges(), svcInfo.LoadBalancerIPStrings(), svcPort, svcProto, svcInfo.ExternalPolicyLocal(), affinityTimeout, loadBalancerMode); err != nil { klog.ErrorS(err, "Error when installing LoadBalancer flows and configurations for Service", "ServiceInfo", svcInfoStr) return false } @@ -937,13 +940,19 @@ func (p *proxier) updateServiceExternalAddresses(pSvcInfo, svcInfo *types.Servic } } if p.proxyLoadBalancerIPs { - deletedLoadBalancerIPs := smallSliceDifference(pSvcInfo.LoadBalancerIPStrings(), svcInfo.LoadBalancerIPStrings()) - addedLoadBalancerIPs := smallSliceDifference(svcInfo.LoadBalancerIPStrings(), pSvcInfo.LoadBalancerIPStrings()) + var deletedLoadBalancerIPs, addedLoadBalancerIPs []string + if !slices.Equal(svcInfo.LoadBalancerSourceRanges(), pSvcInfo.LoadBalancerSourceRanges()) { + deletedLoadBalancerIPs = pSvcInfo.LoadBalancerIPStrings() + addedLoadBalancerIPs = svcInfo.LoadBalancerIPStrings() + } else { + deletedLoadBalancerIPs = smallSliceDifference(pSvcInfo.LoadBalancerIPStrings(), svcInfo.LoadBalancerIPStrings()) + addedLoadBalancerIPs = smallSliceDifference(svcInfo.LoadBalancerIPStrings(), pSvcInfo.LoadBalancerIPStrings()) + } if err := p.uninstallLoadBalancerService(pSvcInfoStr, deletedLoadBalancerIPs, pSvcPort, pSvcProto); err != nil { klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServiceInfo", pSvcInfoStr) return false } - if err := p.installLoadBalancerService(svcInfoStr, localGroupID, clusterGroupID, addedLoadBalancerIPs, svcPort, svcProto, svcInfo.ExternalPolicyLocal(), affinityTimeout, loadBalancerMode); err != nil { + if err := p.installLoadBalancerService(svcInfoStr, localGroupID, clusterGroupID, svcInfo.LoadBalancerSourceRanges(), addedLoadBalancerIPs, svcPort, svcProto, svcInfo.ExternalPolicyLocal(), affinityTimeout, loadBalancerMode); err != nil { klog.ErrorS(err, "Error when installing LoadBalancer flows and configurations for Service", "ServiceInfo", svcInfoStr) return false } diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 48235254447..81677a19e26 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -54,23 +54,25 @@ import ( ) var ( - svc1IPv4 = net.ParseIP("10.20.30.41") - svc2IPv4 = net.ParseIP("10.20.30.42") - svc1IPv6 = net.ParseIP("2001::10:20:30:41") - ep1IPv4 = net.ParseIP("10.180.0.1") - ep1IPv6 = net.ParseIP("2001::10:180:0:1") - ep2IPv4 = net.ParseIP("10.180.0.2") - ep2IPv6 = net.ParseIP("2001::10:180:0:2") - loadBalancerIPv4 = net.ParseIP("169.254.169.1") - loadBalancerIPv6 = net.ParseIP("fec0::169:254:169:1") - loadBalancerIPModeProxyIPv4 = net.ParseIP("169.254.169.2") - loadBalancerIPModeProxyIPv6 = net.ParseIP("fec0::169:254:169:2") - svcNodePortIPv4 = net.ParseIP("192.168.77.100") - svcNodePortIPv6 = net.ParseIP("2001::192:168:77:100") - externalIPv4 = net.ParseIP("192.168.77.101") - externalIPv6 = net.ParseIP("2001::192:168:77:101") - nodePortAddressesIPv4 = []net.IP{svcNodePortIPv4} - nodePortAddressesIPv6 = []net.IP{svcNodePortIPv6} + svc1IPv4 = net.ParseIP("10.20.30.41") + svc2IPv4 = net.ParseIP("10.20.30.42") + svc1IPv6 = net.ParseIP("2001::10:20:30:41") + ep1IPv4 = net.ParseIP("10.180.0.1") + ep1IPv6 = net.ParseIP("2001::10:180:0:1") + ep2IPv4 = net.ParseIP("10.180.0.2") + ep2IPv6 = net.ParseIP("2001::10:180:0:2") + loadBalancerIPv4 = net.ParseIP("169.254.169.1") + loadBalancerIPv6 = net.ParseIP("fec0::169:254:169:1") + loadBalancerIPModeProxyIPv4 = net.ParseIP("169.254.169.2") + loadBalancerIPModeProxyIPv6 = net.ParseIP("fec0::169:254:169:2") + loadBalancerSourceRangesIPv4 = []string{"192.168.1.0/24", "192.168.2.0/24"} + loadBalancerSourceRangesIPv6 = []string{"fec0:192:168:1::/64", "fec0:192:168:2::/64"} + svcNodePortIPv4 = net.ParseIP("192.168.77.100") + svcNodePortIPv6 = net.ParseIP("2001::192:168:77:100") + externalIPv4 = net.ParseIP("192.168.77.101") + externalIPv6 = net.ParseIP("2001::192:168:77:101") + nodePortAddressesIPv4 = []net.IP{svcNodePortIPv4} + nodePortAddressesIPv6 = []net.IP{svcNodePortIPv6} svcPort = 80 svcNodePort = 30008 @@ -105,6 +107,13 @@ func loadBalancerIPModeProxyIP(isIPv6 bool) net.IP { return loadBalancerIPModeProxyIPv4 } +func loadBalancerSourceRanges(isIPv6 bool) []string { + if isIPv6 { + return loadBalancerSourceRangesIPv6 + } + return loadBalancerSourceRangesIPv4 +} + func protocolTCP(isIPv6 bool) binding.Protocol { if isIPv6 { return binding.ProtocolTCPv6 @@ -304,6 +313,7 @@ func makeTestNodePortService(svcPortName *k8sproxy.ServicePortName, func makeTestLoadBalancerService(svcPortName *k8sproxy.ServicePortName, clusterIP net.IP, + loadBalancerSourceRanges []string, externalIPs, loadBalancerIPs []net.IP, loadBalancerIPModeProxyIPs []net.IP, @@ -316,6 +326,7 @@ func makeTestLoadBalancerService(svcPortName *k8sproxy.ServicePortName, return makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { svc.Spec.ClusterIP = clusterIP.String() svc.Spec.Type = corev1.ServiceTypeLoadBalancer + svc.Spec.LoadBalancerSourceRanges = loadBalancerSourceRanges var ingress []corev1.LoadBalancerIngress for _, ip := range loadBalancerIPs { if ip != nil { @@ -650,6 +661,7 @@ func testLoadBalancerAdd(t *testing.T, ep2IP := ep2IP(isIPv6) loadBalancerIP := loadBalancerIP(isIPv6) loadBalancerIPModeProxyIP := loadBalancerIPModeProxyIP(isIPv6) + loadBalancerSourceRanges := loadBalancerSourceRanges(isIPv6) fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, options...) externalTrafficPolicy := corev1.ServiceExternalTrafficPolicyTypeCluster @@ -662,6 +674,7 @@ func testLoadBalancerAdd(t *testing.T, } svc := makeTestLoadBalancerService(&svcPortName, svcIP, + loadBalancerSourceRanges, []net.IP{externalIP}, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, @@ -724,14 +737,15 @@ func testLoadBalancerAdd(t *testing.T, }) if proxyLoadBalancerIPs { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: loadBalancerIP, - ServicePort: uint16(svcPort), - Protocol: protocol, - TrafficPolicyLocal: nodeLocalExternal, - LocalGroupID: 1, - ClusterGroupID: 2, - IsExternal: true, - IsDSR: isDSR, + ServiceIP: loadBalancerIP, + ServicePort: uint16(svcPort), + Protocol: protocol, + TrafficPolicyLocal: nodeLocalExternal, + LocalGroupID: 1, + ClusterGroupID: 2, + IsExternal: true, + IsDSR: isDSR, + LoadBalancerSourceRanges: loadBalancerSourceRanges, }) } if externalIP != nil { @@ -783,14 +797,15 @@ func testLoadBalancerAdd(t *testing.T, }) if proxyLoadBalancerIPs { mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: loadBalancerIP, - ServicePort: uint16(svcPort), - Protocol: protocol, - TrafficPolicyLocal: nodeLocalExternal, - LocalGroupID: localGroupID, - ClusterGroupID: clusterGroupID, - IsExternal: true, - IsDSR: isDSR, + ServiceIP: loadBalancerIP, + ServicePort: uint16(svcPort), + Protocol: protocol, + TrafficPolicyLocal: nodeLocalExternal, + LocalGroupID: localGroupID, + ClusterGroupID: clusterGroupID, + IsExternal: true, + IsDSR: isDSR, + LoadBalancerSourceRanges: loadBalancerSourceRanges, }) } if externalIP != nil { @@ -1741,6 +1756,7 @@ func testLoadBalancerRemove(t *testing.T, protocol binding.Protocol, isIPv6 bool epIP := ep1IP(isIPv6) loadBalancerIP := loadBalancerIP(isIPv6) loadBalancerIPModeProxyIP := loadBalancerIPModeProxyIP(isIPv6) + loadBalancerSourceRanges := loadBalancerSourceRanges(isIPv6) fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, options...) externalTrafficPolicy := corev1.ServiceExternalTrafficPolicyTypeLocal @@ -1748,6 +1764,7 @@ func testLoadBalancerRemove(t *testing.T, protocol binding.Protocol, isIPv6 bool svc := makeTestLoadBalancerService(&svcPortName, svcIP, + loadBalancerSourceRanges, []net.IP{externalIP}, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, @@ -1793,13 +1810,14 @@ func testLoadBalancerRemove(t *testing.T, protocol binding.Protocol, isIPv6 bool IsNodePort: true, }) mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ - ServiceIP: loadBalancerIP, - ServicePort: uint16(svcPort), - Protocol: protocol, - TrafficPolicyLocal: true, - LocalGroupID: 1, - ClusterGroupID: 2, - IsExternal: true, + ServiceIP: loadBalancerIP, + ServicePort: uint16(svcPort), + Protocol: protocol, + TrafficPolicyLocal: true, + LocalGroupID: 1, + ClusterGroupID: 2, + IsExternal: true, + LoadBalancerSourceRanges: loadBalancerSourceRanges, }) mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol) mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, loadBalancerIP) @@ -2167,6 +2185,7 @@ func testLoadBalancerNoEndpoint(t *testing.T, protocol binding.Protocol, isIPv6 svc := makeTestLoadBalancerService(&svcPortName, svcIP, nil, + nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), @@ -2178,6 +2197,7 @@ func testLoadBalancerNoEndpoint(t *testing.T, protocol binding.Protocol, isIPv6 updatedSvc := makeTestLoadBalancerService(&svcPortName, svcIP, nil, + nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort+1), @@ -2367,6 +2387,7 @@ func testLoadBalancerRemoveEndpoints(t *testing.T, protocol binding.Protocol, is svc := makeTestLoadBalancerService(&svcPortName, svcIP, + nil, []net.IP{externalIP}, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, @@ -2594,8 +2615,8 @@ func testServicePortUpdate(t *testing.T, protocol binding.Protocol, isIPv6 bool, svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort+1), int32(svcNodePort), apiProtocol, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) case corev1.ServiceTypeLoadBalancer: - svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, nil, int32(svcPort+1), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, nil, []net.IP{loadBalancerIP}, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, nil, []net.IP{loadBalancerIP}, nil, int32(svcPort+1), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) } makeServiceMap(fp, svc) svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) @@ -2751,8 +2772,8 @@ func testServiceNodePortUpdate(t *testing.T, protocol binding.Protocol, isIPv6 b svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort+1), apiProtocol, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) case corev1.ServiceTypeLoadBalancer: - svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort+1), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort+1), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) } makeServiceMap(fp, svc) svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) @@ -2878,7 +2899,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, protocol binding.Proto case corev1.ServiceTypeNodePort: svc = makeTestNodePortService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), int32(svcNodePort), apiProtocol, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) case corev1.ServiceTypeLoadBalancer: - svc = makeTestLoadBalancerService(&svcPortName, svcIP, []net.IP{externalIP}, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{externalIP}, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) } updatedSvc = svc.DeepCopy() updatedSvc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeLocal @@ -3145,6 +3166,140 @@ func TestServiceInternalTrafficPolicyUpdate(t *testing.T) { }) } +func testServiceLoadBalancerSourceRangesUpdate(t *testing.T, + loadBalancerIPs []net.IP, + updatedLoadBalancerIPs []net.IP, + loadBalancerSourceRanges []string, + updatedLoadBalancerSourceRanges []string, + protocol binding.Protocol, + isIPv6 bool) { + ctrl := gomock.NewController(t) + vIP := virtualNodePortDNATIP(isIPv6) + apiProtocol := getAPIProtocol(protocol) + nodePortAddresses := nodePortAddresses(isIPv6) + svcIP := svc1IP(isIPv6) + epIP := ep1IP(isIPv6) + svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) + + mockOFClient, mockRouteClient := getMockClients(ctrl) + groupAllocator := openflow.NewGroupAllocator() + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + + svc := makeTestLoadBalancerService(&svcPortName, svcIP, loadBalancerSourceRanges, nil, loadBalancerIPs, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc := makeTestLoadBalancerService(&svcPortName, svcIP, updatedLoadBalancerSourceRanges, nil, updatedLoadBalancerIPs, nil, int32(svcPort), int32(svcNodePort), corev1.ProtocolTCP, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + makeServiceMap(fp, svc) + + ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), corev1.ProtocolTCP, false) + eps := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*ep}, []discovery.EndpointPort{*epPort}, isIPv6) + makeEndpointSliceMap(fp, eps) + + expectedEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(epIP.String(), "", "", svcPort, false, true, true, false, nil)} + mockOFClient.EXPECT().InstallEndpointFlows(protocol, gomock.InAnyOrder(expectedEps)).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(binding.GroupIDType(1), false, gomock.InAnyOrder(expectedEps)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ + ServiceIP: svcIP, + ServicePort: uint16(svcPort), + Protocol: protocol, + ClusterGroupID: 1, + }).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ + ServiceIP: vIP, + ServicePort: uint16(svcNodePort), + Protocol: protocol, + ClusterGroupID: 1, + IsExternal: true, + IsNodePort: true, + }).Times(1) + for _, ip := range loadBalancerIPs { + mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ + ServiceIP: ip, + ServicePort: uint16(svcPort), + Protocol: protocol, + ClusterGroupID: 1, + IsExternal: true, + LoadBalancerSourceRanges: loadBalancerSourceRanges, + }).Times(1) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, ip).Times(1) + } + mockRouteClient.EXPECT().AddNodePortConfigs(nodePortAddresses, uint16(svcNodePort), protocol).Times(1) + + for _, ip := range loadBalancerIPs { + mockOFClient.EXPECT().UninstallServiceFlows(ip, uint16(svcPort), protocol).Times(1) + mockRouteClient.EXPECT().DeleteExternalIPConfigs(svcInfoStr, ip).Times(1) + } + + for _, ip := range updatedLoadBalancerIPs { + mockOFClient.EXPECT().InstallServiceFlows(&antreatypes.ServiceConfig{ + ServiceIP: ip, + ServicePort: uint16(svcPort), + Protocol: protocol, + ClusterGroupID: 1, + IsExternal: true, + LoadBalancerSourceRanges: updatedLoadBalancerSourceRanges, + }).Times(1) + mockRouteClient.EXPECT().AddExternalIPConfigs(svcInfoStr, ip).Times(1) + } + + fp.syncProxyRules() + assert.Contains(t, fp.serviceInstalledMap, svcPortName) + assert.Contains(t, fp.endpointsInstalledMap, svcPortName) + fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) + fp.syncProxyRules() + assert.Contains(t, fp.serviceInstalledMap, svcPortName) + assert.Contains(t, fp.endpointsInstalledMap, svcPortName) +} + +func TestServiceLoadBalancerSourceRangesUpdate(t *testing.T) { + t.Run("IPv4", func(t *testing.T) { + loadBalancerIPs := []net.IP{net.ParseIP("169.254.1.1"), net.ParseIP("169.254.1.2")} + updatedLoadBalancerIPs := []net.IP{net.ParseIP("169.254.1.2"), net.ParseIP("169.254.1.3")} + loadBalancerSourceRanges := []string{"192.168.1.0/24", "192.168.2.0/24"} + updatedLoadBalancerSourceRanges := []string{"192.168.3.0/24", "192.168.4.0/24"} + t.Run("LoadBalancer ingress IPs update", func(t *testing.T) { + testServiceLoadBalancerSourceRangesUpdate(t, + loadBalancerIPs, + updatedLoadBalancerIPs, + loadBalancerSourceRanges, + updatedLoadBalancerSourceRanges, + binding.ProtocolTCP, + false) + }) + t.Run("LoadBalancer ingress IPs remain", func(t *testing.T) { + testServiceLoadBalancerSourceRangesUpdate(t, + loadBalancerIPs, + loadBalancerIPs, + loadBalancerSourceRanges, + updatedLoadBalancerSourceRanges, + binding.ProtocolTCP, + false) + }) + }) + t.Run("IPv6", func(t *testing.T) { + loadBalancerIPs := []net.IP{net.ParseIP("fec0::169:254:1:1"), net.ParseIP("fec0::169:254:1:2")} + updatedLoadBalancerIPs := []net.IP{net.ParseIP("fec0::169:254:1:2"), net.ParseIP("fec0::169:254:1:3")} + loadBalancerSourceRanges := []string{"fec0:192:168:1::/64", "fec0:192:168:2::/64"} + updatedLoadBalancerSourceRanges := []string{"fec0:192:168:3::/64", "fec0:192:168:4::/64"} + t.Run("LoadBalancer ingress IPs update", func(t *testing.T) { + testServiceLoadBalancerSourceRangesUpdate(t, + loadBalancerIPs, + updatedLoadBalancerIPs, + loadBalancerSourceRanges, + updatedLoadBalancerSourceRanges, + binding.ProtocolTCPv6, + true) + }) + t.Run("LoadBalancer ingress IPs remain", func(t *testing.T) { + testServiceLoadBalancerSourceRangesUpdate(t, + loadBalancerIPs, + updatedLoadBalancerIPs, + loadBalancerSourceRanges, + updatedLoadBalancerSourceRanges, + binding.ProtocolTCPv6, + true) + }) + }) +} + func testServiceExternalIPsUpdate(t *testing.T, protocol binding.Protocol, isIPv6 bool) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) @@ -3183,8 +3338,8 @@ func testServiceExternalIPsUpdate(t *testing.T, protocol binding.Protocol, isIPv updatedExternalIPStrings = append(updatedExternalIPStrings, ip.String()) } - svc := makeTestLoadBalancerService(&svcPortName, svcIP, externalIPs, loadBalancerIPs, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc := makeTestLoadBalancerService(&svcPortName, svcIP, updatedExternalIPs, updatedLoadBalancerIPs, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + svc := makeTestLoadBalancerService(&svcPortName, svcIP, nil, externalIPs, loadBalancerIPs, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc := makeTestLoadBalancerService(&svcPortName, svcIP, nil, updatedExternalIPs, updatedLoadBalancerIPs, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) makeServiceMap(fp, svc) svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) @@ -3329,8 +3484,8 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, protocol binding.Protoco svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), apiProtocol, &affinitySeconds, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), apiProtocol, &updatedAffinitySeconds, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) case corev1.ServiceTypeLoadBalancer: - svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, &affinitySeconds, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, &updatedAffinitySeconds, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, &affinitySeconds, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, &updatedAffinitySeconds, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) } makeServiceMap(fp, svc) svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) @@ -3483,8 +3638,8 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, protocol binding.Protoco svc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), apiProtocol, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) updatedSvc = makeTestNodePortService(&svcPortName, svcIP, nil, int32(svcPort), int32(svcNodePort), apiProtocol, &affinitySeconds, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeCluster) case corev1.ServiceTypeLoadBalancer: - svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) - updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, &affinitySeconds, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + svc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, nil, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) + updatedSvc = makeTestLoadBalancerService(&svcPortName, svcIP, nil, nil, []net.IP{loadBalancerIP}, []net.IP{loadBalancerIPModeProxyIP}, int32(svcPort), int32(svcNodePort), apiProtocol, &affinitySeconds, nil, corev1.ServiceExternalTrafficPolicyTypeCluster) } makeServiceMap(fp, svc) svcInfoStr := fmt.Sprintf("%s:%d/%s", svcIP, svcPort, apiProtocol) diff --git a/pkg/agent/types/service.go b/pkg/agent/types/service.go index 3f76631475c..5c1433d80f3 100644 --- a/pkg/agent/types/service.go +++ b/pkg/agent/types/service.go @@ -36,7 +36,8 @@ type ServiceConfig struct { // IsNested indicates the whether Service's Endpoints are ClusterIPs of other Services. It's used in multi-cluster. IsNested bool // IsDSR indicates that whether the Service works in Direct Server Return mode. - IsDSR bool + IsDSR bool + LoadBalancerSourceRanges []string } func (c *ServiceConfig) TrafficPolicyGroupID() openflow.GroupIDType { diff --git a/test/e2e/proxy_test.go b/test/e2e/proxy_test.go index f5119a4e27c..57af28e0717 100644 --- a/test/e2e/proxy_test.go +++ b/test/e2e/proxy_test.go @@ -17,6 +17,7 @@ package e2e import ( "context" "encoding/hex" + "encoding/json" "fmt" "net" "strings" @@ -28,6 +29,8 @@ import ( "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "antrea.io/antrea/pkg/agent/config" @@ -218,6 +221,85 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) { }) } +func testProxyLoadBalancerServiceSourceRanges(t *testing.T, isIPv6 bool) { + skipIfHasWindowsNodes(t) + skipIfNumNodesLessThan(t, 2) + + data, err := setupTest(t) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + defer teardownTest(t, data) + skipIfProxyDisabled(t, data) + + node := nodeName(0) + testNamespace := data.testNamespace + + // Create test client Pods. + allowedClient, allowedClientIPs, _ := createAndWaitForPod(t, data, data.createToolboxPodOnNode, "allowed-client", node, testNamespace, false) + notAllowedClient, _, _ := createAndWaitForPod(t, data, data.createToolboxPodOnNode, "not-allowed-client", node, testNamespace, false) + allowedPodIP := allowedClientIPs.IPv4.String() + "/32" + + ingressIP := "169.254.169.1" + ipFamily := corev1.IPv4Protocol + if isIPv6 { + allowedPodIP = allowedClientIPs.IPv6.String() + "/128" + ingressIP = "fd75::aabb:ccdd:ef00" + ipFamily = corev1.IPv6Protocol + } + + // Create test LoadBalancer Service. + serviceName := "agnhost-lb" + service := corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceName, + Namespace: testNamespace, + Labels: map[string]string{ + "antrea-e2e": serviceName, + "app": serviceName, + }, + }, + Spec: corev1.ServiceSpec{ + SessionAffinity: corev1.ServiceAffinityNone, + Ports: []corev1.ServicePort{{ + Port: 8080, + TargetPort: intstr.FromInt(8080), + Protocol: corev1.ProtocolTCP, + }}, + Type: corev1.ServiceTypeLoadBalancer, + Selector: map[string]string{"app": "agnhost"}, + IPFamilies: []corev1.IPFamily{ipFamily}, + LoadBalancerSourceRanges: []string{allowedPodIP}, + }, + } + svc, err := data.clientset.CoreV1().Services(testNamespace).Create(context.TODO(), &service, metav1.CreateOptions{}) + require.NoError(t, err) + updatedSvc := svc.DeepCopy() + updatedSvc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{{IP: ingressIP}} + patchData, err := json.Marshal(updatedSvc) + require.NoError(t, err) + _, err = data.clientset.CoreV1().Services(testNamespace).Patch(context.TODO(), svc.Name, k8stypes.MergePatchType, patchData, metav1.PatchOptions{}, "status") + require.NoError(t, err) + + // Create backend Pod. + createAgnhostPod(t, data, "agnhost-pod", node, false) + + // Connect the LoadBalancer ingress IP from test client Pods. + url := "http://" + net.JoinHostPort(ingressIP, "8080") + require.NoError(t, probeFromPod(data, allowedClient, toolboxContainerName, url), "Service LoadBalancer should be able to be connected from the Pod") + require.Error(t, probeFromPod(data, notAllowedClient, toolboxContainerName, url), "Service LoadBalancer should not be able to be connected from the Pod") +} + +func TestProxyLoadBalancerServiceSourceRangesIPv4(t *testing.T) { + skipIfNotIPv4Cluster(t) + testProxyLoadBalancerServiceSourceRanges(t, false) +} + +func TestProxyLoadBalancerServiceSourceRangesIPv6(t *testing.T) { + skipIfNotIPv6Cluster(t) + testProxyLoadBalancerServiceSourceRanges(t, true) +} + func loadBalancerTestCases(t *testing.T, data *TestData, clusterUrl, localUrl, healthExpected string, nodes, healthUrls, pods []string) { t.Run("ExternalTrafficPolicy:Cluster/Client:Node", func(t *testing.T) { testLoadBalancerClusterFromNode(t, data, nodes, clusterUrl) diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index a6d7fe72582..7174a3ca17c 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -120,7 +120,7 @@ func TestConnectivityFlows(t *testing.T) { antrearuntime.WindowsOS = runtime.GOOS } - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, true, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) defer func() { @@ -176,7 +176,7 @@ func TestAntreaFlexibleIPAMConnectivityFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, true, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, true, true, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) defer func() { @@ -239,7 +239,7 @@ func TestReplayFlowsConnectivityFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, true, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -281,7 +281,7 @@ func TestReplayFlowsNetworkPolicyFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, true, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -466,7 +466,7 @@ func TestNetworkPolicyFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, true, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -580,7 +580,7 @@ func TestIPv6ConnectivityFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, true, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -621,7 +621,7 @@ func TestProxyServiceFlowsAntreaPolicyDisabled(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, true, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -711,7 +711,7 @@ func TestProxyServiceFlowsAntreaPoilcyEnabled(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, true, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -1793,7 +1793,7 @@ func testEgressMarkFlows(t *testing.T, trafficShaping bool) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), false, false, false, true, trafficShaping, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), false, false, false, true, trafficShaping, false, false, false, true, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -1850,7 +1850,7 @@ func TestTrafficControlFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), false, false, false, false, false, false, false, false, false, false, true, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), false, false, false, false, false, false, false, false, true, false, false, true, false, false, groupIDAllocator, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br))