diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 9780fa59b78..3231d6f2af7 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -82,18 +82,15 @@ func ShouldUseIptablesProxier() (bool, error) { return !version.LessThan(*minVersion), nil } -type portal struct { - ip net.IP - port int - protocol api.Protocol -} - // internal struct for string service information type serviceInfo struct { - portal portal + clusterIP net.IP + port int + protocol api.Protocol + nodePort int loadBalancerStatus api.LoadBalancerStatus sessionAffinityType api.ServiceAffinity - stickyMaxAgeMinutes int + stickyMaxAgeSeconds int endpoints []string // Deprecated, but required for back-compat (including e2e) deprecatedPublicIPs []string @@ -103,7 +100,7 @@ type serviceInfo struct { func newServiceInfo(service proxy.ServicePortName) *serviceInfo { return &serviceInfo{ sessionAffinityType: api.ServiceAffinityNone, // default - stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API. + stickyMaxAgeSeconds: 180, // TODO: paramaterize this in the API. } } @@ -181,10 +178,10 @@ func tearDownUserspaceIptables(ipt utiliptables.Interface) { } func (proxier *Proxier) sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool { - if info.portal.protocol != port.Protocol || info.portal.port != port.Port { + if info.protocol != port.Protocol || info.port != port.Port || info.nodePort != port.NodePort { return false } - if !info.portal.ip.Equal(net.ParseIP(service.Spec.ClusterIP)) { + if !info.clusterIP.Equal(net.ParseIP(service.Spec.ClusterIP)) { return false } if !ipsEqual(info.deprecatedPublicIPs, service.Spec.DeprecatedPublicIPs) { @@ -243,7 +240,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { // if ClusterIP is "None" or empty, skip proxying if !api.IsServiceIPSet(service) { - glog.V(3).Infof("Skipping service %s due to portal IP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) + glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) continue } @@ -266,9 +263,10 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { serviceIP := net.ParseIP(service.Spec.ClusterIP) glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) info = newServiceInfo(serviceName) - info.portal.ip = serviceIP - info.portal.port = servicePort.Port - info.portal.protocol = servicePort.Protocol + info.clusterIP = serviceIP + info.port = servicePort.Port + info.protocol = servicePort.Protocol + info.nodePort = servicePort.NodePort info.deprecatedPublicIPs = service.Spec.DeprecatedPublicIPs // Deep-copy in case the service instance changes info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) @@ -413,7 +411,7 @@ func (proxier *Proxier) syncProxyRules() error { glog.V(2).Info("not syncing iptables until Services and Endpoints have been received from master") return nil } - glog.V(4).Infof("Syncing iptables rules.") + glog.V(4).Infof("Syncing iptables rules") // Ensure main chains and rules are installed. inputChains := []utiliptables.Chain{utiliptables.ChainOutput, utiliptables.ChainPrerouting} @@ -451,7 +449,6 @@ func (proxier *Proxier) syncProxyRules() error { // Get iptables-save output so we can check for existing chains and rules. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore existingChains := make(map[utiliptables.Chain]string) - // run iptables-save iptablesSaveRaw, err := proxier.iptables.Save(utiliptables.TableNAT) if err != nil { // if we failed to get any rules glog.Errorf("Failed to execute iptable-save, syncing all rules. %s", err.Error()) @@ -459,126 +456,213 @@ func (proxier *Proxier) syncProxyRules() error { existingChains = getChainLines(utiliptables.TableNAT, iptablesSaveRaw) } - // for first line and chains - var chainsLines bytes.Buffer - // for the actual rules (which should be after the list of chains) - var rulesLines bytes.Buffer + chainsLines := bytes.NewBuffer(nil) + rulesLines := bytes.NewBuffer(nil) - // write table header - chainsLines.WriteString("*nat\n") + // Write table header. + writeLine(chainsLines, "*nat") + // Make sure we keep stats for the top-level chains, if they existed + // (which they should have because we created them above). if chain, ok := existingChains[iptablesServicesChain]; ok { - chainsLines.WriteString(fmt.Sprintf("%s\n", chain)) + writeLine(chainsLines, chain) } else { - chainsLines.WriteString(makeChainLine(iptablesServicesChain)) + writeLine(chainsLines, makeChainLine(iptablesServicesChain)) + } + if chain, ok := existingChains[iptablesNodePortsChain]; ok { + writeLine(chainsLines, chain) + } else { + writeLine(chainsLines, makeChainLine(iptablesNodePortsChain)) } - newHostChains := []utiliptables.Chain{} - newServiceChains := []utiliptables.Chain{} + // Accumulate chains to keep. + activeChains := make(map[utiliptables.Chain]bool) // use a map as a set - //Build rules for services + // Build rules for each service. for name, info := range proxier.serviceMap { - protocol := strings.ToLower((string)(info.portal.protocol)) - // get chain name + protocol := strings.ToLower((string)(info.protocol)) + + // Create the per-service chain, retaining counters if possible. svcChain := servicePortToServiceChain(name) - // Create chain if chain, ok := existingChains[svcChain]; ok { - chainsLines.WriteString(fmt.Sprintf("%s\n", chain)) + writeLine(chainsLines, chain) } else { - chainsLines.WriteString(makeChainLine(svcChain)) - } - // get hosts and host-Chains - hosts := make([]string, 0) - hostChains := make([]utiliptables.Chain, 0) - for _, ep := range info.endpoints { - hosts = append(hosts, ep) - hostChains = append(hostChains, servicePortAndEndpointToServiceChain(name, ep)) + writeLine(chainsLines, makeChainLine(svcChain)) } + activeChains[svcChain] = true - // Ensure we know what chains to flush/remove next time we generate the rules - newHostChains = append(newHostChains, hostChains...) - newServiceChains = append(newServiceChains, svcChain) + // Capture the clusterIP. + writeLine(rulesLines, + "-A", string(iptablesServicesChain), + "-m", "comment", "--comment", fmt.Sprintf("\"%s cluster IP\"", name.String()), + "-m", protocol, "-p", protocol, + "-d", fmt.Sprintf("%s/32", info.clusterIP.String()), + "--dport", fmt.Sprintf("%d", info.port), + "-j", string(svcChain)) - // write chain and sticky session rule - for _, hostChain := range hostChains { - // Create chain - if chain, ok := existingChains[utiliptables.Chain(hostChain)]; ok { - chainsLines.WriteString(fmt.Sprintf("%s\n", chain)) - } else { - chainsLines.WriteString(makeChainLine(hostChain)) - } - - // Sticky session - if info.sessionAffinityType == api.ServiceAffinityClientIP { - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -m recent --name %s --rcheck --seconds %d --reap -j %s\n", svcChain, name.String(), hostChain, info.stickyMaxAgeMinutes*60, hostChain)) + // Capture externalIPs. + for _, externalIP := range info.deprecatedPublicIPs { + args := []string{ + "-A", string(iptablesServicesChain), + "-m", "comment", "--comment", fmt.Sprintf("\"%s external IP\"", name.String()), + "-m", protocol, "-p", protocol, + "-d", fmt.Sprintf("%s/32", externalIP), + "--dport", fmt.Sprintf("%d", info.port), } + // We have to SNAT packets from external IPs. + writeLine(rulesLines, append(args, + "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) + writeLine(rulesLines, append(args, + "-j", string(svcChain))...) } - // write proxy/loadblanacing rules - n := len(hostChains) - for i, hostChain := range hostChains { - // Roughly round robin statistically if we have more than one host - if i < (n - 1) { - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -m statistic --mode random --probability %f -j %s\n", svcChain, name.String(), 1.0/float64(n-i), hostChain)) - } else { - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -j %s\n", svcChain, name.String(), hostChain)) - } - // proxy - if info.sessionAffinityType == api.ServiceAffinityClientIP { - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -m recent --name %s --set -j DNAT -p %s --to-destination %s\n", hostChain, name.String(), hostChain, protocol, hosts[i])) - } else { - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -j DNAT -p %s --to-destination %s\n", hostChain, name.String(), protocol, hosts[i])) - } - } - - // proxy - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"portal for %s\" -d %s/32 -m state --state NEW -p %s -m %s --dport %d -j %s\n", iptablesServicesChain, name.String(), info.portal.ip.String(), protocol, protocol, info.portal.port, svcChain)) - - for _, publicIP := range info.deprecatedPublicIPs { - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"deprecated-PublicIP portal for %s\" -d %s/32 -m state --state NEW -p %s -m %s --dport %d -j %s\n", iptablesServicesChain, name.String(), publicIP, protocol, protocol, info.portal.port, svcChain)) - } - + // Capture load-balancer ingress. for _, ingress := range info.loadBalancerStatus.Ingress { if ingress.IP != "" { - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"load-balancer portal for %s\" -d %s/32 -m state --state NEW -p %s -m %s --dport %d -j %s\n", iptablesServicesChain, name.String(), ingress.IP, protocol, protocol, info.portal.port, svcChain)) + args := []string{ + "-A", string(iptablesServicesChain), + "-m", "comment", "--comment", fmt.Sprintf("\"%s loadbalancer IP\"", name.String()), + "-m", protocol, "-p", protocol, + "-d", fmt.Sprintf("%s/32", ingress.IP), + "--dport", fmt.Sprintf("%d", info.port), + } + // We have to SNAT packets from external IPs. + writeLine(rulesLines, append(args, + "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) + writeLine(rulesLines, append(args, + "-j", string(svcChain))...) } } + + // Capture nodeports. If we had more than 2 rules it might be + // worthwhile to make a new per-service chain for nodeport rules, but + // with just 2 rules it ends up being a waste and a cognitive burden. + if info.nodePort != 0 { + // Nodeports need SNAT. + writeLine(rulesLines, + "-A", string(iptablesNodePortsChain), + "-m", "comment", "--comment", name.String(), + "-m", protocol, "-p", protocol, + "--dport", fmt.Sprintf("%d", info.nodePort), + "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark)) + // Jump to the service chain. + writeLine(rulesLines, + "-A", string(iptablesNodePortsChain), + "-m", "comment", "--comment", name.String(), + "-m", protocol, "-p", protocol, + "--dport", fmt.Sprintf("%d", info.nodePort), + "-j", string(svcChain)) + } + + // Generate the per-endpoint chains. We do this in multiple passes so we + // can group rules together. + endpoints := make([]string, 0) + endpointChains := make([]utiliptables.Chain, 0) + for _, ep := range info.endpoints { + endpoints = append(endpoints, ep) + endpointChain := servicePortAndEndpointToServiceChain(name, ep) + endpointChains = append(endpointChains, endpointChain) + + // Create the endpoint chain, retaining counters if possible. + if chain, ok := existingChains[utiliptables.Chain(endpointChain)]; ok { + writeLine(chainsLines, chain) + } else { + writeLine(chainsLines, makeChainLine(endpointChain)) + } + activeChains[endpointChain] = true + } + + // First write session affinity rules, if applicable. + if info.sessionAffinityType == api.ServiceAffinityClientIP { + for _, endpointChain := range endpointChains { + writeLine(rulesLines, + "-A", string(svcChain), + "-m", "comment", "--comment", name.String(), + "-m", "recent", "--name", string(endpointChain), + "--rcheck", "--seconds", fmt.Sprintf("%d", info.stickyMaxAgeSeconds), "--reap", + "-j", string(endpointChain)) + } + } + + // Now write loadbalancing & DNAT rules. + n := len(endpointChains) + for i, endpointChain := range endpointChains { + // Balancing rules in the per-service chain. + args := []string{ + "-A", string(svcChain), + "-m", "comment", "--comment", name.String(), + } + if i < (n - 1) { + // Each rule is a probabilistic match. + args = append(args, + "-m", "statistic", + "--mode", "random", + "--probability", fmt.Sprintf("%f", 1.0/float64(n-i))) + } + // The final (or only if n == 1) rule is a guaranteed match. + args = append(args, "-j", string(endpointChain)) + writeLine(rulesLines, args...) + + // Rules in the per-endpoint chain. + args = []string{ + "-A", string(endpointChain), + "-m", "comment", "--comment", name.String(), + } + // Handle traffic that loops back to the originator with SNAT. + // Technically we only need to do this if the endpoint is on this + // host, but we don't have that information, so we just do this for + // all endpoints. + // TODO: if we grow logic to get this node's pod CIDR, we can use it. + writeLine(rulesLines, append(args, + "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i], ":")[0]), + "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) + + // Update client-affinity lists. + if info.sessionAffinityType == api.ServiceAffinityClientIP { + args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") + } + // DNAT to final destination. + args = append(args, + "-m", protocol, "-p", protocol, + "-j", "DNAT", "--to-destination", endpoints[i]) + writeLine(rulesLines, args...) + } } - // Delete chains no longer in use: - activeChains := make(map[utiliptables.Chain]bool) // use a map as a set - for _, chain := range newHostChains { - activeChains[chain] = true - } - for _, chain := range newServiceChains { - activeChains[chain] = true - } + // Delete chains no longer in use. for chain := range existingChains { if !activeChains[chain] { chainString := string(chain) - // Ignore chains that aren't ours. if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") { + // Ignore chains that aren't ours. continue } - rulesLines.WriteString(fmt.Sprintf("-F %s\n-X %s\n", chain, chain)) + // We must (as per iptables) write a chain-line for it, which has + // the nice effect of flushing the chain. Then we can remove the + // chain. + writeLine(chainsLines, existingChains[chain]) + writeLine(rulesLines, "-X", chainString) } } - // write end of table - rulesLines.WriteString("COMMIT\n") - // combine parts - lines := append(chainsLines.Bytes(), rulesLines.Bytes()...) + // Write the end-of-table marker. + writeLine(rulesLines, "COMMIT") - // sync rules and return error + // Sync rules. + // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. + lines := append(chainsLines.Bytes(), rulesLines.Bytes()...) glog.V(3).Infof("Syncing rules: %s", lines) - // NOTE: flush=false is used so we don't flush non-kubernetes chains in the table. - err = proxier.iptables.Restore(utiliptables.TableNAT, lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) - return err + return proxier.iptables.Restore(utiliptables.TableNAT, lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) +} + +// Join all words with spaces, terminate with newline and write to buf. +func writeLine(buf *bytes.Buffer, words ...string) { + buf.WriteString(strings.Join(words, " ") + "\n") } // return an iptables-save/restore formatted chain line given a Chain func makeChainLine(chain utiliptables.Chain) string { - return fmt.Sprintf(":%s - [0:0]\n", chain) + return fmt.Sprintf(":%s - [0:0]", chain) } // getChainLines parses a table's iptables-save data to find chains in the table.