diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 71463d2ac69..ae8c7842f4d 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -54,10 +54,16 @@ import ( const iptablesMinVersion = utiliptables.MinCheckVersion // the services chain -const iptablesServicesChain utiliptables.Chain = "KUBE-SERVICES" +const kubeServicesChain utiliptables.Chain = "KUBE-SERVICES" // the nodeports chain -const iptablesNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS" +const kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS" + +// the kubernetes postrouting chain +const kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING" + +// the mark-for-masquerade chain +const kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ" // the mark we apply to traffic needing SNAT const iptablesMasqueradeMark = "0x4d415351" @@ -193,7 +199,10 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod // It returns true if an error was encountered. Errors are logged. func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { //TODO: actually tear down all rules and chains. - args := []string{"-m", "comment", "--comment", "kubernetes service portals", "-j", string(iptablesServicesChain)} + args := []string{ + "-m", "comment", "--comment", "kubernetes service portals", + "-j", string(kubeServicesChain), + } if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainOutput, args...); err != nil { if !utiliptables.IsNotFoundError(err) { glog.Errorf("Error removing pure-iptables proxy rule: %v", err) @@ -207,7 +216,10 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { } } - args = []string{"-m", "comment", "--comment", "kubernetes service traffic requiring SNAT", "-m", "mark", "--mark", iptablesMasqueradeMark, "-j", "MASQUERADE"} + args = []string{ + "-m", "comment", "--comment", "kubernetes postrouting rules", + "-j", string(kubePostroutingChain), + } if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { if !utiliptables.IsNotFoundError(err) { glog.Errorf("Error removing pure-iptables proxy rule: %v", err) @@ -216,7 +228,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { } // flush and delete chains. - chains := []utiliptables.Chain{iptablesServicesChain, iptablesNodePortsChain} + chains := []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, kubeMarkMasqChain} for _, c := range chains { // flush chain, then if sucessful delete, delete will fail if flush fails. if err := ipt.FlushChain(utiliptables.TableNAT, c); err != nil { @@ -233,6 +245,21 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { } } } + + // Clean up the older SNAT rule which was directly in POSTROUTING. + // TODO(thockin): Remove this for v1.3 or v1.4. + args = []string{ + "-m", "comment", "--comment", "kubernetes service traffic requiring SNAT", + "-m", "mark", "--mark", iptablesMasqueradeMark, + "-j", "MASQUERADE", + } + if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { + if !utiliptables.IsNotFoundError(err) { + glog.Errorf("Error removing old-style SNAT rule: %v", err) + encounteredError = true + } + } + return encounteredError } @@ -479,37 +506,45 @@ func (proxier *Proxier) syncProxyRules() { } glog.V(3).Infof("Syncing iptables rules") - // Ensure main chains and rules are installed. - tablesNeedServicesChain := []utiliptables.Table{utiliptables.TableFilter, utiliptables.TableNAT} - for _, table := range tablesNeedServicesChain { - if _, err := proxier.iptables.EnsureChain(table, iptablesServicesChain); err != nil { - glog.Errorf("Failed to ensure that %s chain %s exists: %v", table, iptablesServicesChain, err) - return - } - } - // Link the services chain. - tableChainsNeedJumpServices := []struct { - table utiliptables.Table - chain utiliptables.Chain - }{ - {utiliptables.TableFilter, utiliptables.ChainOutput}, - {utiliptables.TableNAT, utiliptables.ChainOutput}, - {utiliptables.TableNAT, utiliptables.ChainPrerouting}, - } - comment := "kubernetes service portals" - args := []string{"-m", "comment", "--comment", comment, "-j", string(iptablesServicesChain)} - for _, tc := range tableChainsNeedJumpServices { - if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil { - glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, iptablesServicesChain, err) - return - } - } - // Link the output rules. + // Create and link the kube services chain. { - comment := "kubernetes service traffic requiring SNAT" - args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", iptablesMasqueradeMark, "-j", "MASQUERADE"} - if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { - glog.Errorf("Failed to ensure that chain %s obeys MASQUERADE mark: %v", utiliptables.ChainPostrouting, err) + tablesNeedServicesChain := []utiliptables.Table{utiliptables.TableFilter, utiliptables.TableNAT} + for _, table := range tablesNeedServicesChain { + if _, err := proxier.iptables.EnsureChain(table, kubeServicesChain); err != nil { + glog.Errorf("Failed to ensure that %s chain %s exists: %v", table, kubeServicesChain, err) + return + } + } + + tableChainsNeedJumpServices := []struct { + table utiliptables.Table + chain utiliptables.Chain + }{ + {utiliptables.TableFilter, utiliptables.ChainOutput}, + {utiliptables.TableNAT, utiliptables.ChainOutput}, + {utiliptables.TableNAT, utiliptables.ChainPrerouting}, + } + comment := "kubernetes service portals" + args := []string{"-m", "comment", "--comment", comment, "-j", string(kubeServicesChain)} + for _, tc := range tableChainsNeedJumpServices { + if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil { + glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeServicesChain, err) + return + } + } + } + + // Create and link the kube postrouting chain. + { + if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, kubePostroutingChain); err != nil { + glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubePostroutingChain, err) + return + } + + comment := "kubernetes postrouting rules" + args := []string{"-m", "comment", "--comment", comment, "-j", string(kubePostroutingChain)} + if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { + glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, kubeServicesChain, err) return } } @@ -542,24 +577,52 @@ func (proxier *Proxier) syncProxyRules() { writeLine(natChains, "*nat") // Make sure we keep stats for the top-level chains, if they existed - // (which they should have because we created them above). - if chain, ok := existingFilterChains[iptablesServicesChain]; ok { + // (which most should have because we created them above). + if chain, ok := existingFilterChains[kubeServicesChain]; ok { writeLine(filterChains, chain) } else { - writeLine(filterChains, makeChainLine(iptablesServicesChain)) + writeLine(filterChains, makeChainLine(kubeServicesChain)) } - if chain, ok := existingNATChains[iptablesServicesChain]; ok { + if chain, ok := existingNATChains[kubeServicesChain]; ok { writeLine(natChains, chain) } else { - writeLine(natChains, makeChainLine(iptablesServicesChain)) + writeLine(natChains, makeChainLine(kubeServicesChain)) } - if chain, ok := existingNATChains[iptablesNodePortsChain]; ok { + if chain, ok := existingNATChains[kubeNodePortsChain]; ok { writeLine(natChains, chain) } else { - writeLine(natChains, makeChainLine(iptablesNodePortsChain)) + writeLine(natChains, makeChainLine(kubeNodePortsChain)) + } + if chain, ok := existingNATChains[kubePostroutingChain]; ok { + writeLine(natChains, chain) + } else { + writeLine(natChains, makeChainLine(kubePostroutingChain)) + } + if chain, ok := existingNATChains[kubeMarkMasqChain]; ok { + writeLine(natChains, chain) + } else { + writeLine(natChains, makeChainLine(kubeMarkMasqChain)) } - // Accumulate nat chains to keep. + // Install the kubernetes-specific postrouting rules. We use a whole chain for + // this so that it is easier to flush and change, for example if the mark + // value should ever change. + writeLine(natRules, []string{ + "-A", string(kubePostroutingChain), + "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, + "-m", "mark", "--mark", iptablesMasqueradeMark, + "-j", "MASQUERADE", + }...) + + // Install the kubernetes-specific masquerade mark rule. We use a whole chain for + // this so that it is easier to flush and change, for example if the mark + // value should ever change. + writeLine(natRules, []string{ + "-A", string(kubeMarkMasqChain), + "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark), + }...) + + // Accumulate NAT chains to keep. activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set // Accumulate new local ports that we have opened. @@ -580,18 +643,16 @@ func (proxier *Proxier) syncProxyRules() { // Capture the clusterIP. args := []string{ - "-A", string(iptablesServicesChain), - "-m", "comment", "--comment", fmt.Sprintf("\"%s cluster IP\"", svcName.String()), + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcName.String()), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), "--dport", fmt.Sprintf("%d", svcInfo.port), } if proxier.masqueradeAll { - writeLine(natRules, append(args, - "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) + writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...) } - writeLine(natRules, append(args, - "-j", string(svcChain))...) + writeLine(natRules, append(args, "-j", string(svcChain))...) // Capture externalIPs. for _, externalIP := range svcInfo.externalIPs { @@ -619,15 +680,14 @@ func (proxier *Proxier) syncProxyRules() { } } // We're holding the port, so it's OK to install iptables rules. args := []string{ - "-A", string(iptablesServicesChain), - "-m", "comment", "--comment", fmt.Sprintf("\"%s external IP\"", svcName.String()), + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcName.String()), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", externalIP), "--dport", fmt.Sprintf("%d", svcInfo.port), } // We have to SNAT packets to external IPs. - writeLine(natRules, append(args, - "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) + writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...) // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) // nor from a local process to be forwarded to the service. @@ -636,30 +696,26 @@ func (proxier *Proxier) syncProxyRules() { externalTrafficOnlyArgs := append(args, "-m", "physdev", "!", "--physdev-is-in", "-m", "addrtype", "!", "--src-type", "LOCAL") - writeLine(natRules, append(externalTrafficOnlyArgs, - "-j", string(svcChain))...) + writeLine(natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...) dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. // This covers cases like GCE load-balancers which get added to the local routing table. - writeLine(natRules, append(dstLocalOnlyArgs, - "-j", string(svcChain))...) + writeLine(natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...) } // Capture load-balancer ingress. for _, ingress := range svcInfo.loadBalancerStatus.Ingress { if ingress.IP != "" { args := []string{ - "-A", string(iptablesServicesChain), - "-m", "comment", "--comment", fmt.Sprintf("\"%s loadbalancer IP\"", svcName.String()), + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", ingress.IP), "--dport", fmt.Sprintf("%d", svcInfo.port), } // We have to SNAT packets from external IPs. - writeLine(natRules, append(args, - "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) - writeLine(natRules, append(args, - "-j", string(svcChain))...) + writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...) + writeLine(natRules, append(args, "-j", string(svcChain))...) } } @@ -685,27 +741,24 @@ func (proxier *Proxier) syncProxyRules() { } newLocalPorts[lp] = socket } // We're holding the port, so it's OK to install iptables rules. + + args := []string{ + "-A", string(kubeNodePortsChain), + "-m", "comment", "--comment", svcName.String(), + "-m", protocol, "-p", protocol, + "--dport", fmt.Sprintf("%d", svcInfo.nodePort), + } // Nodeports need SNAT. - writeLine(natRules, - "-A", string(iptablesNodePortsChain), - "-m", "comment", "--comment", svcName.String(), - "-m", protocol, "-p", protocol, - "--dport", fmt.Sprintf("%d", svcInfo.nodePort), - "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark)) + writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...) // Jump to the service chain. - writeLine(natRules, - "-A", string(iptablesNodePortsChain), - "-m", "comment", "--comment", svcName.String(), - "-m", protocol, "-p", protocol, - "--dport", fmt.Sprintf("%d", svcInfo.nodePort), - "-j", string(svcChain)) + writeLine(natRules, append(args, "-j", string(svcChain))...) } // If the service has no endpoints then reject packets. if len(proxier.endpointsMap[svcName]) == 0 { writeLine(filterRules, - "-A", string(iptablesServicesChain), - "-m", "comment", "--comment", fmt.Sprintf("\"%s has no endpoints\"", svcName.String()), + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcName.String()), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), "--dport", fmt.Sprintf("%d", svcInfo.port), @@ -775,16 +828,14 @@ func (proxier *Proxier) syncProxyRules() { // TODO: if we grow logic to get this node's pod CIDR, we can use it. writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i], ":")[0]), - "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) + "-j", string(kubeMarkMasqChain))...) // Update client-affinity lists. if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") } // DNAT to final destination. - args = append(args, - "-m", protocol, "-p", protocol, - "-j", "DNAT", "--to-destination", endpoints[i]) + args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i]) writeLine(natRules, args...) } } @@ -808,10 +859,10 @@ func (proxier *Proxier) syncProxyRules() { // Finally, tail-call to the nodeports chain. This needs to be after all // other service portal rules. writeLine(natRules, - "-A", string(iptablesServicesChain), - "-m", "comment", "--comment", "\"kubernetes service nodeports; NOTE: this must be the last rule in this chain\"", + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, "-m", "addrtype", "--dst-type", "LOCAL", - "-j", string(iptablesNodePortsChain)) + "-j", string(kubeNodePortsChain)) // Write the end-of-table markers. writeLine(filterRules, "COMMIT") @@ -832,14 +883,28 @@ func (proxier *Proxier) syncProxyRules() { glog.Errorf("Closing local port %s", k.String()) v.Close() } - } else { - // Close old local ports and save new ones. - for k, v := range proxier.portsMap { - if newLocalPorts[k] == nil { - v.Close() - } + return + } + + // Close old local ports and save new ones. + for k, v := range proxier.portsMap { + if newLocalPorts[k] == nil { + v.Close() + } + } + proxier.portsMap = newLocalPorts + + // Clean up the older SNAT rule which was directly in POSTROUTING. + // TODO(thockin): Remove this for v1.3 or v1.4. + args := []string{ + "-m", "comment", "--comment", "kubernetes service traffic requiring SNAT", + "-m", "mark", "--mark", iptablesMasqueradeMark, + "-j", "MASQUERADE", + } + if err := proxier.iptables.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { + if !utiliptables.IsNotFoundError(err) { + glog.Errorf("Error removing old-style SNAT rule: %v", err) } - proxier.portsMap = newLocalPorts } }