Skip to content

Commit

Permalink
Add iptables cleanup logic for deleted service when controller is down
Browse files Browse the repository at this point in the history
  • Loading branch information
ssup2 committed Nov 14, 2020
1 parent 9cc2cdf commit fb69d9a
Show file tree
Hide file tree
Showing 5 changed files with 399 additions and 29 deletions.
207 changes: 192 additions & 15 deletions controllers/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (r *ServiceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
}
logger.WithValues("IPv4", configIPv4Enabled).WithValues("IPv6", configIPv6Enabled).Info("config network stack")

// Get Nodes's pod CIDR
// Get nodes's pod CIDR
node := &corev1.Node{}
if err := r.Client.Get(ctx, types.NamespacedName{Name: configNodeName}, node); err != nil {
logger.Error(err, "failed to get the pod's node info from API server")
Expand All @@ -108,7 +108,23 @@ func (r *ServiceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
logger.WithValues("pod CIDR IPV4", podCIDRIPv4).WithValues("pod CIDR IPv6", podCIDRIPv6).Info("pod CIDR")

// Init iptables
initIptables(logger)
if err := initIptables(logger); err != nil {
logger.Error(err, "failed to init iptables")
os.Exit(1)
}

// Get all services
svcs := &corev1.ServiceList{}
if err := r.Client.List(ctx, svcs, client.InNamespace("")); err != nil {
logger.Error(err, "failed to get all services from API server")
os.Exit(1)
}

// Cleanup iptables for deleted services
if err := cleanupIptables(logger, svcs, podCIDRIPv4, podCIDRIPv6); err != nil {
logger.Error(err, "failed to cleanup iptables")
os.Exit(1)
}
}

// Get service info
Expand All @@ -132,7 +148,7 @@ func (r *ServiceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {

// Delete iptables rules
logger.WithValues("externalIP", oldExternalIP).Info("delete iptables rules")
if err := deleteIptablesRules(logger, req, oldClusterIP, oldExternalIP, podCIDRIPv4, podCIDRIPv6); err != nil {
if err := deleteIptablesRules(logger, &req, oldClusterIP, oldExternalIP, podCIDRIPv4, podCIDRIPv6); err != nil {
return ctrl.Result{}, err
}
}
Expand All @@ -148,7 +164,7 @@ func (r *ServiceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, nil
}

// Create or Delete iptables rules
// Create iptables rules
for _, ingress := range svc.Status.LoadBalancer.Ingress {
clusterIP := svc.Spec.ClusterIP
externalIP := ingress.IP
Expand All @@ -158,7 +174,7 @@ func (r *ServiceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {

// Create iptables rules
logger.WithValues("externalIP", externalIP).Info("create iptables rules")
if err := createIptablesRules(logger, req, clusterIP, externalIP, podCIDRIPv4, podCIDRIPv6); err != nil {
if err := createIptablesRules(logger, &req, clusterIP, externalIP, podCIDRIPv4, podCIDRIPv6); err != nil {
return ctrl.Result{}, err
}
}
Expand All @@ -174,6 +190,8 @@ func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

func initIptables(logger logr.Logger) error {
logger.Info("create iptables chains")

// IPv4
if configIPv4Enabled {
// Create chain in nat table
Expand Down Expand Up @@ -238,10 +256,160 @@ func initIptables(logger logr.Logger) error {
return nil
}

func createIptablesRules(logger logr.Logger, req ctrl.Request, clusterIP, externalIP, podCIDRIPv4, podCIDRIPv6 string) error {
func cleanupIptables(logger logr.Logger, svcs *corev1.ServiceList, podCIDRIPv4, podCIDRIPv6 string) error {
logger.Info("cleanup iptables for deleted services")

// IPv4
if configIPv4Enabled {
// Make up service map
svcMap := make(map[string]*corev1.Service)
for _, svc := range svcs.Items {
if ip.IsIPv4Addr(svc.Spec.ClusterIP) && svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
svcMap[svc.Namespace+"/"+svc.Name] = svc.DeepCopy()
}
}

// Cleanup prerouting chain
preRules, err := iptables.GetRulesIPv4(iptables.TableNAT, ChainNATIPVSPrerouting)
if err != nil {
return err
}
for _, rule := range preRules {
// Get service info from rule and k8s, and delete iptables rules
nsName, src, dest, jump, dnatDest := getSvcInfoFromRule(rule)
svc, ok := svcMap[nsName]
if !ok {
logger.WithValues("rule", rule).Info("there is no service info in k8s. cleanup prerouting chain IPv4 rule")
out, err := iptables.DeleteRuleRawIPv4(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
if err != nil {
logger.Error(err, out)
}
continue
}

// Compare service info and delete iptables rules
for _, ingress := range svc.Status.LoadBalancer.Ingress {
externalIP := ingress.IP + "/32"
if (jump == ChainNATKubeMasquerade && (src == podCIDRIPv4 && dest == externalIP)) ||
(jump == "DNAT" && (src == podCIDRIPv4 && dest == externalIP && dnatDest == svc.Spec.ClusterIP)) {
continue
}
logger.WithValues("rule", rule).Info("service info is diff. cleanup prerouting chain IPv4 rule")
out, err := iptables.DeleteRuleRawIPv4(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
if err != nil {
logger.Error(err, out)
}
}
}

// Cleanup output chain
outRules, err := iptables.GetRulesIPv4(iptables.TableNAT, ChainNATIPVSOutput)
if err != nil {
return err
}
for _, rule := range outRules {
// Get service info from rule and k8s, and delete iptables rules
nsName, _, dest, jump, dnatDest := getSvcInfoFromRule(rule)
svc, ok := svcMap[nsName]
if !ok {
logger.WithValues("rule", rule).Info("there is no service info in k8s. cleanup output chain IPv4 rule")
iptables.DeleteRuleRawIPv4(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
continue
}

// Compare service info and delete diff iptables rules
for _, ingress := range svc.Status.LoadBalancer.Ingress {
externalIP := ingress.IP + "/32"
if (jump == ChainNATKubeMasquerade && dest == externalIP) ||
(jump == "DNAT" && (dest == externalIP && dnatDest == svc.Spec.ClusterIP)) {
continue
}
logger.WithValues("rule", rule).Info("service info is diff. cleanup output chain IPv4 rule")
out, err := iptables.DeleteRuleRawIPv4(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
if err != nil {
logger.Error(err, out)
}
}
}
}
// IPv6
if configIPv6Enabled {
// Make up service map
svcMap := make(map[string]*corev1.Service)
for _, svc := range svcs.Items {
if ip.IsIPv6Addr(svc.Spec.ClusterIP) && svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
svcMap[svc.Namespace+"/"+svc.Name] = svc.DeepCopy()
}
}

// Cleanup prerouting chain
preRules, err := iptables.GetRulesIPv6(iptables.TableNAT, ChainNATIPVSPrerouting)
if err != nil {
return err
}
for _, rule := range preRules {
// Get service info from rule and k8s, and delete iptables rules
nsName, src, dest, jump, dnatDest := getSvcInfoFromRule(rule)
svc, ok := svcMap[nsName]
if !ok {
logger.WithValues("rule", rule).Info("there is no service info in k8s. cleanup prerouting chain IPv6 rule")
out, err := iptables.DeleteRuleRawIPv6(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
if err != nil {
logger.Error(err, out)
}
continue
}

// Compare service info and delete iptables rules
for _, ingress := range svc.Status.LoadBalancer.Ingress {
externalIP := ingress.IP + "/128"
if (jump == ChainNATKubeMasquerade && (src == podCIDRIPv6 && dest == externalIP)) ||
(jump == "DNAT" && (src == podCIDRIPv6 && dest == externalIP && dnatDest == svc.Spec.ClusterIP)) {
continue
}
logger.WithValues("rule", rule).Info("service info is diff. cleanup prerouting chain IPv6 rule")
out, err := iptables.DeleteRuleRawIPv6(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
if err != nil {
logger.Error(err, out)
}
}
}

// Cleanup output
outRules, err := iptables.GetRulesIPv6(iptables.TableNAT, ChainNATIPVSOutput)
if err != nil {
return err
}
for _, rule := range outRules {
// Get service info from rule and k8s, and delete iptables rules
nsName, _, dest, jump, dnatDest := getSvcInfoFromRule(rule)
svc, ok := svcMap[nsName]
if !ok {
logger.WithValues("rule", rule).Info("there is no service info in k8s. cleanup output chain IPv6 rule")
iptables.DeleteRuleRawIPv6(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
continue
}

// Compare service info and delete diff iptables rules
for _, ingress := range svc.Status.LoadBalancer.Ingress {
externalIP := ingress.IP + "/128"
if (jump == ChainNATKubeMasquerade && dest == externalIP) ||
(jump == "DNAT" && (dest == externalIP && dnatDest == svc.Spec.ClusterIP)) {
continue
}
logger.WithValues("rule", rule).Info("service info is diff. cleanup output chain IPv6 rule")
iptables.DeleteRuleRawIPv6(iptables.TableNAT, iptables.ChangeRuleToDelete(rule)...)
}
}
}

return nil
}

func createIptablesRules(logger logr.Logger, req *ctrl.Request, clusterIP, externalIP, podCIDRIPv4, podCIDRIPv6 string) error {
// Don't use spec.ipFamily to distingush between IPv4 and IPv6 Address
// for kubernetes version that dosen't support IPv6 dualstack
if configIPv4Enabled && ip.IsIPv4Addr(externalIP) {
if configIPv4Enabled && ip.IsIPv4Addr(clusterIP) {
// IPv4
// Set prerouting
rulePreMasq := []string{"-s", podCIDRIPv4, "-d", externalIP, "-j", ChainNATKubeMasquerade}
Expand Down Expand Up @@ -270,7 +438,7 @@ func createIptablesRules(logger logr.Logger, req ctrl.Request, clusterIP, extern
logger.Error(err, out)
return err
}
} else if configIPv6Enabled && ip.IsIPv6Addr(externalIP) {
} else if configIPv6Enabled && ip.IsIPv6Addr(clusterIP) {
// IPv6
// Set prerouting
rulePreMasq := []string{"-s", podCIDRIPv6, "-d", externalIP, "-j", ChainNATKubeMasquerade}
Expand Down Expand Up @@ -300,18 +468,18 @@ func createIptablesRules(logger logr.Logger, req ctrl.Request, clusterIP, extern
return err
}
} else {
if ip.IsVaildIP(externalIP) {
logger.WithValues("externalIP", externalIP).Error(errors.New("invalid IP"), "invaild IP")
if ip.IsVaildIP(clusterIP) {
logger.WithValues("clusterIP", clusterIP).Error(errors.New("invalid IP"), "invaild IP")
}
}

return nil
}

func deleteIptablesRules(logger logr.Logger, req ctrl.Request, clusterIP, externalIP, podCIDRIPv4, podCIDRIPv6 string) error {
func deleteIptablesRules(logger logr.Logger, req *ctrl.Request, clusterIP, externalIP, podCIDRIPv4, podCIDRIPv6 string) error {
// Don't use spec.ipFamily to distingush between IPv4 and IPv6 Address
// for kubernetes version that dosen't support IPv6 dualstack
if configIPv4Enabled && ip.IsIPv4Addr(externalIP) {
if configIPv4Enabled && ip.IsIPv4Addr(clusterIP) {
// IPv4
// Unset prerouting
rulePreMasq := []string{"-s", podCIDRIPv4, "-d", externalIP, "-j", ChainNATKubeMasquerade}
Expand Down Expand Up @@ -340,7 +508,7 @@ func deleteIptablesRules(logger logr.Logger, req ctrl.Request, clusterIP, extern
logger.Error(err, out)
return err
}
} else if configIPv6Enabled && ip.IsIPv6Addr(externalIP) {
} else if configIPv6Enabled && ip.IsIPv6Addr(clusterIP) {
// IPv6
// Unset prerouting
rulePreMasq := []string{"-s", podCIDRIPv6, "-d", externalIP, "-j", ChainNATKubeMasquerade}
Expand Down Expand Up @@ -370,8 +538,8 @@ func deleteIptablesRules(logger logr.Logger, req ctrl.Request, clusterIP, extern
return err
}
} else {
if ip.IsVaildIP(externalIP) {
logger.WithValues("externalIP", externalIP).Error(errors.New("invalid IP"), "invaild IP")
if ip.IsVaildIP(clusterIP) {
logger.WithValues("clusterIP", clusterIP).Error(errors.New("invalid IP"), "invaild IP")
}
}
return nil
Expand All @@ -388,3 +556,12 @@ func getPodCIDR(cidrs []string) (ipv4CIDR string, ipv6CIDR string) {
}
return
}

func getSvcInfoFromRule(rule string) (nsName, src, dest, jump, dnatDest string) {
nsName = iptables.GetRuleComment(rule)
src = iptables.GetRuleSrc(rule)
dest = iptables.GetRuleDest(rule)
jump = iptables.GetRuleJump(rule)
dnatDest = iptables.GetRuleDNATDest(rule)
return
}
80 changes: 77 additions & 3 deletions pkg/iptables/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ type Table string

// Const
const (
iptablesCmdIPv4 = "iptables"
iptablesCmdIPv6 = "ip6tables"
iptablesErrNoRule = "No chain/target/match by that name"
iptablesCmdIPv4 = "iptables"
iptablesCmdIPv6 = "ip6tables"
iptablesSaveCmdIPv4 = "iptables-save"
iptablesSaveCmdIPv6 = "ip6tables-save"
iptablesErrNoRule = "No chain/target/match by that name"

TableNAT Table = "nat"
TableFilter Table = "filter"
Expand Down Expand Up @@ -150,6 +152,40 @@ func isExistRule(iptablesCmd string, table Table, chain string, comment string,
return true
}

// GetRules
func GetRulesIPv4(table Table, chain string) ([]string, error) {
return getRules(iptablesSaveCmdIPv4, table, chain)
}

func GetRulesIPv6(table Table, chain string) ([]string, error) {
return getRules(iptablesSaveCmdIPv6, table, chain)
}

func getRules(iptablesSaveCmd string, table Table, chain string) ([]string, error) {
// Lock
lock.Lock()
defer lock.Unlock()

// Set Common args
args := []string{"-t", string(table)}

// Check rule
cmd := exec.Command(iptablesSaveCmd, args...)
out, err := cmd.CombinedOutput()
if err != nil {
return nil, err
}

var result []string
for _, rule := range strings.Split(string(out), "\n") {
if strings.HasPrefix(rule, "-A "+chain) {
result = append(result, rule)
}
}

return result, nil
}

// CreateRuleFirst
func CreateRuleFirstIPv4(table Table, chain string, comment string, rule ...string) (string, error) {
return createRuleFirst(iptablesCmdIPv4, table, chain, comment, rule...)
Expand Down Expand Up @@ -265,3 +301,41 @@ func deleteRule(iptablesCmd string, table Table, chain string, comment string, r

return string(out), nil
}

// DeleteRuleRaw
func DeleteRuleRawIPv4(table Table, rule ...string) (string, error) {
return deleteRuleRaw(iptablesCmdIPv4, table, rule...)
}

func DeleteRuleRawIPv6(table Table, rule ...string) (string, error) {
return deleteRuleRaw(iptablesCmdIPv6, table, rule...)
}

func deleteRuleRaw(iptablesCmd string, table Table, rule ...string) (string, error) {
// Lock
lock.Lock()
defer lock.Unlock()

// Set tables args
args := []string{"-t", string(table)}

// Check rule
cmd := exec.Command(iptablesCmd, append(append(args, "-C"), rule...)...)
out, err := cmd.CombinedOutput()
if err != nil {
if strings.Contains(string(out), iptablesErrNoRule) {
// If rule isn't exist, return success
return string(out), nil
}
return string(out), err
}

// Delete rule
cmd = exec.Command(iptablesCmd, append(append(args, "-D"), rule...)...)
out, err = cmd.CombinedOutput()
if err != nil {
return string(out), err
}

return string(out), nil
}
Loading

0 comments on commit fb69d9a

Please sign in to comment.