Skip to content

Commit

Permalink
Apply tc rules and filters to all interfaces instead of trying to gue…
Browse files Browse the repository at this point in the history
…ss the one to choose (#286)
  • Loading branch information
Devatoria authored Apr 13, 2021
1 parent 6fa70cb commit 5fcdae2
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 153 deletions.
12 changes: 1 addition & 11 deletions docs/network_disruption_hosts.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,7 @@ When a node is targeted, all interfaces with route table entries to IP addresses
<img src="../docs/img/network_hosts/nodes_label_small.png" height=330 width=600/>
</kbd></p>

The diagrams thus far seem to imply that all network interfaces have a routing table entry for any pod we wish to disrupt. This is a realistic representation for pods where you are unlikely to disrupt pods which do not communicate with the `hosts`. However, for nodes with multiple interfaces, it is conceviable and likely that not all interfaces have routing table entries to the specified `hosts`.

<p align="center"><kbd>
<img src="../docs/img/network_hosts/nodes_limited_routes.png" height=330 width=600/>
</kbd></p>

If no matching route table entries are found across all interfaces on a node, the disruption is applied to a default interface instead.

<p align="center"><kbd>
<img src="../docs/img/network_hosts/nodes_limited_simpler.png" height=330 width=600/>
</kbd></p>
The diagrams thus far seem to imply that all network interfaces have a routing table entry for any pod we wish to disrupt. For nodes with multiple interfaces, it is conceivable and likely that not all interfaces have routing table entries to the specified `hosts`. The `chaos-controller` applies tc rules to all interfaces which it discovers by traversing all routing tables.

### Case 2: Disrupting an entire AZ
Given a label which encompasses all nodes in an Availability Zone, `chaos-controller` can simulate zonal failures for one or more cloud services.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
go.etcd.io/bbolt v1.3.5 // indirect
go.uber.org/zap v1.10.0
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd
gotest.tools/v3 v3.0.2 // indirect
k8s.io/api v0.18.6
k8s.io/apimachinery v0.18.6
Expand Down
161 changes: 69 additions & 92 deletions injector/network_disruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,58 +145,45 @@ func (i networkDisruptionInjector) Clean() error {
// getInterfacesByIP returns the interfaces used to reach the given hosts
// if hosts is empty, all interfaces are returned
func (i *networkDisruptionInjector) getInterfacesByIP(hosts []string) (map[string][]*net.IPNet, error) {
var err error

ips := []*net.IPNet{}
linkByIP := map[string][]*net.IPNet{}

// define the null IP matching all hosts
_, nullIP, _ := net.ParseCIDR("0.0.0.0/0")

if len(hosts) > 0 {
i.config.Log.Infow("auto-detecting used interfaces to reach the given hosts", "hosts", hosts)
i.config.Log.Infow("resolving the given hosts", "hosts", hosts)

// resolve hosts
ips, err := resolveHosts(i.config.DNSClient, hosts)
ips, err = resolveHosts(i.config.DNSClient, hosts)
if err != nil {
return nil, fmt.Errorf("can't resolve given hosts: %w", err)
}

// get the association between IP and interfaces to know
// which interfaces we have to inject disruption to
for _, ip := range ips {
// get routes for resolved destination IP
routes, err := i.config.NetlinkAdapter.RoutesForIP(ip)
if err != nil {
return nil, fmt.Errorf("can't get route for IP %s: %w", ip.String(), err)
}

// for each route, get the related interface and add it to the association
// between interfaces and IPs
for _, route := range routes {
i.config.Log.Infof("IP %s belongs to interface %s", ip.String(), route.Link().Name())

// store association, initialize the map entry if not present yet
if _, ok := linkByIP[route.Link().Name()]; !ok {
linkByIP[route.Link().Name()] = []*net.IPNet{}
}

linkByIP[route.Link().Name()] = append(linkByIP[route.Link().Name()], ip)
}
}
} else {
i.config.Log.Info("no hosts specified, all interfaces will be impacted")
// by default, add the null IP macthing all hosts
ips = append(ips, nullIP)
}

// prepare links/IP association by pre-creating links
links, err := i.config.NetlinkAdapter.LinkList()
if err != nil {
return nil, fmt.Errorf("can't list links: %w", err)
}
// retrieve links used in the routing table
links, err := i.config.NetlinkAdapter.LinkList()
if err != nil {
return nil, fmt.Errorf("can't list links: %w", err)
}

for _, link := range links {
i.config.Log.Infof("adding interface %s", link.Name())
linkByIP[link.Name()] = []*net.IPNet{}
}
// create the interfaces -> IPs association
for _, link := range links {
i.config.Log.Infof("adding interface %s", link.Name())

// explicitly add loopback interface
i.config.Log.Infof("adding loopback interface")
linkByIP["lo"] = []*net.IPNet{}
linkByIP[link.Name()] = ips
}

// explicitly add loopback interface
i.config.Log.Infof("adding loopback interface")

linkByIP["lo"] = ips

return linkByIP, nil
}

Expand Down Expand Up @@ -234,46 +221,56 @@ func (i *networkDisruptionInjector) applyOperations() error {
i.config.Log.Infof("detected default gateway IP %s on interface %s", defaultRoute.Gateway().String(), defaultRoute.Link().Name())

// get the targeted pod node IP from the environment variable
hostIP, ok := os.LookupEnv(env.InjectorTargetPodHostIP)
nodeIP, ok := os.LookupEnv(env.InjectorTargetPodHostIP)
if !ok {
return fmt.Errorf("%s environment variable must be set with the target pod node IP", env.InjectorTargetPodHostIP)
}

i.config.Log.Infof("target pod node IP is %s", hostIP)
i.config.Log.Infof("target pod node IP is %s", nodeIP)

hostIPNet := &net.IPNet{
IP: net.ParseIP(hostIP),
IP: net.ParseIP(nodeIP),
Mask: net.CIDRMask(32, 32),
}

// get routes going to this node IP to add a filter excluding this IP from the disruptions later
// it is used to allow the node to reach the pod even with disruptions applied
hostIPRoutes, err := i.config.NetlinkAdapter.RoutesForIP(hostIPNet)
// get all interfaces with the wildcard/zero route (0.0.0.0/0) which
// can be used in filters to match all destination or source hosts
// "eth0" => [0.0.0.0/0]
// "eth1" => [0.0.0.0/0]
wildcardIPInterfaces, err := i.getInterfacesByIP(nil)
if err != nil {
return fmt.Errorf("error getting network interfaces for wildcard IP: %w", err)
}

// get all interfaces with the node IP
// "eth0" => [192.168.0.1/32]
// "eth1" => [192.168.0.1/32]
nodeIPInterfaces, err := i.getInterfacesByIP([]string{hostIPNet.String()})
if err != nil {
return fmt.Errorf("error getting target pod node IP routes: %w", err)
return fmt.Errorf("error getting target pod node interfaces/IP binding: %w", err)
}

// get the interfaces per IP map looking like:
// get all interfaces with specified hosts in the disruption
// "eth0" => [10.0.0.0/8, 192.168.0.1/32]
// "eth1" => [172.16.0.0/16, ...]
linkByIP, err := i.getInterfacesByIP(i.spec.Hosts)
// "eth1" => [10.0.0.0/8, 192.168.0.1/32]
hostsIPInterfaces, err := i.getInterfacesByIP(i.spec.Hosts)
if err != nil {
return fmt.Errorf("can't get interfaces per IP listing: %w", err)
return fmt.Errorf("can't get interfaces per hosts listing: %w", err)
}

// allow kubelet -> apiserver communications
// resolve the kubernetes.default service created at cluster bootstrap and owning the apiserver cluster IP
apiservers, err := i.getInterfacesByIP([]string{"kubernetes.default"})
apiserverIPInterfaces, err := i.getInterfacesByIP([]string{"kubernetes.default"})
if err != nil {
return fmt.Errorf("error resolving apiservers service IP: %w", err)
}

if len(apiservers) == 0 {
if len(apiserverIPInterfaces) == 0 {
return fmt.Errorf("could not resolve kubernetes.default service IP")
}

// for each link/ip association, add disruption
for linkName, ips := range linkByIP {
for linkName, ips := range hostsIPInterfaces {
// retrieve link from name
link, err := i.config.NetlinkAdapter.LinkByName(linkName)
if err != nil {
Expand Down Expand Up @@ -362,15 +359,8 @@ func (i *networkDisruptionInjector) applyOperations() error {

// if some hosts are targeted, create one filter per host to redirect the traffic to the disrupted band
// otherwise, create a filter redirecting all the traffic (0.0.0.0/0) using the given port and protocol to the disrupted band
if len(ips) > 0 {
for _, ip := range ips {
if err := i.config.TrafficController.AddFilter(link.Name(), "1:0", 0, nil, ip, srcPort, dstPort, i.spec.Protocol, "1:4"); err != nil {
return fmt.Errorf("can't add a filter to interface %s: %w", link.Name(), err)
}
}
} else {
_, nullIP, _ := net.ParseCIDR("0.0.0.0/0")
if err := i.config.TrafficController.AddFilter(link.Name(), "1:0", 0, nil, nullIP, srcPort, dstPort, i.spec.Protocol, "1:4"); err != nil {
for _, ip := range ips {
if err := i.config.TrafficController.AddFilter(link.Name(), "1:0", 0, nil, ip, srcPort, dstPort, i.spec.Protocol, "1:4"); err != nil {
return fmt.Errorf("can't add a filter to interface %s: %w", link.Name(), err)
}
}
Expand All @@ -387,45 +377,36 @@ func (i *networkDisruptionInjector) applyOperations() error {
Mask: net.CIDRMask(32, 32),
}

if _, found := linkByIP[defaultRoute.Link().Name()]; found {
if err := i.config.TrafficController.AddFilter(defaultRoute.Link().Name(), "1:0", 0, nil, gatewayIP, 0, 0, "", "1:1"); err != nil {
return fmt.Errorf("can't add the default route gateway IP filter: %w", err)
}
if err := i.config.TrafficController.AddFilter(defaultRoute.Link().Name(), "1:0", 0, nil, gatewayIP, 0, 0, "", "1:1"); err != nil {
return fmt.Errorf("can't add the default route gateway IP filter: %w", err)
}

// this filter allows the pod to communicate with the node IP
for _, hostIPRoute := range hostIPRoutes {
if _, found := linkByIP[hostIPRoute.Link().Name()]; found {
if err := i.config.TrafficController.AddFilter(hostIPRoute.Link().Name(), "1:0", 0, nil, hostIPNet, 0, 0, "", "1:1"); err != nil {
for linkName, nodeIPs := range nodeIPInterfaces {
for _, hostIP := range nodeIPs {
if err := i.config.TrafficController.AddFilter(linkName, "1:0", 0, nil, hostIP, 0, 0, "", "1:1"); err != nil {
return fmt.Errorf("can't add the target pod node IP filter: %w", err)
}
}
}
} else if i.config.Level == chaostypes.DisruptionLevelNode {
if _, found := linkByIP[defaultRoute.Link().Name()]; found {
for linkName := range wildcardIPInterfaces {
// allow SSH connections (port 22/tcp)
if err := i.config.TrafficController.AddFilter(defaultRoute.Link().Name(), "1:0", 0, nil, nil, 22, 0, "tcp", "1:1"); err != nil {
if err := i.config.TrafficController.AddFilter(linkName, "1:0", 0, nil, nil, 22, 0, "tcp", "1:1"); err != nil {
return fmt.Errorf("error adding filter allowing SSH connections: %w", err)
}

// allow cloud provider health checks (arp)
if err := i.config.TrafficController.AddFilter(defaultRoute.Link().Name(), "1:0", 0, nil, nil, 0, 0, "arp", "1:1"); err != nil {
if err := i.config.TrafficController.AddFilter(linkName, "1:0", 0, nil, nil, 0, 0, "arp", "1:1"); err != nil {
return fmt.Errorf("error adding filter allowing cloud providers health checks (ARP packets): %w", err)
}
}

// allow all communications to this (eventually these) IP
for linkName, apiserverIPs := range apiservers {
if _, found := linkByIP[linkName]; found {
link, err := i.config.NetlinkAdapter.LinkByName(linkName)
if err != nil {
return fmt.Errorf("error getting %s link: %w", linkName, err)
}

for _, ip := range apiserverIPs {
if err := i.config.TrafficController.AddFilter(link.Name(), "1:0", 0, nil, ip, 0, 0, "", "1:1"); err != nil {
return fmt.Errorf("error adding filter allowing apiserver communications: %w", err)
}
for linkName, apiserverIPs := range apiserverIPInterfaces {
for _, apiserverIP := range apiserverIPs {
if err := i.config.TrafficController.AddFilter(linkName, "1:0", 0, nil, apiserverIP, 0, 0, "", "1:1"); err != nil {
return fmt.Errorf("error adding filter allowing apiserver communications: %w", err)
}
}
}
Expand Down Expand Up @@ -456,23 +437,19 @@ func (i *networkDisruptionInjector) addOutputLimitOperation(bytesPerSec uint) {

// clearOperations removes all disruptions by clearing all custom qdiscs created for the given config struct (filters will be deleted as well)
func (i *networkDisruptionInjector) clearOperations() error {
linkByIP, err := i.getInterfacesByIP(i.spec.Hosts)
// get all interfaces
links, err := i.getInterfacesByIP(nil)
if err != nil {
return fmt.Errorf("can't get interfaces per IP map: %w", err)
}

for linkName := range linkByIP {
// clear all interfaces root qdisc so it gets back to default
for linkName := range links {
i.config.Log.Infof("clearing root qdisc for interface %s", linkName)

// retrieve link from name
link, err := i.config.NetlinkAdapter.LinkByName(linkName)
if err != nil {
return fmt.Errorf("can't retrieve link %s: %w", linkName, err)
}

// clear link qdisc if needed
if err := i.config.TrafficController.ClearQdisc(link.Name()); err != nil {
return fmt.Errorf("can't delete the %s link qdisc: %w", link.Name(), err)
if err := i.config.TrafficController.ClearQdisc(linkName); err != nil {
return fmt.Errorf("can't delete the %s link qdisc: %w", linkName, err)
}
}

Expand Down
15 changes: 10 additions & 5 deletions injector/network_disruption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ var _ = Describe("Failure", func() {
nl.On("LinkByName", "lo").Return(nllink1, nil)
nl.On("LinkByName", "eth0").Return(nllink2, nil)
nl.On("LinkByName", "eth1").Return(nllink3, nil)
nl.On("RoutesForIP", "10.0.0.2/32").Return([]network.NetlinkRoute{nlroute2}, nil) // node IP route going through eth0
nl.On("RoutesForIP", "1.1.1.1/32").Return([]network.NetlinkRoute{nlroute2}, nil) // random external route going through eth0
nl.On("RoutesForIP", "2.2.2.2/32").Return([]network.NetlinkRoute{nlroute3}, nil) // random external route going through eth1
nl.On("RoutesForIP", "192.168.0.254/32").Return([]network.NetlinkRoute{nlroute3}, nil) // apiserver route going through eth1
nl.On("DefaultRoute").Return(nlroute2, nil)

// dns
Expand Down Expand Up @@ -208,6 +204,7 @@ var _ = Describe("Failure", func() {
// hosts filtering cases
Context("with no hosts specified", func() {
It("should add a filter to redirect all traffic on main interfaces on the disrupted band", func() {
tc.AssertCalled(GinkgoT(), "AddFilter", "lo", "1:0", mock.Anything, "nil", "0.0.0.0/0", 0, spec.Port, spec.Protocol, "1:4")
tc.AssertCalled(GinkgoT(), "AddFilter", "eth0", "1:0", mock.Anything, "nil", "0.0.0.0/0", 0, spec.Port, spec.Protocol, "1:4")
tc.AssertCalled(GinkgoT(), "AddFilter", "eth1", "1:0", mock.Anything, "nil", "0.0.0.0/0", 0, spec.Port, spec.Protocol, "1:4")
})
Expand All @@ -218,8 +215,12 @@ var _ = Describe("Failure", func() {
spec.Hosts = []string{"1.1.1.1", "2.2.2.2"}
})

It("should add a filter to redirect targeted traffic on related interfaces on the disrupted band filter on given hosts as destination IP", func() {
It("should add a filter to redirect targeted traffic on all interfaces on the disrupted band filter on given hosts as destination IP", func() {
tc.AssertCalled(GinkgoT(), "AddFilter", "lo", "1:0", mock.Anything, "nil", "1.1.1.1/32", 0, spec.Port, spec.Protocol, "1:4")
tc.AssertCalled(GinkgoT(), "AddFilter", "eth0", "1:0", mock.Anything, "nil", "1.1.1.1/32", 0, spec.Port, spec.Protocol, "1:4")
tc.AssertCalled(GinkgoT(), "AddFilter", "eth1", "1:0", mock.Anything, "nil", "1.1.1.1/32", 0, spec.Port, spec.Protocol, "1:4")
tc.AssertCalled(GinkgoT(), "AddFilter", "lo", "1:0", mock.Anything, "nil", "2.2.2.2/32", 0, spec.Port, spec.Protocol, "1:4")
tc.AssertCalled(GinkgoT(), "AddFilter", "eth0", "1:0", mock.Anything, "nil", "2.2.2.2/32", 0, spec.Port, spec.Protocol, "1:4")
tc.AssertCalled(GinkgoT(), "AddFilter", "eth1", "1:0", mock.Anything, "nil", "2.2.2.2/32", 0, spec.Port, spec.Protocol, "1:4")
})
})
Expand All @@ -232,6 +233,7 @@ var _ = Describe("Failure", func() {

It("should add a filter to redirect node IP traffic on a non-disrupted band", func() {
tc.AssertCalled(GinkgoT(), "AddFilter", "eth0", "1:0", mock.Anything, "nil", "10.0.0.2/32", 0, 0, "", "1:1")
tc.AssertCalled(GinkgoT(), "AddFilter", "eth1", "1:0", mock.Anything, "nil", "10.0.0.2/32", 0, 0, "", "1:1")
})
})

Expand All @@ -242,13 +244,16 @@ var _ = Describe("Failure", func() {

It("should add a filter to redirect SSH traffic on a non-disrupted band", func() {
tc.AssertCalled(GinkgoT(), "AddFilter", "eth0", "1:0", mock.Anything, "nil", "nil", 22, 0, "tcp", "1:1")
tc.AssertCalled(GinkgoT(), "AddFilter", "eth1", "1:0", mock.Anything, "nil", "nil", 22, 0, "tcp", "1:1")
})

It("should add a filter to redirect ARP traffic on a non-disrupted band", func() {
tc.AssertCalled(GinkgoT(), "AddFilter", "eth0", "1:0", mock.Anything, "nil", "nil", 0, 0, "arp", "1:1")
tc.AssertCalled(GinkgoT(), "AddFilter", "eth1", "1:0", mock.Anything, "nil", "nil", 0, 0, "arp", "1:1")
})

It("should add a filter to apiserver traffic on a non-disrupted band", func() {
tc.AssertCalled(GinkgoT(), "AddFilter", "eth0", "1:0", mock.Anything, "nil", "192.168.0.254/32", 0, 0, "", "1:1")
tc.AssertCalled(GinkgoT(), "AddFilter", "eth1", "1:0", mock.Anything, "nil", "192.168.0.254/32", 0, 0, "", "1:1")
})
})
Expand Down
Loading

0 comments on commit 5fcdae2

Please sign in to comment.