diff --git a/cmd/kube-proxy/app/BUILD b/cmd/kube-proxy/app/BUILD index cb74ef887ab..4a4cb061112 100644 --- a/cmd/kube-proxy/app/BUILD +++ b/cmd/kube-proxy/app/BUILD @@ -41,6 +41,7 @@ go_library( "//pkg/proxy/userspace:go_default_library", "//pkg/util/configz:go_default_library", "//pkg/util/dbus:go_default_library", + "//pkg/util/ipset:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/ipvs:go_default_library", "//pkg/util/mount:go_default_library", diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 8db7af9d57b..ec050123703 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -63,6 +63,7 @@ import ( "k8s.io/kubernetes/pkg/proxy/ipvs" "k8s.io/kubernetes/pkg/proxy/userspace" "k8s.io/kubernetes/pkg/util/configz" + utilipset "k8s.io/kubernetes/pkg/util/ipset" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilnode "k8s.io/kubernetes/pkg/util/node" @@ -356,6 +357,7 @@ type ProxyServer struct { EventClient v1core.EventsGetter IptInterface utiliptables.Interface IpvsInterface utilipvs.Interface + IpsetInterface utilipset.Interface execer exec.Interface Proxier proxy.ProxyProvider Broadcaster record.EventBroadcaster @@ -422,7 +424,7 @@ func (s *ProxyServer) Run() error { if s.CleanupAndExit { encounteredError := userspace.CleanupLeftovers(s.IptInterface) encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredError - encounteredError = ipvs.CleanupLeftovers(s.IpvsInterface, s.IptInterface) || encounteredError + encounteredError = ipvs.CleanupLeftovers(s.IpvsInterface, s.IptInterface, s.IpsetInterface) || encounteredError if encounteredError { return errors.New("encountered an error while tearing down rules.") } diff --git a/cmd/kube-proxy/app/server_others.go b/cmd/kube-proxy/app/server_others.go index 035d0593fa3..ee68a48540d 100644 --- a/cmd/kube-proxy/app/server_others.go +++ b/cmd/kube-proxy/app/server_others.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/proxy/userspace" "k8s.io/kubernetes/pkg/util/configz" utildbus "k8s.io/kubernetes/pkg/util/dbus" + utilipset "k8s.io/kubernetes/pkg/util/ipset" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilnode "k8s.io/kubernetes/pkg/util/node" @@ -72,6 +73,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi var iptInterface utiliptables.Interface var ipvsInterface utilipvs.Interface + var ipsetInterface utilipset.Interface var dbus utildbus.Interface // Create a iptables utils. @@ -80,6 +82,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi dbus = utildbus.New() iptInterface = utiliptables.New(execer, dbus, protocol) ipvsInterface = utilipvs.New(execer) + ipsetInterface = utilipset.New(execer) // We omit creation of pretty much everything if we run in cleanup mode if cleanupAndExit { @@ -87,6 +90,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi execer: execer, IptInterface: iptInterface, IpvsInterface: ipvsInterface, + IpsetInterface: ipsetInterface, CleanupAndExit: cleanupAndExit, }, nil } @@ -119,7 +123,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi var serviceEventHandler proxyconfig.ServiceHandler var endpointsEventHandler proxyconfig.EndpointsHandler - proxyMode := getProxyMode(string(config.Mode), iptInterface, iptables.LinuxKernelCompatTester{}) + proxyMode := getProxyMode(string(config.Mode), iptInterface, ipsetInterface, iptables.LinuxKernelCompatTester{}) if proxyMode == proxyModeIPTables { glog.V(0).Info("Using iptables Proxier.") nodeIP := net.ParseIP(config.BindAddress) @@ -159,12 +163,13 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi userspace.CleanupLeftovers(iptInterface) // IPVS Proxier will generate some iptables rules, // need to clean them before switching to other proxy mode. - ipvs.CleanupLeftovers(ipvsInterface, iptInterface) + ipvs.CleanupLeftovers(ipvsInterface, iptInterface, ipsetInterface) } else if proxyMode == proxyModeIPVS { glog.V(0).Info("Using ipvs Proxier.") proxierIPVS, err := ipvs.NewProxier( iptInterface, ipvsInterface, + ipsetInterface, utilsysctl.New(), execer, config.IPVS.SyncPeriod.Duration, @@ -220,7 +225,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi iptables.CleanupLeftovers(iptInterface) // IPVS Proxier will generate some iptables rules, // need to clean them before switching to other proxy mode. - ipvs.CleanupLeftovers(ipvsInterface, iptInterface) + ipvs.CleanupLeftovers(ipvsInterface, iptInterface, ipsetInterface) } iptInterface.AddReloadFunc(proxier.Sync) @@ -230,6 +235,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi EventClient: eventClient, IptInterface: iptInterface, IpvsInterface: ipvsInterface, + IpsetInterface: ipsetInterface, execer: execer, Proxier: proxier, Broadcaster: eventBroadcaster, @@ -249,7 +255,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi }, nil } -func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, kcompat iptables.KernelCompatTester) string { +func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, ipsetver ipvs.IPSetVersioner, kcompat iptables.KernelCompatTester) string { if proxyMode == proxyModeUserspace { return proxyModeUserspace } @@ -260,7 +266,7 @@ func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, kcompat i if utilfeature.DefaultFeatureGate.Enabled(features.SupportIPVSProxyMode) { if proxyMode == proxyModeIPVS { - return tryIPVSProxy(iptver, kcompat) + return tryIPVSProxy(iptver, ipsetver, kcompat) } else { glog.Warningf("Can't use ipvs proxier, trying iptables proxier") return tryIPTablesProxy(iptver, kcompat) @@ -270,10 +276,10 @@ func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, kcompat i return tryIPTablesProxy(iptver, kcompat) } -func tryIPVSProxy(iptver iptables.IPTablesVersioner, kcompat iptables.KernelCompatTester) string { +func tryIPVSProxy(iptver iptables.IPTablesVersioner, ipsetver ipvs.IPSetVersioner, kcompat iptables.KernelCompatTester) string { // guaranteed false on error, error only necessary for debugging - // IPVS Proxier relies on iptables - useIPVSProxy, err := ipvs.CanUseIPVSProxier() + // IPVS Proxier relies on ipset + useIPVSProxy, err := ipvs.CanUseIPVSProxier(ipsetver) if err != nil { // Try to fallback to iptables before falling back to userspace utilruntime.HandleError(fmt.Errorf("can't determine whether to use ipvs proxy, error: %v", err)) @@ -282,8 +288,6 @@ func tryIPVSProxy(iptver iptables.IPTablesVersioner, kcompat iptables.KernelComp return proxyModeIPVS } - // TODO: Check ipvs version - // Try to fallback to iptables before falling back to userspace glog.V(1).Infof("Can't use ipvs proxier, trying iptables proxier") return tryIPTablesProxy(iptver, kcompat) diff --git a/pkg/proxy/ipvs/BUILD b/pkg/proxy/ipvs/BUILD index 232c71ee3eb..30945e44334 100644 --- a/pkg/proxy/ipvs/BUILD +++ b/pkg/proxy/ipvs/BUILD @@ -9,6 +9,7 @@ load( go_test( name = "go_default_test", srcs = [ + "ipset_test.go", "proxier_test.go", ], importpath = "k8s.io/kubernetes/pkg/proxy/ipvs", @@ -18,6 +19,8 @@ go_test( "//pkg/proxy:go_default_library", "//pkg/proxy/ipvs/testing:go_default_library", "//pkg/proxy/util:go_default_library", + "//pkg/util/ipset:go_default_library", + "//pkg/util/ipset/testing:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/iptables/testing:go_default_library", "//pkg/util/ipvs:go_default_library", @@ -35,6 +38,7 @@ go_test( go_library( name = "go_default_library", srcs = [ + "ipset.go", "netlink.go", "netlink_unsupported.go", "proxier.go", @@ -55,9 +59,11 @@ go_library( "//pkg/proxy/metrics:go_default_library", "//pkg/proxy/util:go_default_library", "//pkg/util/async:go_default_library", + "//pkg/util/ipset:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/ipvs:go_default_library", "//pkg/util/sysctl:go_default_library", + "//pkg/util/version:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 8066f61003a..64fa6945f0f 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -49,6 +49,7 @@ import ( "k8s.io/kubernetes/pkg/proxy/metrics" utilproxy "k8s.io/kubernetes/pkg/proxy/util" "k8s.io/kubernetes/pkg/util/async" + utilipset "k8s.io/kubernetes/pkg/util/ipset" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" @@ -59,6 +60,12 @@ const ( // kubeServicesChain is the services portal chain kubeServicesChain utiliptables.Chain = "KUBE-SERVICES" + // KubeServiceIPSetsChain is the services access IP chain + KubeServiceIPSetsChain utiliptables.Chain = "KUBE-SVC-IPSETS" + + // KubeFireWallChain is the kubernetes firewall chain. + KubeFireWallChain utiliptables.Chain = "KUBE-FIRE-WALL" + // kubePostroutingChain is the kubernetes postrouting chain kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING" @@ -67,11 +74,10 @@ const ( // KubeMarkDropChain is the mark-for-drop chain KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP" -) -const ( // DefaultScheduler is the default ipvs scheduler algorithm - round robin. DefaultScheduler = "rr" + // DefaultDummyDevice is the default dummy interface where ipvs service address will bind to it. DefaultDummyDevice = "kube-ipvs0" ) @@ -117,6 +123,7 @@ type Proxier struct { minSyncPeriod time.Duration iptables utiliptables.Interface ipvs utilipvs.Interface + ipset utilipset.Interface exec utilexec.Interface masqueradeAll bool masqueradeMark string @@ -137,6 +144,26 @@ type Proxier struct { natRules *bytes.Buffer // Added as a member to the struct to allow injection for testing. netlinkHandle NetLinkHandle + // loopbackSet is the ipset where stores all endpoints IP:Port,IP for solving hairpin mode purpose. + loopbackSet *IPSet + // clusterIPSet is the ipset where stores all service ClusterIP:Port + clusterIPSet *IPSet + // nodePortSetTCP is the bitmap:port type ipset where stores all TCP node port + nodePortSetTCP *IPSet + // nodePortSetTCP is the bitmap:port type ipset where stores all UDP node port + nodePortSetUDP *IPSet + // externalIPSet is the hash:ip,port type ipset where stores all service ExternalIP:Port + externalIPSet *IPSet + // lbIngressSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port. + lbIngressSet *IPSet + // lbMasqSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port which needs masquerade. + lbMasqSet *IPSet + // lbWhiteListIPSet is the hash:ip,port,ip type ipset where stores all service load balancer ingress IP:Port,sourceIP pair, any packets + // with the source IP visit ingress IP:Port can pass through. + lbWhiteListIPSet *IPSet + // lbWhiteListIPSet is the hash:ip,port,net type ipset where stores all service load balancer ingress IP:Port,sourceCIDR pair, any packets + // from the source CIDR visit ingress IP:Port can pass through. + lbWhiteListCIDRSet *IPSet } // IPGetter helps get node network interface IP @@ -184,7 +211,9 @@ var _ proxy.ProxyProvider = &Proxier{} // An error will be returned if it fails to update or acquire the initial lock. // Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and // will not terminate if a particular iptables or ipvs call fails. -func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, +func NewProxier(ipt utiliptables.Interface, + ipvs utilipvs.Interface, + ipset utilipset.Interface, sysctl utilsysctl.Interface, exec utilexec.Interface, syncPeriod time.Duration, @@ -248,32 +277,46 @@ func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps + isIPv6 := utilproxy.IsIPv6(nodeIP) + + glog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6) + proxier := &Proxier{ - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - serviceMap: make(proxyServiceMap), - serviceChanges: newServiceChangeMap(), - endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(hostname), - syncPeriod: syncPeriod, - minSyncPeriod: minSyncPeriod, - iptables: ipt, - masqueradeAll: masqueradeAll, - masqueradeMark: masqueradeMark, - exec: exec, - clusterCIDR: clusterCIDR, - hostname: hostname, - nodeIP: nodeIP, - portMapper: &listenPortOpener{}, - recorder: recorder, - healthChecker: healthChecker, - healthzServer: healthzServer, - ipvs: ipvs, - ipvsScheduler: scheduler, - ipGetter: &realIPGetter{}, - iptablesData: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - netlinkHandle: NewNetLinkHandle(), + portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + serviceMap: make(proxyServiceMap), + serviceChanges: newServiceChangeMap(), + endpointsMap: make(proxyEndpointsMap), + endpointsChanges: newEndpointsChangeMap(hostname), + syncPeriod: syncPeriod, + minSyncPeriod: minSyncPeriod, + iptables: ipt, + masqueradeAll: masqueradeAll, + masqueradeMark: masqueradeMark, + exec: exec, + clusterCIDR: clusterCIDR, + hostname: hostname, + nodeIP: nodeIP, + portMapper: &listenPortOpener{}, + recorder: recorder, + healthChecker: healthChecker, + healthzServer: healthzServer, + ipvs: ipvs, + ipvsScheduler: scheduler, + ipGetter: &realIPGetter{}, + iptablesData: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), + netlinkHandle: NewNetLinkHandle(), + ipset: ipset, + loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6), + clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, isIPv6), + externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, isIPv6), + lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6), + lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, isIPv6), + lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, isIPv6), + lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6), + nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), + nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false), } burstSyncs := 2 glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) @@ -485,6 +528,11 @@ func (e *endpointsInfo) IPPart() string { return utilproxy.IPPart(e.endpoint) } +// PortPart returns just the Port part of the endpoint. +func (e *endpointsInfo) PortPart() (int, error) { + return utilproxy.PortPart(e.endpoint) +} + type endpointServicePair struct { endpoint string servicePortName proxy.ServicePortName @@ -652,7 +700,7 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) { // This is determined by checking if all the required kernel modules can be loaded. It may // return an error if it fails to get the kernel modules information without error, in which // case it will also return false. -func CanUseIPVSProxier() (bool, error) { +func CanUseIPVSProxier(ipsetver IPSetVersioner) (bool, error) { // Try to load IPVS required kernel modules using modprobe for _, kmod := range ipvsModules { err := utilexec.New().Command("modprobe", "--", kmod).Run() @@ -677,6 +725,15 @@ func CanUseIPVSProxier() (bool, error) { if len(modules) != 0 { return false, fmt.Errorf("IPVS proxier will not be used because the following required kernel modules are not loaded: %v", modules) } + + // Check ipset version + versionString, err := ipsetver.GetVersion() + if err != nil { + return false, fmt.Errorf("error getting ipset version, error: %v", err) + } + if !checkMinVersion(versionString) { + return false, fmt.Errorf("ipset version: %s is less than min required version: %s", versionString, MinIPSetCheckVersion) + } return true, nil } @@ -728,7 +785,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool natRules := bytes.NewBuffer(nil) writeLine(natChains, "*nat") // Start with chains we know we need to remove. - for _, chain := range []utiliptables.Chain{kubeServicesChain, kubePostroutingChain, KubeMarkMasqChain} { + for _, chain := range []utiliptables.Chain{kubeServicesChain, kubePostroutingChain, KubeMarkMasqChain, KubeServiceIPSetsChain} { if _, found := existingNATChains[chain]; found { chainString := string(chain) writeLine(natChains, existingNATChains[chain]) // flush @@ -748,7 +805,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool } // CleanupLeftovers clean up all ipvs and iptables rules created by ipvs Proxier. -func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface) (encounteredError bool) { +func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset utilipset.Interface) (encounteredError bool) { // Return immediately when ipvs interface is nil - Probably initialization failed in somewhere. if ipvs == nil { return true @@ -768,6 +825,16 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface) (enco } // Clear iptables created by ipvs Proxier. encounteredError = cleanupIptablesLeftovers(ipt) || encounteredError + // Destroy ip sets created by ipvs Proxier. We should call it after cleaning up + // iptables since we can NOT delete ip set which is still referenced by iptables. + ipSetsToDestroy := []string{KubeLoopBackIPSet, KubeClusterIPSet, KubeLoadBalancerSet, KubeNodePortSetTCP, KubeNodePortSetUDP, + KubeExternalIPSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet, KubeLoadBalancerMasqSet} + for _, set := range ipSetsToDestroy { + err = ipset.DestroySet(set) + if err != nil { + encounteredError = true + } + } return encounteredError } @@ -957,6 +1024,16 @@ func (proxier *Proxier) syncProxyRules() { return } + // make sure ip sets exists in the system. + ipSets := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.externalIPSet, proxier.nodePortSetUDP, proxier.nodePortSetTCP, + proxier.lbIngressSet, proxier.lbMasqSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet} + if err := ensureIPSets(ipSets...); err != nil { + return + } + for i := range ipSets { + ipSets[i].resetEntries() + } + // Accumulate the set of local ports that we will be holding open once this update is complete replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{} // activeIPVSServices represents IPVS service successfully created in this round of sync @@ -976,6 +1053,17 @@ func (proxier *Proxier) syncProxyRules() { // is just for efficiency, not correctness. args := make([]string, 64) + // Kube service portal + if err := proxier.linkKubeServiceChain(existingNATChains, proxier.natChains); err != nil { + glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) + return + } + // Kube service ipset + if err := proxier.createKubeFireWallChain(existingNATChains, proxier.natChains); err != nil { + glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err) + return + } + // Build IPVS rules for each service. for svcName, svcInfo := range proxier.serviceMap { protocol := strings.ToLower(string(svcInfo.protocol)) @@ -983,7 +1071,41 @@ func (proxier *Proxier) syncProxyRules() { // to ServicePortName.String() show up in CPU profiles. svcNameString := svcName.String() + // Handle traffic that loops back to the originator with SNAT. + for _, ep := range proxier.endpointsMap[svcName] { + epIP := ep.IPPart() + epPort, err := ep.PortPart() + // Error parsing this endpoint has been logged. Skip to next endpoint. + if epIP == "" || err != nil { + continue + } + entry := &utilipset.Entry{ + IP: epIP, + Port: epPort, + Protocol: protocol, + IP2: epIP, + SetType: utilipset.HashIPPortIP, + } + proxier.loopbackSet.activeEntries.Insert(entry.String()) + } + // Capture the clusterIP. + // ipset call + entry := &utilipset.Entry{ + IP: svcInfo.clusterIP.String(), + Port: svcInfo.port, + Protocol: protocol, + SetType: utilipset.HashIPPort, + } + // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. + // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) + // Install masquerade rules if 'masqueradeAll' or 'clusterCIDR' is specified. + if proxier.masqueradeAll { + proxier.clusterIPSet.activeEntries.Insert(entry.String()) + } else if len(proxier.clusterCIDR) > 0 { + proxier.clusterIPSet.activeEntries.Insert(entry.String()) + } + // ipvs call serv := &utilipvs.VirtualServer{ Address: svcInfo.clusterIP, Port: uint16(svcInfo.port), @@ -1004,32 +1126,6 @@ func (proxier *Proxier) syncProxyRules() { } else { glog.Errorf("Failed to sync service: %v, err: %v", serv, err) } - // Install masquerade rules if 'masqueradeAll' or 'clusterCIDR' is specified. - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), - "-m", protocol, "-p", protocol, - "-d", utilproxy.ToCIDR(svcInfo.clusterIP), - "--dport", strconv.Itoa(svcInfo.port), - ) - if proxier.masqueradeAll { - err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains) - if err != nil { - glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) - } - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - } else if len(proxier.clusterCIDR) > 0 { - // This masquerades off-cluster traffic to a service VIP. The idea - // is that you can establish a static route for your Service range, - // routing to any node, and that node will bridge into the Service - // for you. Since that might bounce off-node, we masquerade here. - // If/when we support "Local" policy for VIPs, we should update this. - err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains) - if err != nil { - glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) - } - writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) - } // Capture externalIPs. for _, externalIP := range svcInfo.externalIPs { @@ -1064,6 +1160,17 @@ func (proxier *Proxier) syncProxyRules() { } } // We're holding the port, so it's OK to install IPVS rules. + // ipset call + entry := &utilipset.Entry{ + IP: externalIP, + Port: svcInfo.port, + Protocol: protocol, + SetType: utilipset.HashIPPort, + } + // We have to SNAT packets to external IPs. + proxier.externalIPSet.activeEntries.Insert(entry.String()) + + // ipvs call serv := &utilipvs.VirtualServer{ Address: net.ParseIP(externalIP), Port: uint16(svcInfo.port), @@ -1088,25 +1195,39 @@ func (proxier *Proxier) syncProxyRules() { // Capture load-balancer ingress. for _, ingress := range svcInfo.loadBalancerStatus.Ingress { if ingress.IP != "" { + // ipset call + entry = &utilipset.Entry{ + IP: ingress.IP, + Port: svcInfo.port, + Protocol: protocol, + SetType: utilipset.HashIPPort, + } + // add service load balancer ingressIP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. + // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) + // If we are proxying globally, we need to masquerade in case we cross nodes. + // If we are proxying only locally, we can retain the source IP. + if !svcInfo.onlyNodeLocalEndpoints { + proxier.lbMasqSet.activeEntries.Insert(entry.String()) + } if len(svcInfo.loadBalancerSourceRanges) != 0 { - err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains) - if err != nil { - glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) - } // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // This currently works for loadbalancers that preserves source ips. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), - "-m", string(svcInfo.protocol), "-p", string(svcInfo.protocol), - "-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), - "--dport", fmt.Sprintf("%d", svcInfo.port), - ) + proxier.lbIngressSet.activeEntries.Insert(entry.String()) allowFromNode := false for _, src := range svcInfo.loadBalancerSourceRanges { - writeLine(proxier.natRules, append(args, "-s", src, "-j", "ACCEPT")...) + // ipset call + entry = &utilipset.Entry{ + IP: ingress.IP, + Port: svcInfo.port, + Protocol: protocol, + Net: src, + SetType: utilipset.HashIPPortNet, + } + // enumerate all white list source cidr + proxier.lbWhiteListCIDRSet.activeEntries.Insert(entry.String()) + // ignore error because it has been validated _, cidr, _ := net.ParseCIDR(src) if cidr.Contains(proxier.nodeIP) { @@ -1117,14 +1238,19 @@ func (proxier *Proxier) syncProxyRules() { // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // Need to add the following rule to allow request on host. if allowFromNode { - writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), "-j", "ACCEPT")...) + entry = &utilipset.Entry{ + IP: ingress.IP, + Port: svcInfo.port, + Protocol: protocol, + IP2: ingress.IP, + SetType: utilipset.HashIPPortIP, + } + // enumerate all white list source ip + proxier.lbWhiteListIPSet.activeEntries.Insert(entry.String()) } - - // If the packet was able to reach the end of firewall chain, then it did not get DNATed. - // It means the packet cannot go through the firewall, then DROP it. - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) } + // ipvs call serv := &utilipvs.VirtualServer{ Address: net.ParseIP(ingress.IP), Port: uint16(svcInfo.port), @@ -1170,12 +1296,33 @@ func (proxier *Proxier) syncProxyRules() { replacementPortsMap[lp] = socket } // We're holding the port, so it's OK to install ipvs rules. + // Nodeports need SNAT, unless they're local. + // ipset call + if !svcInfo.onlyNodeLocalEndpoints { + entry = &utilipset.Entry{ + // No need to provide ip info + Port: svcInfo.nodePort, + Protocol: protocol, + SetType: utilipset.BitmapPort, + } + switch protocol { + case "tcp": + proxier.nodePortSetTCP.activeEntries.Insert(entry.String()) + case "udp": + proxier.nodePortSetUDP.activeEntries.Insert(entry.String()) + default: + // It should never hit + glog.Errorf("Unsupported protocol type: %s", protocol) + } + } + // Build ipvs kernel routes for each node ip address nodeIPs, err := proxier.ipGetter.NodeIPs() if err != nil { glog.Errorf("Failed to get node IP, err: %v", err) } else { for _, nodeIP := range nodeIPs { + // ipvs call serv := &utilipvs.VirtualServer{ Address: nodeIP, Port: uint16(svcInfo.nodePort), @@ -1200,6 +1347,119 @@ func (proxier *Proxier) syncProxyRules() { } } + // sync ipset entries + ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbIngressSet, proxier.lbMasqSet, proxier.nodePortSetTCP, + proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, proxier.lbWhiteListCIDRSet} + for i := range ipsetsToSync { + ipsetsToSync[i].syncIPSetEntries() + } + + // Tail call iptables rules for ipset, make sure only call iptables once + // in a single loop per ip set. + if !proxier.loopbackSet.isEmpty() { + args = append(args[:0], + "-A", string(kubePostroutingChain), + "-m", "set", "--match-set", proxier.loopbackSet.Name, + "dst,dst,src", + ) + writeLine(proxier.natRules, append(args, "-j", "MASQUERADE")...) + } + if !proxier.clusterIPSet.isEmpty() { + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "set", "--match-set", proxier.clusterIPSet.Name, + "dst,dst", + ) + if proxier.masqueradeAll { + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + } else if len(proxier.clusterCIDR) > 0 { + // This masquerades off-cluster traffic to a service VIP. The idea + // is that you can establish a static route for your Service range, + // routing to any node, and that node will bridge into the Service + // for you. Since that might bounce off-node, we masquerade here. + // If/when we support "Local" policy for VIPs, we should update this. + writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) + } + } + if !proxier.externalIPSet.isEmpty() { + // Build masquerade rules for packets to external IPs. + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "set", "--match-set", proxier.externalIPSet.Name, + "dst,dst", + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) + // nor from a local process to be forwarded to the service. + // This rule roughly translates to "all traffic from off-machine". + // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later. + externalTrafficOnlyArgs := append(args, + "-m", "physdev", "!", "--physdev-is-in", + "-m", "addrtype", "!", "--src-type", "LOCAL") + writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", "ACCEPT")...) + dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") + // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. + // This covers cases like GCE load-balancers which get added to the local routing table. + writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...) + } + if !proxier.lbMasqSet.isEmpty() { + // Build masquerade rules for packets which cross node visit load balancer ingress IPs. + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "set", "--match-set", proxier.lbMasqSet.Name, + "dst,dst", + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + } + if !proxier.lbWhiteListCIDRSet.isEmpty() || !proxier.lbWhiteListIPSet.isEmpty() { + // link kube-services chain -> kube-fire-wall chain + args := []string{"-m", "set", "--match-set", proxier.lbIngressSet.Name, "dst,dst", "-j", string(KubeFireWallChain)} + if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, kubeServicesChain, args...); err != nil { + glog.Errorf("Failed to ensure that ipset %s chain %s jumps to %s: %v", proxier.lbIngressSet.Name, kubeServicesChain, KubeFireWallChain, err) + } + if !proxier.lbWhiteListCIDRSet.isEmpty() { + args = append(args[:0], + "-A", string(KubeFireWallChain), + "-m", "set", "--match-set", proxier.lbWhiteListCIDRSet.Name, + "dst,dst,src", + ) + writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) + } + if !proxier.lbWhiteListIPSet.isEmpty() { + args = append(args[:0], + "-A", string(KubeFireWallChain), + "-m", "set", "--match-set", proxier.lbWhiteListIPSet.Name, + "dst,dst,src", + ) + writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) + } + args = append(args[:0], + "-A", string(KubeFireWallChain), + ) + // If the packet was able to reach the end of firewall chain, then it did not get DNATed. + // It means the packet cannot go thru the firewall, then mark it for DROP + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) + } + if !proxier.nodePortSetTCP.isEmpty() { + // Build masquerade rules for packets which cross node visit nodeport. + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "tcp", "-p", "tcp", + "-m", "set", "--match-set", proxier.nodePortSetTCP.Name, + "dst", + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + } + if !proxier.nodePortSetUDP.isEmpty() { + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "udp", "-p", "udp", + "-m", "set", "--match-set", proxier.nodePortSetUDP.Name, + "dst", + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + } + // Write the end-of-table markers. writeLine(proxier.natRules, "COMMIT") @@ -1411,7 +1671,6 @@ func (proxier *Proxier) cleanLegacyService(atciveServices map[string]bool, curre } // linkKubeServiceChain will Create chain KUBE-SERVICES and link the chin in PREROUTING and OUTPUT -// If not specify masqueradeAll or clusterCIDR or LB source range, won't create them. // Chain PREROUTING (policy ACCEPT) // target prot opt source destination @@ -1451,6 +1710,55 @@ func (proxier *Proxier) linkKubeServiceChain(existingNATChains map[utiliptables. return nil } +//// linkKubeIPSetsChain will Create chain KUBE-SVC-IPSETS and link the chin in KUBE-SERVICES +// +//// Chain KUBE-SERVICES (policy ACCEPT) +//// target prot opt source destination +//// KUBE-SVC-IPSETS all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-SERVICE-ACCESS dst,dst +// +//// Chain KUBE-SVC-IPSETS (1 references) +//// target prot opt source destination +//// KUBE-MARK-MASQ all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-EXTERNAL-IP dst,dst +//// ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-EXTERNAL-IP dst,dst PHYSDEV match ! --physdev-is-in ADDRTYPE match src-type !LOCAL +//// ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-EXTERNAL-IP dst,dst ADDRTYPE match dst-type LOCAL +//// ... +//func (proxier *Proxier) linkKubeIPSetsChain(existingNATChains map[utiliptables.Chain]string, natChains *bytes.Buffer) error { +// if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, KubeServiceIPSetsChain); err != nil { +// return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, KubeServiceIPSetsChain, err) +// } +// +// // TODO: iptables comment message for ipset? +// // The hash:ip,port type of sets require two src/dst parameters of the set match and SET target kernel modules. +// args := []string{"-m", "set", "--match-set", proxier.kubeServiceAccessSet.Name, "dst,dst", "-j", string(KubeServiceIPSetsChain)} +// if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, kubeServicesChain, args...); err != nil { +// return fmt.Errorf("Failed to ensure that ipset %s chain %s jumps to %s: %v", proxier.kubeServiceAccessSet.Name, kubeServicesChain, KubeServiceIPSetsChain, err) +// } +// +// // equal to `iptables -t nat -N KUBE-SVC-IPSETS` +// // write `:KUBE-SERVICES - [0:0]` in nat table +// if chain, ok := existingNATChains[KubeServiceIPSetsChain]; ok { +// writeLine(natChains, chain) +// } else { +// writeLine(natChains, utiliptables.MakeChainLine(KubeServiceIPSetsChain)) +// } +// return nil +//} + +func (proxier *Proxier) createKubeFireWallChain(existingNATChains map[utiliptables.Chain]string, natChains *bytes.Buffer) error { + // `iptables -t nat -N KUBE-FIRE-WALL` + if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, KubeFireWallChain); err != nil { + return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, KubeFireWallChain, err) + } + + // write `:KUBE-FIRE-WALL - [0:0]` in nat table + if chain, ok := existingNATChains[KubeFireWallChain]; ok { + writeLine(natChains, chain) + } else { + writeLine(natChains, utiliptables.MakeChainLine(KubeFireWallChain)) + } + return nil +} + // Join all words with spaces, terminate with newline and write to buff. func writeLine(buf *bytes.Buffer, words ...string) { // We avoid strings.Join for performance reasons. diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index b4147353d42..05701cff8fc 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -33,6 +33,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" proxyutil "k8s.io/kubernetes/pkg/proxy/util" + utilipset "k8s.io/kubernetes/pkg/util/ipset" + ipsettest "k8s.io/kubernetes/pkg/util/ipset/testing" utiliptables "k8s.io/kubernetes/pkg/util/iptables" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" utilipvs "k8s.io/kubernetes/pkg/util/ipvs" @@ -85,7 +87,7 @@ func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedNa return nil } -func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, nodeIPs []net.IP) *Proxier { +func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []net.IP) *Proxier { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ func() ([]byte, error) { return []byte("dummy device have been created"), nil }, @@ -98,24 +100,34 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, nodeIPs LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } return &Proxier{ - exec: fexec, - serviceMap: make(proxyServiceMap), - serviceChanges: newServiceChangeMap(), - endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(testHostname), - iptables: ipt, - ipvs: ipvs, - clusterCIDR: "10.0.0.0/24", - hostname: testHostname, - portsMap: make(map[proxyutil.LocalPort]proxyutil.Closeable), - portMapper: &fakePortOpener{[]*proxyutil.LocalPort{}}, - healthChecker: newFakeHealthChecker(), - ipvsScheduler: DefaultScheduler, - ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, - iptablesData: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - netlinkHandle: netlinktest.NewFakeNetlinkHandle(), + exec: fexec, + serviceMap: make(proxyServiceMap), + serviceChanges: newServiceChangeMap(), + endpointsMap: make(proxyEndpointsMap), + endpointsChanges: newEndpointsChangeMap(testHostname), + iptables: ipt, + ipvs: ipvs, + ipset: ipset, + clusterCIDR: "10.0.0.0/24", + hostname: testHostname, + portsMap: make(map[proxyutil.LocalPort]proxyutil.Closeable), + portMapper: &fakePortOpener{[]*proxyutil.LocalPort{}}, + healthChecker: newFakeHealthChecker(), + ipvsScheduler: DefaultScheduler, + ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, + iptablesData: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), + netlinkHandle: netlinktest.NewFakeNetlinkHandle(), + loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false), + clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false), + externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, false), + lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false), + lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, false), + lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, false), + lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false), + nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), + nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false), } } @@ -171,10 +183,11 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *ap func TestNodePort(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake() nodeIPv4 := net.ParseIP("100.101.102.103") nodeIPv6 := net.ParseIP("2001:db8::1:1") nodeIPs := sets.NewString(nodeIPv4.String(), nodeIPv6.String()) - fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIPv4, nodeIPv6}) + fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIPv4, nodeIPv6}) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -248,8 +261,9 @@ func TestNodePort(t *testing.T) { func TestNodePortNoEndpoint(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake() nodeIP := net.ParseIP("100.101.102.103") - fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIP}) + fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -301,7 +315,8 @@ func TestNodePortNoEndpoint(t *testing.T) { func TestClusterIPNoEndpoint(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) svcIP := "10.20.30.41" svcPort := 80 svcPortName := proxy.ServicePortName{ @@ -344,7 +359,8 @@ func TestClusterIPNoEndpoint(t *testing.T) { func TestClusterIP(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) svcIPv4 := "10.20.30.41" svcPortV4 := 80 @@ -450,7 +466,8 @@ func TestClusterIP(t *testing.T) { func TestExternalIPsNoEndpoint(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) svcIP := "10.20.30.41" svcPort := 80 svcExternalIPs := "50.60.70.81" @@ -504,7 +521,8 @@ func TestExternalIPsNoEndpoint(t *testing.T) { func TestExternalIPs(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) svcIP := "10.20.30.41" svcPort := 80 svcExternalIPs := sets.NewString("50.60.70.81", "2012::51") @@ -573,7 +591,8 @@ func TestExternalIPs(t *testing.T) { func TestLoadBalancer(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -624,8 +643,9 @@ func strPtr(s string) *string { func TestOnlyLocalNodePorts(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake() nodeIP := net.ParseIP("100.101.102.103") - fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIP}) + fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -705,7 +725,8 @@ func TestOnlyLocalNodePorts(t *testing.T) { func TestOnlyLocalLoadBalancing(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -769,7 +790,8 @@ func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, po func TestBuildServiceMapAddRemove(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) services := []*api.Service{ makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) { @@ -874,7 +896,8 @@ func TestBuildServiceMapAddRemove(t *testing.T) { func TestBuildServiceMapServiceHeadless(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) makeServiceMap(fp, makeTestService("somewhere-else", "headless", func(svc *api.Service) { @@ -907,7 +930,8 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) makeServiceMap(fp, makeTestService("somewhere-else", "external-name", func(svc *api.Service) { @@ -934,7 +958,8 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { func TestBuildServiceMapServiceUpdate(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) servicev1 := makeTestService("somewhere", "some-service", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeClusterIP @@ -1016,8 +1041,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { func TestSessionAffinity(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake() nodeIP := net.ParseIP("100.101.102.103") - fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIP}) + fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -1879,7 +1905,8 @@ func Test_updateEndpointsMap(t *testing.T) { for tci, tc := range testCases { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) fp.hostname = nodeName // First check that after adding all previous versions of endpoints, diff --git a/pkg/proxy/util/endpoints.go b/pkg/proxy/util/endpoints.go index 32e770d4f94..a0ae306634f 100644 --- a/pkg/proxy/util/endpoints.go +++ b/pkg/proxy/util/endpoints.go @@ -19,6 +19,7 @@ package util import ( "fmt" "net" + "strconv" "github.com/golang/glog" ) @@ -40,6 +41,21 @@ func IPPart(s string) string { return ip } +func PortPart(s string) (int, error) { + // Must be IP:port + _, port, err := net.SplitHostPort(s) + if err != nil { + glog.Errorf("Error parsing '%s': %v", s, err) + return -1, err + } + portNumber, err := strconv.Atoi(port) + if err != nil { + glog.Errorf("Error parsing '%s': %v", port, err) + return -1, err + } + return portNumber, nil +} + // ToCIDR returns a host address of the form /32 for // IPv4 and /128 for IPv6 func ToCIDR(ip net.IP) string {