Put all masquerade mark logic into new chains

This allows us to use the MARK-MASQ chain as a subroutine, rather than encoding
the mark in many places.  Having a KUBE-POSTROUTING chain means we can flush
and rebuild it atomically.  This makes followon work to change the mark
significantly easier.
This commit is contained in:
Tim Hockin 2016-02-02 17:34:48 -08:00 committed by Matt Dupre
parent 41ba8ced6d
commit 107c5f7813

View File

@ -54,10 +54,16 @@ import (
const iptablesMinVersion = utiliptables.MinCheckVersion const iptablesMinVersion = utiliptables.MinCheckVersion
// the services chain // the services chain
const iptablesServicesChain utiliptables.Chain = "KUBE-SERVICES" const kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
// the nodeports chain // the nodeports chain
const iptablesNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS" const kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS"
// the kubernetes postrouting chain
const kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
// the mark-for-masquerade chain
const kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
// the mark we apply to traffic needing SNAT // the mark we apply to traffic needing SNAT
const iptablesMasqueradeMark = "0x4d415351" const iptablesMasqueradeMark = "0x4d415351"
@ -193,7 +199,10 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod
// It returns true if an error was encountered. Errors are logged. // It returns true if an error was encountered. Errors are logged.
func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
//TODO: actually tear down all rules and chains. //TODO: actually tear down all rules and chains.
args := []string{"-m", "comment", "--comment", "kubernetes service portals", "-j", string(iptablesServicesChain)} args := []string{
"-m", "comment", "--comment", "kubernetes service portals",
"-j", string(kubeServicesChain),
}
if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainOutput, args...); err != nil { if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainOutput, args...); err != nil {
if !utiliptables.IsNotFoundError(err) { if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing pure-iptables proxy rule: %v", err) glog.Errorf("Error removing pure-iptables proxy rule: %v", err)
@ -207,7 +216,10 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
} }
} }
args = []string{"-m", "comment", "--comment", "kubernetes service traffic requiring SNAT", "-m", "mark", "--mark", iptablesMasqueradeMark, "-j", "MASQUERADE"} args = []string{
"-m", "comment", "--comment", "kubernetes postrouting rules",
"-j", string(kubePostroutingChain),
}
if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
if !utiliptables.IsNotFoundError(err) { if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing pure-iptables proxy rule: %v", err) glog.Errorf("Error removing pure-iptables proxy rule: %v", err)
@ -216,7 +228,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
} }
// flush and delete chains. // flush and delete chains.
chains := []utiliptables.Chain{iptablesServicesChain, iptablesNodePortsChain} chains := []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, kubeMarkMasqChain}
for _, c := range chains { for _, c := range chains {
// flush chain, then if sucessful delete, delete will fail if flush fails. // flush chain, then if sucessful delete, delete will fail if flush fails.
if err := ipt.FlushChain(utiliptables.TableNAT, c); err != nil { if err := ipt.FlushChain(utiliptables.TableNAT, c); err != nil {
@ -233,6 +245,21 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
} }
} }
} }
// Clean up the older SNAT rule which was directly in POSTROUTING.
// TODO(thockin): Remove this for v1.3 or v1.4.
args = []string{
"-m", "comment", "--comment", "kubernetes service traffic requiring SNAT",
"-m", "mark", "--mark", iptablesMasqueradeMark,
"-j", "MASQUERADE",
}
if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing old-style SNAT rule: %v", err)
encounteredError = true
}
}
return encounteredError return encounteredError
} }
@ -479,37 +506,45 @@ func (proxier *Proxier) syncProxyRules() {
} }
glog.V(3).Infof("Syncing iptables rules") glog.V(3).Infof("Syncing iptables rules")
// Ensure main chains and rules are installed. // Create and link the kube services chain.
tablesNeedServicesChain := []utiliptables.Table{utiliptables.TableFilter, utiliptables.TableNAT}
for _, table := range tablesNeedServicesChain {
if _, err := proxier.iptables.EnsureChain(table, iptablesServicesChain); err != nil {
glog.Errorf("Failed to ensure that %s chain %s exists: %v", table, iptablesServicesChain, err)
return
}
}
// Link the services chain.
tableChainsNeedJumpServices := []struct {
table utiliptables.Table
chain utiliptables.Chain
}{
{utiliptables.TableFilter, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainPrerouting},
}
comment := "kubernetes service portals"
args := []string{"-m", "comment", "--comment", comment, "-j", string(iptablesServicesChain)}
for _, tc := range tableChainsNeedJumpServices {
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil {
glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, iptablesServicesChain, err)
return
}
}
// Link the output rules.
{ {
comment := "kubernetes service traffic requiring SNAT" tablesNeedServicesChain := []utiliptables.Table{utiliptables.TableFilter, utiliptables.TableNAT}
args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", iptablesMasqueradeMark, "-j", "MASQUERADE"} for _, table := range tablesNeedServicesChain {
if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { if _, err := proxier.iptables.EnsureChain(table, kubeServicesChain); err != nil {
glog.Errorf("Failed to ensure that chain %s obeys MASQUERADE mark: %v", utiliptables.ChainPostrouting, err) glog.Errorf("Failed to ensure that %s chain %s exists: %v", table, kubeServicesChain, err)
return
}
}
tableChainsNeedJumpServices := []struct {
table utiliptables.Table
chain utiliptables.Chain
}{
{utiliptables.TableFilter, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainPrerouting},
}
comment := "kubernetes service portals"
args := []string{"-m", "comment", "--comment", comment, "-j", string(kubeServicesChain)}
for _, tc := range tableChainsNeedJumpServices {
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil {
glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeServicesChain, err)
return
}
}
}
// Create and link the kube postrouting chain.
{
if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, kubePostroutingChain); err != nil {
glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubePostroutingChain, err)
return
}
comment := "kubernetes postrouting rules"
args := []string{"-m", "comment", "--comment", comment, "-j", string(kubePostroutingChain)}
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, kubeServicesChain, err)
return return
} }
} }
@ -542,24 +577,52 @@ func (proxier *Proxier) syncProxyRules() {
writeLine(natChains, "*nat") writeLine(natChains, "*nat")
// Make sure we keep stats for the top-level chains, if they existed // Make sure we keep stats for the top-level chains, if they existed
// (which they should have because we created them above). // (which most should have because we created them above).
if chain, ok := existingFilterChains[iptablesServicesChain]; ok { if chain, ok := existingFilterChains[kubeServicesChain]; ok {
writeLine(filterChains, chain) writeLine(filterChains, chain)
} else { } else {
writeLine(filterChains, makeChainLine(iptablesServicesChain)) writeLine(filterChains, makeChainLine(kubeServicesChain))
} }
if chain, ok := existingNATChains[iptablesServicesChain]; ok { if chain, ok := existingNATChains[kubeServicesChain]; ok {
writeLine(natChains, chain) writeLine(natChains, chain)
} else { } else {
writeLine(natChains, makeChainLine(iptablesServicesChain)) writeLine(natChains, makeChainLine(kubeServicesChain))
} }
if chain, ok := existingNATChains[iptablesNodePortsChain]; ok { if chain, ok := existingNATChains[kubeNodePortsChain]; ok {
writeLine(natChains, chain) writeLine(natChains, chain)
} else { } else {
writeLine(natChains, makeChainLine(iptablesNodePortsChain)) writeLine(natChains, makeChainLine(kubeNodePortsChain))
}
if chain, ok := existingNATChains[kubePostroutingChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(kubePostroutingChain))
}
if chain, ok := existingNATChains[kubeMarkMasqChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(kubeMarkMasqChain))
} }
// Accumulate nat chains to keep. // Install the kubernetes-specific postrouting rules. We use a whole chain for
// this so that it is easier to flush and change, for example if the mark
// value should ever change.
writeLine(natRules, []string{
"-A", string(kubePostroutingChain),
"-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
"-m", "mark", "--mark", iptablesMasqueradeMark,
"-j", "MASQUERADE",
}...)
// Install the kubernetes-specific masquerade mark rule. We use a whole chain for
// this so that it is easier to flush and change, for example if the mark
// value should ever change.
writeLine(natRules, []string{
"-A", string(kubeMarkMasqChain),
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark),
}...)
// Accumulate NAT chains to keep.
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
// Accumulate new local ports that we have opened. // Accumulate new local ports that we have opened.
@ -580,18 +643,16 @@ func (proxier *Proxier) syncProxyRules() {
// Capture the clusterIP. // Capture the clusterIP.
args := []string{ args := []string{
"-A", string(iptablesServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf("\"%s cluster IP\"", svcName.String()), "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcName.String()),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
"--dport", fmt.Sprintf("%d", svcInfo.port), "--dport", fmt.Sprintf("%d", svcInfo.port),
} }
if proxier.masqueradeAll { if proxier.masqueradeAll {
writeLine(natRules, append(args, writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
} }
writeLine(natRules, append(args, writeLine(natRules, append(args, "-j", string(svcChain))...)
"-j", string(svcChain))...)
// Capture externalIPs. // Capture externalIPs.
for _, externalIP := range svcInfo.externalIPs { for _, externalIP := range svcInfo.externalIPs {
@ -619,15 +680,14 @@ func (proxier *Proxier) syncProxyRules() {
} }
} // We're holding the port, so it's OK to install iptables rules. } // We're holding the port, so it's OK to install iptables rules.
args := []string{ args := []string{
"-A", string(iptablesServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf("\"%s external IP\"", svcName.String()), "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcName.String()),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", externalIP), "-d", fmt.Sprintf("%s/32", externalIP),
"--dport", fmt.Sprintf("%d", svcInfo.port), "--dport", fmt.Sprintf("%d", svcInfo.port),
} }
// We have to SNAT packets to external IPs. // We have to SNAT packets to external IPs.
writeLine(natRules, append(args, writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
// Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
// nor from a local process to be forwarded to the service. // nor from a local process to be forwarded to the service.
@ -636,30 +696,26 @@ func (proxier *Proxier) syncProxyRules() {
externalTrafficOnlyArgs := append(args, externalTrafficOnlyArgs := append(args,
"-m", "physdev", "!", "--physdev-is-in", "-m", "physdev", "!", "--physdev-is-in",
"-m", "addrtype", "!", "--src-type", "LOCAL") "-m", "addrtype", "!", "--src-type", "LOCAL")
writeLine(natRules, append(externalTrafficOnlyArgs, writeLine(natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...)
"-j", string(svcChain))...)
dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
// Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
// This covers cases like GCE load-balancers which get added to the local routing table. // This covers cases like GCE load-balancers which get added to the local routing table.
writeLine(natRules, append(dstLocalOnlyArgs, writeLine(natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...)
"-j", string(svcChain))...)
} }
// Capture load-balancer ingress. // Capture load-balancer ingress.
for _, ingress := range svcInfo.loadBalancerStatus.Ingress { for _, ingress := range svcInfo.loadBalancerStatus.Ingress {
if ingress.IP != "" { if ingress.IP != "" {
args := []string{ args := []string{
"-A", string(iptablesServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf("\"%s loadbalancer IP\"", svcName.String()), "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", ingress.IP), "-d", fmt.Sprintf("%s/32", ingress.IP),
"--dport", fmt.Sprintf("%d", svcInfo.port), "--dport", fmt.Sprintf("%d", svcInfo.port),
} }
// We have to SNAT packets from external IPs. // We have to SNAT packets from external IPs.
writeLine(natRules, append(args, writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) writeLine(natRules, append(args, "-j", string(svcChain))...)
writeLine(natRules, append(args,
"-j", string(svcChain))...)
} }
} }
@ -685,27 +741,24 @@ func (proxier *Proxier) syncProxyRules() {
} }
newLocalPorts[lp] = socket newLocalPorts[lp] = socket
} // We're holding the port, so it's OK to install iptables rules. } // We're holding the port, so it's OK to install iptables rules.
args := []string{
"-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", svcName.String(),
"-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", svcInfo.nodePort),
}
// Nodeports need SNAT. // Nodeports need SNAT.
writeLine(natRules, writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
"-A", string(iptablesNodePortsChain),
"-m", "comment", "--comment", svcName.String(),
"-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", svcInfo.nodePort),
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))
// Jump to the service chain. // Jump to the service chain.
writeLine(natRules, writeLine(natRules, append(args, "-j", string(svcChain))...)
"-A", string(iptablesNodePortsChain),
"-m", "comment", "--comment", svcName.String(),
"-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", svcInfo.nodePort),
"-j", string(svcChain))
} }
// If the service has no endpoints then reject packets. // If the service has no endpoints then reject packets.
if len(proxier.endpointsMap[svcName]) == 0 { if len(proxier.endpointsMap[svcName]) == 0 {
writeLine(filterRules, writeLine(filterRules,
"-A", string(iptablesServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf("\"%s has no endpoints\"", svcName.String()), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcName.String()),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
"--dport", fmt.Sprintf("%d", svcInfo.port), "--dport", fmt.Sprintf("%d", svcInfo.port),
@ -775,16 +828,14 @@ func (proxier *Proxier) syncProxyRules() {
// TODO: if we grow logic to get this node's pod CIDR, we can use it. // TODO: if we grow logic to get this node's pod CIDR, we can use it.
writeLine(natRules, append(args, writeLine(natRules, append(args,
"-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i], ":")[0]), "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i], ":")[0]),
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) "-j", string(kubeMarkMasqChain))...)
// Update client-affinity lists. // Update client-affinity lists.
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
} }
// DNAT to final destination. // DNAT to final destination.
args = append(args, args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i])
"-m", protocol, "-p", protocol,
"-j", "DNAT", "--to-destination", endpoints[i])
writeLine(natRules, args...) writeLine(natRules, args...)
} }
} }
@ -808,10 +859,10 @@ func (proxier *Proxier) syncProxyRules() {
// Finally, tail-call to the nodeports chain. This needs to be after all // Finally, tail-call to the nodeports chain. This needs to be after all
// other service portal rules. // other service portal rules.
writeLine(natRules, writeLine(natRules,
"-A", string(iptablesServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", "\"kubernetes service nodeports; NOTE: this must be the last rule in this chain\"", "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
"-m", "addrtype", "--dst-type", "LOCAL", "-m", "addrtype", "--dst-type", "LOCAL",
"-j", string(iptablesNodePortsChain)) "-j", string(kubeNodePortsChain))
// Write the end-of-table markers. // Write the end-of-table markers.
writeLine(filterRules, "COMMIT") writeLine(filterRules, "COMMIT")
@ -832,14 +883,28 @@ func (proxier *Proxier) syncProxyRules() {
glog.Errorf("Closing local port %s", k.String()) glog.Errorf("Closing local port %s", k.String())
v.Close() v.Close()
} }
} else { return
// Close old local ports and save new ones. }
for k, v := range proxier.portsMap {
if newLocalPorts[k] == nil { // Close old local ports and save new ones.
v.Close() for k, v := range proxier.portsMap {
} if newLocalPorts[k] == nil {
v.Close()
}
}
proxier.portsMap = newLocalPorts
// Clean up the older SNAT rule which was directly in POSTROUTING.
// TODO(thockin): Remove this for v1.3 or v1.4.
args := []string{
"-m", "comment", "--comment", "kubernetes service traffic requiring SNAT",
"-m", "mark", "--mark", iptablesMasqueradeMark,
"-j", "MASQUERADE",
}
if err := proxier.iptables.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing old-style SNAT rule: %v", err)
} }
proxier.portsMap = newLocalPorts
} }
} }