Skip to content

Commit

Permalink
Clean up unit tests for Openflow client in Agent (#5622)
Browse files Browse the repository at this point in the history
* There is no reason for unit tests to depend on OVSMetersAreSupported()
  (which is platform-dependent), we can just set the client's
  ovsMetersAreSupported field to the desired value.
* Run some tests in both configurations (meters supported / unsupported)
  regardless of platform support. Tests that require Egress traffic shaping are
  always run with ovsMetersAreSupported set to true.
* Add ability to provide a mock OF bridge when creating the test client.

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas authored Nov 2, 2023
1 parent e8f5d93 commit ab94f29
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 249 deletions.
234 changes: 105 additions & 129 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func skipTest(tb testing.TB, skipLinux, skipWindows bool) {
}

type clientOptions struct {
enableOVSMeters bool
enableProxy bool
enableAntreaPolicy bool
enableEgress bool
Expand All @@ -96,6 +97,12 @@ type clientOptions struct {

type clientOptionsFn func(*clientOptions)

func setEnableOVSMeters(v bool) clientOptionsFn {
return func(o *clientOptions) {
o.enableOVSMeters = v
}
}

func enableProxyAll(o *clientOptions) {
o.enableProxy = true
o.proxyAll = true
Expand All @@ -121,9 +128,20 @@ func disableEgress(o *clientOptions) {
}

func enableEgressTrafficShaping(o *clientOptions) {
// traffic shaping requires meter support
o.enableOVSMeters = true
o.enableEgressTrafficShaping = true
}

func setEnableEgressTrafficShaping(v bool) clientOptionsFn {
return func(o *clientOptions) {
if v {
o.enableOVSMeters = true
}
o.enableEgressTrafficShaping = v
}
}

func enableConnectUplinkToBridge(o *clientOptions) {
o.connectUplinkToBridge = true
}
Expand Down Expand Up @@ -345,25 +363,31 @@ func TestConcurrentFlowInstallation(t *testing.T) {

}

func newFakeClient(mockOFEntryOperations *oftest.MockOFEntryOperations,
func newFakeClient(
mockOFEntryOperations *oftest.MockOFEntryOperations,
enableIPv4,
enableIPv6 bool,
nodeType config.NodeType,
trafficEncapMode config.TrafficEncapModeType,
options ...clientOptionsFn) *client {
options ...clientOptionsFn,
) *client {
return newFakeClientWithBridge(mockOFEntryOperations, enableIPv4, enableIPv6, nodeType, trafficEncapMode, nil, options...)
}

func newFakeClientWithBridge(
mockOFEntryOperations *oftest.MockOFEntryOperations,
enableIPv4,
enableIPv6 bool,
nodeType config.NodeType,
trafficEncapMode config.TrafficEncapModeType,
bridge binding.Bridge, // use nil to use the default bridge created with binding.NewOFBridge
options ...clientOptionsFn,
) *client {
// default options
o := &clientOptions{
enableProxy: true,
enableAntreaPolicy: true,
enableEgress: true,
enableEgressTrafficShaping: false,
proxyAll: false,
enableDSR: false,
connectUplinkToBridge: false,
enableMulticast: false,
enableTrafficControl: false,
enableMulticluster: false,
enableL7NetworkPolicy: false,
enableProxy: true,
enableAntreaPolicy: true,
enableEgress: true,
}
for _, fn := range options {
fn(o)
Expand All @@ -388,6 +412,11 @@ func newFakeClient(mockOFEntryOperations *oftest.MockOFEntryOperations,
false,
defaultPacketInRate)

// Meters must be supported to enable Egress traffic shaping.
// For unit tests, we can "force" client.ovsMetersAreSupported to true, even if the platform
// running the unit tests don't actually support meters.
client.ovsMetersAreSupported = o.enableOVSMeters

var egressExceptCIDRs []net.IPNet
var serviceIPv4CIDR, serviceIPv6CIDR *net.IPNet
var nodePortAddressesIPv4, nodePortAddressesIPv6 []net.IP
Expand Down Expand Up @@ -477,8 +506,14 @@ func newFakeClient(mockOFEntryOperations *oftest.MockOFEntryOperations,
client.generatePipelines()
client.realizePipelines()
binding.TableNameCache = getTableNameCache()

// This is needed even when bridge != nil for proper table initialization.
client.bridge.(*binding.OFBridge).SetOFSwitch(ofctrl.NewSwitch(&util.MessageStream{}, GlobalVirtualMAC, client.bridge.(ofctrl.AppInterface), make(chan int), 32776))
client.bridge.(*binding.OFBridge).Initialize()
if bridge != nil {
client.bridge = bridge
}

return client
}

Expand Down Expand Up @@ -1614,16 +1649,7 @@ func Test_client_InstallSNATMarkFlows(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
m := oftest.NewMockOFEntryOperations(ctrl)
var fc *client
if tc.trafficShapingEnabled {
if !OVSMetersAreSupported() {
t.Skipf("Skip test because OVS meters are not supported")
}
fc = newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap, enableEgressTrafficShaping)
fc.featureEgress.enableEgressTrafficShaping = tc.trafficShapingEnabled
} else {
fc = newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap)
}
fc := newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap, setEnableEgressTrafficShaping(tc.trafficShapingEnabled))
defer resetPipelines()

m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1)
Expand Down Expand Up @@ -1681,16 +1707,7 @@ func Test_client_InstallPodSNATFlows(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
m := oftest.NewMockOFEntryOperations(ctrl)
var fc *client
if tc.trafficShapingEnabled {
if !OVSMetersAreSupported() {
t.Skipf("Skip test because OVS meters are not supported")
}
fc = newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap, enableEgressTrafficShaping)
fc.featureEgress.enableEgressTrafficShaping = tc.trafficShapingEnabled
} else {
fc = newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap)
}
fc := newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap, setEnableEgressTrafficShaping(tc.trafficShapingEnabled))
defer resetPipelines()

m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1)
Expand All @@ -1710,9 +1727,6 @@ func Test_client_InstallPodSNATFlows(t *testing.T) {
}

func Test_client_InstallEgressQoS(t *testing.T) {
if !OVSMetersAreSupported() {
t.Skipf("Skip test because OVS meters are not supported")
}
meterID := uint32(100)
meterRate := uint32(100)
meterBurst := uint32(200)
Expand All @@ -1721,10 +1735,7 @@ func Test_client_InstallEgressQoS(t *testing.T) {
ctrl := gomock.NewController(t)
m := oftest.NewMockOFEntryOperations(ctrl)
bridge := ovsoftest.NewMockBridge(ctrl)
fc := newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap, enableEgressTrafficShaping)
fc.bridge = bridge
fc.featureEgress.enableEgressTrafficShaping = true
fc.ovsMetersAreSupported = true
fc := newFakeClientWithBridge(m, true, true, config.K8sNode, config.TrafficEncapModeEncap, bridge, enableEgressTrafficShaping)
defer resetPipelines()

m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1)
Expand Down Expand Up @@ -1895,9 +1906,8 @@ func Test_client_SendTraceflowPacket(t *testing.T) {

func prepareTraceflowFlow(ctrl *gomock.Controller) *client {
m := oftest.NewMockOFEntryOperations(ctrl)
fc := newFakeClient(m, true, false, config.K8sNode, config.TrafficEncapModeEncap)
fc := newFakeClientWithBridge(m, true, false, config.K8sNode, config.TrafficEncapModeEncap, ovsoftest.NewMockBridge(ctrl))
defer resetPipelines()
fc.bridge = ovsoftest.NewMockBridge(ctrl)

m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1)
_, ipCIDR, _ := net.ParseCIDR("192.168.2.30/32")
Expand All @@ -1921,11 +1931,10 @@ func prepareTraceflowFlow(ctrl *gomock.Controller) *client {
}

func prepareSendTraceflowPacket(ctrl *gomock.Controller, success bool) *client {
fc := newFakeClient(nil, true, false, config.K8sNode, config.TrafficEncapModeEncap)
m := ovsoftest.NewMockBridge(ctrl)
fc := newFakeClientWithBridge(nil, true, false, config.K8sNode, config.TrafficEncapModeEncap, m)
defer resetPipelines()

m := ovsoftest.NewMockBridge(ctrl)
fc.bridge = m
bridge := binding.OFBridge{}
m.EXPECT().BuildPacketOut().Return(bridge.BuildPacketOut()).Times(1)
if success {
Expand Down Expand Up @@ -2123,9 +2132,8 @@ func Test_client_SendPacketOut(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockBridge := ovsoftest.NewMockBridge(ctrl)
fc := newFakeClient(nil, true, true, config.K8sNode, config.TrafficEncapModeEncap)
fc := newFakeClientWithBridge(nil, true, true, config.K8sNode, config.TrafficEncapModeEncap, mockBridge)
defer resetPipelines()
fc.bridge = mockBridge

srcMAC := fc.nodeConfig.GatewayConfig.MAC
dstMAC, _ := net.ParseMAC("00:00:10:10:00:66")
Expand Down Expand Up @@ -2296,9 +2304,8 @@ func Test_client_InstallMulticastRemoteReportFlows(t *testing.T) {
func Test_client_SendIGMPQueryPacketOut(t *testing.T) {
ctrl := gomock.NewController(t)
mockBridge := ovsoftest.NewMockBridge(ctrl)
fc := newFakeClient(nil, true, false, config.K8sNode, config.TrafficEncapModeEncap)
fc := newFakeClientWithBridge(nil, true, false, config.K8sNode, config.TrafficEncapModeEncap, mockBridge)
defer resetPipelines()
fc.bridge = mockBridge

srcMAC := fc.nodeConfig.GatewayConfig.MAC
srcIP := fc.nodeConfig.GatewayConfig.IPv4
Expand Down Expand Up @@ -2657,31 +2664,26 @@ func Test_client_InstallMulticlusterClassifierFlows(t *testing.T) {
}

func Test_client_RegisterPacketInHandler(t *testing.T) {
fc := newFakeClient(nil, true, false, config.K8sNode, config.TrafficEncapModeEncap)
defer resetPipelines()
ctrl := gomock.NewController(t)
bridge := ovsoftest.NewMockBridge(ctrl)
fc := newFakeClientWithBridge(nil, true, false, config.K8sNode, config.TrafficEncapModeEncap, bridge)
defer resetPipelines()
bridge.EXPECT().ResumePacket(gomock.Any()).Times(1)
fc.bridge = bridge
fc.ResumePausePacket(nil)
}

func Test_client_ReplayFlows(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
egressTrafficShaping := false
clientOptions := []clientOptionsFn{enableTrafficControl, enableMulticast, enableMulticluster}
if OVSMetersAreSupported() {
egressTrafficShaping = true
clientOptions = append(clientOptions, enableEgressTrafficShaping)
}
fc := newFakeClient(m, true, false, config.K8sNode, config.TrafficEncapModeEncap, clientOptions...)
bridge := ovsoftest.NewMockBridge(ctrl)
clientOptions := []clientOptionsFn{enableTrafficControl, enableMulticast, enableMulticluster, enableEgressTrafficShaping}
fc := newFakeClientWithBridge(m, true, false, config.K8sNode, config.TrafficEncapModeEncap, bridge, clientOptions...)
defer resetPipelines()

expectedFlows := append(pipelineDefaultFlows(egressTrafficShaping, false, true, true), egressInitFlows(true)...)
expectedFlows := append(pipelineDefaultFlows(true /* egressTrafficShapingEnabled */, false /* externalNodeEnabled */, true /* isEncap */, true /* isIPv4 */), egressInitFlows(true)...)
expectedFlows = append(expectedFlows, multicastInitFlows(true)...)
expectedFlows = append(expectedFlows, networkPolicyInitFlows(fc.ovsMetersAreSupported, false, false)...)
expectedFlows = append(expectedFlows, networkPolicyInitFlows(true, false, false)...)
expectedFlows = append(expectedFlows, podConnectivityInitFlows(config.TrafficEncapModeEncap, false, true, true, true)...)
expectedFlows = append(expectedFlows, serviceInitFlows(true, true, false, false)...)

Expand All @@ -2698,16 +2700,10 @@ func Test_client_ReplayFlows(t *testing.T) {
// Feature Egress replays flows.
snatIP := net.ParseIP("192.168.77.100")
addFlowInCache(fc.featureEgress.cachedFlows, "egressFlows", []binding.Flow{fc.featureEgress.snatIPFromTunnelFlow(snatIP, uint32(100))})
if egressTrafficShaping {
replayedFlows = append(replayedFlows,
"cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+trk,ip,tun_dst=192.168.77.100 actions=set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:EgressQoS",
"cookie=0x1040000000000, table=EgressQoS, priority=190 actions=goto_table:L2ForwardingCalc",
)
} else {
replayedFlows = append(replayedFlows,
"cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+new+trk,ip,tun_dst=192.168.77.100 actions=set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
)
}
replayedFlows = append(replayedFlows,
"cookie=0x1040000000000, table=EgressMark, priority=200,ct_state=+trk,ip,tun_dst=192.168.77.100 actions=set_field:0x64/0xff->pkt_mark,set_field:0x20/0xf0->reg0,goto_table:EgressQoS",
"cookie=0x1040000000000, table=EgressQoS, priority=190 actions=goto_table:L2ForwardingCalc",
)
// Feature Multicast replays flows.
podIP := net.ParseIP("10.10.0.66")
podOfPort := uint32(100)
Expand Down Expand Up @@ -2754,21 +2750,10 @@ func Test_client_ReplayFlows(t *testing.T) {
"cookie=0x1020000000000, table=IngressRule, priority=200,reg1=0x64 actions=conjunction(15,2/2)",
"cookie=0x1020000000000, table=IngressMetric, priority=200,reg0=0x400/0x400,reg3=0xf actions=drop",
)
if egressTrafficShaping {
// When egressTrafficShaping is enabled, EgressQoSTable will be initialized, which
// will cause IDs of tables after EgressQoSTable shifted.
// The tableID stored in PacketInTableField need to be added 1:
// set_field:0x1a/0xff->reg2 => set_field:0x1b/0xff->reg2
replayedFlows = append(replayedFlows,
"cookie=0x1020000000000, table=IngressRule, priority=200,conj_id=15 actions=set_field:0xf->reg3,set_field:0x400/0x400->reg0,set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x1b/0xff->reg2,group:4",
"cookie=0x1020000000000, table=IngressDefaultRule, priority=200,reg1=0x64 actions=set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x400000/0x600000->reg0,set_field:0x1c/0xff->reg2,goto_table:Output",
)
} else {
replayedFlows = append(replayedFlows,
"cookie=0x1020000000000, table=IngressRule, priority=200,conj_id=15 actions=set_field:0xf->reg3,set_field:0x400/0x400->reg0,set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x1a/0xff->reg2,group:4",
"cookie=0x1020000000000, table=IngressDefaultRule, priority=200,reg1=0x64 actions=set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x400000/0x600000->reg0,set_field:0x1b/0xff->reg2,goto_table:Output",
)
}
replayedFlows = append(replayedFlows,
"cookie=0x1020000000000, table=IngressRule, priority=200,conj_id=15 actions=set_field:0xf->reg3,set_field:0x400/0x400->reg0,set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x1b/0xff->reg2,group:4",
"cookie=0x1020000000000, table=IngressDefaultRule, priority=200,reg1=0x64 actions=set_field:0x800/0x1800->reg0,set_field:0x2000000/0xfe000000->reg0,set_field:0x400000/0x600000->reg0,set_field:0x1c/0xff->reg2,goto_table:Output",
)

// Feature Pod connectivity replays flows.
podMAC, _ := net.ParseMAC("00:00:10:10:00:66")
Expand Down Expand Up @@ -2816,43 +2801,41 @@ func Test_client_ReplayFlows(t *testing.T) {
actualFlows = append(actualFlows, flowStrings...)
}).Return(nil).AnyTimes()

if fc.ovsMetersAreSupported {
// Use mock for the unit test on meter is because the current implementation in ofnet does not support
// sending Meter modification message in a bundle message. The "Add" meter actions is dependent on a valid
// connection to OVS, hence it may return timeout error without a mock which is possibly block the comparation
// on the flows and groups.
bridge := ovsoftest.NewMockBridge(ctrl)
fc.bridge = bridge
egressMeterID := uint32(1)
egressMeterRate := uint32(100)
egressMeterBurst := uint32(200)
expectNewMeter := func(id, rate, burst uint32, unit ofctrl.MeterFlag, isCached bool) {
meter := ovsoftest.NewMockMeter(ctrl)
meterBuilder := ovsoftest.NewMockMeterBandBuilder(ctrl)
bridge.EXPECT().NewMeter(binding.MeterIDType(id), ofctrl.MeterBurst|unit).Return(meter).Times(1)
meter.EXPECT().MeterBand().Return(meterBuilder).Times(1)
meterBuilder.EXPECT().MeterType(ofctrl.MeterDrop).Return(meterBuilder).Times(1)
meterBuilder.EXPECT().Rate(rate).Return(meterBuilder).Times(1)
meterBuilder.EXPECT().Burst(burst).Return(meterBuilder).Times(1)
meterBuilder.EXPECT().Done().Return(meter).Times(1)
if isCached {
meter.EXPECT().Reset().Times(1)
}
meter.EXPECT().Add().Return(nil).Times(1)
}
expectNewMeter(egressMeterID, egressMeterRate, egressMeterBurst, ofctrl.MeterKbps, true)
egressMeter := fc.genOFMeter(binding.MeterIDType(egressMeterID), ofctrl.MeterBurst|ofctrl.MeterKbps, egressMeterRate, egressMeterBurst)
fc.featureEgress.cachedMeter.Store(egressMeterID, egressMeter)
for _, meterCfg := range []struct {
id binding.MeterIDType
rate uint32
}{
{id: PacketInMeterIDNP, rate: uint32(defaultPacketInRate)},
{id: PacketInMeterIDTF, rate: uint32(defaultPacketInRate)},
{id: PacketInMeterIDDNS, rate: uint32(defaultPacketInRate)},
} {
expectNewMeter(uint32(meterCfg.id), meterCfg.rate, meterCfg.rate*2, ofctrl.MeterPktps, false)
// Use mock for the unit test on meter is because the current implementation in ofnet does not support
// sending Meter modification message in a bundle message. The "Add" meter actions is dependent on a valid
// connection to OVS, hence it may return timeout error without a mock which is possibly block the comparation
// on the flows and groups.
egressMeterID := uint32(1)
egressMeterRate := uint32(100)
egressMeterBurst := uint32(200)
expectNewMeter := func(id, rate, burst uint32, unit ofctrl.MeterFlag, isCached bool) {
meter := ovsoftest.NewMockMeter(ctrl)
meterBuilder := ovsoftest.NewMockMeterBandBuilder(ctrl)
bridge.EXPECT().NewMeter(binding.MeterIDType(id), ofctrl.MeterBurst|unit).Return(meter).Times(1)
meter.EXPECT().MeterBand().Return(meterBuilder).Times(1)
meterBuilder.EXPECT().MeterType(ofctrl.MeterDrop).Return(meterBuilder).Times(1)
meterBuilder.EXPECT().Rate(rate).Return(meterBuilder).Times(1)
meterBuilder.EXPECT().Burst(burst).Return(meterBuilder).Times(1)
meterBuilder.EXPECT().Done().Return(meter).Times(1)
if isCached {
meter.EXPECT().Reset().Times(1)
}
meter.EXPECT().Add().Return(nil).Times(1)
}
// egress QOS meter:
expectNewMeter(egressMeterID, egressMeterRate, egressMeterBurst, ofctrl.MeterKbps, true)
egressMeter := fc.genOFMeter(binding.MeterIDType(egressMeterID), ofctrl.MeterBurst|ofctrl.MeterKbps, egressMeterRate, egressMeterBurst)
fc.featureEgress.cachedMeter.Store(egressMeterID, egressMeter)
// "static" PacketIn meters:
for _, meterCfg := range []struct {
id binding.MeterIDType
rate uint32
}{
{id: PacketInMeterIDNP, rate: uint32(defaultPacketInRate)},
{id: PacketInMeterIDTF, rate: uint32(defaultPacketInRate)},
{id: PacketInMeterIDDNS, rate: uint32(defaultPacketInRate)},
} {
expectNewMeter(uint32(meterCfg.id), meterCfg.rate, meterCfg.rate*2, ofctrl.MeterPktps, false)
}

fc.ReplayFlows()
Expand All @@ -2861,13 +2844,6 @@ func Test_client_ReplayFlows(t *testing.T) {
}

func TestCachedFlowIsDrop(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
fc := newFakeClient(m, true, false, config.K8sNode, config.TrafficEncapModeEncap)
defer resetPipelines()
fc.bridge = ovsoftest.NewMockBridge(ctrl)

_, ipCIDR, _ := net.ParseCIDR("192.168.2.30/32")
flows, err := EgressDefaultTable.ofTable.
BuildFlow(priority100).
Expand Down
Loading

0 comments on commit ab94f29

Please sign in to comment.