Skip to content

Commit

Permalink
Factor out networking impl in latency and bandwidth limit disruptions (
Browse files Browse the repository at this point in the history
  • Loading branch information
brandon-dd authored Jun 5, 2020
1 parent e8aeb21 commit 51ed4b1
Show file tree
Hide file tree
Showing 15 changed files with 586 additions and 552 deletions.
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ linters:
- bodyclose
- depguard
- dogsled
- dupl
- goconst
- godox
- gofmt
Expand Down
14 changes: 14 additions & 0 deletions api/v1beta1/network_limitation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package v1beta1

import (
"strconv"
"strings"

chaostypes "github.com/DataDog/chaos-controller/types"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -15,6 +16,10 @@ import (
// NetworkLimitationSpec represents a network bandwidth limitation injection
type NetworkLimitationSpec struct {
BytesPerSec uint `json:"bytesPerSec"`
// +nullable
Hosts []string `json:"hosts,omitempty"`
// +nullable
Port int `json:"port,omitempty"`
}

// GenerateArgs generates injection or cleanup pod arguments for the given spec
Expand All @@ -34,6 +39,13 @@ func (s *NetworkLimitationSpec) GenerateArgs(mode chaostypes.PodMode, uid types.
containerID,
"--bytes-per-sec",
strconv.Itoa(int(s.BytesPerSec)),
"--hosts",
}

args = append(args, strings.Split(strings.Join(s.Hosts, " --hosts "), " ")...)

if s.Port != 0 {
args = append(args, "--port", strconv.Itoa(s.Port))
}
case chaostypes.PodModeClean:
args = []string{
Expand All @@ -45,7 +57,9 @@ func (s *NetworkLimitationSpec) GenerateArgs(mode chaostypes.PodMode, uid types.
sink,
"--container-id",
containerID,
"--hosts",
}
args = append(args, strings.Split(strings.Join(s.Hosts, " --hosts "), " ")...)
}

return args
Expand Down
7 changes: 6 additions & 1 deletion api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cli/injector/network_latency_inject.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var networkLatencyInjectCmd = &cobra.Command{
containerID, _ := cmd.Flags().GetString("container-id")
delay, _ := cmd.Flags().GetUint("delay")
hosts, _ := cmd.Flags().GetStringSlice("hosts")
port, _ := cmd.Flags().GetInt("port")

// prepare container
c, err := container.New(containerID)
Expand All @@ -31,6 +32,7 @@ var networkLatencyInjectCmd = &cobra.Command{
spec := v1beta1.NetworkLatencySpec{
Delay: delay,
Hosts: hosts,
Port: port,
}

// inject
Expand All @@ -41,5 +43,6 @@ var networkLatencyInjectCmd = &cobra.Command{

func init() {
networkLatencyInjectCmd.Flags().Uint("delay", 0, "Delay to add to the given container in ms")
networkLatencyInjectCmd.Flags().Uint("port", 0, "Port to restrict disruption to (0 == all ports)")
_ = networkLatencyInjectCmd.MarkFlagRequired("delay")
}
1 change: 1 addition & 0 deletions cli/injector/network_limitation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ func init() {
networkLimitationCmd.AddCommand(networkLimitationInjectCmd)
networkLimitationCmd.AddCommand(networkLimitationCleanCmd)
networkLimitationCmd.PersistentFlags().String("container-id", "", "ID of the container to inject")
networkLimitationCmd.PersistentFlags().StringSlice("hosts", []string{}, "List of hosts (hostname, single IP or IP block) to apply disruption to. If not specified, the disruption applies to all the outgoing traffic")
_ = cobra.MarkFlagRequired(networkLimitationCmd.PersistentFlags(), "container-id")
}
5 changes: 4 additions & 1 deletion cli/injector/network_limitation_clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var networkLimitationCleanCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
uid, _ := cmd.Flags().GetString("uid")
containerID, _ := cmd.Flags().GetString("container-id")
hosts, _ := cmd.Flags().GetStringSlice("hosts")

// prepare container
c, err := container.New(containerID)
Expand All @@ -26,7 +27,9 @@ var networkLimitationCleanCmd = &cobra.Command{
}

// prepare spec
spec := v1beta1.NetworkLimitationSpec{}
spec := v1beta1.NetworkLimitationSpec{
Hosts: hosts,
}

// clean
i := injector.NewNetworkLimitationInjector(uid, spec, c, log, ms)
Expand Down
5 changes: 5 additions & 0 deletions cli/injector/network_limitation_inject.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ var networkLimitationInjectCmd = &cobra.Command{
uid, _ := cmd.Flags().GetString("uid")
containerID, _ := cmd.Flags().GetString("container-id")
bytesPerSec, _ := cmd.Flags().GetUint("bytes-per-sec")
hosts, _ := cmd.Flags().GetStringSlice("hosts")
port, _ := cmd.Flags().GetInt("port")

// prepare container
c, err := container.New(containerID)
Expand All @@ -29,6 +31,8 @@ var networkLimitationInjectCmd = &cobra.Command{
// prepare spec
spec := v1beta1.NetworkLimitationSpec{
BytesPerSec: bytesPerSec,
Hosts: hosts,
Port: port,
}

// inject
Expand All @@ -39,5 +43,6 @@ var networkLimitationInjectCmd = &cobra.Command{

func init() {
networkLimitationInjectCmd.Flags().Uint("bytes-per-sec", 1000000000, "Bytes per second to limit bandwidth to")
networkLimitationInjectCmd.Flags().Uint("port", 0, "Port to restrict disruption to (0 == all ports)")
_ = networkLimitationInjectCmd.MarkFlagRequired("bytes-per-sec")
}
8 changes: 8 additions & 0 deletions config/crd/bases/chaos.datadoghq.com_disruptions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ spec:
properties:
bytesPerSec:
type: integer
hosts:
items:
type: string
nullable: true
type: array
port:
nullable: true
type: integer
required:
- bytesPerSec
type: object
Expand Down
2 changes: 1 addition & 1 deletion config/samples/chaos_v1beta1_disruption.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ spec:
nodeFailure: # node kernel panic or shutdown
shutdown: true # shutdown the host instead of triggering a stack dump
cpuPressure: {} # cpu load generator
networkLimitation:
networkLimitation: # output bandwidth limit
bytesPerSec: 100000 # 100kbps, visible but not backbreaking
222 changes: 222 additions & 0 deletions injector/network_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2020 Datadog, Inc.

package injector

import (
"fmt"
"net"
"time"

"github.com/DataDog/chaos-controller/network"
"go.uber.org/zap"
)

// linkOperation represents an operation on a single network interface, possibly a subset of traffic
// based on the 2nd param (parent)
type linkOperation func(network.NetlinkLink, string) error

// NetworkDisruptionConfig provides an interface for using the network traffic controller for new disruptions
type NetworkDisruptionConfig interface {
AddLatency(hosts []string, port int, delay time.Duration)
AddOutputLimit(hosts []string, port int, bytesPerSec uint)
ClearAllQdiscs(hosts []string)
}

// NetworkDisruptionConfigStruct contains all needed drivers to create a network disruption using `tc`
type NetworkDisruptionConfigStruct struct {
Log *zap.SugaredLogger
TrafficController network.TrafficController
NetlinkAdapter network.NetlinkAdapter
DNSClient network.DNSClient
}

// NewNetworkDisruptionConfig creates a new network disruption object using default netlink, dns, etc
// if any non-default drivers are needed (for example in unit tests), just make a NetworkDisruptionConfigStruct
func NewNetworkDisruptionConfig(logger *zap.SugaredLogger) NetworkDisruptionConfig {
return NetworkDisruptionConfigStruct{
Log: logger,
TrafficController: network.NewTrafficController(logger),
NetlinkAdapter: network.NewNetlinkAdapter(),
DNSClient: network.NewDNSClient(),
}
}

func (c NetworkDisruptionConfigStruct) getInterfacesByIP(hosts []string) (map[string][]*net.IPNet, error) {
linkByIP := map[string][]*net.IPNet{}

if len(hosts) > 0 {
c.Log.Info("auto-detecting interfaces to apply disruption to...")
// resolve hosts
ips, err := resolveHosts(c.DNSClient, hosts)
if err != nil {
return nil, fmt.Errorf("can't resolve given hosts: %w", err)
}

// get the association between IP and interfaces to know
// which interfaces we have to inject disruption to
for _, ip := range ips {
// get routes for resolved destination IP
routes, err := c.NetlinkAdapter.RoutesForIP(ip)
if err != nil {
return nil, fmt.Errorf("can't get route for IP %s: %w", ip.String(), err)
}

// for each route, get the related interface and add it to the association
// between interfaces and IPs
for _, route := range routes {
c.Log.Infof("IP %s belongs to interface %s", ip.String(), route.Link().Name())

// store association, initialize the map entry if not present yet
if _, ok := linkByIP[route.Link().Name()]; !ok {
linkByIP[route.Link().Name()] = []*net.IPNet{}
}

linkByIP[route.Link().Name()] = append(linkByIP[route.Link().Name()], ip)
}
}
} else {
c.Log.Info("no hosts specified, all interfaces will be impacted")

// prepare links/IP association by pre-creating links
links, err := c.NetlinkAdapter.LinkList()
if err != nil {
c.Log.Fatalf("can't list links: %w", err)
}
for _, link := range links {
c.Log.Infof("adding interface %s", link.Name())
linkByIP[link.Name()] = []*net.IPNet{}
}
}

return linkByIP, nil
}

func (c NetworkDisruptionConfigStruct) addOperation(hosts []string, port int, operation linkOperation) {
c.Log.Info("auto-detecting interfaces to apply disruption to...")

parent := "root"

linkByIP, err := c.getInterfacesByIP(hosts)
if err != nil {
c.Log.Fatalw("can't get interfaces per IP listing: %w", err)
}

// for each link/ip association, add disruption
for linkName, ips := range linkByIP {
clearTxQlen := false

// retrieve link from name
link, err := c.NetlinkAdapter.LinkByName(linkName)
if err != nil {
c.Log.Fatalf("can't retrieve link %s: %w", linkName, err)
}

// if at least one IP has been specified, we need to create a prio qdisc to be able to apply
// a filter and a delay only on traffic going to those IP
if len(ips) > 0 {
// set the tx qlen if not already set as it is required to create a prio qdisc without dropping
// all the outgoing traffic
// this qlen will be removed once the injection is done if it was not present before
if link.TxQLen() == 0 {
c.Log.Infof("setting tx qlen for interface %s", link.Name())

clearTxQlen = true

if err := link.SetTxQLen(1000); err != nil {
c.Log.Fatalf("can't set tx queue length on interface %s: %w", link.Name(), err)
}
}

// create a new qdisc for the given interface of type prio with 4 bands instead of 3
// we keep the default priomap, the extra band will be used to filter traffic going to the specified IP
// we only create this qdisc if we want to target traffic going to some hosts only, it avoids to add delay to
// all the outgoing traffic
parent = "1:4"
priomap := [16]uint32{1, 2, 2, 2, 1, 2, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1}

if err := c.TrafficController.AddPrio(link.Name(), "root", 1, 4, priomap); err != nil {
c.Log.Fatalf("can't create a new qdisc for interface %s: %w", link.Name(), err)
}
}

// add delay
if err := operation(link, parent); err != nil {
c.Log.Fatalf("could not perform operation on newly created qdisc for interface %s: %w", link.Name(), err)
}

// if only some hosts/ports are targeted, redirect the traffic to the extra band created earlier
// where the delay is applied
if len(ips) > 0 {
for _, ip := range ips {
if err := c.TrafficController.AddFilter(link.Name(), "1:0", 0, ip, port, "1:4"); err != nil {
c.Log.Fatalf("can't add a filter to interface %s: %w", link.Name(), err)
}
}
}

// reset the interface transmission queue length once filters have been created
if clearTxQlen {
c.Log.Infof("clearing tx qlen for interface %s", link.Name())

if err := link.SetTxQLen(0); err != nil {
c.Log.Fatalf("can't clear %s link transmission queue length: %w", link.Name(), err)
}
}
}
}

// AddLatency adds a network latency disruption using the drivers in the NetworkDisruptionConfigStruct
func (c NetworkDisruptionConfigStruct) AddLatency(hosts []string, port int, delay time.Duration) {
// closure which adds latency
operation := func(link network.NetlinkLink, parent string) error {
return c.TrafficController.AddDelay(link.Name(), parent, 0, delay)
}

c.addOperation(hosts, port, operation)
}

// AddOutputLimit adds a network bandwidth disruption using the drivers in the NetworkDisruptionConfigStruct
func (c NetworkDisruptionConfigStruct) AddOutputLimit(hosts []string, port int, bytesPerSec uint) {
// closure which adds a bandwidth limit
operation := func(link network.NetlinkLink, parent string) error {
return c.TrafficController.AddOutputLimit(link.Name(), parent, 0, bytesPerSec)
}

c.addOperation(hosts, port, operation)
}

// ClearAllQdiscs removes all disruptions by clearing all custom qdiscs created for the given config struct
func (c NetworkDisruptionConfigStruct) ClearAllQdiscs(hosts []string) {
linkByIP, err := c.getInterfacesByIP(hosts)
if err != nil {
c.Log.Fatalf("can't get interfaces per IP map: %w", err)
}

for linkName := range linkByIP {
c.Log.Infof("clearing root qdisc for interface %s", linkName)

// retrieve link from name
link, err := c.NetlinkAdapter.LinkByName(linkName)
if err != nil {
c.Log.Fatalf("can't retrieve link %s: %w", linkName, err)
}

// ensure qdisc isn't cleared before clearing it to avoid any tc error
cleared, err := c.TrafficController.IsQdiscCleared(link.Name())
if err != nil {
c.Log.Fatalf("can't ensure the %s link qdisc is cleared or not: %w", link.Name(), err)
}

// clear link qdisc if needed
if !cleared {
if err := c.TrafficController.ClearQdisc(link.Name()); err != nil {
c.Log.Fatalf("can't delete the %s link qdisc: %w", link.Name(), err)
}
} else {
c.Log.Infof("%s link qdisc is already cleared, skipping", link.Name())
}
}
}
Loading

0 comments on commit 51ed4b1

Please sign in to comment.