From d14c98f6ccba344277f4a43c762df3b884601b4e Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Fri, 14 Aug 2015 15:20:19 -0700 Subject: [PATCH 1/9] Add nodepoprt chain and link it in, add unused MASQ rule --- pkg/proxy/iptables/proxier.go | 49 ++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 4de9ae91905..9780fa59b78 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -48,12 +48,16 @@ import ( // features are backported in various distros and this could get pretty hairy. // However iptables-1.4.0 was released 2007-Dec-22 and appears to have every feature we use, // so this seems prefectly reasonable for now. -const ( - IPTABLES_MIN_VERSION string = "1.4.0" -) +const IPTABLES_MIN_VERSION string = "1.4.0" // the services chain -var iptablesServicesChain utiliptables.Chain = "KUBE-SERVICES" +const iptablesServicesChain utiliptables.Chain = "KUBE-SERVICES" + +// the nodeports chain +const iptablesNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS" + +// the mark we apply to traffic needing SNAT +const iptablesMasqueradeMark = "0x4d415351" // ShouldUseIptablesProxier returns true if we should use the iptables Proxier instead of // the userspace Proxier. @@ -411,16 +415,37 @@ func (proxier *Proxier) syncProxyRules() error { } glog.V(4).Infof("Syncing iptables rules.") - // ensure main chain and rule connecting to output - args := []string{"-j", string(iptablesServicesChain)} - if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, iptablesServicesChain); err != nil { - return err + // Ensure main chains and rules are installed. + inputChains := []utiliptables.Chain{utiliptables.ChainOutput, utiliptables.ChainPrerouting} + // Link the services chain. + for _, chain := range inputChains { + if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, iptablesServicesChain); err != nil { + return err + } + comment := "kubernetes service portals; must be before nodeports" + args := []string{"-m", "comment", "--comment", comment, "-j", string(iptablesServicesChain)} + if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, chain, args...); err != nil { + return err + } } - if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainOutput, args...); err != nil { - return err + // Link the nodeports chain. + for _, chain := range inputChains { + if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, iptablesNodePortsChain); err != nil { + return err + } + comment := "kubernetes service nodeports; must be after portals" + args := []string{"-m", "comment", "--comment", comment, "-m", "addrtype", "--dst-type", "LOCAL", "-j", string(iptablesNodePortsChain)} + if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, chain, args...); err != nil { + return err + } } - if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainPrerouting, args...); err != nil { - return err + // Link the output rules. + { + comment := "kubernetes service traffic requiring SNAT" + args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", iptablesMasqueradeMark, "-j", "MASQUERADE"} + if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { + return err + } } // Get iptables-save output so we can check for existing chains and rules. From 731d5e519139da63b1d551d45708484bbde4a9b8 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Fri, 14 Aug 2015 20:50:29 -0700 Subject: [PATCH 2/9] Clean up iptables rules, add nodeport support --- pkg/proxy/iptables/proxier.go | 282 ++++++++++++++++++++++------------ 1 file changed, 183 insertions(+), 99 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 9780fa59b78..3231d6f2af7 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -82,18 +82,15 @@ func ShouldUseIptablesProxier() (bool, error) { return !version.LessThan(*minVersion), nil } -type portal struct { - ip net.IP - port int - protocol api.Protocol -} - // internal struct for string service information type serviceInfo struct { - portal portal + clusterIP net.IP + port int + protocol api.Protocol + nodePort int loadBalancerStatus api.LoadBalancerStatus sessionAffinityType api.ServiceAffinity - stickyMaxAgeMinutes int + stickyMaxAgeSeconds int endpoints []string // Deprecated, but required for back-compat (including e2e) deprecatedPublicIPs []string @@ -103,7 +100,7 @@ type serviceInfo struct { func newServiceInfo(service proxy.ServicePortName) *serviceInfo { return &serviceInfo{ sessionAffinityType: api.ServiceAffinityNone, // default - stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API. + stickyMaxAgeSeconds: 180, // TODO: paramaterize this in the API. } } @@ -181,10 +178,10 @@ func tearDownUserspaceIptables(ipt utiliptables.Interface) { } func (proxier *Proxier) sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool { - if info.portal.protocol != port.Protocol || info.portal.port != port.Port { + if info.protocol != port.Protocol || info.port != port.Port || info.nodePort != port.NodePort { return false } - if !info.portal.ip.Equal(net.ParseIP(service.Spec.ClusterIP)) { + if !info.clusterIP.Equal(net.ParseIP(service.Spec.ClusterIP)) { return false } if !ipsEqual(info.deprecatedPublicIPs, service.Spec.DeprecatedPublicIPs) { @@ -243,7 +240,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { // if ClusterIP is "None" or empty, skip proxying if !api.IsServiceIPSet(service) { - glog.V(3).Infof("Skipping service %s due to portal IP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) + glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) continue } @@ -266,9 +263,10 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { serviceIP := net.ParseIP(service.Spec.ClusterIP) glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) info = newServiceInfo(serviceName) - info.portal.ip = serviceIP - info.portal.port = servicePort.Port - info.portal.protocol = servicePort.Protocol + info.clusterIP = serviceIP + info.port = servicePort.Port + info.protocol = servicePort.Protocol + info.nodePort = servicePort.NodePort info.deprecatedPublicIPs = service.Spec.DeprecatedPublicIPs // Deep-copy in case the service instance changes info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) @@ -413,7 +411,7 @@ func (proxier *Proxier) syncProxyRules() error { glog.V(2).Info("not syncing iptables until Services and Endpoints have been received from master") return nil } - glog.V(4).Infof("Syncing iptables rules.") + glog.V(4).Infof("Syncing iptables rules") // Ensure main chains and rules are installed. inputChains := []utiliptables.Chain{utiliptables.ChainOutput, utiliptables.ChainPrerouting} @@ -451,7 +449,6 @@ func (proxier *Proxier) syncProxyRules() error { // Get iptables-save output so we can check for existing chains and rules. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore existingChains := make(map[utiliptables.Chain]string) - // run iptables-save iptablesSaveRaw, err := proxier.iptables.Save(utiliptables.TableNAT) if err != nil { // if we failed to get any rules glog.Errorf("Failed to execute iptable-save, syncing all rules. %s", err.Error()) @@ -459,126 +456,213 @@ func (proxier *Proxier) syncProxyRules() error { existingChains = getChainLines(utiliptables.TableNAT, iptablesSaveRaw) } - // for first line and chains - var chainsLines bytes.Buffer - // for the actual rules (which should be after the list of chains) - var rulesLines bytes.Buffer + chainsLines := bytes.NewBuffer(nil) + rulesLines := bytes.NewBuffer(nil) - // write table header - chainsLines.WriteString("*nat\n") + // Write table header. + writeLine(chainsLines, "*nat") + // Make sure we keep stats for the top-level chains, if they existed + // (which they should have because we created them above). if chain, ok := existingChains[iptablesServicesChain]; ok { - chainsLines.WriteString(fmt.Sprintf("%s\n", chain)) + writeLine(chainsLines, chain) } else { - chainsLines.WriteString(makeChainLine(iptablesServicesChain)) + writeLine(chainsLines, makeChainLine(iptablesServicesChain)) + } + if chain, ok := existingChains[iptablesNodePortsChain]; ok { + writeLine(chainsLines, chain) + } else { + writeLine(chainsLines, makeChainLine(iptablesNodePortsChain)) } - newHostChains := []utiliptables.Chain{} - newServiceChains := []utiliptables.Chain{} + // Accumulate chains to keep. + activeChains := make(map[utiliptables.Chain]bool) // use a map as a set - //Build rules for services + // Build rules for each service. for name, info := range proxier.serviceMap { - protocol := strings.ToLower((string)(info.portal.protocol)) - // get chain name + protocol := strings.ToLower((string)(info.protocol)) + + // Create the per-service chain, retaining counters if possible. svcChain := servicePortToServiceChain(name) - // Create chain if chain, ok := existingChains[svcChain]; ok { - chainsLines.WriteString(fmt.Sprintf("%s\n", chain)) + writeLine(chainsLines, chain) } else { - chainsLines.WriteString(makeChainLine(svcChain)) - } - // get hosts and host-Chains - hosts := make([]string, 0) - hostChains := make([]utiliptables.Chain, 0) - for _, ep := range info.endpoints { - hosts = append(hosts, ep) - hostChains = append(hostChains, servicePortAndEndpointToServiceChain(name, ep)) + writeLine(chainsLines, makeChainLine(svcChain)) } + activeChains[svcChain] = true - // Ensure we know what chains to flush/remove next time we generate the rules - newHostChains = append(newHostChains, hostChains...) - newServiceChains = append(newServiceChains, svcChain) + // Capture the clusterIP. + writeLine(rulesLines, + "-A", string(iptablesServicesChain), + "-m", "comment", "--comment", fmt.Sprintf("\"%s cluster IP\"", name.String()), + "-m", protocol, "-p", protocol, + "-d", fmt.Sprintf("%s/32", info.clusterIP.String()), + "--dport", fmt.Sprintf("%d", info.port), + "-j", string(svcChain)) - // write chain and sticky session rule - for _, hostChain := range hostChains { - // Create chain - if chain, ok := existingChains[utiliptables.Chain(hostChain)]; ok { - chainsLines.WriteString(fmt.Sprintf("%s\n", chain)) - } else { - chainsLines.WriteString(makeChainLine(hostChain)) - } - - // Sticky session - if info.sessionAffinityType == api.ServiceAffinityClientIP { - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -m recent --name %s --rcheck --seconds %d --reap -j %s\n", svcChain, name.String(), hostChain, info.stickyMaxAgeMinutes*60, hostChain)) + // Capture externalIPs. + for _, externalIP := range info.deprecatedPublicIPs { + args := []string{ + "-A", string(iptablesServicesChain), + "-m", "comment", "--comment", fmt.Sprintf("\"%s external IP\"", name.String()), + "-m", protocol, "-p", protocol, + "-d", fmt.Sprintf("%s/32", externalIP), + "--dport", fmt.Sprintf("%d", info.port), } + // We have to SNAT packets from external IPs. + writeLine(rulesLines, append(args, + "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) + writeLine(rulesLines, append(args, + "-j", string(svcChain))...) } - // write proxy/loadblanacing rules - n := len(hostChains) - for i, hostChain := range hostChains { - // Roughly round robin statistically if we have more than one host - if i < (n - 1) { - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -m statistic --mode random --probability %f -j %s\n", svcChain, name.String(), 1.0/float64(n-i), hostChain)) - } else { - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -j %s\n", svcChain, name.String(), hostChain)) - } - // proxy - if info.sessionAffinityType == api.ServiceAffinityClientIP { - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -m recent --name %s --set -j DNAT -p %s --to-destination %s\n", hostChain, name.String(), hostChain, protocol, hosts[i])) - } else { - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -j DNAT -p %s --to-destination %s\n", hostChain, name.String(), protocol, hosts[i])) - } - } - - // proxy - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"portal for %s\" -d %s/32 -m state --state NEW -p %s -m %s --dport %d -j %s\n", iptablesServicesChain, name.String(), info.portal.ip.String(), protocol, protocol, info.portal.port, svcChain)) - - for _, publicIP := range info.deprecatedPublicIPs { - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"deprecated-PublicIP portal for %s\" -d %s/32 -m state --state NEW -p %s -m %s --dport %d -j %s\n", iptablesServicesChain, name.String(), publicIP, protocol, protocol, info.portal.port, svcChain)) - } - + // Capture load-balancer ingress. for _, ingress := range info.loadBalancerStatus.Ingress { if ingress.IP != "" { - rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"load-balancer portal for %s\" -d %s/32 -m state --state NEW -p %s -m %s --dport %d -j %s\n", iptablesServicesChain, name.String(), ingress.IP, protocol, protocol, info.portal.port, svcChain)) + args := []string{ + "-A", string(iptablesServicesChain), + "-m", "comment", "--comment", fmt.Sprintf("\"%s loadbalancer IP\"", name.String()), + "-m", protocol, "-p", protocol, + "-d", fmt.Sprintf("%s/32", ingress.IP), + "--dport", fmt.Sprintf("%d", info.port), + } + // We have to SNAT packets from external IPs. + writeLine(rulesLines, append(args, + "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) + writeLine(rulesLines, append(args, + "-j", string(svcChain))...) } } + + // Capture nodeports. If we had more than 2 rules it might be + // worthwhile to make a new per-service chain for nodeport rules, but + // with just 2 rules it ends up being a waste and a cognitive burden. + if info.nodePort != 0 { + // Nodeports need SNAT. + writeLine(rulesLines, + "-A", string(iptablesNodePortsChain), + "-m", "comment", "--comment", name.String(), + "-m", protocol, "-p", protocol, + "--dport", fmt.Sprintf("%d", info.nodePort), + "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark)) + // Jump to the service chain. + writeLine(rulesLines, + "-A", string(iptablesNodePortsChain), + "-m", "comment", "--comment", name.String(), + "-m", protocol, "-p", protocol, + "--dport", fmt.Sprintf("%d", info.nodePort), + "-j", string(svcChain)) + } + + // Generate the per-endpoint chains. We do this in multiple passes so we + // can group rules together. + endpoints := make([]string, 0) + endpointChains := make([]utiliptables.Chain, 0) + for _, ep := range info.endpoints { + endpoints = append(endpoints, ep) + endpointChain := servicePortAndEndpointToServiceChain(name, ep) + endpointChains = append(endpointChains, endpointChain) + + // Create the endpoint chain, retaining counters if possible. + if chain, ok := existingChains[utiliptables.Chain(endpointChain)]; ok { + writeLine(chainsLines, chain) + } else { + writeLine(chainsLines, makeChainLine(endpointChain)) + } + activeChains[endpointChain] = true + } + + // First write session affinity rules, if applicable. + if info.sessionAffinityType == api.ServiceAffinityClientIP { + for _, endpointChain := range endpointChains { + writeLine(rulesLines, + "-A", string(svcChain), + "-m", "comment", "--comment", name.String(), + "-m", "recent", "--name", string(endpointChain), + "--rcheck", "--seconds", fmt.Sprintf("%d", info.stickyMaxAgeSeconds), "--reap", + "-j", string(endpointChain)) + } + } + + // Now write loadbalancing & DNAT rules. + n := len(endpointChains) + for i, endpointChain := range endpointChains { + // Balancing rules in the per-service chain. + args := []string{ + "-A", string(svcChain), + "-m", "comment", "--comment", name.String(), + } + if i < (n - 1) { + // Each rule is a probabilistic match. + args = append(args, + "-m", "statistic", + "--mode", "random", + "--probability", fmt.Sprintf("%f", 1.0/float64(n-i))) + } + // The final (or only if n == 1) rule is a guaranteed match. + args = append(args, "-j", string(endpointChain)) + writeLine(rulesLines, args...) + + // Rules in the per-endpoint chain. + args = []string{ + "-A", string(endpointChain), + "-m", "comment", "--comment", name.String(), + } + // Handle traffic that loops back to the originator with SNAT. + // Technically we only need to do this if the endpoint is on this + // host, but we don't have that information, so we just do this for + // all endpoints. + // TODO: if we grow logic to get this node's pod CIDR, we can use it. + writeLine(rulesLines, append(args, + "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i], ":")[0]), + "-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...) + + // Update client-affinity lists. + if info.sessionAffinityType == api.ServiceAffinityClientIP { + args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") + } + // DNAT to final destination. + args = append(args, + "-m", protocol, "-p", protocol, + "-j", "DNAT", "--to-destination", endpoints[i]) + writeLine(rulesLines, args...) + } } - // Delete chains no longer in use: - activeChains := make(map[utiliptables.Chain]bool) // use a map as a set - for _, chain := range newHostChains { - activeChains[chain] = true - } - for _, chain := range newServiceChains { - activeChains[chain] = true - } + // Delete chains no longer in use. for chain := range existingChains { if !activeChains[chain] { chainString := string(chain) - // Ignore chains that aren't ours. if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") { + // Ignore chains that aren't ours. continue } - rulesLines.WriteString(fmt.Sprintf("-F %s\n-X %s\n", chain, chain)) + // We must (as per iptables) write a chain-line for it, which has + // the nice effect of flushing the chain. Then we can remove the + // chain. + writeLine(chainsLines, existingChains[chain]) + writeLine(rulesLines, "-X", chainString) } } - // write end of table - rulesLines.WriteString("COMMIT\n") - // combine parts - lines := append(chainsLines.Bytes(), rulesLines.Bytes()...) + // Write the end-of-table marker. + writeLine(rulesLines, "COMMIT") - // sync rules and return error + // Sync rules. + // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. + lines := append(chainsLines.Bytes(), rulesLines.Bytes()...) glog.V(3).Infof("Syncing rules: %s", lines) - // NOTE: flush=false is used so we don't flush non-kubernetes chains in the table. - err = proxier.iptables.Restore(utiliptables.TableNAT, lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) - return err + return proxier.iptables.Restore(utiliptables.TableNAT, lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) +} + +// Join all words with spaces, terminate with newline and write to buf. +func writeLine(buf *bytes.Buffer, words ...string) { + buf.WriteString(strings.Join(words, " ") + "\n") } // return an iptables-save/restore formatted chain line given a Chain func makeChainLine(chain utiliptables.Chain) string { - return fmt.Sprintf(":%s - [0:0]\n", chain) + return fmt.Sprintf(":%s - [0:0]", chain) } // getChainLines parses a table's iptables-save data to find chains in the table. From d72892d0b0745803ad1cfcb3831024f741670b3a Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Fri, 14 Aug 2015 21:02:53 -0700 Subject: [PATCH 3/9] Include protocol in the hash for chain names --- pkg/proxy/iptables/proxier.go | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 3231d6f2af7..ff67c530258 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -384,22 +384,23 @@ func flattenValidEndpoints(endpoints []hostPortPair) []string { return result } -// servicePortToServiceChain takes the ServicePortName for a -// service and returns the associated iptables chain -// this is computed by hashing (sha256) then encoding to base64 and -// truncating with the prefix "KUBE-SVC-" -// We do this because Iptables Chain Names must be <= 28 chars long -func servicePortToServiceChain(s proxy.ServicePortName) utiliptables.Chain { - hash := sha256.Sum256([]byte(s.String())) +// servicePortToServiceChain takes the ServicePortName for a service and +// returns the associated iptables chain. This is computed by hashing (sha256) +// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do +// this because Iptables Chain Names must be <= 28 chars long, and the longer +// they are the harder they are to read. +func servicePortToServiceChain(s proxy.ServicePortName, protocol string) utiliptables.Chain { + hash := sha256.Sum256([]byte(s.String() + protocol)) encoded := base32.StdEncoding.EncodeToString(hash[:]) - return utiliptables.Chain("KUBE-SVC-" + encoded[:19]) + return utiliptables.Chain("KUBE-SVC-" + encoded[:16]) } -// this is the same as servicePortToServiceChain but with the endpoint included essentially -func servicePortAndEndpointToServiceChain(s proxy.ServicePortName, endpoint string) utiliptables.Chain { - hash := sha256.Sum256([]byte(s.String() + "_" + endpoint)) +// This is the same as servicePortToServiceChain but with the endpoint +// included. +func servicePortAndEndpointToServiceChain(s proxy.ServicePortName, protocol string, endpoint string) utiliptables.Chain { + hash := sha256.Sum256([]byte(s.String() + protocol + endpoint)) encoded := base32.StdEncoding.EncodeToString(hash[:]) - return utiliptables.Chain("KUBE-SEP-" + encoded[:19]) + return utiliptables.Chain("KUBE-SEP-" + encoded[:16]) } // This is where all of the iptables-save/restore calls happen. @@ -480,10 +481,10 @@ func (proxier *Proxier) syncProxyRules() error { // Build rules for each service. for name, info := range proxier.serviceMap { - protocol := strings.ToLower((string)(info.protocol)) + protocol := strings.ToLower(string(info.protocol)) // Create the per-service chain, retaining counters if possible. - svcChain := servicePortToServiceChain(name) + svcChain := servicePortToServiceChain(name, protocol) if chain, ok := existingChains[svcChain]; ok { writeLine(chainsLines, chain) } else { @@ -560,7 +561,7 @@ func (proxier *Proxier) syncProxyRules() error { endpointChains := make([]utiliptables.Chain, 0) for _, ep := range info.endpoints { endpoints = append(endpoints, ep) - endpointChain := servicePortAndEndpointToServiceChain(name, ep) + endpointChain := servicePortAndEndpointToServiceChain(name, protocol, ep) endpointChains = append(endpointChains, endpointChain) // Create the endpoint chain, retaining counters if possible. From f1a48574a6420a9c6fe9cf799e5d85b21bdf7c8b Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Fri, 14 Aug 2015 21:53:52 -0700 Subject: [PATCH 4/9] Clean up logging, make initial sync faster --- pkg/proxy/config/config.go | 19 +++++++++++-------- pkg/proxy/iptables/proxier.go | 15 +++++++++------ 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 7c873f3243d..7e6b1858220 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -19,6 +19,7 @@ package config import ( "sync" + "github.com/davecgh/go-spew/spew" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/types" @@ -91,6 +92,7 @@ func NewEndpointsConfig() *EndpointsConfig { func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { + glog.V(3).Infof("Calling handler.OnEndpointsUpdate()") handler.OnEndpointsUpdate(instance.([]api.Endpoints)) })) } @@ -126,19 +128,19 @@ func (s *endpointsStore) Merge(source string, change interface{}) error { update := change.(EndpointsUpdate) switch update.Op { case ADD: - glog.V(4).Infof("Adding new endpoint from source %s : %+v", source, update.Endpoints) + glog.V(4).Infof("Adding new endpoint from source %s : %s", source, spew.Sdump(update.Endpoints)) for _, value := range update.Endpoints { name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} endpoints[name] = value } case REMOVE: - glog.V(4).Infof("Removing an endpoint %+v", update) + glog.V(4).Infof("Removing an endpoint %s", spew.Sdump(update)) for _, value := range update.Endpoints { name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} delete(endpoints, name) } case SET: - glog.V(4).Infof("Setting endpoints %+v", update) + glog.V(4).Infof("Setting endpoints %s", spew.Sdump(update)) // Clear the old map entries by just creating a new map endpoints = make(map[types.NamespacedName]api.Endpoints) for _, value := range update.Endpoints { @@ -146,7 +148,7 @@ func (s *endpointsStore) Merge(source string, change interface{}) error { endpoints[name] = value } default: - glog.V(4).Infof("Received invalid update type: %v", update) + glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update)) } s.endpoints[source] = endpoints s.endpointLock.Unlock() @@ -189,6 +191,7 @@ func NewServiceConfig() *ServiceConfig { func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { + glog.V(3).Infof("Calling handler.OnServiceUpdate()") handler.OnServiceUpdate(instance.([]api.Service)) })) } @@ -224,19 +227,19 @@ func (s *serviceStore) Merge(source string, change interface{}) error { update := change.(ServiceUpdate) switch update.Op { case ADD: - glog.V(4).Infof("Adding new service from source %s : %+v", source, update.Services) + glog.V(4).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Services)) for _, value := range update.Services { name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} services[name] = value } case REMOVE: - glog.V(4).Infof("Removing a service %+v", update) + glog.V(4).Infof("Removing a service %s", spew.Sdump(update)) for _, value := range update.Services { name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} delete(services, name) } case SET: - glog.V(4).Infof("Setting services %+v", update) + glog.V(4).Infof("Setting services %s", spew.Sdump(update)) // Clear the old map entries by just creating a new map services = make(map[types.NamespacedName]api.Service) for _, value := range update.Services { @@ -244,7 +247,7 @@ func (s *serviceStore) Merge(source string, change interface{}) error { services[name] = value } default: - glog.V(4).Infof("Received invalid update type: %v", update) + glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update)) } s.services[source] = services s.serviceLock.Unlock() diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index ff67c530258..dc53fb92060 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -33,6 +33,7 @@ import ( "time" "github.com/coreos/go-semver/semver" + "github.com/davecgh/go-spew/spew" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/proxy" @@ -232,7 +233,6 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { defer proxier.mu.Unlock() proxier.haveReceivedServiceUpdate = true - glog.V(4).Infof("Received service update notice: %+v", allServices) activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set for i := range allServices { @@ -256,7 +256,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { } if exists { //Something changed. - glog.V(4).Infof("Something changed for service %q: removing it", serviceName) + glog.V(3).Infof("Something changed for service %q: removing it", serviceName) delete(proxier.serviceMap, serviceName) } @@ -273,7 +273,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { info.sessionAffinityType = service.Spec.SessionAffinity proxier.serviceMap[serviceName] = info - glog.V(4).Infof("info: %+v", info) + glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info)) } } @@ -297,7 +297,6 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { defer proxier.mu.Unlock() proxier.haveReceivedEndpointsUpdate = true - glog.V(4).Infof("Received endpoints update notice: %+v", allEndpoints) registeredEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set // Update endpoints for services. @@ -349,6 +348,10 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { proxier.serviceMap[service].endpoints = nil } } + + if err := proxier.syncProxyRules(); err != nil { + glog.Errorf("Failed to sync iptables rules: %v", err) + } } // used in OnEndpointsUpdate @@ -409,10 +412,10 @@ func servicePortAndEndpointToServiceChain(s proxy.ServicePortName, protocol stri func (proxier *Proxier) syncProxyRules() error { // don't sync rules till we've received services and endpoints if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate { - glog.V(2).Info("not syncing iptables until Services and Endpoints have been received from master") + glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") return nil } - glog.V(4).Infof("Syncing iptables rules") + glog.V(3).Infof("Syncing iptables rules") // Ensure main chains and rules are installed. inputChains := []utiliptables.Chain{utiliptables.ChainOutput, utiliptables.ChainPrerouting} From 9cf33772b405d6a6a067a7da926b467042c93371 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Fri, 14 Aug 2015 22:15:12 -0700 Subject: [PATCH 5/9] test for and set route_localnet sysctl --- pkg/proxy/iptables/proxier.go | 50 +++++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 5 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index dc53fb92060..a364cfc1e73 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -25,7 +25,9 @@ import ( "crypto/sha256" "encoding/base32" "fmt" + "io/ioutil" "net" + "path" "reflect" "strconv" "strings" @@ -60,10 +62,11 @@ const iptablesNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS" // the mark we apply to traffic needing SNAT const iptablesMasqueradeMark = "0x4d415351" -// ShouldUseIptablesProxier returns true if we should use the iptables Proxier instead of -// the userspace Proxier. -// This is determined by the iptables version. It may return an erorr if it fails to get the -// itpables version without error, in which case it will also return false. +// ShouldUseIptablesProxier returns true if we should use the iptables Proxier +// instead of the "classic" userspace Proxier. This is determined by checking +// the iptables version and for the existence of kernel features. It may return +// an error if it fails to get the itpables version without error, in which +// case it will also return false. func ShouldUseIptablesProxier() (bool, error) { exec := utilexec.New() minVersion, err := semver.NewVersion(IPTABLES_MIN_VERSION) @@ -80,7 +83,38 @@ func ShouldUseIptablesProxier() (bool, error) { if err != nil { return false, err } - return !version.LessThan(*minVersion), nil + if version.LessThan(*minVersion) { + return false, nil + } + + // Check for the required sysctls. We don't care about the value, just + // that it exists. If this Proxier is chosen, we'll iniialize it as we + // need. + _, err = getSysctl(sysctlRouteLocalnet) + if err != nil { + return false, err + } + + return true, nil +} + +const sysctlBase = "/proc/sys" +const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet" + +func getSysctl(sysctl string) (int, error) { + data, err := ioutil.ReadFile(path.Join(sysctlBase, sysctl)) + if err != nil { + return -1, err + } + val, err := strconv.Atoi(strings.Trim(string(data), " \n")) + if err != nil { + return -1, err + } + return val, nil +} + +func setSysctl(sysctl string, newVal int) error { + return ioutil.WriteFile(path.Join(sysctlBase, sysctl), []byte(strconv.Itoa(newVal)), 0640) } // internal struct for string service information @@ -128,6 +162,12 @@ func NewProxier(ipt utiliptables.Interface, syncPeriod time.Duration) (*Proxier, glog.V(2).Info("Tearing down userspace rules. Errors here are acceptable.") // remove iptables rules/chains from the userspace Proxier tearDownUserspaceIptables(ipt) + + // Set the route_localnet sysctl we need for + if err := setSysctl(sysctlRouteLocalnet, 1); err != nil { + return nil, fmt.Errorf("can't set sysctl route_localnet: %v", err) + } + return &Proxier{ serviceMap: make(map[proxy.ServicePortName]*serviceInfo), syncPeriod: syncPeriod, From 3a5c23d727a0c766df2b897adb1e15dea2a6a9b4 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Fri, 14 Aug 2015 22:36:11 -0700 Subject: [PATCH 6/9] test for and set bridge-nf-call-iptables sysctl --- cmd/kube-proxy/app/server.go | 3 ++- pkg/proxy/iptables/proxier.go | 20 +++++++++++++++----- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index df7bdeb7b0a..a058773ebf6 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -159,7 +159,8 @@ func (s *ProxyServer) Run(_ []string) error { if !s.ForceUserspaceProxy && shouldUseIptables { glog.V(2).Info("Using iptables Proxier.") - proxierIptables, err := iptables.NewProxier(utiliptables.New(exec.New(), protocol), s.SyncPeriod) + execer := exec.New() + proxierIptables, err := iptables.NewProxier(utiliptables.New(execer, protocol), execer, s.SyncPeriod) if err != nil { glog.Fatalf("Unable to create proxier: %v", err) } diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index a364cfc1e73..91ca2b4996c 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -100,6 +100,7 @@ func ShouldUseIptablesProxier() (bool, error) { const sysctlBase = "/proc/sys" const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet" +const sysctlBridgeCallIptables = "net/bridge/bridge-nf-call-iptables" func getSysctl(sysctl string) (int, error) { data, err := ioutil.ReadFile(path.Join(sysctlBase, sysctl)) @@ -158,16 +159,25 @@ var _ proxy.ProxyProvider = &Proxier{} // An error will be returned if iptables fails to update or acquire the initial lock. // Once a proxier is created, it will keep iptables up to date in the background and // will not terminate if a particular iptables call fails. -func NewProxier(ipt utiliptables.Interface, syncPeriod time.Duration) (*Proxier, error) { - glog.V(2).Info("Tearing down userspace rules. Errors here are acceptable.") - // remove iptables rules/chains from the userspace Proxier - tearDownUserspaceIptables(ipt) +func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration) (*Proxier, error) { // Set the route_localnet sysctl we need for if err := setSysctl(sysctlRouteLocalnet, 1); err != nil { - return nil, fmt.Errorf("can't set sysctl route_localnet: %v", err) + return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err) } + // Load the module. It's OK if this fails (e.g. the module is not present) + // because we'll catch the error on the sysctl, which is what we actually + // care about. + exec.Command("modprobe", "br-netfilter").CombinedOutput() + if err := setSysctl(sysctlBridgeCallIptables, 1); err != nil { + return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlBridgeCallIptables, err) + } + + // No turning back. Remove artifacts that might still exist from the userspace Proxier. + glog.V(2).Info("Tearing down userspace rules. Errors here are acceptable.") + tearDownUserspaceIptables(ipt) + return &Proxier{ serviceMap: make(map[proxy.ServicePortName]*serviceInfo), syncPeriod: syncPeriod, From 3d309700d04572f8b81c48b7d4c2ea22065e5086 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Sat, 15 Aug 2015 21:10:12 -0700 Subject: [PATCH 7/9] Make iptables use semver lib --- pkg/util/iptables/iptables.go | 60 ++++++++---------------------- pkg/util/iptables/iptables_test.go | 59 ----------------------------- 2 files changed, 15 insertions(+), 104 deletions(-) diff --git a/pkg/util/iptables/iptables.go b/pkg/util/iptables/iptables.go index 5e686d71db3..35f4d842977 100644 --- a/pkg/util/iptables/iptables.go +++ b/pkg/util/iptables/iptables.go @@ -21,10 +21,10 @@ import ( "io/ioutil" "os" "regexp" - "strconv" "strings" "sync" + "github.com/coreos/go-semver/semver" "github.com/golang/glog" "k8s.io/kubernetes/pkg/util" utilexec "k8s.io/kubernetes/pkg/util/exec" @@ -106,6 +106,10 @@ type FlushFlag bool const FlushTables FlushFlag = true const NoFlushTables FlushFlag = false +// Versions of iptables less than this do not support the -C / --check flag +// (test whether a rule exists). +const MinCheckVersion = "1.4.11" + // runner implements Interface in terms of exec("iptables"). type runner struct { mu sync.Mutex @@ -399,44 +403,24 @@ func makeFullArgs(table Table, chain Chain, args ...string) []string { // Checks if iptables has the "-C" flag func getIptablesHasCheckCommand(exec utilexec.Interface) (bool, error) { + minVersion, err := semver.NewVersion(MinCheckVersion) + if err != nil { + return false, err + } + // Returns "vX.Y.Z". vstring, err := GetIptablesVersionString(exec) if err != nil { return false, err } - - v1, v2, v3, err := extractIptablesVersion(vstring) + // Make a semver of the part after the v in "vX.X.X". + version, err := semver.NewVersion(vstring[1:]) if err != nil { return false, err } - - return iptablesHasCheckCommand(v1, v2, v3), nil -} - -// extractIptablesVersion returns the first three components of the iptables version. -// e.g. "iptables v1.3.66" would return (1, 3, 66, nil) -func extractIptablesVersion(str string) (int, int, int, error) { - versionMatcher := regexp.MustCompile("v([0-9]+)\\.([0-9]+)\\.([0-9]+)") - result := versionMatcher.FindStringSubmatch(str) - if result == nil { - return 0, 0, 0, fmt.Errorf("no iptables version found in string: %s", str) + if version.LessThan(*minVersion) { + return false, nil } - - v1, err := strconv.Atoi(result[1]) - if err != nil { - return 0, 0, 0, err - } - - v2, err := strconv.Atoi(result[2]) - if err != nil { - return 0, 0, 0, err - } - - v3, err := strconv.Atoi(result[3]) - if err != nil { - return 0, 0, 0, err - } - - return v1, v2, v3, nil + return true, nil } // GetIptablesVersionString runs "iptables --version" to get the version string, @@ -455,17 +439,3 @@ func GetIptablesVersionString(exec utilexec.Interface) (string, error) { } return match[0], nil } - -// Checks if an iptables version is after 1.4.11, when --check was added -func iptablesHasCheckCommand(v1 int, v2 int, v3 int) bool { - if v1 > 1 { - return true - } - if v1 == 1 && v2 > 4 { - return true - } - if v1 == 1 && v2 == 4 && v3 >= 11 { - return true - } - return false -} diff --git a/pkg/util/iptables/iptables_test.go b/pkg/util/iptables/iptables_test.go index 8327481469e..b8a0e45a787 100644 --- a/pkg/util/iptables/iptables_test.go +++ b/pkg/util/iptables/iptables_test.go @@ -438,65 +438,6 @@ func TestGetIptablesHasCheckCommand(t *testing.T) { } } -func TestExtractIptablesVersion(t *testing.T) { - testCases := []struct { - Version string - V1 int - V2 int - V3 int - Err bool - }{ - {"iptables v1.4.7", 1, 4, 7, false}, - {"iptables v1.4.11", 1, 4, 11, false}, - {"iptables v0.2.5", 0, 2, 5, false}, - {"iptables v1.2.3.4.5.6", 1, 2, 3, false}, - {"iptables v1.4", 0, 0, 0, true}, - {"iptables v12345.12345.12345.12344", 12345, 12345, 12345, false}, - {"total junk", 0, 0, 0, true}, - } - - for _, testCase := range testCases { - v1, v2, v3, err := extractIptablesVersion(testCase.Version) - if (err != nil) != testCase.Err { - t.Errorf("Expected error: %v, Got error: %v", testCase.Err, err) - } - if err == nil { - if v1 != testCase.V1 { - t.Errorf("First version number incorrect for string \"%s\", got %d, expected %d", testCase.Version, v1, testCase.V1) - } - if v2 != testCase.V2 { - t.Errorf("Second version number incorrect for string \"%s\", got %d, expected %d", testCase.Version, v2, testCase.V2) - } - if v3 != testCase.V3 { - t.Errorf("Third version number incorrect for string \"%s\", got %d, expected %d", testCase.Version, v3, testCase.V3) - } - } - } -} - -func TestIptablesHasCheckCommand(t *testing.T) { - testCases := []struct { - V1 int - V2 int - V3 int - Result bool - }{ - {0, 55, 55, false}, - {1, 0, 55, false}, - {1, 4, 10, false}, - {1, 4, 11, true}, - {1, 4, 19, true}, - {1, 5, 0, true}, - {2, 0, 0, true}, - } - - for _, testCase := range testCases { - if result := iptablesHasCheckCommand(testCase.V1, testCase.V2, testCase.V3); result != testCase.Result { - t.Errorf("For %d.%d.%d expected %v got %v", testCase.V1, testCase.V2, testCase.V3, testCase.Result, result) - } - } -} - func TestCheckRuleWithoutCheckPresent(t *testing.T) { iptables_save_output := `# Generated by iptables-save v1.4.7 on Wed Oct 29 14:56:01 2014 *nat From 7e9c685ba600121aeea3687f3da5bd10e2b6b77a Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Sat, 15 Aug 2015 21:16:04 -0700 Subject: [PATCH 8/9] Require same min iptables version as -C --- pkg/proxy/iptables/proxier.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 91ca2b4996c..e227492c4ca 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -16,9 +16,9 @@ limitations under the License. package iptables -/* -NOTE: this needs to be tested in e2e since it uses iptables for everything. -*/ +// +// NOTE: this needs to be tested in e2e since it uses iptables for everything. +// import ( "bytes" @@ -45,13 +45,14 @@ import ( "k8s.io/kubernetes/pkg/util/slice" ) -// NOTE: IPTABLES_MIN_VERSION is the minimum version of iptables for which we will use the Proxier -// from this package instead of the userspace Proxier. -// This is will not be enough, as the version number is somewhat unreliable, -// features are backported in various distros and this could get pretty hairy. -// However iptables-1.4.0 was released 2007-Dec-22 and appears to have every feature we use, -// so this seems prefectly reasonable for now. -const IPTABLES_MIN_VERSION string = "1.4.0" +// iptablesMinVersion is the minimum version of iptables for which we will use the Proxier +// from this package instead of the userspace Proxier. While most of the +// features we need were available earlier, the '-C' flag was added more +// recently. We use that indirectly in Ensure* functions, and if we don't +// have it, we have to be extra careful about the exact args we feed in being +// the same as the args we read back (iptables itself normalizes some args). +// This is the "new" Proxier, so we require "new" versions of tools. +const iptablesMinVersion = utiliptables.MinCheckVersion // the services chain const iptablesServicesChain utiliptables.Chain = "KUBE-SERVICES" @@ -69,7 +70,7 @@ const iptablesMasqueradeMark = "0x4d415351" // case it will also return false. func ShouldUseIptablesProxier() (bool, error) { exec := utilexec.New() - minVersion, err := semver.NewVersion(IPTABLES_MIN_VERSION) + minVersion, err := semver.NewVersion(iptablesMinVersion) if err != nil { return false, err } From 6f34be30a3ee4cf57bdadf262e4949e190163b6f Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Mon, 17 Aug 2015 20:54:05 -0700 Subject: [PATCH 9/9] Limit float precision to 5 points --- pkg/proxy/iptables/proxier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index e227492c4ca..d3d800b8d42 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -652,7 +652,7 @@ func (proxier *Proxier) syncProxyRules() error { args = append(args, "-m", "statistic", "--mode", "random", - "--probability", fmt.Sprintf("%f", 1.0/float64(n-i))) + "--probability", fmt.Sprintf("%0.5f", 1.0/float64(n-i))) } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", string(endpointChain))