From 51ed4b107cc31e2ce55ac56f042bf74dd28fb597 Mon Sep 17 00:00:00 2001 From: Brandon Willett <64801037+brandon-dd@users.noreply.github.com> Date: Fri, 5 Jun 2020 10:17:56 -0400 Subject: [PATCH] Factor out networking impl in latency and bandwidth limit disruptions (#133) --- .golangci.yml | 1 - api/v1beta1/network_limitation.go | 14 + api/v1beta1/zz_generated.deepcopy.go | 7 +- cli/injector/network_latency_inject.go | 3 + cli/injector/network_limitation.go | 1 + cli/injector/network_limitation_clean.go | 5 +- cli/injector/network_limitation_inject.go | 5 + .../chaos.datadoghq.com_disruptions.yaml | 8 + config/samples/chaos_v1beta1_disruption.yaml | 2 +- injector/network_config.go | 222 ++++++++++++++++ injector/network_config_test.go | 247 ++++++++++++++++++ injector/network_latency.go | 187 +------------ injector/network_latency_test.go | 225 +++------------- injector/network_limitation.go | 111 +------- injector/network_limitation_test.go | 100 ++----- 15 files changed, 586 insertions(+), 552 deletions(-) create mode 100644 injector/network_config.go create mode 100644 injector/network_config_test.go diff --git a/.golangci.yml b/.golangci.yml index 0bd074b5b..0acbaa544 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -22,7 +22,6 @@ linters: - bodyclose - depguard - dogsled - - dupl - goconst - godox - gofmt diff --git a/api/v1beta1/network_limitation.go b/api/v1beta1/network_limitation.go index 7fa3f0c41..89aaa1980 100644 --- a/api/v1beta1/network_limitation.go +++ b/api/v1beta1/network_limitation.go @@ -7,6 +7,7 @@ package v1beta1 import ( "strconv" + "strings" chaostypes "github.com/DataDog/chaos-controller/types" "k8s.io/apimachinery/pkg/types" @@ -15,6 +16,10 @@ import ( // NetworkLimitationSpec represents a network bandwidth limitation injection type NetworkLimitationSpec struct { BytesPerSec uint `json:"bytesPerSec"` + // +nullable + Hosts []string `json:"hosts,omitempty"` + // +nullable + Port int `json:"port,omitempty"` } // GenerateArgs generates injection or cleanup pod arguments for the given spec @@ -34,6 +39,13 @@ func (s *NetworkLimitationSpec) GenerateArgs(mode chaostypes.PodMode, uid types. containerID, "--bytes-per-sec", strconv.Itoa(int(s.BytesPerSec)), + "--hosts", + } + + args = append(args, strings.Split(strings.Join(s.Hosts, " --hosts "), " ")...) + + if s.Port != 0 { + args = append(args, "--port", strconv.Itoa(s.Port)) } case chaostypes.PodModeClean: args = []string{ @@ -45,7 +57,9 @@ func (s *NetworkLimitationSpec) GenerateArgs(mode chaostypes.PodMode, uid types. sink, "--container-id", containerID, + "--hosts", } + args = append(args, strings.Split(strings.Join(s.Hosts, " --hosts "), " ")...) } return args diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index 401ce3032..290f26fe9 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -135,7 +135,7 @@ func (in *DisruptionSpec) DeepCopyInto(out *DisruptionSpec) { if in.NetworkLimitation != nil { in, out := &in.NetworkLimitation, &out.NetworkLimitation *out = new(NetworkLimitationSpec) - **out = **in + (*in).DeepCopyInto(*out) } } @@ -212,6 +212,11 @@ func (in *NetworkLatencySpec) DeepCopy() *NetworkLatencySpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NetworkLimitationSpec) DeepCopyInto(out *NetworkLimitationSpec) { *out = *in + if in.Hosts != nil { + in, out := &in.Hosts, &out.Hosts + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NetworkLimitationSpec. diff --git a/cli/injector/network_latency_inject.go b/cli/injector/network_latency_inject.go index eb849b88c..a2b276c87 100644 --- a/cli/injector/network_latency_inject.go +++ b/cli/injector/network_latency_inject.go @@ -20,6 +20,7 @@ var networkLatencyInjectCmd = &cobra.Command{ containerID, _ := cmd.Flags().GetString("container-id") delay, _ := cmd.Flags().GetUint("delay") hosts, _ := cmd.Flags().GetStringSlice("hosts") + port, _ := cmd.Flags().GetInt("port") // prepare container c, err := container.New(containerID) @@ -31,6 +32,7 @@ var networkLatencyInjectCmd = &cobra.Command{ spec := v1beta1.NetworkLatencySpec{ Delay: delay, Hosts: hosts, + Port: port, } // inject @@ -41,5 +43,6 @@ var networkLatencyInjectCmd = &cobra.Command{ func init() { networkLatencyInjectCmd.Flags().Uint("delay", 0, "Delay to add to the given container in ms") + networkLatencyInjectCmd.Flags().Uint("port", 0, "Port to restrict disruption to (0 == all ports)") _ = networkLatencyInjectCmd.MarkFlagRequired("delay") } diff --git a/cli/injector/network_limitation.go b/cli/injector/network_limitation.go index 5cfc12148..02e7735a9 100644 --- a/cli/injector/network_limitation.go +++ b/cli/injector/network_limitation.go @@ -17,5 +17,6 @@ func init() { networkLimitationCmd.AddCommand(networkLimitationInjectCmd) networkLimitationCmd.AddCommand(networkLimitationCleanCmd) networkLimitationCmd.PersistentFlags().String("container-id", "", "ID of the container to inject") + networkLimitationCmd.PersistentFlags().StringSlice("hosts", []string{}, "List of hosts (hostname, single IP or IP block) to apply disruption to. If not specified, the disruption applies to all the outgoing traffic") _ = cobra.MarkFlagRequired(networkLimitationCmd.PersistentFlags(), "container-id") } diff --git a/cli/injector/network_limitation_clean.go b/cli/injector/network_limitation_clean.go index 1f67e4899..3d8f60baa 100644 --- a/cli/injector/network_limitation_clean.go +++ b/cli/injector/network_limitation_clean.go @@ -18,6 +18,7 @@ var networkLimitationCleanCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { uid, _ := cmd.Flags().GetString("uid") containerID, _ := cmd.Flags().GetString("container-id") + hosts, _ := cmd.Flags().GetStringSlice("hosts") // prepare container c, err := container.New(containerID) @@ -26,7 +27,9 @@ var networkLimitationCleanCmd = &cobra.Command{ } // prepare spec - spec := v1beta1.NetworkLimitationSpec{} + spec := v1beta1.NetworkLimitationSpec{ + Hosts: hosts, + } // clean i := injector.NewNetworkLimitationInjector(uid, spec, c, log, ms) diff --git a/cli/injector/network_limitation_inject.go b/cli/injector/network_limitation_inject.go index 3b66b4fe6..94afde5ab 100644 --- a/cli/injector/network_limitation_inject.go +++ b/cli/injector/network_limitation_inject.go @@ -19,6 +19,8 @@ var networkLimitationInjectCmd = &cobra.Command{ uid, _ := cmd.Flags().GetString("uid") containerID, _ := cmd.Flags().GetString("container-id") bytesPerSec, _ := cmd.Flags().GetUint("bytes-per-sec") + hosts, _ := cmd.Flags().GetStringSlice("hosts") + port, _ := cmd.Flags().GetInt("port") // prepare container c, err := container.New(containerID) @@ -29,6 +31,8 @@ var networkLimitationInjectCmd = &cobra.Command{ // prepare spec spec := v1beta1.NetworkLimitationSpec{ BytesPerSec: bytesPerSec, + Hosts: hosts, + Port: port, } // inject @@ -39,5 +43,6 @@ var networkLimitationInjectCmd = &cobra.Command{ func init() { networkLimitationInjectCmd.Flags().Uint("bytes-per-sec", 1000000000, "Bytes per second to limit bandwidth to") + networkLimitationInjectCmd.Flags().Uint("port", 0, "Port to restrict disruption to (0 == all ports)") _ = networkLimitationInjectCmd.MarkFlagRequired("bytes-per-sec") } diff --git a/config/crd/bases/chaos.datadoghq.com_disruptions.yaml b/config/crd/bases/chaos.datadoghq.com_disruptions.yaml index f46c8e4cf..64ff19506 100644 --- a/config/crd/bases/chaos.datadoghq.com_disruptions.yaml +++ b/config/crd/bases/chaos.datadoghq.com_disruptions.yaml @@ -89,6 +89,14 @@ spec: properties: bytesPerSec: type: integer + hosts: + items: + type: string + nullable: true + type: array + port: + nullable: true + type: integer required: - bytesPerSec type: object diff --git a/config/samples/chaos_v1beta1_disruption.yaml b/config/samples/chaos_v1beta1_disruption.yaml index 0835dd5a6..cd788999c 100644 --- a/config/samples/chaos_v1beta1_disruption.yaml +++ b/config/samples/chaos_v1beta1_disruption.yaml @@ -27,5 +27,5 @@ spec: nodeFailure: # node kernel panic or shutdown shutdown: true # shutdown the host instead of triggering a stack dump cpuPressure: {} # cpu load generator - networkLimitation: + networkLimitation: # output bandwidth limit bytesPerSec: 100000 # 100kbps, visible but not backbreaking diff --git a/injector/network_config.go b/injector/network_config.go new file mode 100644 index 000000000..5840e5924 --- /dev/null +++ b/injector/network_config.go @@ -0,0 +1,222 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2020 Datadog, Inc. + +package injector + +import ( + "fmt" + "net" + "time" + + "github.com/DataDog/chaos-controller/network" + "go.uber.org/zap" +) + +// linkOperation represents an operation on a single network interface, possibly a subset of traffic +// based on the 2nd param (parent) +type linkOperation func(network.NetlinkLink, string) error + +// NetworkDisruptionConfig provides an interface for using the network traffic controller for new disruptions +type NetworkDisruptionConfig interface { + AddLatency(hosts []string, port int, delay time.Duration) + AddOutputLimit(hosts []string, port int, bytesPerSec uint) + ClearAllQdiscs(hosts []string) +} + +// NetworkDisruptionConfigStruct contains all needed drivers to create a network disruption using `tc` +type NetworkDisruptionConfigStruct struct { + Log *zap.SugaredLogger + TrafficController network.TrafficController + NetlinkAdapter network.NetlinkAdapter + DNSClient network.DNSClient +} + +// NewNetworkDisruptionConfig creates a new network disruption object using default netlink, dns, etc +// if any non-default drivers are needed (for example in unit tests), just make a NetworkDisruptionConfigStruct +func NewNetworkDisruptionConfig(logger *zap.SugaredLogger) NetworkDisruptionConfig { + return NetworkDisruptionConfigStruct{ + Log: logger, + TrafficController: network.NewTrafficController(logger), + NetlinkAdapter: network.NewNetlinkAdapter(), + DNSClient: network.NewDNSClient(), + } +} + +func (c NetworkDisruptionConfigStruct) getInterfacesByIP(hosts []string) (map[string][]*net.IPNet, error) { + linkByIP := map[string][]*net.IPNet{} + + if len(hosts) > 0 { + c.Log.Info("auto-detecting interfaces to apply disruption to...") + // resolve hosts + ips, err := resolveHosts(c.DNSClient, hosts) + if err != nil { + return nil, fmt.Errorf("can't resolve given hosts: %w", err) + } + + // get the association between IP and interfaces to know + // which interfaces we have to inject disruption to + for _, ip := range ips { + // get routes for resolved destination IP + routes, err := c.NetlinkAdapter.RoutesForIP(ip) + if err != nil { + return nil, fmt.Errorf("can't get route for IP %s: %w", ip.String(), err) + } + + // for each route, get the related interface and add it to the association + // between interfaces and IPs + for _, route := range routes { + c.Log.Infof("IP %s belongs to interface %s", ip.String(), route.Link().Name()) + + // store association, initialize the map entry if not present yet + if _, ok := linkByIP[route.Link().Name()]; !ok { + linkByIP[route.Link().Name()] = []*net.IPNet{} + } + + linkByIP[route.Link().Name()] = append(linkByIP[route.Link().Name()], ip) + } + } + } else { + c.Log.Info("no hosts specified, all interfaces will be impacted") + + // prepare links/IP association by pre-creating links + links, err := c.NetlinkAdapter.LinkList() + if err != nil { + c.Log.Fatalf("can't list links: %w", err) + } + for _, link := range links { + c.Log.Infof("adding interface %s", link.Name()) + linkByIP[link.Name()] = []*net.IPNet{} + } + } + + return linkByIP, nil +} + +func (c NetworkDisruptionConfigStruct) addOperation(hosts []string, port int, operation linkOperation) { + c.Log.Info("auto-detecting interfaces to apply disruption to...") + + parent := "root" + + linkByIP, err := c.getInterfacesByIP(hosts) + if err != nil { + c.Log.Fatalw("can't get interfaces per IP listing: %w", err) + } + + // for each link/ip association, add disruption + for linkName, ips := range linkByIP { + clearTxQlen := false + + // retrieve link from name + link, err := c.NetlinkAdapter.LinkByName(linkName) + if err != nil { + c.Log.Fatalf("can't retrieve link %s: %w", linkName, err) + } + + // if at least one IP has been specified, we need to create a prio qdisc to be able to apply + // a filter and a delay only on traffic going to those IP + if len(ips) > 0 { + // set the tx qlen if not already set as it is required to create a prio qdisc without dropping + // all the outgoing traffic + // this qlen will be removed once the injection is done if it was not present before + if link.TxQLen() == 0 { + c.Log.Infof("setting tx qlen for interface %s", link.Name()) + + clearTxQlen = true + + if err := link.SetTxQLen(1000); err != nil { + c.Log.Fatalf("can't set tx queue length on interface %s: %w", link.Name(), err) + } + } + + // create a new qdisc for the given interface of type prio with 4 bands instead of 3 + // we keep the default priomap, the extra band will be used to filter traffic going to the specified IP + // we only create this qdisc if we want to target traffic going to some hosts only, it avoids to add delay to + // all the outgoing traffic + parent = "1:4" + priomap := [16]uint32{1, 2, 2, 2, 1, 2, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1} + + if err := c.TrafficController.AddPrio(link.Name(), "root", 1, 4, priomap); err != nil { + c.Log.Fatalf("can't create a new qdisc for interface %s: %w", link.Name(), err) + } + } + + // add delay + if err := operation(link, parent); err != nil { + c.Log.Fatalf("could not perform operation on newly created qdisc for interface %s: %w", link.Name(), err) + } + + // if only some hosts/ports are targeted, redirect the traffic to the extra band created earlier + // where the delay is applied + if len(ips) > 0 { + for _, ip := range ips { + if err := c.TrafficController.AddFilter(link.Name(), "1:0", 0, ip, port, "1:4"); err != nil { + c.Log.Fatalf("can't add a filter to interface %s: %w", link.Name(), err) + } + } + } + + // reset the interface transmission queue length once filters have been created + if clearTxQlen { + c.Log.Infof("clearing tx qlen for interface %s", link.Name()) + + if err := link.SetTxQLen(0); err != nil { + c.Log.Fatalf("can't clear %s link transmission queue length: %w", link.Name(), err) + } + } + } +} + +// AddLatency adds a network latency disruption using the drivers in the NetworkDisruptionConfigStruct +func (c NetworkDisruptionConfigStruct) AddLatency(hosts []string, port int, delay time.Duration) { + // closure which adds latency + operation := func(link network.NetlinkLink, parent string) error { + return c.TrafficController.AddDelay(link.Name(), parent, 0, delay) + } + + c.addOperation(hosts, port, operation) +} + +// AddOutputLimit adds a network bandwidth disruption using the drivers in the NetworkDisruptionConfigStruct +func (c NetworkDisruptionConfigStruct) AddOutputLimit(hosts []string, port int, bytesPerSec uint) { + // closure which adds a bandwidth limit + operation := func(link network.NetlinkLink, parent string) error { + return c.TrafficController.AddOutputLimit(link.Name(), parent, 0, bytesPerSec) + } + + c.addOperation(hosts, port, operation) +} + +// ClearAllQdiscs removes all disruptions by clearing all custom qdiscs created for the given config struct +func (c NetworkDisruptionConfigStruct) ClearAllQdiscs(hosts []string) { + linkByIP, err := c.getInterfacesByIP(hosts) + if err != nil { + c.Log.Fatalf("can't get interfaces per IP map: %w", err) + } + + for linkName := range linkByIP { + c.Log.Infof("clearing root qdisc for interface %s", linkName) + + // retrieve link from name + link, err := c.NetlinkAdapter.LinkByName(linkName) + if err != nil { + c.Log.Fatalf("can't retrieve link %s: %w", linkName, err) + } + + // ensure qdisc isn't cleared before clearing it to avoid any tc error + cleared, err := c.TrafficController.IsQdiscCleared(link.Name()) + if err != nil { + c.Log.Fatalf("can't ensure the %s link qdisc is cleared or not: %w", link.Name(), err) + } + + // clear link qdisc if needed + if !cleared { + if err := c.TrafficController.ClearQdisc(link.Name()); err != nil { + c.Log.Fatalf("can't delete the %s link qdisc: %w", link.Name(), err) + } + } else { + c.Log.Infof("%s link qdisc is already cleared, skipping", link.Name()) + } + } +} diff --git a/injector/network_config_test.go b/injector/network_config_test.go new file mode 100644 index 000000000..9028ae661 --- /dev/null +++ b/injector/network_config_test.go @@ -0,0 +1,247 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2020 Datadog, Inc. + +package injector_test + +import ( + "net" + "time" + + . "github.com/onsi/ginkgo" + "github.com/stretchr/testify/mock" + + . "github.com/DataDog/chaos-controller/injector" + "github.com/DataDog/chaos-controller/network" +) + +// tc +type fakeTc struct { + mock.Mock +} + +func (f *fakeTc) AddDelay(iface string, parent string, handle uint32, delay time.Duration) error { + args := f.Called(iface, parent, handle, delay) + return args.Error(0) +} +func (f *fakeTc) AddPrio(iface string, parent string, handle uint32, bands uint32, priomap [16]uint32) error { + args := f.Called(iface, parent, handle, bands, priomap) + return args.Error(0) +} +func (f *fakeTc) AddFilter(iface string, parent string, handle uint32, ip *net.IPNet, port int, flowid string) error { + args := f.Called(iface, parent, handle, ip.String(), port, flowid) + return args.Error(0) +} +func (f *fakeTc) AddOutputLimit(iface string, parent string, handle uint32, bytesPerSec uint) error { + args := f.Called(iface, parent, handle, bytesPerSec) + return args.Error(0) +} +func (f *fakeTc) ClearQdisc(iface string) error { + args := f.Called(iface) + return args.Error(0) +} +func (f *fakeTc) IsQdiscCleared(iface string) (bool, error) { + args := f.Called(iface) + return args.Bool(0), args.Error(1) +} + +// netlink +type fakeNetlinkAdapter struct { + mock.Mock +} + +func (f *fakeNetlinkAdapter) LinkList() ([]network.NetlinkLink, error) { + args := f.Called() + return args.Get(0).([]network.NetlinkLink), args.Error(1) +} +func (f *fakeNetlinkAdapter) LinkByIndex(index int) (network.NetlinkLink, error) { + args := f.Called(index) + return args.Get(0).(network.NetlinkLink), args.Error(1) +} +func (f *fakeNetlinkAdapter) LinkByName(name string) (network.NetlinkLink, error) { + args := f.Called(name) + return args.Get(0).(network.NetlinkLink), args.Error(1) +} +func (f *fakeNetlinkAdapter) RoutesForIP(ip *net.IPNet) ([]network.NetlinkRoute, error) { + args := f.Called(ip.String()) + return args.Get(0).([]network.NetlinkRoute), args.Error(1) +} + +type fakeNetlinkLink struct { + mock.Mock +} + +func (f *fakeNetlinkLink) Name() string { + args := f.Called() + return args.String(0) +} +func (f *fakeNetlinkLink) SetTxQLen(qlen int) error { + args := f.Called(qlen) + return args.Error(0) +} +func (f *fakeNetlinkLink) TxQLen() int { + args := f.Called() + return args.Int(0) +} + +type fakeNetlinkRoute struct { + mock.Mock +} + +func (f *fakeNetlinkRoute) Link() network.NetlinkLink { + args := f.Called() + return args.Get(0).(network.NetlinkLink) +} + +var _ = Describe("Tc", func() { + var ( + config NetworkDisruptionConfig + tc fakeTc + tcIsQdiscClearedCall *mock.Call + nl fakeNetlinkAdapter + nllink1, nllink2 *fakeNetlinkLink + nllink1TxQlenCall, nllink2TxQlenCall *mock.Call + nlroute1, nlroute2 *fakeNetlinkRoute + ) + + BeforeEach(func() { + // tc + tc = fakeTc{} + tc.On("AddDelay", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + tc.On("AddPrio", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + tc.On("AddFilter", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + tc.On("AddOutputLimit", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + tc.On("ClearQdisc", mock.Anything).Return(nil) + tcIsQdiscClearedCall = tc.On("IsQdiscCleared", mock.Anything).Return(false, nil) + + // netlink + nllink1 = &fakeNetlinkLink{} + nllink1.On("Name").Return("lo") + nllink1.On("SetTxQLen", mock.Anything).Return(nil) + nllink1TxQlenCall = nllink1.On("TxQLen").Return(0) + nllink2 = &fakeNetlinkLink{} + nllink2.On("Name").Return("eth0") + nllink2.On("SetTxQLen", mock.Anything).Return(nil) + nllink2TxQlenCall = nllink2.On("TxQLen").Return(0) + + nlroute1 = &fakeNetlinkRoute{} + nlroute1.On("Link").Return(nllink1) + nlroute2 = &fakeNetlinkRoute{} + nlroute2.On("Link").Return(nllink2) + + nl = fakeNetlinkAdapter{} + nl.On("LinkList").Return([]network.NetlinkLink{nllink1, nllink2}, nil) + nl.On("LinkByIndex", 0).Return(nllink1, nil) + nl.On("LinkByIndex", 1).Return(nllink2, nil) + nl.On("LinkByName", "lo").Return(nllink1, nil) + nl.On("LinkByName", "eth0").Return(nllink2, nil) + nl.On("RoutesForIP", mock.Anything).Return([]network.NetlinkRoute{nlroute1, nlroute2}, nil) + }) + + JustBeforeEach(func() { + config = NetworkDisruptionConfigStruct{ + Log: log, + TrafficController: &tc, + NetlinkAdapter: &nl, + DNSClient: nil, + } + }) + + Describe("addOperation", func() { + + Context("with no host specified", func() { + JustBeforeEach(func() { + // + // NOTE: We're using `AddLatency` here just to make sure the chain is called + // since addOperation is private, but any would work + // + config.AddLatency([]string{}, 0, time.Second) + }) + It("should not set or clear the interface qlen", func() { + nllink1.AssertNumberOfCalls(GinkgoT(), "SetTxQLen", 0) + nllink2.AssertNumberOfCalls(GinkgoT(), "SetTxQLen", 0) + }) + }) + + Context("with multiple hosts specified and interface without qlen", func() { + BeforeEach(func() { + config.AddLatency([]string{"1.1.1.1", "2.2.2.2"}, 80, time.Second) + }) + It("should set and clear the interface qlen", func() { + nllink1.AssertCalled(GinkgoT(), "SetTxQLen", 1000) + nllink1.AssertCalled(GinkgoT(), "SetTxQLen", 0) + nllink2.AssertCalled(GinkgoT(), "SetTxQLen", 1000) + nllink2.AssertCalled(GinkgoT(), "SetTxQLen", 0) + }) + It("should create a prio qdisc on both interfaces", func() { + tc.AssertCalled(GinkgoT(), "AddPrio", "lo", "root", uint32(1), uint32(4), mock.Anything) + tc.AssertCalled(GinkgoT(), "AddPrio", "eth0", "root", uint32(1), uint32(4), mock.Anything) + }) + It("should add a filter to redirect traffic on delayed band", func() { + tc.AssertCalled(GinkgoT(), "AddFilter", "lo", "1:0", mock.Anything, "1.1.1.1/32", 80, "1:4") + tc.AssertCalled(GinkgoT(), "AddFilter", "lo", "1:0", mock.Anything, "2.2.2.2/32", 80, "1:4") + tc.AssertCalled(GinkgoT(), "AddFilter", "eth0", "1:0", mock.Anything, "1.1.1.1/32", 80, "1:4") + tc.AssertCalled(GinkgoT(), "AddFilter", "eth0", "1:0", mock.Anything, "2.2.2.2/32", 80, "1:4") + }) + }) + + Context("with multiple hosts specified and interfaces with qlen", func() { + BeforeEach(func() { + nllink1TxQlenCall.Return(1000) + nllink2TxQlenCall.Return(1000) + config.AddLatency([]string{"1.1.1.1", "2.2.2.2"}, 0, time.Second) + }) + It("should not set and clear the interface qlen", func() { + nllink1.AssertNumberOfCalls(GinkgoT(), "SetTxQLen", 0) + nllink2.AssertNumberOfCalls(GinkgoT(), "SetTxQLen", 0) + }) + }) + }) + + Describe("ClearAllQdiscs", func() { + JustBeforeEach(func() { + config.ClearAllQdiscs([]string{}) + }) + + Context("with a non-cleared qdisc", func() { + It("should clear the interfaces qdisc", func() { + tc.AssertCalled(GinkgoT(), "ClearQdisc", "lo") + tc.AssertCalled(GinkgoT(), "ClearQdisc", "eth0") + }) + }) + + Context("with an already cleared qdisc", func() { + BeforeEach(func() { + tcIsQdiscClearedCall.Return(true, nil) + }) + It("should not clear the interfaces qdisc", func() { + tc.AssertNotCalled(GinkgoT(), "ClearQdisc", "lo") + tc.AssertNotCalled(GinkgoT(), "ClearQdisc", "eth0") + }) + }) + }) + + Describe("AddLatency", func() { + BeforeEach(func() { + config.AddLatency([]string{}, 0, time.Second) + }) + + It("should add delay to the interfaces root qdisc", func() { + tc.AssertCalled(GinkgoT(), "AddDelay", "lo", "root", mock.Anything, time.Second) + tc.AssertCalled(GinkgoT(), "AddDelay", "eth0", "root", mock.Anything, time.Second) + }) + }) + + Describe("AddOutputLimit", func() { + BeforeEach(func() { + config.AddOutputLimit([]string{}, 0, uint(12345)) + }) + + It("should add delay to the interfaces root qdisc", func() { + tc.AssertCalled(GinkgoT(), "AddOutputLimit", "lo", "root", mock.Anything, uint(12345)) + tc.AssertCalled(GinkgoT(), "AddOutputLimit", "eth0", "root", mock.Anything, uint(12345)) + }) + }) + +}) diff --git a/injector/network_latency.go b/injector/network_latency.go index d7927bde8..394d2a98d 100644 --- a/injector/network_latency.go +++ b/injector/network_latency.go @@ -6,14 +6,11 @@ package injector import ( - "fmt" - "net" "time" "github.com/DataDog/chaos-controller/api/v1beta1" "github.com/DataDog/chaos-controller/container" "github.com/DataDog/chaos-controller/metrics" - "github.com/DataDog/chaos-controller/network" "github.com/DataDog/chaos-controller/types" "go.uber.org/zap" ) @@ -22,40 +19,17 @@ import ( type networkLatencyInjector struct { containerInjector spec v1beta1.NetworkLatencySpec - config NetworkLatencyInjectorConfig -} - -// NetworkLatencyInjectorConfig contains needed drivers to create -// a NetworkLatencyInjector -type NetworkLatencyInjectorConfig struct { - TrafficController network.TrafficController - NetlinkAdapter network.NetlinkAdapter - DNSClient network.DNSClient + config NetworkDisruptionConfig } // NewNetworkLatencyInjector creates a NetworkLatencyInjector object with the default drivers func NewNetworkLatencyInjector(uid string, spec v1beta1.NetworkLatencySpec, ctn container.Container, log *zap.SugaredLogger, ms metrics.Sink) Injector { - return NewNetworkLatencyInjectorWithConfig(uid, spec, ctn, log, ms, NetworkLatencyInjectorConfig{}) + return NewNetworkLatencyInjectorWithConfig(uid, spec, ctn, log, ms, NewNetworkDisruptionConfig(log)) } // NewNetworkLatencyInjectorWithConfig creates a NetworkLatencyInjector object with the given config, // missing fields being initialized with the defaults -func NewNetworkLatencyInjectorWithConfig(uid string, spec v1beta1.NetworkLatencySpec, ctn container.Container, log *zap.SugaredLogger, ms metrics.Sink, config NetworkLatencyInjectorConfig) Injector { - // traffic controller - if config.TrafficController == nil { - config.TrafficController = network.NewTrafficController(log) - } - - // netlink adapter - if config.NetlinkAdapter == nil { - config.NetlinkAdapter = network.NewNetlinkAdapter() - } - - // dns resolver - if config.DNSClient == nil { - config.DNSClient = network.NewDNSClient() - } - +func NewNetworkLatencyInjectorWithConfig(uid string, spec v1beta1.NetworkLatencySpec, ctn container.Container, log *zap.SugaredLogger, ms metrics.Sink, config NetworkDisruptionConfig) Injector { return networkLatencyInjector{ containerInjector: containerInjector{ injector: injector{ @@ -71,56 +45,6 @@ func NewNetworkLatencyInjectorWithConfig(uid string, spec v1beta1.NetworkLatency } } -func (i networkLatencyInjector) getInterfacesByIP() (map[string][]*net.IPNet, error) { - linkByIP := map[string][]*net.IPNet{} - - if len(i.spec.Hosts) > 0 { - i.log.Info("auto-detecting interfaces to apply latency to...") - // resolve hosts - ips, err := resolveHosts(i.config.DNSClient, i.spec.Hosts) - if err != nil { - return nil, fmt.Errorf("can't resolve given hosts: %w", err) - } - - // get the association between IP and interfaces to know - // which interfaces we have to inject latency to - for _, ip := range ips { - // get routes for resolved destination IP - routes, err := i.config.NetlinkAdapter.RoutesForIP(ip) - if err != nil { - return nil, fmt.Errorf("can't get route for IP %s: %w", ip.String(), err) - } - - // for each route, get the related interface and add it to the association - // between interfaces and IPs - for _, route := range routes { - i.log.Infof("IP %s belongs to interface %s", ip.String(), route.Link().Name()) - - // store association, initialize the map entry if not present yet - if _, ok := linkByIP[route.Link().Name()]; !ok { - linkByIP[route.Link().Name()] = []*net.IPNet{} - } - - linkByIP[route.Link().Name()] = append(linkByIP[route.Link().Name()], ip) - } - } - } else { - i.log.Info("no hosts specified, all interfaces will be impacted") - - // prepare links/IP association by pre-creating links - links, err := i.config.NetlinkAdapter.LinkList() - if err != nil { - i.log.Fatalf("can't list links: %w", err) - } - for _, link := range links { - i.log.Infof("adding interface %s", link.Name()) - linkByIP[link.Name()] = []*net.IPNet{} - } - } - - return linkByIP, nil -} - // Inject injects network latency according to the current spec func (i networkLatencyInjector) Inject() { var err error @@ -132,9 +56,6 @@ func (i networkLatencyInjector) Inject() { i.handleMetricSinkError(i.ms.MetricInjected(i.container.ID(), i.uid, err == nil, i.kind, []string{})) }() - delay := time.Duration(i.spec.Delay) * time.Millisecond - parent := "root" - // enter container network namespace err = i.container.EnterNetworkNamespace() if err != nil { @@ -149,77 +70,11 @@ func (i networkLatencyInjector) Inject() { } }() - i.log.Info("auto-detecting interfaces to apply latency to...") - - linkByIP, err := i.getInterfacesByIP() - if err != nil { - i.log.Fatalw("can't get interfaces per IP listing: %w", err) - } - - // for each link/ip association, add latency - for linkName, ips := range linkByIP { - clearTxQlen := false - - // retrieve link from name - link, err := i.config.NetlinkAdapter.LinkByName(linkName) - if err != nil { - i.log.Fatalf("can't retrieve link %s: %w", linkName, err) - } - - // if at least one IP has been specified, we need to create a prio qdisc to be able to apply - // a filter and a delay only on traffic going to those IP - if len(ips) > 0 { - // set the tx qlen if not already set as it is required to create a prio qdisc without dropping - // all the outgoing traffic - // this qlen will be removed once the injection is done if it was not present before - if link.TxQLen() == 0 { - i.log.Infof("setting tx qlen for interface %s", link.Name()) - - clearTxQlen = true - - if err := link.SetTxQLen(1000); err != nil { - i.log.Fatalf("can't set tx queue length on interface %s: %w", link.Name(), err) - } - } - - // create a new qdisc for the given interface of type prio with 4 bands instead of 3 - // we keep the default priomap, the extra band will be used to filter traffic going to the specified IP - // we only create this qdisc if we want to target traffic going to some hosts only, it avoids to add delay to - // all the outgoing traffic - parent = "1:4" - priomap := [16]uint32{1, 2, 2, 2, 1, 2, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1} - - if err := i.config.TrafficController.AddPrio(link.Name(), "root", 1, 4, priomap); err != nil { - i.log.Fatalf("can't create a new qdisc for interface %s: %w", link.Name(), err) - } - } - - // add delay - if err := i.config.TrafficController.AddDelay(link.Name(), parent, 0, delay); err != nil { - i.log.Fatalf("can't add delay to the newly created qdisc for interface %s: %w", link.Name(), err) - } - - // if only some hosts/ports are targeted, redirect the traffic to the extra band created earlier - // where the delay is applied - port := i.spec.Port - - if len(ips) > 0 { - for _, ip := range ips { - if err := i.config.TrafficController.AddFilter(link.Name(), "1:0", 0, ip, port, "1:4"); err != nil { - i.log.Fatalf("can't add a filter to interface %s: %w", link.Name(), err) - } - } - } + delay := time.Duration(i.spec.Delay) * time.Millisecond - // reset the interface transmission queue length once filters have been created - if clearTxQlen { - i.log.Infof("clearing tx qlen for interface %s", link.Name()) + i.config.AddLatency(i.spec.Hosts, i.spec.Port, delay) - if err := link.SetTxQLen(0); err != nil { - i.log.Fatalf("can't clear %s link transmission queue length: %w", link.Name(), err) - } - } - } + i.log.Info("successfully injected latency of %s millseconds to pod", delay) } // Clean cleans the injected latency @@ -247,33 +102,7 @@ func (i networkLatencyInjector) Clean() { } }() - linkByIP, err := i.getInterfacesByIP() - if err != nil { - i.log.Fatalf("can't get interfaces per IP map: %w", err) - } - - for linkName := range linkByIP { - i.log.Infof("clearing root qdisc for interface %s", linkName) - - // retrieve link from name - link, err := i.config.NetlinkAdapter.LinkByName(linkName) - if err != nil { - i.log.Fatalf("can't retrieve link %s: %w", linkName, err) - } + i.config.ClearAllQdiscs(i.spec.Hosts) - // ensure qdisc isn't cleared before clearing it to avoid any tc error - cleared, err := i.config.TrafficController.IsQdiscCleared(link.Name()) - if err != nil { - i.log.Fatalf("can't ensure the %s link qdisc is cleared or not: %w", link.Name(), err) - } - - // clear link qdisc if needed - if !cleared { - if err := i.config.TrafficController.ClearQdisc(link.Name()); err != nil { - i.log.Fatalf("can't delete the %s link qdisc: %w", link.Name(), err) - } - } else { - i.log.Infof("%s link qdisc is already cleared, skipping", link.Name()) - } - } + i.log.Info("successfully cleared injected network latency") } diff --git a/injector/network_latency_test.go b/injector/network_latency_test.go index 75a244658..7625992ee 100644 --- a/injector/network_latency_test.go +++ b/injector/network_latency_test.go @@ -6,7 +6,6 @@ package injector_test import ( - "net" "time" . "github.com/onsi/ginkgo" @@ -15,99 +14,31 @@ import ( "github.com/DataDog/chaos-controller/api/v1beta1" . "github.com/DataDog/chaos-controller/injector" - "github.com/DataDog/chaos-controller/network" ) -// tc -type fakeTc struct { +type fakeNetworkConfig struct { mock.Mock } -func (f *fakeTc) AddDelay(iface string, parent string, handle uint32, delay time.Duration) error { - args := f.Called(iface, parent, handle, delay) - return args.Error(0) +func (f *fakeNetworkConfig) AddOutputLimit(hosts []string, port int, bytesPerSec uint) { + f.Called(hosts, port, bytesPerSec) + return } -func (f *fakeTc) AddPrio(iface string, parent string, handle uint32, bands uint32, priomap [16]uint32) error { - args := f.Called(iface, parent, handle, bands, priomap) - return args.Error(0) +func (f *fakeNetworkConfig) AddLatency(hosts []string, port int, delay time.Duration) { + f.Called(hosts, port, delay) + return } -func (f *fakeTc) AddFilter(iface string, parent string, handle uint32, ip *net.IPNet, port int, flowid string) error { - args := f.Called(iface, parent, handle, ip.String(), port, flowid) - return args.Error(0) -} -func (f *fakeTc) AddOutputLimit(iface string, parent string, handle uint32, bytesPerSec uint) error { - args := f.Called(iface, parent, handle, bytesPerSec) - return args.Error(0) -} -func (f *fakeTc) ClearQdisc(iface string) error { - args := f.Called(iface) - return args.Error(0) -} -func (f *fakeTc) IsQdiscCleared(iface string) (bool, error) { - args := f.Called(iface) - return args.Bool(0), args.Error(1) -} - -// netlink -type fakeNetlinkAdapter struct { - mock.Mock -} - -func (f *fakeNetlinkAdapter) LinkList() ([]network.NetlinkLink, error) { - args := f.Called() - return args.Get(0).([]network.NetlinkLink), args.Error(1) -} -func (f *fakeNetlinkAdapter) LinkByIndex(index int) (network.NetlinkLink, error) { - args := f.Called(index) - return args.Get(0).(network.NetlinkLink), args.Error(1) -} -func (f *fakeNetlinkAdapter) LinkByName(name string) (network.NetlinkLink, error) { - args := f.Called(name) - return args.Get(0).(network.NetlinkLink), args.Error(1) -} -func (f *fakeNetlinkAdapter) RoutesForIP(ip *net.IPNet) ([]network.NetlinkRoute, error) { - args := f.Called(ip.String()) - return args.Get(0).([]network.NetlinkRoute), args.Error(1) +func (f *fakeNetworkConfig) ClearAllQdiscs(hosts []string) { + f.Called(hosts) + return } -type fakeNetlinkLink struct { - mock.Mock -} - -func (f *fakeNetlinkLink) Name() string { - args := f.Called() - return args.String(0) -} -func (f *fakeNetlinkLink) SetTxQLen(qlen int) error { - args := f.Called(qlen) - return args.Error(0) -} -func (f *fakeNetlinkLink) TxQLen() int { - args := f.Called() - return args.Int(0) -} - -type fakeNetlinkRoute struct { - mock.Mock -} - -func (f *fakeNetlinkRoute) Link() network.NetlinkLink { - args := f.Called() - return args.Get(0).(network.NetlinkLink) -} - -var _ = Describe("Tc", func() { +var _ = Describe("Latency", func() { var ( - c fakeContainer - inj Injector - config NetworkLatencyInjectorConfig - spec v1beta1.NetworkLatencySpec - tc fakeTc - tcIsQdiscClearedCall *mock.Call - nl fakeNetlinkAdapter - nllink1, nllink2 *fakeNetlinkLink - nllink1TxQlenCall, nllink2TxQlenCall *mock.Call - nlroute1, nlroute2 *fakeNetlinkRoute + c fakeContainer + inj Injector + config fakeNetworkConfig + spec v1beta1.NetworkLatencySpec ) BeforeEach(func() { @@ -116,48 +47,21 @@ var _ = Describe("Tc", func() { c.On("EnterNetworkNamespace").Return(nil) c.On("ExitNetworkNamespace").Return(nil) - // tc - tc = fakeTc{} - tc.On("AddDelay", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - tc.On("AddPrio", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - tc.On("AddFilter", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - tc.On("ClearQdisc", mock.Anything).Return(nil) - tcIsQdiscClearedCall = tc.On("IsQdiscCleared", mock.Anything).Return(false, nil) - - // netlink - nllink1 = &fakeNetlinkLink{} - nllink1.On("Name").Return("lo") - nllink1.On("SetTxQLen", mock.Anything).Return(nil) - nllink1TxQlenCall = nllink1.On("TxQLen").Return(0) - nllink2 = &fakeNetlinkLink{} - nllink2.On("Name").Return("eth0") - nllink2.On("SetTxQLen", mock.Anything).Return(nil) - nllink2TxQlenCall = nllink2.On("TxQLen").Return(0) - - nlroute1 = &fakeNetlinkRoute{} - nlroute1.On("Link").Return(nllink1) - nlroute2 = &fakeNetlinkRoute{} - nlroute2.On("Link").Return(nllink2) - - nl = fakeNetlinkAdapter{} - nl.On("LinkList").Return([]network.NetlinkLink{nllink1, nllink2}, nil) - nl.On("LinkByIndex", 0).Return(nllink1, nil) - nl.On("LinkByIndex", 1).Return(nllink2, nil) - nl.On("LinkByName", "lo").Return(nllink1, nil) - nl.On("LinkByName", "eth0").Return(nllink2, nil) - nl.On("RoutesForIP", mock.Anything).Return([]network.NetlinkRoute{nlroute1, nlroute2}, nil) + // network disruption conf + config = fakeNetworkConfig{} + config.On("AddLatency", mock.Anything, mock.Anything, mock.Anything).Return(nil) + config.On("AddOutputLimit", mock.Anything, mock.Anything, mock.Anything).Return(nil) + config.On("ClearAllQdiscs", mock.Anything).Return(nil) spec = v1beta1.NetworkLatencySpec{ + Hosts: []string{"testhost"}, + Port: 22, Delay: 1000, } - config = NetworkLatencyInjectorConfig{ - TrafficController: &tc, - NetlinkAdapter: &nl, - } }) JustBeforeEach(func() { - inj = NewNetworkLatencyInjectorWithConfig("fake", spec, &c, log, ms, config) + inj = NewNetworkLatencyInjectorWithConfig("fake", spec, &c, log, ms, &config) }) Describe("inj.Inject", func() { @@ -165,59 +69,14 @@ var _ = Describe("Tc", func() { inj.Inject() }) - Context("with no host specified", func() { - It("should enter and exit the container network namespace", func() { - Expect(c.AssertCalled(GinkgoT(), "EnterNetworkNamespace")).To(BeTrue()) - Expect(c.AssertCalled(GinkgoT(), "ExitNetworkNamespace")).To(BeTrue()) - }) - It("should not set or clear the interface qlen", func() { - nllink1.AssertNumberOfCalls(GinkgoT(), "SetTxQLen", 0) - nllink2.AssertNumberOfCalls(GinkgoT(), "SetTxQLen", 0) - }) - It("should add delay to the interfaces root qdisc", func() { - tc.AssertCalled(GinkgoT(), "AddDelay", "lo", "root", mock.Anything, time.Second) - tc.AssertCalled(GinkgoT(), "AddDelay", "eth0", "root", mock.Anything, time.Second) - }) + It("should enter and exit the container network namespace", func() { + Expect(c.AssertCalled(GinkgoT(), "EnterNetworkNamespace")).To(BeTrue()) + Expect(c.AssertCalled(GinkgoT(), "ExitNetworkNamespace")).To(BeTrue()) }) - Context("with multiple hosts specified and interface without qlen", func() { - BeforeEach(func() { - spec.Hosts = []string{"1.1.1.1", "2.2.2.2"} - spec.Port = 80 - }) - - It("should set and clear the interface qlen", func() { - nllink1.AssertCalled(GinkgoT(), "SetTxQLen", 1000) - nllink1.AssertCalled(GinkgoT(), "SetTxQLen", 0) - nllink2.AssertCalled(GinkgoT(), "SetTxQLen", 1000) - nllink2.AssertCalled(GinkgoT(), "SetTxQLen", 0) - }) - It("should create a prio qdisc on both interfaces", func() { - tc.AssertCalled(GinkgoT(), "AddPrio", "lo", "root", uint32(1), uint32(4), mock.Anything) - tc.AssertCalled(GinkgoT(), "AddPrio", "eth0", "root", uint32(1), uint32(4), mock.Anything) - }) - It("should add latency on both interfaces prio qdisc", func() { - tc.AssertCalled(GinkgoT(), "AddDelay", "lo", "1:4", mock.Anything, time.Second) - tc.AssertCalled(GinkgoT(), "AddDelay", "eth0", "1:4", mock.Anything, time.Second) - }) - It("should add a filter to redirect traffic on delayed band", func() { - tc.AssertCalled(GinkgoT(), "AddFilter", "lo", "1:0", mock.Anything, "1.1.1.1/32", 80, "1:4") - tc.AssertCalled(GinkgoT(), "AddFilter", "lo", "1:0", mock.Anything, "2.2.2.2/32", 80, "1:4") - tc.AssertCalled(GinkgoT(), "AddFilter", "eth0", "1:0", mock.Anything, "1.1.1.1/32", 80, "1:4") - tc.AssertCalled(GinkgoT(), "AddFilter", "eth0", "1:0", mock.Anything, "2.2.2.2/32", 80, "1:4") - }) - }) - - Context("with multiple hosts specified and interfaces with qlen", func() { - BeforeEach(func() { - spec.Hosts = []string{"1.1.1.1", "2.2.2.2"} - nllink1TxQlenCall.Return(1000) - nllink2TxQlenCall.Return(1000) - }) - It("should not set and clear the interface qlen", func() { - nllink1.AssertNumberOfCalls(GinkgoT(), "SetTxQLen", 0) - nllink2.AssertNumberOfCalls(GinkgoT(), "SetTxQLen", 0) - }) + It("should call AddLatency on its network disruption config", func() { + delay_ms := time.Duration(spec.Delay) * time.Millisecond + Expect(config.AssertCalled(GinkgoT(), "AddLatency", spec.Hosts, spec.Port, delay_ms)).To(BeTrue()) }) Describe("inj.Clean", func() { @@ -225,29 +84,13 @@ var _ = Describe("Tc", func() { inj.Clean() }) - Context("with a non-cleared qdisc", func() { - It("should enter and exit the container network namespace", func() { - Expect(c.AssertCalled(GinkgoT(), "EnterNetworkNamespace")).To(BeTrue()) - Expect(c.AssertCalled(GinkgoT(), "ExitNetworkNamespace")).To(BeTrue()) - }) - It("should clear the interfaces qdisc", func() { - tc.AssertCalled(GinkgoT(), "ClearQdisc", "lo") - tc.AssertCalled(GinkgoT(), "ClearQdisc", "eth0") - }) + It("should enter and exit the container network namespace", func() { + Expect(c.AssertCalled(GinkgoT(), "EnterNetworkNamespace")).To(BeTrue()) + Expect(c.AssertCalled(GinkgoT(), "ExitNetworkNamespace")).To(BeTrue()) }) - Context("with an already cleared qdisc", func() { - BeforeEach(func() { - tcIsQdiscClearedCall.Return(true, nil) - }) - It("should enter and exit the container network namespace", func() { - Expect(c.AssertCalled(GinkgoT(), "EnterNetworkNamespace")).To(BeTrue()) - Expect(c.AssertCalled(GinkgoT(), "ExitNetworkNamespace")).To(BeTrue()) - }) - It("should not clear the interfaces qdisc", func() { - tc.AssertNotCalled(GinkgoT(), "ClearQdisc", "lo") - tc.AssertNotCalled(GinkgoT(), "ClearQdisc", "eth0") - }) + It("should call ClearAllQdiscs on its network disruption config", func() { + Expect(config.AssertCalled(GinkgoT(), "ClearAllQdiscs", spec.Hosts)).To(BeTrue()) }) }) }) diff --git a/injector/network_limitation.go b/injector/network_limitation.go index c387d39eb..ac761e778 100644 --- a/injector/network_limitation.go +++ b/injector/network_limitation.go @@ -6,12 +6,9 @@ package injector import ( - "net" - "github.com/DataDog/chaos-controller/api/v1beta1" "github.com/DataDog/chaos-controller/container" "github.com/DataDog/chaos-controller/metrics" - "github.com/DataDog/chaos-controller/network" "github.com/DataDog/chaos-controller/types" "go.uber.org/zap" ) @@ -20,40 +17,17 @@ import ( type networkLimitationInjector struct { containerInjector spec v1beta1.NetworkLimitationSpec - config NetworkLimitationInjectorConfig -} - -// NetworkLimitationInjectorConfig contains needed drivers to create -// a NetworkLimitationInjector -type NetworkLimitationInjectorConfig struct { - TrafficController network.TrafficController - NetlinkAdapter network.NetlinkAdapter - DNSClient network.DNSClient + config NetworkDisruptionConfig } // NewNetworkLimitationInjector creates a NetworkLimitationInjector object with the default drivers func NewNetworkLimitationInjector(uid string, spec v1beta1.NetworkLimitationSpec, ctn container.Container, log *zap.SugaredLogger, ms metrics.Sink) Injector { - return NewNetworkLimitationInjectorWithConfig(uid, spec, ctn, log, ms, NetworkLimitationInjectorConfig{}) + return NewNetworkLimitationInjectorWithConfig(uid, spec, ctn, log, ms, NewNetworkDisruptionConfig(log)) } // NewNetworkLimitationInjectorWithConfig creates a NetworkLimitationInjector object with the given config, // missing fields being initialized with the defaults -func NewNetworkLimitationInjectorWithConfig(uid string, spec v1beta1.NetworkLimitationSpec, ctn container.Container, log *zap.SugaredLogger, ms metrics.Sink, config NetworkLimitationInjectorConfig) Injector { - // traffic controller - if config.TrafficController == nil { - config.TrafficController = network.NewTrafficController(log) - } - - // netlink adapter - if config.NetlinkAdapter == nil { - config.NetlinkAdapter = network.NewNetlinkAdapter() - } - - // dns resolver - if config.DNSClient == nil { - config.DNSClient = network.NewDNSClient() - } - +func NewNetworkLimitationInjectorWithConfig(uid string, spec v1beta1.NetworkLimitationSpec, ctn container.Container, log *zap.SugaredLogger, ms metrics.Sink, config NetworkDisruptionConfig) Injector { return networkLimitationInjector{ containerInjector: containerInjector{ injector: injector{ @@ -69,37 +43,11 @@ func NewNetworkLimitationInjectorWithConfig(uid string, spec v1beta1.NetworkLimi } } -func (i networkLimitationInjector) getInterfacesByIP() map[string][]*net.IPNet { - linkByIP := map[string][]*net.IPNet{} - - // this is just copied from `network_limitation.go` - // currently omitted support for specific IPs, Hosts for bandwidth limit - // if we want to support that, probably should factor it out of `network_limitation.go` - - i.log.Info("no hosts specified, all interfaces will be impacted") - - // prepare links/IP association by pre-creating links - links, err := i.config.NetlinkAdapter.LinkList() - if err != nil { - i.log.Fatalf("can't list links: %w", err) - } - - for _, link := range links { - i.log.Infof("adding interface %s", link.Name()) - - linkByIP[link.Name()] = []*net.IPNet{} - } - - return linkByIP -} - // Inject injects network bandwidth limitation according to the current spec func (i networkLimitationInjector) Inject() { var err error - i.log.Info("injecting bandwidth limitation") - - parent := "root" + i.log.Info("injecting bandwidth limitation: %s", i.spec) // handle metrics defer func() { @@ -120,29 +68,9 @@ func (i networkLimitationInjector) Inject() { } }() - i.log.Info("auto-detecting interfaces to apply bandwidth limitation to...") - - linkByIP := i.getInterfacesByIP() - - // for each link/ip association, add bandwidth limitation - for linkName := range linkByIP { - // retrieve link from name - link, err := i.config.NetlinkAdapter.LinkByName(linkName) - if err != nil { - i.log.Fatalf("can't retrieve link %s: %w", linkName, err) - } - - // currently omitted support for specific IPs, Hosts for bandwidth limit - // if we want to support that, probably should factor it out of `network_limitation.go` - - i.log.Info("going to add bandwidth limit of %s bytes per sec now...", i.spec.BytesPerSec) + i.config.AddOutputLimit(i.spec.Hosts, i.spec.Port, i.spec.BytesPerSec) - // add limitation - err2 := i.config.TrafficController.AddOutputLimit(link.Name(), parent, 0, i.spec.BytesPerSec) - if err2 != nil { - i.log.Fatalf("can't add bandwidth limit to the newly created qdisc for interface %s: %w", link.Name(), err2) - } - } + i.log.Info("successfully injected output bandwidth limit of %s bytes/sec to pod", i.spec.BytesPerSec) } // Clean cleans the injected bandwidth limitation @@ -170,30 +98,7 @@ func (i networkLimitationInjector) Clean() { } }() - linkByIP := i.getInterfacesByIP() + i.config.ClearAllQdiscs(i.spec.Hosts) - for linkName := range linkByIP { - i.log.Infof("clearing root qdisc for interface %s", linkName) - - // retrieve link from name - link, err := i.config.NetlinkAdapter.LinkByName(linkName) - if err != nil { - i.log.Fatalf("can't retrieve link %s: %w", linkName, err) - } - - // ensure qdisc isn't cleared before clearing it to avoid any tc error - cleared, err := i.config.TrafficController.IsQdiscCleared(link.Name()) - if err != nil { - i.log.Fatalf("can't ensure the %s link qdisc is cleared or not: %w", link.Name(), err) - } - - // clear link qdisc if needed - if !cleared { - if err := i.config.TrafficController.ClearQdisc(link.Name()); err != nil { - i.log.Fatalf("can't delete the %s link qdisc: %w", link.Name(), err) - } - } else { - i.log.Infof("%s link qdisc is already cleared, skipping", link.Name()) - } - } + i.log.Info("successfully cleared injected bandwidth limit") } diff --git a/injector/network_limitation_test.go b/injector/network_limitation_test.go index 87cf366d7..02aebd71a 100644 --- a/injector/network_limitation_test.go +++ b/injector/network_limitation_test.go @@ -12,24 +12,16 @@ import ( "github.com/DataDog/chaos-controller/api/v1beta1" . "github.com/DataDog/chaos-controller/injector" - "github.com/DataDog/chaos-controller/network" ) -// -// no need to define `fakeTc`, `fakeNetlinkLink`, etc -- defined (same package) in `network_latency_test.go` -// +// fakeNetworkConfig mock implementation is already defined in `network_latency_test.go` -var _ = Describe("Tc", func() { +var _ = Describe("Limitation", func() { var ( - c fakeContainer - inj Injector - config NetworkLimitationInjectorConfig - spec v1beta1.NetworkLimitationSpec - tc fakeTc - tcIsQdiscClearedCall *mock.Call - nl fakeNetlinkAdapter - nllink1, nllink2 *fakeNetlinkLink - nlroute1, nlroute2 *fakeNetlinkRoute + c fakeContainer + inj Injector + config fakeNetworkConfig + spec v1beta1.NetworkLimitationSpec ) BeforeEach(func() { @@ -38,44 +30,20 @@ var _ = Describe("Tc", func() { c.On("EnterNetworkNamespace").Return(nil) c.On("ExitNetworkNamespace").Return(nil) - // tc - tc = fakeTc{} - tc.On("AddOutputLimit", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - tc.On("ClearQdisc", mock.Anything).Return(nil) - tcIsQdiscClearedCall = tc.On("IsQdiscCleared", mock.Anything).Return(false, nil) - - // netlink - nllink1 = &fakeNetlinkLink{} - nllink1.On("Name").Return("lo") - nllink1.On("SetTxQLen", mock.Anything).Return(nil) - nllink2 = &fakeNetlinkLink{} - nllink2.On("Name").Return("eth0") - nllink2.On("SetTxQLen", mock.Anything).Return(nil) - - nlroute1 = &fakeNetlinkRoute{} - nlroute1.On("Link").Return(nllink1) - nlroute2 = &fakeNetlinkRoute{} - nlroute2.On("Link").Return(nllink2) - - nl = fakeNetlinkAdapter{} - nl.On("LinkList").Return([]network.NetlinkLink{nllink1, nllink2}, nil) - nl.On("LinkByIndex", 0).Return(nllink1, nil) - nl.On("LinkByIndex", 1).Return(nllink2, nil) - nl.On("LinkByName", "lo").Return(nllink1, nil) - nl.On("LinkByName", "eth0").Return(nllink2, nil) - nl.On("RoutesForIP", mock.Anything).Return([]network.NetlinkRoute{nlroute1, nlroute2}, nil) + config = fakeNetworkConfig{} + config.On("AddLatency", mock.Anything, mock.Anything, mock.Anything).Return(nil) + config.On("AddOutputLimit", mock.Anything, mock.Anything, mock.Anything).Return(nil) + config.On("ClearAllQdiscs", mock.Anything).Return(nil) spec = v1beta1.NetworkLimitationSpec{ + Hosts: []string{"testhost"}, + Port: 22, BytesPerSec: 12345, } - config = NetworkLimitationInjectorConfig{ - TrafficController: &tc, - NetlinkAdapter: &nl, - } }) JustBeforeEach(func() { - inj = NewNetworkLimitationInjectorWithConfig("fake", spec, &c, log, ms, config) + inj = NewNetworkLimitationInjectorWithConfig("fake", spec, &c, log, ms, &config) }) Describe("inj.Inject", func() { @@ -83,15 +51,13 @@ var _ = Describe("Tc", func() { inj.Inject() }) - Context("with no host specified", func() { - It("should enter and exit the container network namespace", func() { - Expect(c.AssertCalled(GinkgoT(), "EnterNetworkNamespace")).To(BeTrue()) - Expect(c.AssertCalled(GinkgoT(), "ExitNetworkNamespace")).To(BeTrue()) - }) - It("should add output limit to the interfaces root qdisc", func() { - tc.AssertCalled(GinkgoT(), "AddOutputLimit", "lo", "root", mock.Anything, uint(12345)) - tc.AssertCalled(GinkgoT(), "AddOutputLimit", "eth0", "root", mock.Anything, uint(12345)) - }) + It("should enter and exit the container network namespace", func() { + Expect(c.AssertCalled(GinkgoT(), "EnterNetworkNamespace")).To(BeTrue()) + Expect(c.AssertCalled(GinkgoT(), "ExitNetworkNamespace")).To(BeTrue()) + }) + + It("should call AddOutputLimit on its network disruption config", func() { + Expect(config.AssertCalled(GinkgoT(), "AddOutputLimit", spec.Hosts, spec.Port, spec.BytesPerSec)).To(BeTrue()) }) Describe("inj.Clean", func() { @@ -99,29 +65,13 @@ var _ = Describe("Tc", func() { inj.Clean() }) - Context("with a non-cleared qdisc", func() { - It("should enter and exit the container network namespace", func() { - Expect(c.AssertCalled(GinkgoT(), "EnterNetworkNamespace")).To(BeTrue()) - Expect(c.AssertCalled(GinkgoT(), "ExitNetworkNamespace")).To(BeTrue()) - }) - It("should clear the interfaces qdisc", func() { - tc.AssertCalled(GinkgoT(), "ClearQdisc", "lo") - tc.AssertCalled(GinkgoT(), "ClearQdisc", "eth0") - }) + It("should enter and exit the container network namespace", func() { + Expect(c.AssertCalled(GinkgoT(), "EnterNetworkNamespace")).To(BeTrue()) + Expect(c.AssertCalled(GinkgoT(), "ExitNetworkNamespace")).To(BeTrue()) }) - Context("with an already cleared qdisc", func() { - BeforeEach(func() { - tcIsQdiscClearedCall.Return(true, nil) - }) - It("should enter and exit the container network namespace", func() { - Expect(c.AssertCalled(GinkgoT(), "EnterNetworkNamespace")).To(BeTrue()) - Expect(c.AssertCalled(GinkgoT(), "ExitNetworkNamespace")).To(BeTrue()) - }) - It("should not clear the interfaces qdisc", func() { - tc.AssertNotCalled(GinkgoT(), "ClearQdisc", "lo") - tc.AssertNotCalled(GinkgoT(), "ClearQdisc", "eth0") - }) + It("should call ClearAllQdiscs on its network disruption config", func() { + Expect(config.AssertCalled(GinkgoT(), "ClearAllQdiscs", spec.Hosts)).To(BeTrue()) }) }) })