Skip to content

Commit

Permalink
Add iptables cleanup logic for deleted service when controller is down
Browse files Browse the repository at this point in the history
  • Loading branch information
ssup2 authored and ssup.2 committed Nov 16, 2020
1 parent 9cc2cdf commit 126442c
Show file tree
Hide file tree
Showing 6 changed files with 425 additions and 38 deletions.
224 changes: 209 additions & 15 deletions controllers/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (r *ServiceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
}
logger.WithValues("IPv4", configIPv4Enabled).WithValues("IPv6", configIPv6Enabled).Info("config network stack")

// Get Nodes's pod CIDR
// Get nodes's pod CIDR
node := &corev1.Node{}
if err := r.Client.Get(ctx, types.NamespacedName{Name: configNodeName}, node); err != nil {
logger.Error(err, "failed to get the pod's node info from API server")
Expand All @@ -108,7 +108,23 @@ func (r *ServiceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
logger.WithValues("pod CIDR IPV4", podCIDRIPv4).WithValues("pod CIDR IPv6", podCIDRIPv6).Info("pod CIDR")

// Init iptables
initIptables(logger)
if err := initIptables(logger); err != nil {
logger.Error(err, "failed to init iptables")
os.Exit(1)
}

// Get all services
svcs := &corev1.ServiceList{}
if err := r.Client.List(ctx, svcs, client.InNamespace("")); err != nil {
logger.Error(err, "failed to get all services from API server")
os.Exit(1)
}

// Cleanup iptables for deleted services
if err := cleanupIptables(logger, svcs, podCIDRIPv4, podCIDRIPv6); err != nil {
logger.Error(err, "failed to cleanup iptables")
os.Exit(1)
}
}

// Get service info
Expand All @@ -132,7 +148,7 @@ func (r *ServiceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {

// Delete iptables rules
logger.WithValues("externalIP", oldExternalIP).Info("delete iptables rules")
if err := deleteIptablesRules(logger, req, oldClusterIP, oldExternalIP, podCIDRIPv4, podCIDRIPv6); err != nil {
if err := deleteIptablesRules(logger, &req, oldClusterIP, oldExternalIP, podCIDRIPv4, podCIDRIPv6); err != nil {
return ctrl.Result{}, err
}
}
Expand All @@ -148,7 +164,7 @@ func (r *ServiceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, nil
}

// Create or Delete iptables rules
// Create iptables rules
for _, ingress := range svc.Status.LoadBalancer.Ingress {
clusterIP := svc.Spec.ClusterIP
externalIP := ingress.IP
Expand All @@ -158,7 +174,7 @@ func (r *ServiceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {

// Create iptables rules
logger.WithValues("externalIP", externalIP).Info("create iptables rules")
if err := createIptablesRules(logger, req, clusterIP, externalIP, podCIDRIPv4, podCIDRIPv6); err != nil {
if err := createIptablesRules(logger, &req, clusterIP, externalIP, podCIDRIPv4, podCIDRIPv6); err != nil {
return ctrl.Result{}, err
}
}
Expand All @@ -174,6 +190,8 @@ func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

func initIptables(logger logr.Logger) error {
logger.Info("create iptables chains")

// IPv4
if configIPv4Enabled {
// Create chain in nat table
Expand Down Expand Up @@ -238,10 +256,177 @@ func initIptables(logger logr.Logger) error {
return nil
}

func createIptablesRules(logger logr.Logger, req ctrl.Request, clusterIP, externalIP, podCIDRIPv4, podCIDRIPv6 string) error {
func cleanupIptables(logger logr.Logger, svcs *corev1.ServiceList, podCIDRIPv4, podCIDRIPv6 string) error {
logger.Info("cleanup iptables for deleted services")

// IPv4
if configIPv4Enabled {
// Make up service map
svcMap := make(map[string]*corev1.Service)
for _, svc := range svcs.Items {
if ip.IsIPv4Addr(svc.Spec.ClusterIP) && svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
svcMap[svc.Namespace+"/"+svc.Name] = svc.DeepCopy()
}
}

// Cleanup prerouting chain
preRules, err := iptables.GetRulesIPv4(iptables.TableNAT, ChainNATIPVSPrerouting)
if err != nil {
return err
}
for _, rule := range preRules {
// Get service info from rule and k8s, and delete iptables rules
nsName, src, dest, jump, dnatDest := getSvcInfoFromRule(rule)
svc, ok := svcMap[nsName]
if !ok {
logger.WithValues("rule", rule).Info("there is no service info in k8s. cleanup prerouting chain IPv4 rule")
out, err := iptables.DeleteRuleRawIPv4(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
if err != nil {
logger.Error(err, out)
return err
}
continue
}

// Compare service info and delete iptables rules
for _, ingress := range svc.Status.LoadBalancer.Ingress {
externalIP := ingress.IP + "/32"
if (jump == ChainNATKubeMasquerade && (src == podCIDRIPv4 && dest == externalIP)) ||
(jump == "DNAT" && (src == podCIDRIPv4 && dest == externalIP && dnatDest == svc.Spec.ClusterIP)) {
continue
}
logger.WithValues("rule", rule).Info("service info is diff. cleanup prerouting chain IPv4 rule")
out, err := iptables.DeleteRuleRawIPv4(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
if err != nil {
logger.Error(err, out)
return err
}
}
}

// Cleanup output chain
outRules, err := iptables.GetRulesIPv4(iptables.TableNAT, ChainNATIPVSOutput)
if err != nil {
return err
}
for _, rule := range outRules {
// Get service info from rule and k8s, and delete iptables rules
nsName, _, dest, jump, dnatDest := getSvcInfoFromRule(rule)
svc, ok := svcMap[nsName]
if !ok {
logger.WithValues("rule", rule).Info("there is no service info in k8s. cleanup output chain IPv4 rule")
out, err := iptables.DeleteRuleRawIPv4(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
if err != nil {
logger.Error(err, out)
return err
}
continue
}

// Compare service info and delete diff iptables rules
for _, ingress := range svc.Status.LoadBalancer.Ingress {
externalIP := ingress.IP + "/32"
if (jump == ChainNATKubeMasquerade && dest == externalIP) ||
(jump == "DNAT" && (dest == externalIP && dnatDest == svc.Spec.ClusterIP)) {
continue
}
logger.WithValues("rule", rule).Info("service info is diff. cleanup output chain IPv4 rule")
out, err := iptables.DeleteRuleRawIPv4(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
if err != nil {
logger.Error(err, out)
return err
}
}
}
}
// IPv6
if configIPv6Enabled {
// Make up service map
svcMap := make(map[string]*corev1.Service)
for _, svc := range svcs.Items {
if ip.IsIPv6Addr(svc.Spec.ClusterIP) && svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
svcMap[svc.Namespace+"/"+svc.Name] = svc.DeepCopy()
}
}

// Cleanup prerouting chain
preRules, err := iptables.GetRulesIPv6(iptables.TableNAT, ChainNATIPVSPrerouting)
if err != nil {
return err
}
for _, rule := range preRules {
// Get service info from rule and k8s, and delete iptables rules
nsName, src, dest, jump, dnatDest := getSvcInfoFromRule(rule)
svc, ok := svcMap[nsName]
if !ok {
logger.WithValues("rule", rule).Info("there is no service info in k8s. cleanup prerouting chain IPv6 rule")
out, err := iptables.DeleteRuleRawIPv6(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
if err != nil {
logger.Error(err, out)
return err
}
continue
}

// Compare service info and delete iptables rules
for _, ingress := range svc.Status.LoadBalancer.Ingress {
externalIP := ingress.IP + "/128"
if (jump == ChainNATKubeMasquerade && (src == podCIDRIPv6 && dest == externalIP)) ||
(jump == "DNAT" && (src == podCIDRIPv6 && dest == externalIP && dnatDest == svc.Spec.ClusterIP)) {
continue
}
logger.WithValues("rule", rule).Info("service info is diff. cleanup prerouting chain IPv6 rule")
out, err := iptables.DeleteRuleRawIPv6(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
if err != nil {
logger.Error(err, out)
return err
}
}
}

// Cleanup output
outRules, err := iptables.GetRulesIPv6(iptables.TableNAT, ChainNATIPVSOutput)
if err != nil {
return err
}
for _, rule := range outRules {
// Get service info from rule and k8s, and delete iptables rules
nsName, _, dest, jump, dnatDest := getSvcInfoFromRule(rule)
svc, ok := svcMap[nsName]
if !ok {
logger.WithValues("rule", rule).Info("there is no service info in k8s. cleanup output chain IPv6 rule")
out, err := iptables.DeleteRuleRawIPv6(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
if err != nil {
logger.Error(err, out)
return err
}
continue
}

// Compare service info and delete diff iptables rules
for _, ingress := range svc.Status.LoadBalancer.Ingress {
externalIP := ingress.IP + "/128"
if (jump == ChainNATKubeMasquerade && dest == externalIP) ||
(jump == "DNAT" && (dest == externalIP && dnatDest == svc.Spec.ClusterIP)) {
continue
}
logger.WithValues("rule", rule).Info("service info is diff. cleanup output chain IPv6 rule")
out, err := iptables.DeleteRuleRawIPv6(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
if err != nil {
logger.Error(err, out)
return err
}
}
}
}

return nil
}

func createIptablesRules(logger logr.Logger, req *ctrl.Request, clusterIP, externalIP, podCIDRIPv4, podCIDRIPv6 string) error {
// Don't use spec.ipFamily to distingush between IPv4 and IPv6 Address
// for kubernetes version that dosen't support IPv6 dualstack
if configIPv4Enabled && ip.IsIPv4Addr(externalIP) {
if configIPv4Enabled && ip.IsIPv4Addr(clusterIP) {
// IPv4
// Set prerouting
rulePreMasq := []string{"-s", podCIDRIPv4, "-d", externalIP, "-j", ChainNATKubeMasquerade}
Expand Down Expand Up @@ -270,7 +455,7 @@ func createIptablesRules(logger logr.Logger, req ctrl.Request, clusterIP, extern
logger.Error(err, out)
return err
}
} else if configIPv6Enabled && ip.IsIPv6Addr(externalIP) {
} else if configIPv6Enabled && ip.IsIPv6Addr(clusterIP) {
// IPv6
// Set prerouting
rulePreMasq := []string{"-s", podCIDRIPv6, "-d", externalIP, "-j", ChainNATKubeMasquerade}
Expand Down Expand Up @@ -300,18 +485,18 @@ func createIptablesRules(logger logr.Logger, req ctrl.Request, clusterIP, extern
return err
}
} else {
if ip.IsVaildIP(externalIP) {
logger.WithValues("externalIP", externalIP).Error(errors.New("invalid IP"), "invaild IP")
if ip.IsVaildIP(clusterIP) {
logger.WithValues("clusterIP", clusterIP).Error(errors.New("invalid IP"), "invaild IP")
}
}

return nil
}

func deleteIptablesRules(logger logr.Logger, req ctrl.Request, clusterIP, externalIP, podCIDRIPv4, podCIDRIPv6 string) error {
func deleteIptablesRules(logger logr.Logger, req *ctrl.Request, clusterIP, externalIP, podCIDRIPv4, podCIDRIPv6 string) error {
// Don't use spec.ipFamily to distingush between IPv4 and IPv6 Address
// for kubernetes version that dosen't support IPv6 dualstack
if configIPv4Enabled && ip.IsIPv4Addr(externalIP) {
if configIPv4Enabled && ip.IsIPv4Addr(clusterIP) {
// IPv4
// Unset prerouting
rulePreMasq := []string{"-s", podCIDRIPv4, "-d", externalIP, "-j", ChainNATKubeMasquerade}
Expand Down Expand Up @@ -340,7 +525,7 @@ func deleteIptablesRules(logger logr.Logger, req ctrl.Request, clusterIP, extern
logger.Error(err, out)
return err
}
} else if configIPv6Enabled && ip.IsIPv6Addr(externalIP) {
} else if configIPv6Enabled && ip.IsIPv6Addr(clusterIP) {
// IPv6
// Unset prerouting
rulePreMasq := []string{"-s", podCIDRIPv6, "-d", externalIP, "-j", ChainNATKubeMasquerade}
Expand Down Expand Up @@ -370,8 +555,8 @@ func deleteIptablesRules(logger logr.Logger, req ctrl.Request, clusterIP, extern
return err
}
} else {
if ip.IsVaildIP(externalIP) {
logger.WithValues("externalIP", externalIP).Error(errors.New("invalid IP"), "invaild IP")
if ip.IsVaildIP(clusterIP) {
logger.WithValues("clusterIP", clusterIP).Error(errors.New("invalid IP"), "invaild IP")
}
}
return nil
Expand All @@ -388,3 +573,12 @@ func getPodCIDR(cidrs []string) (ipv4CIDR string, ipv6CIDR string) {
}
return
}

func getSvcInfoFromRule(rule string) (nsName, src, dest, jump, dnatDest string) {
nsName = iptables.GetRuleComment(rule)
src = iptables.GetRuleSrc(rule)
dest = iptables.GetRuleDest(rule)
jump = iptables.GetRuleJump(rule)
dnatDest = iptables.GetRuleDNATDest(rule)
return
}
18 changes: 9 additions & 9 deletions pkg/configs/configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,57 +6,57 @@ import (
)

func TestGetConfigNodeName(t *testing.T) {
os.Setenv(EnvNodeName, "node")
_ = os.Setenv(EnvNodeName, "node")
nodeName, _ := GetConfigNodeName()
if nodeName != "node" {
t.Errorf("wrong result - %s", "node")
}
}

func TestGetConfigNetStack(t *testing.T) {
os.Setenv(EnvNetStack, "ipv4")
_ = os.Setenv(EnvNetStack, "ipv4")
ipv4, ipv6, _ := GetConfigNetStack()
if ipv4 != true || ipv6 != false {
t.Errorf("wrong result - %s", "ipv4")
}

os.Setenv(EnvNetStack, "IPV4")
_ = os.Setenv(EnvNetStack, "IPV4")
ipv4, ipv6, _ = GetConfigNetStack()
if ipv4 != true || ipv6 != false {
t.Errorf("wrong result - %s", "IPV4")
}

os.Setenv(EnvNetStack, "ipv6")
_ = os.Setenv(EnvNetStack, "ipv6")
ipv4, ipv6, _ = GetConfigNetStack()
if ipv4 != false || ipv6 != true {
t.Errorf("wrong result - %s", "ipv6")
}

os.Setenv(EnvNetStack, "IPV6")
_ = os.Setenv(EnvNetStack, "IPV6")
ipv4, ipv6, _ = GetConfigNetStack()
if ipv4 != false || ipv6 != true {
t.Errorf("wrong result - %s", "IPV6")
}

os.Setenv(EnvNetStack, "ipv5")
_ = os.Setenv(EnvNetStack, "ipv5")
_, _, err := GetConfigNetStack()
if err == nil {
t.Errorf("wrong result - %s", "ipv5")
}

os.Setenv(EnvNetStack, "ipv4,ipv6")
_ = os.Setenv(EnvNetStack, "ipv4,ipv6")
ipv4, ipv6, _ = GetConfigNetStack()
if ipv4 != true || ipv6 != true {
t.Errorf("wrong result - %s", "ipv4,ipv6")
}

os.Setenv(EnvNetStack, "ipv4, ipv6")
_ = os.Setenv(EnvNetStack, "ipv4, ipv6")
ipv4, ipv6, _ = GetConfigNetStack()
if ipv4 != true || ipv6 != true {
t.Errorf("wrong result - %s", "ipv4, ipv6")
}

os.Setenv(EnvNetStack, "ipv6, ipv4")
_ = os.Setenv(EnvNetStack, "ipv6, ipv4")
ipv4, ipv6, _ = GetConfigNetStack()
if ipv4 != true || ipv6 != true {
t.Errorf("wrong result - %s", "ipv6,ipv4")
Expand Down
Loading

0 comments on commit 126442c

Please sign in to comment.