diff --git a/pkg/proxy/ipvs/ipset.go b/pkg/proxy/ipvs/ipset.go index 16637455b7b..037df0ada85 100644 --- a/pkg/proxy/ipvs/ipset.go +++ b/pkg/proxy/ipvs/ipset.go @@ -40,8 +40,8 @@ const ( // KubeLoadBalancerSet is used to store service load balancer ingress ip + port, it is the service lb portal. KubeLoadBalancerSet = "KUBE-LOAD-BALANCER" - // KubeLoadBalancerMasqSet is used to store service load balancer ingress ip + port for masquerade purpose. - KubeLoadBalancerMasqSet = "KUBE-LOAD-BALANCER-MASQ" + // KubeLoadBalancerIngressLocalSet is used to store service load balancer ingress ip + port with externalTrafficPolicy=local. + KubeLoadBalancerIngressLocalSet = "KUBE-LB-INGRESS-LOCAL" // KubeLoadBalancerSourceIPSet is used to store service load balancer ingress ip + port + source IP for packet filter purpose. KubeLoadBalancerSourceIPSet = "KUBE-LOAD-BALANCER-SOURCE-IP" @@ -49,11 +49,17 @@ const ( // KubeLoadBalancerSourceCIDRSet is used to store service load balancer ingress ip + port + source cidr for packet filter purpose. KubeLoadBalancerSourceCIDRSet = "KUBE-LOAD-BALANCER-SOURCE-CIDR" - // KubeNodePortSetTCP is used to store nodeport TCP port for masquerade purpose. + // KubeNodePortSetTCP is used to store the nodeport TCP port for masquerade purpose. KubeNodePortSetTCP = "KUBE-NODE-PORT-TCP" - // KubeNodePortSetUDP is used to store nodeport UDP port for masquerade purpose. + // KubeNodePortLocalSetTCP is used to store the nodeport TCP port with externalTrafficPolicy=local. + KubeNodePortLocalSetTCP = "KUBE-NODE-PORT-LOCAL-TCP" + + // KubeNodePortSetUDP is used to store the nodeport UDP port for masquerade purpose. KubeNodePortSetUDP = "KUBE-NODE-PORT-UDP" + + // KubeNodePortLocalSetUDP is used to store the nodeport UDP port with externalTrafficPolicy=local. + KubeNodePortLocalSetUDP = "KUBE-NODE-PORT-LOCAL-UDP" ) // IPSetVersioner can query the current ipset version. diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index f5ae1da3bd8..43cf636313c 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -68,13 +68,16 @@ const ( // KubeMarkMasqChain is the mark-for-masquerade chain KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ" + // KubeNodePortChain is the kubernetes node port chain + KubeNodePortChain utiliptables.Chain = "KUBE-NODE-PORT" + // KubeMarkDropChain is the mark-for-drop chain KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP" // DefaultScheduler is the default ipvs scheduler algorithm - round robin. DefaultScheduler = "rr" - // DefaultDummyDevice is the default dummy interface where ipvs service address will bind to it. + // DefaultDummyDevice is the default dummy interface which ipvs service address will bind to it. DefaultDummyDevice = "kube-ipvs0" ) @@ -149,27 +152,31 @@ type Proxier struct { natRules *bytes.Buffer // Added as a member to the struct to allow injection for testing. netlinkHandle NetLinkHandle - // loopbackSet is the ipset where stores all endpoints IP:Port,IP for solving hairpin mode purpose. + // loopbackSet is the ipset which stores all endpoints IP:Port,IP for solving hairpin mode purpose. loopbackSet *IPSet - // clusterIPSet is the ipset where stores all service ClusterIP:Port + // clusterIPSet is the ipset which stores all service ClusterIP:Port clusterIPSet *IPSet - // nodePortSetTCP is the bitmap:port type ipset where stores all TCP node port + // nodePortSetTCP is the bitmap:port type ipset which stores all TCP node port nodePortSetTCP *IPSet - // nodePortSetTCP is the bitmap:port type ipset where stores all UDP node port + // nodePortSetTCP is the bitmap:port type ipset which stores all UDP node port nodePortSetUDP *IPSet - // externalIPSet is the hash:ip,port type ipset where stores all service ExternalIP:Port + // lbIngressLocalSet is the hash:ip type ipset which stores all service ip's with externaltrafficPolicy=local + lbIngressLocalSet *IPSet + // nodePortLocalSetTCP is the bitmap:port type ipset which stores all TCP nodeport's with externaltrafficPolicy=local + nodePortLocalSetTCP *IPSet + // nodePortLocalSetUDP is the bitmap:port type ipset which stores all UDP nodeport's with externaltrafficPolicy=local + nodePortLocalSetUDP *IPSet + // externalIPSet is the hash:ip,port type ipset which stores all service ExternalIP:Port externalIPSet *IPSet - // lbIngressSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port. + // lbIngressSet is the hash:ip,port type ipset which stores all service load balancer ingress IP:Port. lbIngressSet *IPSet - // lbMasqSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port which needs masquerade. - lbMasqSet *IPSet - // lbWhiteListIPSet is the hash:ip,port,ip type ipset where stores all service load balancer ingress IP:Port,sourceIP pair, any packets + // lbWhiteListIPSet is the hash:ip,port,ip type ipset which stores all service load balancer ingress IP:Port,sourceIP pair, any packets // with the source IP visit ingress IP:Port can pass through. lbWhiteListIPSet *IPSet - // lbWhiteListIPSet is the hash:ip,port,net type ipset where stores all service load balancer ingress IP:Port,sourceCIDR pair, any packets + // lbWhiteListIPSet is the hash:ip,port,net type ipset which stores all service load balancer ingress IP:Port,sourceCIDR pair, any packets // from the source CIDR visit ingress IP:Port can pass through. lbWhiteListCIDRSet *IPSet - // Values are as a parameter to select the interfaces where nodeport works. + // Values are as a parameter to select the interfaces which nodeport works. nodePortAddresses []string // networkInterfacer defines an interface for several net library functions. // Inject for test purpose. @@ -308,43 +315,45 @@ func NewProxier(ipt utiliptables.Interface, healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps proxier := &Proxier{ - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), - endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder), - syncPeriod: syncPeriod, - minSyncPeriod: minSyncPeriod, - iptables: ipt, - masqueradeAll: masqueradeAll, - masqueradeMark: masqueradeMark, - exec: exec, - clusterCIDR: clusterCIDR, - hostname: hostname, - nodeIP: nodeIP, - portMapper: &listenPortOpener{}, - recorder: recorder, - healthChecker: healthChecker, - healthzServer: healthzServer, - ipvs: ipvs, - ipvsScheduler: scheduler, - ipGetter: &realIPGetter{nl: NewNetLinkHandle()}, - iptablesData: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - netlinkHandle: NewNetLinkHandle(), - ipset: ipset, - loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6), - clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, isIPv6), - externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, isIPv6), - lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6), - lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, isIPv6), - lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, isIPv6), - lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6), - nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), - nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false), - nodePortAddresses: nodePortAddresses, - networkInterfacer: utilproxy.RealNetwork{}, + portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + serviceMap: make(proxy.ServiceMap), + serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), + endpointsMap: make(proxy.EndpointsMap), + endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder), + syncPeriod: syncPeriod, + minSyncPeriod: minSyncPeriod, + iptables: ipt, + masqueradeAll: masqueradeAll, + masqueradeMark: masqueradeMark, + exec: exec, + clusterCIDR: clusterCIDR, + hostname: hostname, + nodeIP: nodeIP, + portMapper: &listenPortOpener{}, + recorder: recorder, + healthChecker: healthChecker, + healthzServer: healthzServer, + ipvs: ipvs, + ipvsScheduler: scheduler, + ipGetter: &realIPGetter{nl: NewNetLinkHandle()}, + iptablesData: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), + netlinkHandle: NewNetLinkHandle(), + ipset: ipset, + loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6), + clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, isIPv6), + externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, isIPv6), + lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6), + lbIngressLocalSet: NewIPSet(ipset, KubeLoadBalancerIngressLocalSet, utilipset.HashIPPort, isIPv6), + lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, isIPv6), + lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6), + nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), + nodePortLocalSetTCP: NewIPSet(ipset, KubeNodePortLocalSetTCP, utilipset.BitmapPort, false), + nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false), + nodePortLocalSetUDP: NewIPSet(ipset, KubeNodePortLocalSetUDP, utilipset.BitmapPort, false), + nodePortAddresses: nodePortAddresses, + networkInterfacer: utilproxy.RealNetwork{}, } burstSyncs := 2 glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) @@ -511,7 +520,8 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset // Destroy ip sets created by ipvs Proxier. We should call it after cleaning up // iptables since we can NOT delete ip set which is still referenced by iptables. ipSetsToDestroy := []string{KubeLoopBackIPSet, KubeClusterIPSet, KubeLoadBalancerSet, KubeNodePortSetTCP, KubeNodePortSetUDP, - KubeExternalIPSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet, KubeLoadBalancerMasqSet} + KubeExternalIPSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet, + KubeLoadBalancerIngressLocalSet, KubeNodePortLocalSetUDP, KubeNodePortLocalSetTCP} for _, set := range ipSetsToDestroy { err = ipset.DestroySet(set) if err != nil { @@ -699,7 +709,8 @@ func (proxier *Proxier) syncProxyRules() { // make sure ip sets exists in the system. ipSets := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.externalIPSet, proxier.nodePortSetUDP, proxier.nodePortSetTCP, - proxier.lbIngressSet, proxier.lbMasqSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet} + proxier.lbIngressSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet, proxier.lbIngressLocalSet, + proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP} if err := ensureIPSets(ipSets...); err != nil { return } @@ -731,12 +742,18 @@ func (proxier *Proxier) syncProxyRules() { glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) return } - // Kube service ipset - if err := proxier.createKubeFireWallChain(existingNATChains, proxier.natChains); err != nil { + // `iptables -t nat -N KUBE-FIRE-WALL` + if err := proxier.createKubeChain(existingNATChains, KubeFireWallChain); err != nil { glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err) return } + // `iptables -t nat -N KUBE-NODE-PORT` + if err := proxier.createKubeChain(existingNATChains, KubeNodePortChain); err != nil { + glog.Errorf("Failed to create KUBE-NODE-PORT chain: %v", err) + return + } + // Build IPVS rules for each service. for svcName, svc := range proxier.serviceMap { svcInfo, ok := svc.(*serviceInfo) @@ -901,23 +918,23 @@ func (proxier *Proxier) syncProxyRules() { // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) // If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying only locally, we can retain the source IP. - if !svcInfo.OnlyNodeLocalEndpoints { - if valid := proxier.lbMasqSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbMasqSet.Name)) + if valid := proxier.lbIngressSet.validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name)) + continue + } + proxier.lbIngressSet.activeEntries.Insert(entry.String()) + // insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local + if svcInfo.OnlyNodeLocalEndpoints { + if valid := proxier.lbIngressLocalSet.validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name)) continue } - proxier.lbMasqSet.activeEntries.Insert(entry.String()) + proxier.lbIngressLocalSet.activeEntries.Insert(entry.String()) } if len(svcInfo.LoadBalancerSourceRanges) != 0 { // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // This currently works for loadbalancers that preserves source ips. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. - if valid := proxier.lbIngressSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name)) - continue - } - proxier.lbIngressSet.activeEntries.Insert(entry.String()) - allowFromNode := false for _, src := range svcInfo.LoadBalancerSourceRanges { // ipset call @@ -1009,30 +1026,48 @@ func (proxier *Proxier) syncProxyRules() { // Nodeports need SNAT, unless they're local. // ipset call - if !svcInfo.OnlyNodeLocalEndpoints { - entry = &utilipset.Entry{ - // No need to provide ip info - Port: svcInfo.NodePort, - Protocol: protocol, - SetType: utilipset.BitmapPort, + entry = &utilipset.Entry{ + // No need to provide ip info + Port: svcInfo.NodePort, + Protocol: protocol, + SetType: utilipset.BitmapPort, + } + var nodePortSet *IPSet + switch protocol { + case "tcp": + nodePortSet = proxier.nodePortSetTCP + case "udp": + nodePortSet = proxier.nodePortSetUDP + default: + // It should never hit + glog.Errorf("Unsupported protocol type: %s", protocol) + } + if nodePortSet != nil { + if valid := nodePortSet.validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name)) + continue } - var nodePortSet *IPSet - switch protocol { + nodePortSet.activeEntries.Insert(entry.String()) + } + // Add externaltrafficpolicy=local type nodeport entry + if svcInfo.OnlyNodeLocalEndpoints { + var nodePortLocalSet *IPSet + switch protocol { case "tcp": - nodePortSet = proxier.nodePortSetTCP + nodePortLocalSet = proxier.nodePortLocalSetTCP case "udp": - nodePortSet = proxier.nodePortSetUDP + nodePortLocalSet = proxier.nodePortLocalSetUDP default: // It should never hit glog.Errorf("Unsupported protocol type: %s", protocol) } - if nodePortSet != nil { - if valid := nodePortSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name)) + if nodePortLocalSet != nil { + if valid := nodePortLocalSet.validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name)) continue } - nodePortSet.activeEntries.Insert(entry.String()) + nodePortLocalSet.activeEntries.Insert(entry.String()) } } @@ -1080,8 +1115,9 @@ func (proxier *Proxier) syncProxyRules() { } // sync ipset entries - ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbIngressSet, proxier.lbMasqSet, proxier.nodePortSetTCP, - proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, proxier.lbWhiteListCIDRSet} + ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbIngressSet, proxier.nodePortSetTCP, + proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, proxier.lbWhiteListCIDRSet, proxier.lbIngressLocalSet, + proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP} for i := range ipsetsToSync { ipsetsToSync[i].syncIPSetEntries() } @@ -1134,46 +1170,55 @@ func (proxier *Proxier) syncProxyRules() { // This covers cases like GCE load-balancers which get added to the local routing table. writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...) } - if !proxier.lbMasqSet.isEmpty() { + if !proxier.lbIngressSet.isEmpty() { // Build masquerade rules for packets which cross node visit load balancer ingress IPs. args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "set", "--match-set", proxier.lbMasqSet.Name, - "dst,dst", - ) - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - } - if !proxier.lbWhiteListCIDRSet.isEmpty() || !proxier.lbWhiteListIPSet.isEmpty() { - // link kube-services chain -> kube-fire-wall chain - args := []string{ "-A", string(kubeServicesChain), "-m", "set", "--match-set", proxier.lbIngressSet.Name, "dst,dst", - "-j", string(KubeFireWallChain), - } - writeLine(proxier.natRules, args...) - if !proxier.lbWhiteListCIDRSet.isEmpty() { + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeFireWallChain))...) + // Don't masq for service with externaltrafficpolicy =local + if !proxier.lbIngressLocalSet.isEmpty() { args = append(args[:0], "-A", string(KubeFireWallChain), - "-m", "set", "--match-set", proxier.lbWhiteListCIDRSet.Name, - "dst,dst,src", - ) - writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) - } - if !proxier.lbWhiteListIPSet.isEmpty() { - args = append(args[:0], - "-A", string(KubeFireWallChain), - "-m", "set", "--match-set", proxier.lbWhiteListIPSet.Name, - "dst,dst,src", + "-m", "set", "--match-set", proxier.lbIngressLocalSet.Name, + "dst,dst", ) writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) } + // mark masq for others args = append(args[:0], "-A", string(KubeFireWallChain), + "-m", "comment", "--comment", + fmt.Sprintf(`"mark MASQ for external traffic policy not local"`), ) - // If the packet was able to reach the end of firewall chain, then it did not get DNATed. - // It means the packet cannot go thru the firewall, then mark it for DROP - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + // if have whitelist, accept or drop. + if !proxier.lbWhiteListCIDRSet.isEmpty() || !proxier.lbWhiteListIPSet.isEmpty() { + if !proxier.lbWhiteListCIDRSet.isEmpty() { + args = append(args[:0], + "-A", string(KubeFireWallChain), + "-m", "set", "--match-set", proxier.lbWhiteListCIDRSet.Name, + "dst,dst,src", + ) + writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) + } + if !proxier.lbWhiteListIPSet.isEmpty() { + args = append(args[:0], + "-A", string(KubeFireWallChain), + "-m", "set", "--match-set", proxier.lbWhiteListIPSet.Name, + "dst,dst,src", + ) + writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) + } + args = append(args[:0], + "-A", string(KubeFireWallChain), + ) + // If the packet was able to reach the end of firewall chain, then it did not get DNATed. + // It means the packet cannot go thru the firewall, then mark it for DROP + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) + } } if !proxier.nodePortSetTCP.isEmpty() { // Build masquerade rules for packets which cross node visit nodeport. @@ -1183,15 +1228,47 @@ func (proxier *Proxier) syncProxyRules() { "-m", "set", "--match-set", proxier.nodePortSetTCP.Name, "dst", ) + writeLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...) + // accept for nodeports w/ externaltrafficpolicy=local + if !proxier.nodePortLocalSetTCP.isEmpty() { + args = append(args[:0], + "-A", string(KubeNodePortChain), + "-m", "set", "--match-set", proxier.nodePortLocalSetTCP.Name, + "dst", + ) + writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) + } + // mark masq for others + args = append(args[:0], + "-A", string(KubeNodePortChain), + "-m", "comment", "--comment", + fmt.Sprintf(`"mark MASQ for externaltrafficpolicy=cluster"`), + ) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) } if !proxier.nodePortSetUDP.isEmpty() { + // accept for nodeports w/ externaltrafficpolicy=local args = append(args[:0], "-A", string(kubeServicesChain), "-m", "udp", "-p", "udp", "-m", "set", "--match-set", proxier.nodePortSetUDP.Name, "dst", ) + writeLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...) + if !proxier.nodePortLocalSetUDP.isEmpty() { + args = append(args[:0], + "-A", string(KubeNodePortChain), + "-m", "set", "--match-set", proxier.nodePortLocalSetUDP.Name, + "dst", + ) + writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) + } + // mark masq for others + args = append(args[:0], + "-A", string(KubeNodePortChain), + "-m", "comment", "--comment", + fmt.Sprintf(`"mark MASQ for externaltrafficpolicy=cluster"`), + ) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) } @@ -1327,9 +1404,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode } for _, epInfo := range proxier.endpointsMap[svcPortName] { - if !onlyNodeLocalEndpoints || onlyNodeLocalEndpoints && epInfo.GetIsLocal() { - newEndpoints.Insert(epInfo.String()) - } + newEndpoints.Insert(epInfo.String()) } if !curEndpoints.Equal(newEndpoints) { @@ -1439,17 +1514,17 @@ func (proxier *Proxier) linkKubeServiceChain(existingNATChains map[utiliptables. return nil } -func (proxier *Proxier) createKubeFireWallChain(existingNATChains map[utiliptables.Chain]string, natChains *bytes.Buffer) error { - // `iptables -t nat -N KUBE-FIRE-WALL` - if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, KubeFireWallChain); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, KubeFireWallChain, err) +// `iptables -t nat -N ` +func (proxier *Proxier) createKubeChain(existingNATChains map[utiliptables.Chain]string, chainName utiliptables.Chain) error { + if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, chainName); err != nil { + return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, chainName, err) } - // write `:KUBE-FIRE-WALL - [0:0]` in nat table - if chain, ok := existingNATChains[KubeFireWallChain]; ok { - writeLine(natChains, chain) + // write `: - [0:0]` in nat table + if chain, ok := existingNATChains[chainName]; ok { + writeLine(proxier.natChains, chain) } else { - writeLine(natChains, utiliptables.MakeChainLine(KubeFireWallChain)) + writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName)) } return nil } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 4a6c294c35b..f5d062b8d5a 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -120,36 +120,38 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } return &Proxier{ - exec: fexec, - serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), - endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil), - iptables: ipt, - ipvs: ipvs, - ipset: ipset, - clusterCIDR: "10.0.0.0/24", - hostname: testHostname, - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, - healthChecker: newFakeHealthChecker(), - ipvsScheduler: DefaultScheduler, - ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, - iptablesData: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - netlinkHandle: netlinktest.NewFakeNetlinkHandle(), - loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false), - clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false), - externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, false), - lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false), - lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, false), - lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, false), - lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false), - nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), - nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false), - nodePortAddresses: make([]string, 0), - networkInterfacer: proxyutiltest.NewFakeNetwork(), + exec: fexec, + serviceMap: make(proxy.ServiceMap), + serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), + endpointsMap: make(proxy.EndpointsMap), + endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil), + iptables: ipt, + ipvs: ipvs, + ipset: ipset, + clusterCIDR: "10.0.0.0/24", + hostname: testHostname, + portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, + healthChecker: newFakeHealthChecker(), + ipvsScheduler: DefaultScheduler, + ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, + iptablesData: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), + netlinkHandle: netlinktest.NewFakeNetlinkHandle(), + loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false), + clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false), + externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, false), + lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false), + lbIngressLocalSet: NewIPSet(ipset, KubeLoadBalancerIngressLocalSet, utilipset.HashIPPort, false), + lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, false), + lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false), + nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), + nodePortLocalSetTCP: NewIPSet(ipset, KubeNodePortLocalSetTCP, utilipset.BitmapPort, false), + nodePortLocalSetUDP: NewIPSet(ipset, KubeNodePortLocalSetUDP, utilipset.BitmapPort, false), + nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false), + nodePortAddresses: make([]string, 0), + networkInterfacer: proxyutiltest.NewFakeNetwork(), } } @@ -803,16 +805,10 @@ func TestLoadBalancer(t *testing.T) { fp.syncProxyRules() } -func strPtr(s string) *string { - return &s -} - func TestOnlyLocalNodePorts(t *testing.T) { - ipt := iptablestest.NewFake() - ipvs := ipvstest.NewFake() - ipset := ipsettest.NewFake(testIPSetVersion) nodeIP := net.ParseIP("100.101.102.103") - fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}) + ipt, fp := buildFakeProxier([]net.IP{nodeIP}) + svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -835,17 +831,13 @@ func TestOnlyLocalNodePorts(t *testing.T) { }), ) - epIP1 := "10.180.0.1" - epIP2 := "10.180.2.1" + epIP := "10.180.0.1" makeEndpointsMap(fp, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ - IP: epIP1, + IP: epIP, NodeName: nil, - }, { - IP: epIP2, - NodeName: strPtr(testHostname), }}, Ports: []api.EndpointPort{{ Name: svcPortName.Port, @@ -866,40 +858,42 @@ func TestOnlyLocalNodePorts(t *testing.T) { fp.syncProxyRules() // Expect 3 services and 1 destination - services, err := ipvs.GetVirtualServers() - if err != nil { - t.Errorf("Failed to get ipvs services, err: %v", err) + epVS := &netlinktest.ExpectedVirtualServer{ + VSNum: 3, IP: nodeIP.String(), Port: uint16(svcNodePort), Protocol: string(api.ProtocolTCP), + RS: []netlinktest.ExpectedRealServer{{ + IP: epIP, Port: uint16(svcPort), + }}} + checkIPVS(t, fp, epVS) + + // check ipSet rules + epEntry := &utilipset.Entry{ + Port: svcNodePort, + Protocol: strings.ToLower(string(api.ProtocolTCP)), + SetType: utilipset.BitmapPort, } - if len(services) != 3 { - t.Errorf("Expect 3 ipvs services, got %d", len(services)) + epIPSet := netlinktest.ExpectedIPSet{ + KubeNodePortSetTCP: {epEntry}, + KubeNodePortLocalSetTCP: {epEntry}, } - found := false - for _, svc := range services { - if svc.Address.Equal(nodeIP) && svc.Port == uint16(svcNodePort) && svc.Protocol == string(api.ProtocolTCP) { - found = true - destinations, err := ipvs.GetRealServers(svc) - if err != nil { - t.Errorf("Failed to get ipvs destinations, err: %v", err) - } - if len(destinations) != 1 { - t.Errorf("Expect 1 ipvs destination, got %d", len(destinations)) - } else { - if destinations[0].Address.String() != epIP2 || destinations[0].Port != uint16(svcPort) { - t.Errorf("service Endpoint mismatch ipvs service destination") - } - } - break - } - } - if !found { - t.Errorf("Expect node port type service, got none") + checkIPSet(t, fp, epIPSet) + + // Check iptables chain and rules + epIpt := netlinktest.ExpectedIptablesChain{ + string(kubeServicesChain): {{ + JumpChain: string(KubeNodePortChain), MatchSet: KubeNodePortSetTCP, + }}, + string(KubeNodePortChain): {{ + JumpChain: "ACCEPT", MatchSet: KubeNodePortLocalSetTCP, + }, { + JumpChain: string(KubeMarkMasqChain), MatchSet: "", + }}, } + checkIptables(t, ipt, epIpt) } func TestLoadBalanceSourceRanges(t *testing.T) { - ipt := iptablestest.NewFake() - ipvs := ipvstest.NewFake() - ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil) + nodeIP := net.ParseIP("100.101.102.103") + ipt, fp := buildFakeProxier([]net.IP{nodeIP}) + svcIP := "10.20.30.41" svcPort := 80 svcLBIP := "1.2.3.4" @@ -932,7 +926,7 @@ func TestLoadBalanceSourceRanges(t *testing.T) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ IP: epIP, - NodeName: strPtr(testHostname), + NodeName: nil, }}, Ports: []api.EndpointPort{{ Name: svcPortName.Port, @@ -945,82 +939,48 @@ func TestLoadBalanceSourceRanges(t *testing.T) { fp.syncProxyRules() // Check ipvs service and destinations - services, err := ipvs.GetVirtualServers() - if err != nil { - t.Errorf("Failed to get ipvs services, err: %v", err) - } - found := false - for _, svc := range services { - fmt.Printf("address: %s:%d, %s", svc.Address.String(), svc.Port, svc.Protocol) - if svc.Address.Equal(net.ParseIP(svcLBIP)) && svc.Port == uint16(svcPort) && svc.Protocol == string(api.ProtocolTCP) { - destinations, _ := ipvs.GetRealServers(svc) - if len(destinations) != 1 { - t.Errorf("Unexpected %d destinations, expect 0 destinations", len(destinations)) - } - for _, ep := range destinations { - if ep.Address.String() == epIP && ep.Port == uint16(svcPort) { - found = true - } - } - } - } - if !found { - t.Errorf("Did not got expected loadbalance service") - } + epVS := &netlinktest.ExpectedVirtualServer{ + VSNum: 2, IP: svcLBIP, Port: uint16(svcPort), Protocol: string(api.ProtocolTCP), + RS: []netlinktest.ExpectedRealServer{{ + IP: epIP, Port: uint16(svcPort), + }}} + checkIPVS(t, fp, epVS) // Check ipset entry - expectIPSet := map[string]*utilipset.Entry{ - KubeLoadBalancerSet: { + epIPSet := netlinktest.ExpectedIPSet{ + KubeLoadBalancerSet: {{ IP: svcLBIP, Port: svcPort, Protocol: strings.ToLower(string(api.ProtocolTCP)), SetType: utilipset.HashIPPort, - }, - KubeLoadBalancerMasqSet: { - IP: svcLBIP, - Port: svcPort, - Protocol: strings.ToLower(string(api.ProtocolTCP)), - SetType: utilipset.HashIPPort, - }, - KubeLoadBalancerSourceCIDRSet: { + }}, + KubeLoadBalancerSourceCIDRSet: {{ IP: svcLBIP, Port: svcPort, Protocol: strings.ToLower(string(api.ProtocolTCP)), Net: svcLBSource, SetType: utilipset.HashIPPortNet, - }, - } - for set, entry := range expectIPSet { - ents, err := ipset.ListEntries(set) - if err != nil || len(ents) != 1 { - t.Errorf("Check ipset entries failed for ipset: %q", set) - continue - } - if ents[0] != entry.String() { - t.Errorf("Check ipset entries failed for ipset: %q", set) - } + }}, } + checkIPSet(t, fp, epIPSet) // Check iptables chain and rules - kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) - kubeFWRules := ipt.GetRules(string(KubeFireWallChain)) - if !hasJump(kubeSvcRules, string(KubeMarkMasqChain), KubeLoadBalancerMasqSet) { - t.Errorf("Didn't find jump from chain %v match set %v to MASQUERADE", kubeServicesChain, KubeLoadBalancerMasqSet) - } - if !hasJump(kubeSvcRules, string(KubeFireWallChain), KubeLoadBalancerSet) { - t.Errorf("Didn't find jump from chain %v match set %v to %v", kubeServicesChain, - KubeLoadBalancerSet, KubeFireWallChain) - } - if !hasJump(kubeFWRules, "ACCEPT", KubeLoadBalancerSourceCIDRSet) { - t.Errorf("Didn't find jump from chain %v match set %v to ACCEPT", kubeServicesChain, KubeLoadBalancerSourceCIDRSet) + epIpt := netlinktest.ExpectedIptablesChain{ + string(kubeServicesChain): {{ + JumpChain: string(KubeFireWallChain), MatchSet: KubeLoadBalancerSet, + }}, + string(KubeFireWallChain): {{ + JumpChain: "ACCEPT", MatchSet: KubeLoadBalancerSourceCIDRSet, + }, { + JumpChain: string(KubeMarkDropChain), MatchSet: "", + }}, } + checkIptables(t, ipt, epIpt) } func TestOnlyLocalLoadBalancing(t *testing.T) { - ipt := iptablestest.NewFake() - ipvs := ipvstest.NewFake() - ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, nil) + ipt, fp := buildFakeProxier(nil) + svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -1047,17 +1007,13 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { }), ) - epIP1 := "10.180.0.1" - epIP2 := "10.180.2.1" + epIP := "10.180.0.1" makeEndpointsMap(fp, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ - IP: epIP1, + IP: epIP, NodeName: nil, - }, { - IP: epIP2, - NodeName: strPtr(testHostname), }}, Ports: []api.EndpointPort{{ Name: svcPortName.Port, @@ -1068,6 +1024,44 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { ) fp.syncProxyRules() + + // Expect 2 services and 1 destination + epVS := &netlinktest.ExpectedVirtualServer{ + VSNum: 2, IP: svcLBIP, Port: uint16(svcPort), Protocol: string(api.ProtocolTCP), + RS: []netlinktest.ExpectedRealServer{{ + IP: epIP, Port: uint16(svcPort), + }}} + checkIPVS(t, fp, epVS) + + // check ipSet rules + epIPSet := netlinktest.ExpectedIPSet{ + KubeLoadBalancerSet: {{ + IP: svcLBIP, + Port: svcPort, + Protocol: strings.ToLower(string(api.ProtocolTCP)), + SetType: utilipset.HashIPPort, + }}, + KubeLoadBalancerIngressLocalSet: {{ + IP: svcLBIP, + Port: svcPort, + Protocol: strings.ToLower(string(api.ProtocolTCP)), + SetType: utilipset.HashIPPort, + }}, + } + checkIPSet(t, fp, epIPSet) + + // Check iptables chain and rules + epIpt := netlinktest.ExpectedIptablesChain{ + string(kubeServicesChain): {{ + JumpChain: string(KubeFireWallChain), MatchSet: KubeLoadBalancerSet, + }}, + string(KubeFireWallChain): {{ + JumpChain: "ACCEPT", MatchSet: KubeLoadBalancerIngressLocalSet, + }, { + JumpChain: string(KubeMarkMasqChain), MatchSet: "", + }}, + } + checkIptables(t, ipt, epIpt) } func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, port, nodeport int32, targetPort int) []api.ServicePort { @@ -2399,18 +2393,75 @@ func Test_syncService(t *testing.T) { } } +func buildFakeProxier(nodeIP []net.IP) (*iptablestest.FakeIPTables, *Proxier) { + ipt := iptablestest.NewFake() + ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake(testIPSetVersion) + return ipt, NewFakeProxier(ipt, ipvs, ipset, nil) +} + func hasJump(rules []iptablestest.Rule, destChain, ipSet string) bool { - match := false for _, r := range rules { if r[iptablestest.Jump] == destChain { - match = true - if ipSet != "" { - if strings.Contains(r[iptablestest.MatchSet], ipSet) { - return true - } - match = false + if ipSet == "" { + return true + } + if strings.Contains(r[iptablestest.MatchSet], ipSet) { + return true + } + } + } + return false +} + +// checkIptabless to check expected iptables chain and rules +func checkIptables(t *testing.T, ipt *iptablestest.FakeIPTables, epIpt netlinktest.ExpectedIptablesChain) { + for epChain, epRules := range epIpt { + rules := ipt.GetRules(epChain) + for _, epRule := range epRules { + if !hasJump(rules, epRule.JumpChain, epRule.MatchSet) { + t.Errorf("Didn't find jump from chain %v match set %v to %v", epChain, epRule.MatchSet, epRule.JumpChain) + } + } + } +} + +// checkIPSet to check expected ipset and entries +func checkIPSet(t *testing.T, fp *Proxier, ipSet netlinktest.ExpectedIPSet) { + for set, entries := range ipSet { + ents, err := fp.ipset.ListEntries(set) + if err != nil || len(ents) != len(entries) { + t.Errorf("Check ipset entries failed for ipset: %q", set) + continue + } + if len(entries) == 1 { + if ents[0] != entries[0].String() { + t.Errorf("Check ipset entries failed for ipset: %q", set) + } + } + } +} + +// checkIPVS to check expected ipvs service and destination +func checkIPVS(t *testing.T, fp *Proxier, vs *netlinktest.ExpectedVirtualServer) { + services, err := fp.ipvs.GetVirtualServers() + if err != nil { + t.Errorf("Failed to get ipvs services, err: %v", err) + } + if len(services) != vs.VSNum { + t.Errorf("Expect %d ipvs services, got %d", vs.VSNum, len(services)) + } + for _, svc := range services { + if svc.Address.String() == vs.IP && svc.Port == vs.Port && svc.Protocol == vs.Protocol { + destinations, _ := fp.ipvs.GetRealServers(svc) + if len(destinations) != len(vs.RS) { + t.Errorf("Expected %d destinations, got %d destinations", len(vs.RS), len(destinations)) + } + if len(vs.RS) == 1 { + if destinations[0].Address.String() != vs.RS[0].IP || destinations[0].Port != vs.RS[0].Port { + t.Errorf("Unexpected mismatch destinations") + } } } } - return match } diff --git a/pkg/proxy/ipvs/testing/BUILD b/pkg/proxy/ipvs/testing/BUILD index 576024cee6a..09065034194 100644 --- a/pkg/proxy/ipvs/testing/BUILD +++ b/pkg/proxy/ipvs/testing/BUILD @@ -10,10 +10,16 @@ load( go_library( name = "go_default_library", - srcs = ["fake.go"], + srcs = [ + "fake.go", + "util.go", + ], importpath = "k8s.io/kubernetes/pkg/proxy/ipvs/testing", tags = ["automanaged"], - deps = ["//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library"], + deps = [ + "//pkg/util/ipset:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + ], ) filegroup( diff --git a/pkg/proxy/ipvs/testing/util.go b/pkg/proxy/ipvs/testing/util.go new file mode 100644 index 00000000000..2dbff14d171 --- /dev/null +++ b/pkg/proxy/ipvs/testing/util.go @@ -0,0 +1,51 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + utilipset "k8s.io/kubernetes/pkg/util/ipset" +) + +// ExpectedVirtualServer is the expected ipvs rules with VirtualServer and RealServer +// VSNum is the expected ipvs virtual server number +// IP:Port protocol is the expected ipvs vs info +// RS is the RealServer of this expected VirtualServer +type ExpectedVirtualServer struct { + VSNum int + IP string + Port uint16 + Protocol string + RS []ExpectedRealServer +} + +// ExpectedRealServer is the expected ipvs RealServer +type ExpectedRealServer struct { + IP string + Port uint16 +} + +// ExpectedIptablesChain is a map of expected iptables chain and jump rules +type ExpectedIptablesChain map[string][]ExpectedIptablesRule + +// ExpectedIptablesRule is the expected iptables rules with jump chain and match ipset name +type ExpectedIptablesRule struct { + JumpChain string + MatchSet string +} + +// ExpectedIPSet is the expected ipset with set name and entries name +type ExpectedIPSet map[string][]*utilipset.Entry