diff --git a/pkg/proxy/ipvs/ipset.go b/pkg/proxy/ipvs/ipset.go index 16637455b7b..037df0ada85 100644 --- a/pkg/proxy/ipvs/ipset.go +++ b/pkg/proxy/ipvs/ipset.go @@ -40,8 +40,8 @@ const ( // KubeLoadBalancerSet is used to store service load balancer ingress ip + port, it is the service lb portal. KubeLoadBalancerSet = "KUBE-LOAD-BALANCER" - // KubeLoadBalancerMasqSet is used to store service load balancer ingress ip + port for masquerade purpose. - KubeLoadBalancerMasqSet = "KUBE-LOAD-BALANCER-MASQ" + // KubeLoadBalancerIngressLocalSet is used to store service load balancer ingress ip + port with externalTrafficPolicy=local. + KubeLoadBalancerIngressLocalSet = "KUBE-LB-INGRESS-LOCAL" // KubeLoadBalancerSourceIPSet is used to store service load balancer ingress ip + port + source IP for packet filter purpose. KubeLoadBalancerSourceIPSet = "KUBE-LOAD-BALANCER-SOURCE-IP" @@ -49,11 +49,17 @@ const ( // KubeLoadBalancerSourceCIDRSet is used to store service load balancer ingress ip + port + source cidr for packet filter purpose. KubeLoadBalancerSourceCIDRSet = "KUBE-LOAD-BALANCER-SOURCE-CIDR" - // KubeNodePortSetTCP is used to store nodeport TCP port for masquerade purpose. + // KubeNodePortSetTCP is used to store the nodeport TCP port for masquerade purpose. KubeNodePortSetTCP = "KUBE-NODE-PORT-TCP" - // KubeNodePortSetUDP is used to store nodeport UDP port for masquerade purpose. + // KubeNodePortLocalSetTCP is used to store the nodeport TCP port with externalTrafficPolicy=local. + KubeNodePortLocalSetTCP = "KUBE-NODE-PORT-LOCAL-TCP" + + // KubeNodePortSetUDP is used to store the nodeport UDP port for masquerade purpose. KubeNodePortSetUDP = "KUBE-NODE-PORT-UDP" + + // KubeNodePortLocalSetUDP is used to store the nodeport UDP port with externalTrafficPolicy=local. + KubeNodePortLocalSetUDP = "KUBE-NODE-PORT-LOCAL-UDP" ) // IPSetVersioner can query the current ipset version. diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index f5ae1da3bd8..2b8cce281fe 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -68,6 +68,9 @@ const ( // KubeMarkMasqChain is the mark-for-masquerade chain KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ" + // KubeNodePortChain is the kubernetes node port chain + KubeNodePortChain utiliptables.Chain = "KUBE-NODE-PORT" + // KubeMarkDropChain is the mark-for-drop chain KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP" @@ -157,12 +160,16 @@ type Proxier struct { nodePortSetTCP *IPSet // nodePortSetTCP is the bitmap:port type ipset where stores all UDP node port nodePortSetUDP *IPSet + // lbIngressLocalSet is the hash:ip type ipset where stores all service ip's with externaltrafficPolicy=local + lbIngressLocalSet *IPSet + // nodePortLocalSetTCP is the bitmap:port type ipset where stores all TCP nodeport's with externaltrafficPolicy=local + nodePortLocalSetTCP *IPSet + // nodePortLocalSetUDP is the bitmap:port type ipset where stores all UDP nodeport's with externaltrafficPolicy=local + nodePortLocalSetUDP *IPSet // externalIPSet is the hash:ip,port type ipset where stores all service ExternalIP:Port externalIPSet *IPSet // lbIngressSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port. lbIngressSet *IPSet - // lbMasqSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port which needs masquerade. - lbMasqSet *IPSet // lbWhiteListIPSet is the hash:ip,port,ip type ipset where stores all service load balancer ingress IP:Port,sourceIP pair, any packets // with the source IP visit ingress IP:Port can pass through. lbWhiteListIPSet *IPSet @@ -308,43 +315,45 @@ func NewProxier(ipt utiliptables.Interface, healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps proxier := &Proxier{ - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), - endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder), - syncPeriod: syncPeriod, - minSyncPeriod: minSyncPeriod, - iptables: ipt, - masqueradeAll: masqueradeAll, - masqueradeMark: masqueradeMark, - exec: exec, - clusterCIDR: clusterCIDR, - hostname: hostname, - nodeIP: nodeIP, - portMapper: &listenPortOpener{}, - recorder: recorder, - healthChecker: healthChecker, - healthzServer: healthzServer, - ipvs: ipvs, - ipvsScheduler: scheduler, - ipGetter: &realIPGetter{nl: NewNetLinkHandle()}, - iptablesData: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - netlinkHandle: NewNetLinkHandle(), - ipset: ipset, - loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6), - clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, isIPv6), - externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, isIPv6), - lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6), - lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, isIPv6), - lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, isIPv6), - lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6), - nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), - nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false), - nodePortAddresses: nodePortAddresses, - networkInterfacer: utilproxy.RealNetwork{}, + portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + serviceMap: make(proxy.ServiceMap), + serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), + endpointsMap: make(proxy.EndpointsMap), + endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder), + syncPeriod: syncPeriod, + minSyncPeriod: minSyncPeriod, + iptables: ipt, + masqueradeAll: masqueradeAll, + masqueradeMark: masqueradeMark, + exec: exec, + clusterCIDR: clusterCIDR, + hostname: hostname, + nodeIP: nodeIP, + portMapper: &listenPortOpener{}, + recorder: recorder, + healthChecker: healthChecker, + healthzServer: healthzServer, + ipvs: ipvs, + ipvsScheduler: scheduler, + ipGetter: &realIPGetter{nl: NewNetLinkHandle()}, + iptablesData: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), + netlinkHandle: NewNetLinkHandle(), + ipset: ipset, + loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6), + clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, isIPv6), + externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, isIPv6), + lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6), + lbIngressLocalSet: NewIPSet(ipset, KubeLoadBalancerIngressLocalSet, utilipset.HashIPPort, isIPv6), + lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, isIPv6), + lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6), + nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), + nodePortLocalSetTCP: NewIPSet(ipset, KubeNodePortLocalSetTCP, utilipset.BitmapPort, false), + nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false), + nodePortLocalSetUDP: NewIPSet(ipset, KubeNodePortLocalSetUDP, utilipset.BitmapPort, false), + nodePortAddresses: nodePortAddresses, + networkInterfacer: utilproxy.RealNetwork{}, } burstSyncs := 2 glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) @@ -511,7 +520,8 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset // Destroy ip sets created by ipvs Proxier. We should call it after cleaning up // iptables since we can NOT delete ip set which is still referenced by iptables. ipSetsToDestroy := []string{KubeLoopBackIPSet, KubeClusterIPSet, KubeLoadBalancerSet, KubeNodePortSetTCP, KubeNodePortSetUDP, - KubeExternalIPSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet, KubeLoadBalancerMasqSet} + KubeExternalIPSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet, + KubeLoadBalancerIngressLocalSet, KubeNodePortLocalSetUDP, KubeNodePortLocalSetTCP} for _, set := range ipSetsToDestroy { err = ipset.DestroySet(set) if err != nil { @@ -699,7 +709,8 @@ func (proxier *Proxier) syncProxyRules() { // make sure ip sets exists in the system. ipSets := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.externalIPSet, proxier.nodePortSetUDP, proxier.nodePortSetTCP, - proxier.lbIngressSet, proxier.lbMasqSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet} + proxier.lbIngressSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet, proxier.lbIngressLocalSet, + proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP} if err := ensureIPSets(ipSets...); err != nil { return } @@ -731,12 +742,18 @@ func (proxier *Proxier) syncProxyRules() { glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) return } - // Kube service ipset - if err := proxier.createKubeFireWallChain(existingNATChains, proxier.natChains); err != nil { + // `iptables -t nat -N KUBE-FIRE-WALL` + if err := proxier.createKubeChain(existingNATChains, KubeFireWallChain); err != nil { glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err) return } + // `iptables -t nat -N KUBE-NODE-PORT` + if err := proxier.createKubeChain(existingNATChains, KubeNodePortChain); err != nil { + glog.Errorf("Failed to create KUBE-NODE-PORT chain: %v", err) + return + } + // Build IPVS rules for each service. for svcName, svc := range proxier.serviceMap { svcInfo, ok := svc.(*serviceInfo) @@ -901,23 +918,23 @@ func (proxier *Proxier) syncProxyRules() { // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) // If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying only locally, we can retain the source IP. - if !svcInfo.OnlyNodeLocalEndpoints { - if valid := proxier.lbMasqSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbMasqSet.Name)) + if valid := proxier.lbIngressSet.validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name)) + continue + } + proxier.lbIngressSet.activeEntries.Insert(entry.String()) + // insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local + if svcInfo.OnlyNodeLocalEndpoints { + if valid := proxier.lbIngressLocalSet.validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name)) continue } - proxier.lbMasqSet.activeEntries.Insert(entry.String()) + proxier.lbIngressLocalSet.activeEntries.Insert(entry.String()) } if len(svcInfo.LoadBalancerSourceRanges) != 0 { // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // This currently works for loadbalancers that preserves source ips. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. - if valid := proxier.lbIngressSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name)) - continue - } - proxier.lbIngressSet.activeEntries.Insert(entry.String()) - allowFromNode := false for _, src := range svcInfo.LoadBalancerSourceRanges { // ipset call @@ -1009,30 +1026,48 @@ func (proxier *Proxier) syncProxyRules() { // Nodeports need SNAT, unless they're local. // ipset call - if !svcInfo.OnlyNodeLocalEndpoints { - entry = &utilipset.Entry{ - // No need to provide ip info - Port: svcInfo.NodePort, - Protocol: protocol, - SetType: utilipset.BitmapPort, + entry = &utilipset.Entry{ + // No need to provide ip info + Port: svcInfo.NodePort, + Protocol: protocol, + SetType: utilipset.BitmapPort, + } + var nodePortSet *IPSet + switch protocol { + case "tcp": + nodePortSet = proxier.nodePortSetTCP + case "udp": + nodePortSet = proxier.nodePortSetUDP + default: + // It should never hit + glog.Errorf("Unsupported protocol type: %s", protocol) + } + if nodePortSet != nil { + if valid := nodePortSet.validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name)) + continue } - var nodePortSet *IPSet - switch protocol { + nodePortSet.activeEntries.Insert(entry.String()) + } + // Add externaltrafficpolicy=local type nodeport entry + if svcInfo.OnlyNodeLocalEndpoints { + var nodePortLocalSet *IPSet + switch protocol { case "tcp": - nodePortSet = proxier.nodePortSetTCP + nodePortLocalSet = proxier.nodePortLocalSetTCP case "udp": - nodePortSet = proxier.nodePortSetUDP + nodePortLocalSet = proxier.nodePortLocalSetUDP default: // It should never hit glog.Errorf("Unsupported protocol type: %s", protocol) } - if nodePortSet != nil { - if valid := nodePortSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name)) + if nodePortLocalSet != nil { + if valid := nodePortLocalSet.validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name)) continue } - nodePortSet.activeEntries.Insert(entry.String()) + nodePortLocalSet.activeEntries.Insert(entry.String()) } } @@ -1080,8 +1115,9 @@ func (proxier *Proxier) syncProxyRules() { } // sync ipset entries - ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbIngressSet, proxier.lbMasqSet, proxier.nodePortSetTCP, - proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, proxier.lbWhiteListCIDRSet} + ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbIngressSet, proxier.nodePortSetTCP, + proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, proxier.lbWhiteListCIDRSet, proxier.lbIngressLocalSet, + proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP} for i := range ipsetsToSync { ipsetsToSync[i].syncIPSetEntries() } @@ -1134,46 +1170,55 @@ func (proxier *Proxier) syncProxyRules() { // This covers cases like GCE load-balancers which get added to the local routing table. writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...) } - if !proxier.lbMasqSet.isEmpty() { + if !proxier.lbIngressSet.isEmpty() { // Build masquerade rules for packets which cross node visit load balancer ingress IPs. args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "set", "--match-set", proxier.lbMasqSet.Name, - "dst,dst", - ) - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - } - if !proxier.lbWhiteListCIDRSet.isEmpty() || !proxier.lbWhiteListIPSet.isEmpty() { - // link kube-services chain -> kube-fire-wall chain - args := []string{ "-A", string(kubeServicesChain), "-m", "set", "--match-set", proxier.lbIngressSet.Name, "dst,dst", - "-j", string(KubeFireWallChain), - } - writeLine(proxier.natRules, args...) - if !proxier.lbWhiteListCIDRSet.isEmpty() { + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeFireWallChain))...) + // Don't masq for service with externaltrafficpolicy =local + if !proxier.lbIngressLocalSet.isEmpty() { args = append(args[:0], "-A", string(KubeFireWallChain), - "-m", "set", "--match-set", proxier.lbWhiteListCIDRSet.Name, - "dst,dst,src", - ) - writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) - } - if !proxier.lbWhiteListIPSet.isEmpty() { - args = append(args[:0], - "-A", string(KubeFireWallChain), - "-m", "set", "--match-set", proxier.lbWhiteListIPSet.Name, - "dst,dst,src", + "-m", "set", "--match-set", proxier.lbIngressLocalSet.Name, + "dst,dst", ) writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) } + // mark masq for others args = append(args[:0], "-A", string(KubeFireWallChain), + "-m", "comment", "--comment", + fmt.Sprintf(`"mark MASQ for external traffic policy not local"`), ) - // If the packet was able to reach the end of firewall chain, then it did not get DNATed. - // It means the packet cannot go thru the firewall, then mark it for DROP - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + // if have whitelist, accept or drop. + if !proxier.lbWhiteListCIDRSet.isEmpty() || !proxier.lbWhiteListIPSet.isEmpty() { + if !proxier.lbWhiteListCIDRSet.isEmpty() { + args = append(args[:0], + "-A", string(KubeFireWallChain), + "-m", "set", "--match-set", proxier.lbWhiteListCIDRSet.Name, + "dst,dst,src", + ) + writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) + } + if !proxier.lbWhiteListIPSet.isEmpty() { + args = append(args[:0], + "-A", string(KubeFireWallChain), + "-m", "set", "--match-set", proxier.lbWhiteListIPSet.Name, + "dst,dst,src", + ) + writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) + } + args = append(args[:0], + "-A", string(KubeFireWallChain), + ) + // If the packet was able to reach the end of firewall chain, then it did not get DNATed. + // It means the packet cannot go thru the firewall, then mark it for DROP + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) + } } if !proxier.nodePortSetTCP.isEmpty() { // Build masquerade rules for packets which cross node visit nodeport. @@ -1183,15 +1228,47 @@ func (proxier *Proxier) syncProxyRules() { "-m", "set", "--match-set", proxier.nodePortSetTCP.Name, "dst", ) + writeLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...) + // accept for nodeports w/ externaltrafficpolicy=local + if !proxier.nodePortLocalSetTCP.isEmpty() { + args = append(args[:0], + "-A", string(KubeNodePortChain), + "-m", "set", "--match-set", proxier.nodePortLocalSetTCP.Name, + "dst", + ) + writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) + } + // mark masq for others + args = append(args[:0], + "-A", string(KubeNodePortChain), + "-m", "comment", "--comment", + fmt.Sprintf(`"mark MASQ for externaltrafficpolicy=cluster"`), + ) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) } if !proxier.nodePortSetUDP.isEmpty() { + // accept for nodeports w/ externaltrafficpolicy=local args = append(args[:0], "-A", string(kubeServicesChain), "-m", "udp", "-p", "udp", "-m", "set", "--match-set", proxier.nodePortSetUDP.Name, "dst", ) + writeLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...) + if !proxier.nodePortLocalSetUDP.isEmpty() { + args = append(args[:0], + "-A", string(KubeNodePortChain), + "-m", "set", "--match-set", proxier.nodePortLocalSetUDP.Name, + "dst", + ) + writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) + } + // mark masq for others + args = append(args[:0], + "-A", string(KubeNodePortChain), + "-m", "comment", "--comment", + fmt.Sprintf(`"mark MASQ for externaltrafficpolicy=cluster"`), + ) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) } @@ -1327,9 +1404,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode } for _, epInfo := range proxier.endpointsMap[svcPortName] { - if !onlyNodeLocalEndpoints || onlyNodeLocalEndpoints && epInfo.GetIsLocal() { - newEndpoints.Insert(epInfo.String()) - } + newEndpoints.Insert(epInfo.String()) } if !curEndpoints.Equal(newEndpoints) { @@ -1439,17 +1514,17 @@ func (proxier *Proxier) linkKubeServiceChain(existingNATChains map[utiliptables. return nil } -func (proxier *Proxier) createKubeFireWallChain(existingNATChains map[utiliptables.Chain]string, natChains *bytes.Buffer) error { - // `iptables -t nat -N KUBE-FIRE-WALL` - if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, KubeFireWallChain); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, KubeFireWallChain, err) +// `iptables -t nat -N ` +func (proxier *Proxier) createKubeChain(existingNATChains map[utiliptables.Chain]string, chainName utiliptables.Chain) error { + if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, chainName); err != nil { + return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, chainName, err) } - // write `:KUBE-FIRE-WALL - [0:0]` in nat table - if chain, ok := existingNATChains[KubeFireWallChain]; ok { - writeLine(natChains, chain) + // write `: - [0:0]` in nat table + if chain, ok := existingNATChains[chainName]; ok { + writeLine(proxier.natChains, chain) } else { - writeLine(natChains, utiliptables.MakeChainLine(KubeFireWallChain)) + writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName)) } return nil } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 4a6c294c35b..149252c851d 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -120,36 +120,38 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } return &Proxier{ - exec: fexec, - serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), - endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil), - iptables: ipt, - ipvs: ipvs, - ipset: ipset, - clusterCIDR: "10.0.0.0/24", - hostname: testHostname, - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, - healthChecker: newFakeHealthChecker(), - ipvsScheduler: DefaultScheduler, - ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, - iptablesData: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - netlinkHandle: netlinktest.NewFakeNetlinkHandle(), - loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false), - clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false), - externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, false), - lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false), - lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, false), - lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, false), - lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false), - nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), - nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false), - nodePortAddresses: make([]string, 0), - networkInterfacer: proxyutiltest.NewFakeNetwork(), + exec: fexec, + serviceMap: make(proxy.ServiceMap), + serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), + endpointsMap: make(proxy.EndpointsMap), + endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil), + iptables: ipt, + ipvs: ipvs, + ipset: ipset, + clusterCIDR: "10.0.0.0/24", + hostname: testHostname, + portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, + healthChecker: newFakeHealthChecker(), + ipvsScheduler: DefaultScheduler, + ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, + iptablesData: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), + netlinkHandle: netlinktest.NewFakeNetlinkHandle(), + loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false), + clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false), + externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, false), + lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false), + lbIngressLocalSet: NewIPSet(ipset, KubeLoadBalancerIngressLocalSet, utilipset.HashIPPort, false), + lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, false), + lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false), + nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), + nodePortLocalSetTCP: NewIPSet(ipset, KubeNodePortLocalSetTCP, utilipset.BitmapPort, false), + nodePortLocalSetUDP: NewIPSet(ipset, KubeNodePortLocalSetUDP, utilipset.BitmapPort, false), + nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false), + nodePortAddresses: make([]string, 0), + networkInterfacer: proxyutiltest.NewFakeNetwork(), } } @@ -976,12 +978,6 @@ func TestLoadBalanceSourceRanges(t *testing.T) { Protocol: strings.ToLower(string(api.ProtocolTCP)), SetType: utilipset.HashIPPort, }, - KubeLoadBalancerMasqSet: { - IP: svcLBIP, - Port: svcPort, - Protocol: strings.ToLower(string(api.ProtocolTCP)), - SetType: utilipset.HashIPPort, - }, KubeLoadBalancerSourceCIDRSet: { IP: svcLBIP, Port: svcPort, @@ -1004,9 +1000,6 @@ func TestLoadBalanceSourceRanges(t *testing.T) { // Check iptables chain and rules kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) kubeFWRules := ipt.GetRules(string(KubeFireWallChain)) - if !hasJump(kubeSvcRules, string(KubeMarkMasqChain), KubeLoadBalancerMasqSet) { - t.Errorf("Didn't find jump from chain %v match set %v to MASQUERADE", kubeServicesChain, KubeLoadBalancerMasqSet) - } if !hasJump(kubeSvcRules, string(KubeFireWallChain), KubeLoadBalancerSet) { t.Errorf("Didn't find jump from chain %v match set %v to %v", kubeServicesChain, KubeLoadBalancerSet, KubeFireWallChain)