From e90de22a5e4c47188414df6d038fa47823476a31 Mon Sep 17 00:00:00 2001 From: Lion-Wei Date: Tue, 17 Apr 2018 19:32:32 +0800 Subject: [PATCH] fix ipvs fw --- pkg/proxy/ipvs/ipset.go | 7 +- pkg/proxy/ipvs/proxier.go | 123 +++++++++++++++++++-------------- pkg/proxy/ipvs/proxier_test.go | 30 +++++--- 3 files changed, 99 insertions(+), 61 deletions(-) diff --git a/pkg/proxy/ipvs/ipset.go b/pkg/proxy/ipvs/ipset.go index 037df0ada85..54634e342d9 100644 --- a/pkg/proxy/ipvs/ipset.go +++ b/pkg/proxy/ipvs/ipset.go @@ -40,8 +40,11 @@ const ( // KubeLoadBalancerSet is used to store service load balancer ingress ip + port, it is the service lb portal. KubeLoadBalancerSet = "KUBE-LOAD-BALANCER" - // KubeLoadBalancerIngressLocalSet is used to store service load balancer ingress ip + port with externalTrafficPolicy=local. - KubeLoadBalancerIngressLocalSet = "KUBE-LB-INGRESS-LOCAL" + // KubeLoadBalancerLocalSet is used to store service load balancer ingress ip + port with externalTrafficPolicy=local. + KubeLoadBalancerLocalSet = "KUBE-LOAD-BALANCER-LOCAL" + + // KubeLoadbalancerFWSet is used to store service load balancer ingress ip + port for load balancer with sourceRange. + KubeLoadbalancerFWSet = "KUBE-LOAD-BALANCER-FW" // KubeLoadBalancerSourceIPSet is used to store service load balancer ingress ip + port + source IP for packet filter purpose. KubeLoadBalancerSourceIPSet = "KUBE-LOAD-BALANCER-SOURCE-IP" diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 9de9aa7204a..38df2bb4d22 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -57,7 +57,7 @@ const ( kubeServicesChain utiliptables.Chain = "KUBE-SERVICES" // KubeFireWallChain is the kubernetes firewall chain. - KubeFireWallChain utiliptables.Chain = "KUBE-FIRE-WALL" + KubeFireWallChain utiliptables.Chain = "KUBE-FIREWALL" // kubePostroutingChain is the kubernetes postrouting chain kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING" @@ -74,6 +74,9 @@ const ( // KubeForwardChain is the kubernetes forward chain KubeForwardChain utiliptables.Chain = "KUBE-FORWARD" + // KubeLoadBalancerChain is the kubernetes chain for loadbalancer type service + KubeLoadBalancerChain utiliptables.Chain = "KUBE-LOAD-BALANCER" + // DefaultScheduler is the default ipvs scheduler algorithm - round robin. DefaultScheduler = "rr" @@ -164,16 +167,18 @@ type Proxier struct { nodePortSetTCP *IPSet // nodePortSetTCP is the bitmap:port type ipset which stores all UDP node port nodePortSetUDP *IPSet - // lbIngressLocalSet is the hash:ip type ipset which stores all service ip's with externaltrafficPolicy=local - lbIngressLocalSet *IPSet // nodePortLocalSetTCP is the bitmap:port type ipset which stores all TCP nodeport's with externaltrafficPolicy=local nodePortLocalSetTCP *IPSet // nodePortLocalSetUDP is the bitmap:port type ipset which stores all UDP nodeport's with externaltrafficPolicy=local nodePortLocalSetUDP *IPSet // externalIPSet is the hash:ip,port type ipset which stores all service ExternalIP:Port externalIPSet *IPSet - // lbIngressSet is the hash:ip,port type ipset which stores all service load balancer ingress IP:Port. - lbIngressSet *IPSet + // lbSet is the hash:ip,port type ipset which stores all service load balancer IP:Port. + lbSet *IPSet + // lbLocalSet is the hash:ip type ipset which stores all service ip's with externaltrafficPolicy=local + lbLocalSet *IPSet + // lbFWSet is the hash:ip,port type ipset which stores all service load balancer ingress IP:Port for load balancer with sourceRange. + lbFWSet *IPSet // lbWhiteListIPSet is the hash:ip,port,ip type ipset which stores all service load balancer ingress IP:Port,sourceIP pair, any packets // with the source IP visit ingress IP:Port can pass through. lbWhiteListIPSet *IPSet @@ -351,8 +356,9 @@ func NewProxier(ipt utiliptables.Interface, loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6), clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, isIPv6), externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, isIPv6), - lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6), - lbIngressLocalSet: NewIPSet(ipset, KubeLoadBalancerIngressLocalSet, utilipset.HashIPPort, isIPv6), + lbSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6), + lbFWSet: NewIPSet(ipset, KubeLoadbalancerFWSet, utilipset.HashIPPort, isIPv6), + lbLocalSet: NewIPSet(ipset, KubeLoadBalancerLocalSet, utilipset.HashIPPort, isIPv6), lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, isIPv6), lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6), nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), @@ -496,7 +502,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool } // Flush and remove all of our "-t nat" chains. - for _, chain := range []utiliptables.Chain{kubeServicesChain, kubePostroutingChain} { + for _, chain := range []utiliptables.Chain{kubeServicesChain, kubePostroutingChain, KubeNodePortChain, KubeLoadBalancerChain, KubeFireWallChain} { if err := ipt.FlushChain(utiliptables.TableNAT, chain); err != nil { if !utiliptables.IsNotFoundError(err) { glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) @@ -553,8 +559,8 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset // Destroy ip sets created by ipvs Proxier. We should call it after cleaning up // iptables since we can NOT delete ip set which is still referenced by iptables. ipSetsToDestroy := []string{KubeLoopBackIPSet, KubeClusterIPSet, KubeLoadBalancerSet, KubeNodePortSetTCP, KubeNodePortSetUDP, - KubeExternalIPSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet, - KubeLoadBalancerIngressLocalSet, KubeNodePortLocalSetUDP, KubeNodePortLocalSetTCP} + KubeExternalIPSet, KubeLoadbalancerFWSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet, + KubeLoadBalancerLocalSet, KubeNodePortLocalSetUDP, KubeNodePortLocalSetTCP} for _, set := range ipSetsToDestroy { err = ipset.DestroySet(set) if err != nil { @@ -755,7 +761,7 @@ func (proxier *Proxier) syncProxyRules() { // make sure ip sets exists in the system. ipSets := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.externalIPSet, proxier.nodePortSetUDP, proxier.nodePortSetTCP, - proxier.lbIngressSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet, proxier.lbIngressLocalSet, + proxier.lbSet, proxier.lbFWSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet, proxier.lbLocalSet, proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP} if err := ensureIPSets(ipSets...); err != nil { return @@ -788,9 +794,19 @@ func (proxier *Proxier) syncProxyRules() { glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) return } - // `iptables -t nat -N KUBE-FIRE-WALL` + // `iptables -t nat -N KUBE-FIREWALL` if err := proxier.createKubeChain(existingNATChains, KubeFireWallChain); err != nil { - glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err) + glog.Errorf("Failed to create KUBE-FIREWALL chain: %v", err) + return + } + // `iptables -t nat -N KUBE-NODE-PORT` + if err := proxier.createKubeChain(existingNATChains, KubeNodePortChain); err != nil { + glog.Errorf("Failed to create KUBE-NODE-PORT chain: %v", err) + return + } + // `iptables -t nat -N KUBE-LOAD-BALANCER` + if err := proxier.createKubeChain(existingNATChains, KubeLoadBalancerChain); err != nil { + glog.Errorf("Failed to create KUBE-LOAD-BALANCER chain: %v", err) return } // Kube forward @@ -799,12 +815,6 @@ func (proxier *Proxier) syncProxyRules() { return } - // `iptables -t nat -N KUBE-NODE-PORT` - if err := proxier.createKubeChain(existingNATChains, KubeNodePortChain); err != nil { - glog.Errorf("Failed to create KUBE-NODE-PORT chain: %v", err) - return - } - // Build IPVS rules for each service. for svcName, svc := range proxier.serviceMap { svcInfo, ok := svc.(*serviceInfo) @@ -968,23 +978,28 @@ func (proxier *Proxier) syncProxyRules() { // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) // If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying only locally, we can retain the source IP. - if valid := proxier.lbIngressSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name)) + if valid := proxier.lbSet.validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbSet.Name)) continue } - proxier.lbIngressSet.activeEntries.Insert(entry.String()) + proxier.lbSet.activeEntries.Insert(entry.String()) // insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local if svcInfo.OnlyNodeLocalEndpoints { - if valid := proxier.lbIngressLocalSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name)) + if valid := proxier.lbLocalSet.validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbLocalSet.Name)) continue } - proxier.lbIngressLocalSet.activeEntries.Insert(entry.String()) + proxier.lbLocalSet.activeEntries.Insert(entry.String()) } if len(svcInfo.LoadBalancerSourceRanges) != 0 { // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // This currently works for loadbalancers that preserves source ips. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. + if valid := proxier.lbFWSet.validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbFWSet.Name)) + continue + } + proxier.lbFWSet.activeEntries.Insert(entry.String()) allowFromNode := false for _, src := range svcInfo.LoadBalancerSourceRanges { // ipset call @@ -1164,9 +1179,9 @@ func (proxier *Proxier) syncProxyRules() { } // sync ipset entries - ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbIngressSet, proxier.nodePortSetTCP, - proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, proxier.lbWhiteListCIDRSet, proxier.lbIngressLocalSet, - proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP} + ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbSet, proxier.nodePortSetTCP, + proxier.lbFWSet, proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, + proxier.lbWhiteListCIDRSet, proxier.lbLocalSet, proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP} for i := range ipsetsToSync { ipsetsToSync[i].syncIPSetEntries() } @@ -1219,39 +1234,31 @@ func (proxier *Proxier) syncProxyRules() { // This covers cases like GCE load-balancers which get added to the local routing table. writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...) } - if !proxier.lbIngressSet.isEmpty() { + if !proxier.lbSet.isEmpty() { // Build masquerade rules for packets which cross node visit load balancer ingress IPs. args = append(args[:0], "-A", string(kubeServicesChain), - "-m", "set", "--match-set", proxier.lbIngressSet.Name, + "-m", "set", "--match-set", proxier.lbSet.Name, "dst,dst", ) - writeLine(proxier.natRules, append(args, "-j", string(KubeFireWallChain))...) - // Don't masq for service with externaltrafficpolicy =local - if !proxier.lbIngressLocalSet.isEmpty() { - args = append(args[:0], - "-A", string(KubeFireWallChain), - "-m", "set", "--match-set", proxier.lbIngressLocalSet.Name, - "dst,dst", - ) - writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) - } - // mark masq for others - args = append(args[:0], - "-A", string(KubeFireWallChain), - "-m", "comment", "--comment", - fmt.Sprintf(`"mark MASQ for external traffic policy not local"`), - ) - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + writeLine(proxier.natRules, append(args, "-j", string(KubeLoadBalancerChain))...) // if have whitelist, accept or drop. if !proxier.lbWhiteListCIDRSet.isEmpty() || !proxier.lbWhiteListIPSet.isEmpty() { + if !proxier.lbFWSet.isEmpty() { + args = append(args[:0], + "-A", string(KubeLoadBalancerChain), + "-m", "set", "--match-set", proxier.lbFWSet.Name, + "dst,dst", + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeFireWallChain))...) + } if !proxier.lbWhiteListCIDRSet.isEmpty() { args = append(args[:0], "-A", string(KubeFireWallChain), "-m", "set", "--match-set", proxier.lbWhiteListCIDRSet.Name, "dst,dst,src", ) - writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) + writeLine(proxier.natRules, append(args, "-j", "RETURN")...) } if !proxier.lbWhiteListIPSet.isEmpty() { args = append(args[:0], @@ -1259,7 +1266,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "set", "--match-set", proxier.lbWhiteListIPSet.Name, "dst,dst,src", ) - writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) + writeLine(proxier.natRules, append(args, "-j", "RETURN")...) } args = append(args[:0], "-A", string(KubeFireWallChain), @@ -1268,6 +1275,22 @@ func (proxier *Proxier) syncProxyRules() { // It means the packet cannot go thru the firewall, then mark it for DROP writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) } + // Don't masq for service with externaltrafficpolicy =local + if !proxier.lbLocalSet.isEmpty() { + args = append(args[:0], + "-A", string(KubeLoadBalancerChain), + "-m", "set", "--match-set", proxier.lbLocalSet.Name, + "dst,dst", + ) + writeLine(proxier.natRules, append(args, "-j", "RETURN")...) + } + // mark masq for others + args = append(args[:0], + "-A", string(KubeLoadBalancerChain), + "-m", "comment", "--comment", + fmt.Sprintf(`"mark MASQ for external traffic policy not local"`), + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) } if !proxier.nodePortSetTCP.isEmpty() { // Build masquerade rules for packets which cross node visit nodeport. @@ -1424,7 +1447,7 @@ func (proxier *Proxier) syncProxyRules() { } func (proxier *Proxier) acceptIPVSTraffic() { - sets := []*IPSet{proxier.clusterIPSet, proxier.externalIPSet, proxier.lbIngressSet} + sets := []*IPSet{proxier.clusterIPSet, proxier.externalIPSet, proxier.lbSet} for _, set := range sets { var matchType string if !set.isEmpty() { diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index e540c68df68..bcc506ddb2a 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -145,8 +145,9 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false), clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false), externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, false), - lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false), - lbIngressLocalSet: NewIPSet(ipset, KubeLoadBalancerIngressLocalSet, utilipset.HashIPPort, false), + lbSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false), + lbFWSet: NewIPSet(ipset, KubeLoadbalancerFWSet, utilipset.HashIPPort, false), + lbLocalSet: NewIPSet(ipset, KubeLoadBalancerLocalSet, utilipset.HashIPPort, false), lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, false), lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false), nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), @@ -957,6 +958,12 @@ func TestLoadBalanceSourceRanges(t *testing.T) { Protocol: strings.ToLower(string(api.ProtocolTCP)), SetType: utilipset.HashIPPort, }}, + KubeLoadbalancerFWSet: {{ + IP: svcLBIP, + Port: svcPort, + Protocol: strings.ToLower(string(api.ProtocolTCP)), + SetType: utilipset.HashIPPort, + }}, KubeLoadBalancerSourceCIDRSet: {{ IP: svcLBIP, Port: svcPort, @@ -970,10 +977,15 @@ func TestLoadBalanceSourceRanges(t *testing.T) { // Check iptables chain and rules epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): {{ - JumpChain: string(KubeFireWallChain), MatchSet: KubeLoadBalancerSet, + JumpChain: string(KubeLoadBalancerChain), MatchSet: KubeLoadBalancerSet, + }}, + string(KubeLoadBalancerChain): {{ + JumpChain: string(KubeFireWallChain), MatchSet: KubeLoadbalancerFWSet, + }, { + JumpChain: string(KubeMarkMasqChain), MatchSet: "", }}, string(KubeFireWallChain): {{ - JumpChain: "ACCEPT", MatchSet: KubeLoadBalancerSourceCIDRSet, + JumpChain: "RETURN", MatchSet: KubeLoadBalancerSourceCIDRSet, }, { JumpChain: string(KubeMarkDropChain), MatchSet: "", }}, @@ -1109,7 +1121,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { Protocol: strings.ToLower(string(api.ProtocolTCP)), SetType: utilipset.HashIPPort, }}, - KubeLoadBalancerIngressLocalSet: {{ + KubeLoadBalancerLocalSet: {{ IP: svcLBIP, Port: svcPort, Protocol: strings.ToLower(string(api.ProtocolTCP)), @@ -1121,10 +1133,10 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { // Check iptables chain and rules epIpt := netlinktest.ExpectedIptablesChain{ string(kubeServicesChain): {{ - JumpChain: string(KubeFireWallChain), MatchSet: KubeLoadBalancerSet, + JumpChain: string(KubeLoadBalancerChain), MatchSet: KubeLoadBalancerSet, }}, - string(KubeFireWallChain): {{ - JumpChain: "ACCEPT", MatchSet: KubeLoadBalancerIngressLocalSet, + string(KubeLoadBalancerChain): {{ + JumpChain: "RETURN", MatchSet: KubeLoadBalancerLocalSet, }, { JumpChain: string(KubeMarkMasqChain), MatchSet: "", }}, @@ -2580,7 +2592,7 @@ func checkIPSet(t *testing.T, fp *Proxier, ipSet netlinktest.ExpectedIPSet) { for set, entries := range ipSet { ents, err := fp.ipset.ListEntries(set) if err != nil || len(ents) != len(entries) { - t.Errorf("Check ipset entries failed for ipset: %q", set) + t.Errorf("Check ipset entries failed for ipset: %q, expect %d, got %d", set, len(entries), len(ents)) continue } if len(entries) == 1 {