Skip to content

Commit

Permalink
Support Service loadBalancerSourceRanges in AntreaProxy
Browse files Browse the repository at this point in the history
For #5493

Signed-off-by: Hongliang Liu <[email protected]>
  • Loading branch information
hongliangl committed Apr 29, 2024
1 parent 89363ac commit 779e504
Show file tree
Hide file tree
Showing 13 changed files with 473 additions and 202 deletions.
220 changes: 140 additions & 80 deletions docs/design/ovs-pipeline.md

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,9 @@ func (c *client) InstallServiceFlows(config *types.ServiceConfig) error {
if config.IsDSR {
flows = append(flows, c.featureService.dsrServiceMarkFlow(config))
}
if len(config.LoadBalancerSourceRanges) != 0 {
flows = append(flows, c.featureService.loadBalancerSourceRangesMarkFlows(config)...)
}
cacheKey := generateServicePortFlowCacheKey(config.ServiceIP, config.ServicePort, config.Protocol)
return c.addFlows(c.featureService.cachedFlows, cacheKey, flows)
}
Expand Down
96 changes: 65 additions & 31 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,8 +1017,8 @@ func Test_client_GetPodFlowKeys(t *testing.T) {
"table=1,priority=200,arp,in_port=11,arp_spa=10.10.0.11,arp_sha=00:00:10:10:00:11",
"table=3,priority=190,in_port=11",
"table=4,priority=200,ip,in_port=11,dl_src=00:00:10:10:00:11,nw_src=10.10.0.11",
"table=17,priority=200,ip,reg0=0x200/0x200,nw_dst=10.10.0.11",
"table=22,priority=200,dl_dst=00:00:10:10:00:11",
"table=18,priority=200,ip,reg0=0x200/0x200,nw_dst=10.10.0.11",
"table=23,priority=200,dl_dst=00:00:10:10:00:11",
}
assert.ElementsMatch(t, expectedFlowKeys, flowKeys)
}
Expand Down Expand Up @@ -1254,17 +1254,18 @@ func Test_client_InstallServiceFlows(t *testing.T) {
port := uint16(80)

testCases := []struct {
name string
trafficPolicyLocal bool
protocol binding.Protocol
svcIP net.IP
affinityTimeout uint16
isExternal bool
isNodePort bool
isNested bool
isDSR bool
enableMulticluster bool
expectedFlows []string
name string
trafficPolicyLocal bool
protocol binding.Protocol
svcIP net.IP
affinityTimeout uint16
isExternal bool
isNodePort bool
isNested bool
isDSR bool
enableMulticluster bool
loadBalancerSourceRanges []string
expectedFlows []string
}{
{
name: "Service ClusterIP",
Expand Down Expand Up @@ -1449,6 +1450,38 @@ func Test_client_InstallServiceFlows(t *testing.T) {
"cookie=0x1030000000064, table=DSRServiceMark, priority=200,tcp6,reg4=0xc000000/0xe000000,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=learn(table=SessionAffinity,idle_timeout=160,fin_idle_timeout=5,priority=210,delete_learned,cookie=0x1030000000064,eth_type=0x86dd,nw_proto=0x6,OXM_OF_TCP_SRC[],OXM_OF_TCP_DST[],NXM_NX_IPV6_SRC[],NXM_NX_IPV6_DST[],load:NXM_NX_REG4[0..15]->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG4[25],load:NXM_NX_XXREG3[]->NXM_NX_XXREG3[]),set_field:0x2000000/0x2000000->reg4,goto_table:EndpointDNAT",
},
},
{
name: "Service LoadBalancer,LoadBalancerSourceRanges,SessionAffinity,Short-circuiting",
protocol: binding.ProtocolSCTP,
svcIP: svcIPv4,
affinityTimeout: uint16(100),
isExternal: true,
trafficPolicyLocal: true,
loadBalancerSourceRanges: []string{"192.168.1.0/24", "192.168.2.0/24"},
expectedFlows: []string{
"cookie=0x1030000000000, table=ServiceMark, priority=200,sctp,nw_src=192.168.1.0/24,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x20000000/0x20000000->reg4",
"cookie=0x1030000000000, table=ServiceMark, priority=200,sctp,nw_src=192.168.2.0/24,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x20000000/0x20000000->reg4",
"cookie=0x1030000000000, table=ServiceMark, priority=190,sctp,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x4000/0x4000->reg0",
"cookie=0x1030000000000, table=ServiceLB, priority=210,sctp,reg4=0x10010000/0x10070000,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x200/0x200->reg0,set_field:0x30000/0x70000->reg4,set_field:0x200000/0x200000->reg4,set_field:0x64->reg7,group:100",
"cookie=0x1030000000000, table=ServiceLB, priority=200,sctp,reg4=0x20010000/0x20070000,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x200/0x200->reg0,set_field:0x30000/0x70000->reg4,set_field:0x200000/0x200000->reg4,set_field:0x65->reg7,group:101",
"cookie=0x1030000000065, table=ServiceLB, priority=190,sctp,reg4=0x30000/0x70000,nw_dst=10.96.0.100,tp_dst=80 actions=learn(table=SessionAffinity,hard_timeout=100,priority=200,delete_learned,cookie=0x1030000000065,eth_type=0x800,nw_proto=0x84,OXM_OF_SCTP_DST[],NXM_OF_IP_DST[],NXM_OF_IP_SRC[],load:NXM_NX_REG4[0..15]->NXM_NX_REG4[0..15],load:NXM_NX_REG4[26]->NXM_NX_REG4[26],load:NXM_NX_REG3[]->NXM_NX_REG3[],load:0x2->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[9],load:0x1->NXM_NX_REG4[21]),set_field:0x20000/0x70000->reg4,goto_table:EndpointDNAT",
},
},
{
name: "Service LoadBalancer,LoadBalancerSourceRanges,IPv6,SessionAffinity",
protocol: binding.ProtocolSCTPv6,
svcIP: svcIPv6,
affinityTimeout: uint16(100),
isExternal: true,
loadBalancerSourceRanges: []string{"fec0:192:168:1::/64", "fec0:192:168:2::/64"},
expectedFlows: []string{
"cookie=0x1030000000000, table=ServiceMark, priority=200,sctp6,ipv6_src=fec0:192:168:1::/64,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=set_field:0x20000000/0x20000000->reg4",
"cookie=0x1030000000000, table=ServiceMark, priority=200,sctp6,ipv6_src=fec0:192:168:2::/64,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=set_field:0x20000000/0x20000000->reg4",
"cookie=0x1030000000000, table=ServiceMark, priority=190,sctp6,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=set_field:0x4000/0x4000->reg0",
"cookie=0x1030000000000, table=ServiceLB, priority=200,sctp6,reg4=0x20010000/0x20070000,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=set_field:0x200/0x200->reg0,set_field:0x30000/0x70000->reg4,set_field:0x200000/0x200000->reg4,set_field:0x64->reg7,group:100",
"cookie=0x1030000000064, table=ServiceLB, priority=190,sctp6,reg4=0x30000/0x70000,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=learn(table=SessionAffinity,hard_timeout=100,priority=200,delete_learned,cookie=0x1030000000064,eth_type=0x86dd,nw_proto=0x84,OXM_OF_SCTP_DST[],NXM_NX_IPV6_DST[],NXM_NX_IPV6_SRC[],load:NXM_NX_REG4[0..15]->NXM_NX_REG4[0..15],load:NXM_NX_REG4[26]->NXM_NX_REG4[26],load:NXM_NX_XXREG3[]->NXM_NX_XXREG3[],load:0x2->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[9],load:0x1->NXM_NX_REG4[21]),set_field:0x20000/0x70000->reg4,goto_table:EndpointDNAT",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -1471,17 +1504,18 @@ func Test_client_InstallServiceFlows(t *testing.T) {
cacheKey := generateServicePortFlowCacheKey(tc.svcIP, port, tc.protocol)

assert.NoError(t, fc.InstallServiceFlows(&types.ServiceConfig{
ServiceIP: tc.svcIP,
ServicePort: port,
Protocol: tc.protocol,
TrafficPolicyLocal: tc.trafficPolicyLocal,
LocalGroupID: localGroupID,
ClusterGroupID: clusterGroupID,
AffinityTimeout: tc.affinityTimeout,
IsExternal: tc.isExternal,
IsNodePort: tc.isNodePort,
IsNested: tc.isNested,
IsDSR: tc.isDSR,
ServiceIP: tc.svcIP,
ServicePort: port,
Protocol: tc.protocol,
TrafficPolicyLocal: tc.trafficPolicyLocal,
LocalGroupID: localGroupID,
ClusterGroupID: clusterGroupID,
AffinityTimeout: tc.affinityTimeout,
IsExternal: tc.isExternal,
IsNodePort: tc.isNodePort,
IsNested: tc.isNested,
IsDSR: tc.isDSR,
LoadBalancerSourceRanges: tc.loadBalancerSourceRanges,
}))
fCacheI, ok := fc.featureService.cachedFlows.Load(cacheKey)
require.True(t, ok)
Expand Down Expand Up @@ -1527,11 +1561,11 @@ func Test_client_GetServiceFlowKeys(t *testing.T) {
assert.NoError(t, fc.InstallEndpointFlows(bindingProtocol, endpoints))
flowKeys := fc.GetServiceFlowKeys(svcIP, svcPort, bindingProtocol, endpoints)
expectedFlowKeys := []string{
"table=11,priority=200,tcp,reg4=0x10000/0x70000,nw_dst=10.96.0.224,tp_dst=80",
"table=11,priority=190,tcp,reg4=0x30000/0x70000,nw_dst=10.96.0.224,tp_dst=80",
"table=12,priority=200,tcp,reg3=0xa0a000b,reg4=0x20050/0x7ffff",
"table=12,priority=200,tcp,reg3=0xa0a000c,reg4=0x20050/0x7ffff",
"table=20,priority=190,ct_state=+new+trk,ip,nw_src=10.10.0.12,nw_dst=10.10.0.12",
"table=12,priority=200,tcp,reg4=0x10000/0x70000,nw_dst=10.96.0.224,tp_dst=80",
"table=12,priority=190,tcp,reg4=0x30000/0x70000,nw_dst=10.96.0.224,tp_dst=80",
"table=13,priority=200,tcp,reg3=0xa0a000b,reg4=0x20050/0x7ffff",
"table=13,priority=200,tcp,reg3=0xa0a000c,reg4=0x20050/0x7ffff",
"table=21,priority=190,ct_state=+new+trk,ip,nw_src=10.10.0.12,nw_dst=10.10.0.12",
}
assert.ElementsMatch(t, expectedFlowKeys, flowKeys)
}
Expand Down Expand Up @@ -2787,8 +2821,8 @@ func Test_client_ReplayFlows(t *testing.T) {
"cookie=0x1020000000000, table=IngressMetric, priority=200,reg0=0x400/0x400,reg3=0xf actions=drop",
)
replayedFlows = append(replayedFlows,
"cookie=0x1020000000000, table=IngressRule, priority=200,conj_id=15 actions=set_field:0xf->reg3,set_field:0x400/0x400->reg0,set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x1b/0xff->reg2,group:4",
"cookie=0x1020000000000, table=IngressDefaultRule, priority=200,reg1=0x64 actions=set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x400000/0x600000->reg0,set_field:0x1c/0xff->reg2,goto_table:Output",
"cookie=0x1020000000000, table=IngressRule, priority=200,conj_id=15 actions=set_field:0xf->reg3,set_field:0x400/0x400->reg0,set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x1c/0xff->reg2,group:4",
"cookie=0x1020000000000, table=IngressDefaultRule, priority=200,reg1=0x64 actions=set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x400000/0x600000->reg0,set_field:0x1d/0xff->reg2,goto_table:Output",
)

// Feature Pod connectivity replays flows.
Expand Down
6 changes: 4 additions & 2 deletions pkg/agent/openflow/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ var (
DispositionPassRegMark = binding.NewRegMark(APDispositionField, DispositionPass)
// reg0[13]: Mark to indicate the packet is a generated reject response packet-out.
GeneratedRejectPacketOutRegMark = binding.NewOneBitRegMark(0, 13)
// reg0[14]: Mark to indicate a Service without any Endpoints (used by Proxy)
SvcNoEpRegMark = binding.NewOneBitRegMark(0, 14)
// reg0[14]: Mark to indicate a Service connection should be rejected.
SvcRejectRegMark = binding.NewOneBitRegMark(0, 14)
// reg0[19]: Mark to indicate remote SNAT for Egress.
RemoteSNATRegMark = binding.NewOneBitRegMark(0, 19)
// reg0[20]: Field to indicate redirect action of layer 7 NetworkPolicy.
Expand Down Expand Up @@ -149,6 +149,8 @@ var (
FromExternalRegMark = binding.NewOneBitRegMark(4, 27)
// reg4[28]: Mark to indicate that whether the traffic's source is a local Pod or the Node.
FromLocalRegMark = binding.NewOneBitRegMark(4, 28)
// reg4[29]: Mark to indicate that whether the LoadBalancer Service traffic's source is included in the Service loadBalancerSourceRanges.
LoadBalancerSourceRangesRegMark = binding.NewOneBitRegMark(4, 29)

// reg5(NXM_NX_REG5)
// Field to cache the Egress conjunction ID hit by TraceFlow packet.
Expand Down
4 changes: 1 addition & 3 deletions pkg/agent/openflow/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func (f *featureService) getRequiredTables() []*Table {
tables := []*Table{
UnSNATTable,
PreRoutingClassifierTable,
ServiceMarkTable,
SessionAffinityTable,
ServiceLBTable,
EndpointDNATTable,
Expand All @@ -263,9 +264,6 @@ func (f *featureService) getRequiredTables() []*Table {
ConntrackCommitTable,
OutputTable,
}
if f.proxyAll {
tables = append(tables, NodePortMarkTable)
}
if f.enableDSR {
tables = append(tables, DSRServiceMarkTable)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/openflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func TestBuildPipeline(t *testing.T) {
},
},
{
name: "K8s Node, IPv4 only, with proxyAll enabled",
name: "K8s Node, IPv4 only, with AntreaProxy enabled",
ipStack: ipv4Only,
features: []feature{
newTestFeaturePodConnectivity(ipStackMap[ipv4Only]),
Expand All @@ -426,7 +426,7 @@ func TestBuildPipeline(t *testing.T) {
ConntrackTable,
ConntrackStateTable,
PreRoutingClassifierTable,
NodePortMarkTable,
ServiceMarkTable,
SessionAffinityTable,
ServiceLBTable,
EndpointDNATTable,
Expand Down
45 changes: 39 additions & 6 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ var (
// Tables in stagePreRouting:
// When proxy is enabled.
PreRoutingClassifierTable = newTable("PreRoutingClassifier", stagePreRouting, pipelineIP)
NodePortMarkTable = newTable("NodePortMark", stagePreRouting, pipelineIP)
ServiceMarkTable = newTable("ServiceMark", stagePreRouting, pipelineIP)
SessionAffinityTable = newTable("SessionAffinity", stagePreRouting, pipelineIP)
ServiceLBTable = newTable("ServiceLB", stagePreRouting, pipelineIP)
DSRServiceMarkTable = newTable("DSRServiceMark", stagePreRouting, pipelineIP)
Expand Down Expand Up @@ -2288,7 +2288,7 @@ func (f *featureService) nodePortMarkFlows() []binding.Flow {
continue
}
flows = append(flows,
NodePortMarkTable.ofTable.BuildFlow(priorityNormal).
ServiceMarkTable.ofTable.BuildFlow(priorityNormal).
Cookie(cookieID).
MatchProtocol(ipProtocol).
MatchDstIP(nodePortAddresses[i]).
Expand All @@ -2298,7 +2298,7 @@ func (f *featureService) nodePortMarkFlows() []binding.Flow {
// This generates the flow for the virtual NodePort DNAT IP. The flow is used to mark the first packet of NodePort
// connection sourced from the Antrea gateway (the connection is performed DNAT with the virtual IP in host netns).
flows = append(flows,
NodePortMarkTable.ofTable.BuildFlow(priorityNormal).
ServiceMarkTable.ofTable.BuildFlow(priorityNormal).
Cookie(cookieID).
MatchProtocol(ipProtocol).
MatchDstIP(f.virtualNodePortDNATIPs[ipProtocol]).
Expand Down Expand Up @@ -2408,7 +2408,12 @@ func (f *featureService) serviceLBFlows(config *types.ServiceConfig) []binding.F
Action().Group(groupID).Done()
}
flows := []binding.Flow{
buildFlow(priorityNormal, config.TrafficPolicyGroupID(), nil),
buildFlow(priorityNormal, config.TrafficPolicyGroupID(), func(b binding.FlowBuilder) binding.FlowBuilder {
if len(config.LoadBalancerSourceRanges) != 0 {
b = b.MatchRegMark(LoadBalancerSourceRangesRegMark)
}
return b
}),
}
if config.IsExternal && config.TrafficPolicyLocal {
// For short-circuiting flow, an extra match condition matching packet from a local Pod or the Node is added.
Expand Down Expand Up @@ -2551,7 +2556,7 @@ func (f *featureService) serviceEndpointGroup(groupID binding.GroupIDType, withS

if len(endpoints) == 0 {
return group.Bucket().Weight(100).
LoadRegMark(SvcNoEpRegMark).
LoadRegMark(SvcRejectRegMark).
ResubmitToTable(EndpointDNATTable.GetID()).
Done()
}
Expand Down Expand Up @@ -3014,7 +3019,7 @@ func (f *featureService) preRoutingClassifierFlows() []binding.Flow {

targetTables := []uint8{SessionAffinityTable.GetID(), ServiceLBTable.GetID()}
if f.proxyAll {
targetTables = append([]uint8{NodePortMarkTable.GetID()}, targetTables...)
targetTables = append([]uint8{ServiceMarkTable.GetID()}, targetTables...)
}
for _, ipProtocol := range f.ipProtocols {
flows = append(flows,
Expand Down Expand Up @@ -3107,6 +3112,34 @@ func (f *featureService) gatewaySNATFlows() []binding.Flow {
return flows
}

func (f *featureService) loadBalancerSourceRangesMarkFlows(config *types.ServiceConfig) []binding.Flow {
cookieID := f.cookieAllocator.Request(f.category).Raw()
protocol := config.Protocol
ingressIP := config.ServiceIP
port := config.ServicePort
var flows []binding.Flow
for _, srcRange := range config.LoadBalancerSourceRanges {
_, srcIPNet, _ := net.ParseCIDR(srcRange)
flows = append(flows, ServiceMarkTable.ofTable.BuildFlow(priorityNormal).
Cookie(cookieID).
MatchProtocol(protocol).
MatchSrcIPNet(*srcIPNet).
MatchDstIP(ingressIP).
MatchDstPort(port, nil).
Action().LoadRegMark(LoadBalancerSourceRangesRegMark).
Done(),
)
}
flows = append(flows, ServiceMarkTable.ofTable.BuildFlow(priorityLow).
Cookie(cookieID).
MatchProtocol(protocol).
MatchDstIP(ingressIP).
MatchDstPort(port, nil).
Action().LoadRegMark(SvcRejectRegMark).
Done())
return flows
}

func getCachedFlowMessages(cache *flowCategoryCache) []*openflow15.FlowMod {
var flows []*openflow15.FlowMod
cache.Range(func(key, value interface{}) bool {
Expand Down
6 changes: 4 additions & 2 deletions pkg/agent/openflow/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func pipelineDefaultFlows(egressTrafficShapingEnabled, externalNodeEnabled, isEn
"cookie=0x1000000000000, table=UnSNAT, priority=0 actions=goto_table:ConntrackZone",
"cookie=0x1000000000000, table=ConntrackZone, priority=0 actions=goto_table:ConntrackState",
"cookie=0x1000000000000, table=ConntrackState, priority=0 actions=goto_table:PreRoutingClassifier",
"cookie=0x1000000000000, table=PreRoutingClassifier, priority=0 actions=goto_table:SessionAffinity",
"cookie=0x1000000000000, table=PreRoutingClassifier, priority=0 actions=goto_table:ServiceMark",
"cookie=0x1000000000000, table=ServiceMark, priority=0 actions=goto_table:SessionAffinity",
"cookie=0x1000000000000, table=SessionAffinity, priority=0 actions=goto_table:ServiceLB",
"cookie=0x1000000000000, table=ServiceLB, priority=0 actions=goto_table:EndpointDNAT",
"cookie=0x1000000000000, table=EndpointDNAT, priority=0 actions=goto_table:AntreaPolicyEgressRule",
Expand Down Expand Up @@ -136,7 +137,8 @@ func pipelineDefaultFlows(egressTrafficShapingEnabled, externalNodeEnabled, isEn
"cookie=0x1000000000000, table=UnSNAT, priority=0 actions=goto_table:ConntrackZone",
"cookie=0x1000000000000, table=ConntrackZone, priority=0 actions=goto_table:ConntrackState",
"cookie=0x1000000000000, table=ConntrackState, priority=0 actions=goto_table:PreRoutingClassifier",
"cookie=0x1000000000000, table=PreRoutingClassifier, priority=0 actions=goto_table:SessionAffinity",
"cookie=0x1000000000000, table=PreRoutingClassifier, priority=0 actions=goto_table:ServiceMark",
"cookie=0x1000000000000, table=ServiceMark, priority=0 actions=goto_table:SessionAffinity",
"cookie=0x1000000000000, table=SessionAffinity, priority=0 actions=goto_table:ServiceLB",
"cookie=0x1000000000000, table=ServiceLB, priority=0 actions=goto_table:EndpointDNAT",
"cookie=0x1000000000000, table=EndpointDNAT, priority=0 actions=goto_table:AntreaPolicyEgressRule",
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/openflow/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func newFeatureService(
func (f *featureService) serviceNoEndpointFlow() binding.Flow {
return EndpointDNATTable.ofTable.BuildFlow(priorityNormal).
Cookie(f.cookieAllocator.Request(f.category).Raw()).
MatchRegMark(SvcNoEpRegMark).
MatchRegMark(SvcRejectRegMark).
Action().SendToController([]byte{uint8(PacketInCategorySvcReject)}, false).
Done()
}
Expand Down
Loading

0 comments on commit 779e504

Please sign in to comment.