diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 609cc26cb39..d10d38905b4 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -204,8 +204,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err // IPTablesMasqueradeBit must be specified or defaulted. return nil, fmt.Errorf("Unable to read IPTablesMasqueradeBit from config") } - - proxierIptables, err := iptables.NewProxier(iptInterface, execer, config.IPTablesSyncPeriod.Duration, config.MasqueradeAll, int(*config.IPTablesMasqueradeBit), config.ClusterCIDR, hostname) + proxierIptables, err := iptables.NewProxier(iptInterface, execer, config.IPTablesSyncPeriod.Duration, config.MasqueradeAll, int(*config.IPTablesMasqueradeBit), config.ClusterCIDR, hostname, getNodeIP(client, hostname)) if err != nil { glog.Fatalf("Unable to create proxier: %v", err) } @@ -409,3 +408,18 @@ func tryIptablesProxy(iptver iptables.IptablesVersioner, kcompat iptables.Kernel func (s *ProxyServer) birthCry() { s.Recorder.Eventf(s.Config.NodeRef, api.EventTypeNormal, "Starting", "Starting kube-proxy.") } + +func getNodeIP(client *kubeclient.Client, hostname string) net.IP { + var nodeIP net.IP + node, err := client.Nodes().Get(hostname) + if err != nil { + glog.Warningf("Failed to retrieve node info: %v", err) + return nil + } + nodeIP, err = nodeutil.GetNodeHostIP(node) + if err != nil { + glog.Warningf("Failed to retrieve node IP: %v", err) + return nil + } + return nodeIP +} diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 197d0779d85..1405d492bb9 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -45,31 +45,35 @@ import ( utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" ) -// iptablesMinVersion is the minimum version of iptables for which we will use the Proxier -// from this package instead of the userspace Proxier. While most of the -// features we need were available earlier, the '-C' flag was added more -// recently. We use that indirectly in Ensure* functions, and if we don't -// have it, we have to be extra careful about the exact args we feed in being -// the same as the args we read back (iptables itself normalizes some args). -// This is the "new" Proxier, so we require "new" versions of tools. -const iptablesMinVersion = utiliptables.MinCheckVersion +const ( + // iptablesMinVersion is the minimum version of iptables for which we will use the Proxier + // from this package instead of the userspace Proxier. While most of the + // features we need were available earlier, the '-C' flag was added more + // recently. We use that indirectly in Ensure* functions, and if we don't + // have it, we have to be extra careful about the exact args we feed in being + // the same as the args we read back (iptables itself normalizes some args). + // This is the "new" Proxier, so we require "new" versions of tools. + iptablesMinVersion = utiliptables.MinCheckVersion -// the services chain -const kubeServicesChain utiliptables.Chain = "KUBE-SERVICES" + // the services chain + kubeServicesChain utiliptables.Chain = "KUBE-SERVICES" -// the nodeports chain -const kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS" + // the nodeports chain + kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS" -// the kubernetes postrouting chain -const kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING" + // the kubernetes postrouting chain + kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING" -// the mark-for-masquerade chain -// TODO: let kubelet manage this chain. Other component should just assume it exists and use it. -const KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ" + // the mark-for-masquerade chain + KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ" -// the mark we apply to traffic needing SNAT -// TODO(thockin): Remove this for v1.3 or v1.4. -const oldIptablesMasqueradeMark = "0x4d415351" + // the mark we apply to traffic needing SNAT + // TODO(thockin): Remove this for v1.3 or v1.4. + oldIptablesMasqueradeMark = "0x4d415351" + + // the mark-for-drop chain + KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP" +) // IptablesVersioner can query the current iptables version. type IptablesVersioner interface { @@ -128,14 +132,15 @@ const sysctlBridgeCallIptables = "net/bridge/bridge-nf-call-iptables" // internal struct for string service information type serviceInfo struct { - clusterIP net.IP - port int - protocol api.Protocol - nodePort int - loadBalancerStatus api.LoadBalancerStatus - sessionAffinityType api.ServiceAffinity - stickyMaxAgeSeconds int - externalIPs []string + clusterIP net.IP + port int + protocol api.Protocol + nodePort int + loadBalancerStatus api.LoadBalancerStatus + sessionAffinityType api.ServiceAffinity + stickyMaxAgeSeconds int + externalIPs []string + loadBalancerSourceRanges []string } // returns a new serviceInfo struct @@ -164,6 +169,7 @@ type Proxier struct { exec utilexec.Interface clusterCIDR string hostname string + nodeIP net.IP } type localPort struct { @@ -189,7 +195,7 @@ var _ proxy.ProxyProvider = &Proxier{} // An error will be returned if iptables fails to update or acquire the initial lock. // Once a proxier is created, it will keep iptables up to date in the background and // will not terminate if a particular iptables call fails. -func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration, masqueradeAll bool, masqueradeBit int, clusterCIDR string, hostname string) (*Proxier, error) { +func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration, masqueradeAll bool, masqueradeBit int, clusterCIDR string, hostname string, nodeIP net.IP) (*Proxier, error) { // Set the route_localnet sysctl we need for if err := utilsysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil { return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err) @@ -209,6 +215,10 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod masqueradeValue := 1 << uint(masqueradeBit) masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue) + if nodeIP == nil { + glog.Warningf("invalid nodeIP, initialize kube-proxy with 127.0.0.1 as nodeIP") + nodeIP = net.ParseIP("127.0.0.1") + } return &Proxier{ serviceMap: make(map[proxy.ServicePortName]*serviceInfo), endpointsMap: make(map[proxy.ServicePortName][]string), @@ -220,6 +230,7 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod exec: exec, clusterCIDR: clusterCIDR, hostname: hostname, + nodeIP: nodeIP, }, nil } @@ -428,6 +439,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { // Deep-copy in case the service instance changes info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) info.sessionAffinityType = service.Spec.SessionAffinity + info.loadBalancerSourceRanges = service.Spec.LoadBalancerSourceRanges proxier.serviceMap[serviceName] = info glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info)) @@ -552,15 +564,28 @@ func flattenValidEndpoints(endpoints []hostPortPair) []string { return result } -// servicePortChainName takes the ServicePortName for a service and -// returns the associated iptables chain. This is computed by hashing (sha256) -// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do -// this because Iptables Chain Names must be <= 28 chars long, and the longer -// they are the harder they are to read. -func servicePortChainName(s proxy.ServicePortName, protocol string) utiliptables.Chain { +// portProtoHash takes the ServicePortName and protocol for a service +// returns the associated 16 character hash. This is computed by hashing (sha256) +// then encoding to base32 and truncating to 16 chars. We do this because Iptables +// Chain Names must be <= 28 chars long, and the longer they are the harder they are to read. +func portProtoHash(s proxy.ServicePortName, protocol string) string { hash := sha256.Sum256([]byte(s.String() + protocol)) encoded := base32.StdEncoding.EncodeToString(hash[:]) - return utiliptables.Chain("KUBE-SVC-" + encoded[:16]) + return encoded[:16] +} + +// servicePortChainName takes the ServicePortName for a service and +// returns the associated iptables chain. This is computed by hashing (sha256) +// then encoding to base32 and truncating with the prefix "KUBE-SVC-". +func servicePortChainName(s proxy.ServicePortName, protocol string) utiliptables.Chain { + return utiliptables.Chain("KUBE-SVC-" + portProtoHash(s, protocol)) +} + +// servicePortChainName takes the ServicePortName for a service and +// returns the associated iptables chain. This is computed by hashing (sha256) +// then encoding to base32 and truncating with the prefix "KUBE-FW-". +func serviceFirewallChainName(s proxy.ServicePortName, protocol string) utiliptables.Chain { + return utiliptables.Chain("KUBE-FW-" + portProtoHash(s, protocol)) } // This is the same as servicePortChainName but with the endpoint included. @@ -854,9 +879,54 @@ func (proxier *Proxier) syncProxyRules() { "-d", fmt.Sprintf("%s/32", ingress.IP), "--dport", fmt.Sprintf("%d", svcInfo.port), } + // create service firewall chain + fwChain := serviceFirewallChainName(svcName, protocol) + if chain, ok := existingNATChains[fwChain]; ok { + writeLine(natChains, chain) + } else { + writeLine(natChains, utiliptables.MakeChainLine(fwChain)) + } + // jump to service firewall chain + // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. + // This currently works for loadbalancers that preserves source ips. + // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. + writeLine(natRules, append(args, "-j", string(fwChain))...) + + args = []string{ + "-A", string(fwChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()), + "-m", protocol, "-p", protocol, + "-d", fmt.Sprintf("%s/32", ingress.IP), + "--dport", fmt.Sprintf("%d", svcInfo.port), + } // We have to SNAT packets from external IPs. writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) - writeLine(natRules, append(args, "-j", string(svcChain))...) + + if len(svcInfo.loadBalancerSourceRanges) == 0 { + // allow all sources, so jump directly to KUBE-SVC chain + writeLine(natRules, append(args, "-j", string(svcChain))...) + } else { + // firewall filter based on each source range + allowFromNode := false + for _, src := range svcInfo.loadBalancerSourceRanges { + writeLine(natRules, append(args, "-s", src, "-j", string(svcChain))...) + // ignore error because it has been validated + _, cidr, _ := net.ParseCIDR(src) + if cidr.Contains(proxier.nodeIP) { + allowFromNode = true + } + } + // generally, ip route rule was added to intercept request to loadbalancer vip from the + // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. + // Need to add the following rule to allow request on host. + if allowFromNode { + writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(svcChain))...) + } + } + + // If the packet was able to reach the end of firewall chain, then it did not get DNATed. + // It means the packet cannot go thru the firewall, then mark it for DROP + writeLine(natRules, append(args, "-j", string(KubeMarkDropChain))...) } }