From 339f92c175932d46bd0ef64b419948dbb97b7feb Mon Sep 17 00:00:00 2001 From: Lion-Wei Date: Tue, 8 May 2018 20:32:48 +0800 Subject: [PATCH] abstract duplicated code in ipvs proxier --- pkg/proxy/ipvs/ipset.go | 15 +- pkg/proxy/ipvs/proxier.go | 922 ++++++++++++++------------------- pkg/proxy/ipvs/proxier_test.go | 68 ++- 3 files changed, 424 insertions(+), 581 deletions(-) diff --git a/pkg/proxy/ipvs/ipset.go b/pkg/proxy/ipvs/ipset.go index 7f361d64039..5da72749b42 100644 --- a/pkg/proxy/ipvs/ipset.go +++ b/pkg/proxy/ipvs/ipset.go @@ -21,6 +21,7 @@ import ( utilipset "k8s.io/kubernetes/pkg/util/ipset" utilversion "k8s.io/kubernetes/pkg/util/version" + "fmt" "github.com/golang/glog" ) @@ -107,6 +108,10 @@ func (set *IPSet) isEmpty() bool { return len(set.activeEntries.UnsortedList()) == 0 } +func (set *IPSet) getComment() string { + return fmt.Sprintf("\"%s\"", set.Comment) +} + func (set *IPSet) resetEntries() { set.activeEntries = sets.NewString() } @@ -146,12 +151,10 @@ func (set *IPSet) syncIPSetEntries() { } } -func ensureIPSets(ipSets ...*IPSet) error { - for _, set := range ipSets { - if err := set.handle.CreateSet(&set.IPSet, true); err != nil { - glog.Errorf("Failed to make sure ip set: %v exist, error: %v", set, err) - return err - } +func ensureIPSet(set *IPSet) error { + if err := set.handle.CreateSet(&set.IPSet, true); err != nil { + glog.Errorf("Failed to make sure ip set: %v exist, error: %v", set, err) + return err } return nil } diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 21b5048663a..ff0930c6fee 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -16,10 +16,6 @@ limitations under the License. package ipvs -// -// NOTE: this needs to be tested in e2e since it uses ipvs for everything. -// - import ( "bytes" "fmt" @@ -84,13 +80,73 @@ const ( DefaultDummyDevice = "kube-ipvs0" ) -// tableChainsWithJumpService is the iptables chains ipvs proxy mode used. -var tableChainsWithJumpService = []struct { +// iptablesJumpChain is tables of iptables chains that ipvs proxier used to install iptables or cleanup iptables. +// `to` is the iptables chain we want to operate. +// `from` is the source iptables chain +var iptablesJumpChain = []struct { + table utiliptables.Table + from utiliptables.Chain + to utiliptables.Chain + comment string +}{ + {utiliptables.TableNAT, utiliptables.ChainOutput, kubeServicesChain, "kubernetes service portals"}, + {utiliptables.TableNAT, utiliptables.ChainPrerouting, kubeServicesChain, "kubernetes service portals"}, + {utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, "kubernetes postrouting rules"}, + {utiliptables.TableFilter, utiliptables.ChainForward, KubeForwardChain, "kubernetes forwarding rules"}, +} + +var iptablesChains = []struct { table utiliptables.Table chain utiliptables.Chain }{ - {utiliptables.TableNAT, utiliptables.ChainOutput}, - {utiliptables.TableNAT, utiliptables.ChainPrerouting}, + {utiliptables.TableNAT, kubeServicesChain}, + {utiliptables.TableNAT, kubePostroutingChain}, + {utiliptables.TableNAT, KubeFireWallChain}, + {utiliptables.TableNAT, KubeNodePortChain}, + {utiliptables.TableNAT, KubeLoadBalancerChain}, + {utiliptables.TableNAT, KubeMarkMasqChain}, + {utiliptables.TableFilter, KubeForwardChain}, +} + +// ipsetInfo is all ipset we needed in ipvs proxier +var ipsetInfo = []struct { + name string + setType utilipset.Type + isIPv6 bool + comment string +}{ + {kubeLoopBackIPSet, utilipset.HashIPPortIP, true, kubeLoopBackIPSetComment}, + {kubeClusterIPSet, utilipset.HashIPPort, true, kubeClusterIPSetComment}, + {kubeExternalIPSet, utilipset.HashIPPort, true, kubeExternalIPSetComment}, + {kubeLoadBalancerSet, utilipset.HashIPPort, true, kubeLoadBalancerSetComment}, + {kubeLoadbalancerFWSet, utilipset.HashIPPort, true, kubeLoadbalancerFWSetComment}, + {kubeLoadBalancerLocalSet, utilipset.HashIPPort, true, kubeLoadBalancerLocalSetComment}, + {kubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, true, kubeLoadBalancerSourceIPSetComment}, + {kubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, true, kubeLoadBalancerSourceCIDRSetComment}, + {kubeNodePortSetTCP, utilipset.BitmapPort, false, kubeNodePortSetTCPComment}, + {kubeNodePortLocalSetTCP, utilipset.BitmapPort, false, kubeNodePortLocalSetTCPComment}, + {kubeNodePortSetUDP, utilipset.BitmapPort, false, kubeNodePortSetUDPComment}, + {kubeNodePortLocalSetUDP, utilipset.BitmapPort, false, kubeNodePortLocalSetUDPComment}, +} + +// ipsetWithIptablesChain is the ipsets list with iptables source chain and the chain jump to +// `iptables -t nat -A -m set --match-set -j ` +// example: iptables -t nat -A KUBE-SERVICES -m set --match-set KUBE-NODE-PORT-TCP dst -j KUBE-NODE-PORT +// ipsets with ohter match rules will be create Individually. +var ipsetWithIptablesChain = []struct { + name string + from string + to string + matchType string +}{ + {kubeLoopBackIPSet, string(kubePostroutingChain), "MASQUERADE", "dst,dst,src"}, + {kubeLoadBalancerSet, string(kubeServicesChain), string(KubeLoadBalancerChain), "dst,dst"}, + {kubeLoadbalancerFWSet, string(KubeLoadBalancerChain), string(KubeFireWallChain), "dst,dst"}, + {kubeLoadBalancerSourceCIDRSet, string(KubeFireWallChain), "RETURN", "dst,dst,src"}, + {kubeLoadBalancerSourceIPSet, string(KubeFireWallChain), "RETURN", "dst,dst,src"}, + {kubeLoadBalancerLocalSet, string(KubeLoadBalancerChain), "RETURN", "dst,dst"}, + {kubeNodePortSetTCP, string(kubeServicesChain), string(KubeNodePortChain), "dst"}, + {kubeNodePortLocalSetTCP, string(KubeNodePortChain), "RETURN", "dst"}, } var ipvsModules = []string{ @@ -159,32 +215,8 @@ type Proxier struct { filterRules *bytes.Buffer // Added as a member to the struct to allow injection for testing. netlinkHandle NetLinkHandle - // loopbackSet is the ipset which stores all endpoints IP:Port,IP for solving hairpin mode purpose. - loopbackSet *IPSet - // clusterIPSet is the ipset which stores all service ClusterIP:Port - clusterIPSet *IPSet - // nodePortSetTCP is the bitmap:port type ipset which stores all TCP node port - nodePortSetTCP *IPSet - // nodePortSetTCP is the bitmap:port type ipset which stores all UDP node port - nodePortSetUDP *IPSet - // nodePortLocalSetTCP is the bitmap:port type ipset which stores all TCP nodeport's with externaltrafficPolicy=local - nodePortLocalSetTCP *IPSet - // nodePortLocalSetUDP is the bitmap:port type ipset which stores all UDP nodeport's with externaltrafficPolicy=local - nodePortLocalSetUDP *IPSet - // externalIPSet is the hash:ip,port type ipset which stores all service ExternalIP:Port - externalIPSet *IPSet - // lbSet is the hash:ip,port type ipset which stores all service load balancer IP:Port. - lbSet *IPSet - // lbLocalSet is the hash:ip type ipset which stores all service ip's with externaltrafficPolicy=local - lbLocalSet *IPSet - // lbFWSet is the hash:ip,port type ipset which stores all service load balancer ingress IP:Port for load balancer with sourceRange. - lbFWSet *IPSet - // lbWhiteListIPSet is the hash:ip,port,ip type ipset which stores all service load balancer ingress IP:Port,sourceIP pair, any packets - // with the source IP visit ingress IP:Port can pass through. - lbWhiteListIPSet *IPSet - // lbWhiteListIPSet is the hash:ip,port,net type ipset which stores all service load balancer ingress IP:Port,sourceCIDR pair, any packets - // from the source CIDR visit ingress IP:Port can pass through. - lbWhiteListCIDRSet *IPSet + // ipsetList is the list of ipsets that ipvs proxier used. + ipsetList map[string]*IPSet // Values are as a parameter to select the interfaces which nodeport works. nodePortAddresses []string // networkInterfacer defines an interface for several net library functions. @@ -325,48 +357,44 @@ func NewProxier(ipt utiliptables.Interface, healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps proxier := &Proxier{ - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), - endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder), - syncPeriod: syncPeriod, - minSyncPeriod: minSyncPeriod, - iptables: ipt, - masqueradeAll: masqueradeAll, - masqueradeMark: masqueradeMark, - exec: exec, - clusterCIDR: clusterCIDR, - hostname: hostname, - nodeIP: nodeIP, - portMapper: &listenPortOpener{}, - recorder: recorder, - healthChecker: healthChecker, - healthzServer: healthzServer, - ipvs: ipvs, - ipvsScheduler: scheduler, - ipGetter: &realIPGetter{nl: NewNetLinkHandle()}, - iptablesData: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - filterChains: bytes.NewBuffer(nil), - filterRules: bytes.NewBuffer(nil), - netlinkHandle: NewNetLinkHandle(), - ipset: ipset, - loopbackSet: NewIPSet(ipset, kubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6, kubeLoopBackIPSetComment), - clusterIPSet: NewIPSet(ipset, kubeClusterIPSet, utilipset.HashIPPort, isIPv6, kubeClusterIPSetComment), - externalIPSet: NewIPSet(ipset, kubeExternalIPSet, utilipset.HashIPPort, isIPv6, kubeExternalIPSetComment), - lbSet: NewIPSet(ipset, kubeLoadBalancerSet, utilipset.HashIPPort, isIPv6, kubeLoadBalancerSetComment), - lbFWSet: NewIPSet(ipset, kubeLoadbalancerFWSet, utilipset.HashIPPort, isIPv6, kubeLoadbalancerFWSetComment), - lbLocalSet: NewIPSet(ipset, kubeLoadBalancerLocalSet, utilipset.HashIPPort, isIPv6, kubeLoadBalancerLocalSetComment), - lbWhiteListIPSet: NewIPSet(ipset, kubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, isIPv6, kubeLoadBalancerSourceIPSetComment), - lbWhiteListCIDRSet: NewIPSet(ipset, kubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6, kubeLoadBalancerSourceCIDRSetComment), - nodePortSetTCP: NewIPSet(ipset, kubeNodePortSetTCP, utilipset.BitmapPort, false, kubeNodePortSetTCPComment), - nodePortLocalSetTCP: NewIPSet(ipset, kubeNodePortLocalSetTCP, utilipset.BitmapPort, false, kubeNodePortLocalSetTCPComment), - nodePortSetUDP: NewIPSet(ipset, kubeNodePortSetUDP, utilipset.BitmapPort, false, kubeNodePortSetUDPComment), - nodePortLocalSetUDP: NewIPSet(ipset, kubeNodePortLocalSetUDP, utilipset.BitmapPort, false, kubeNodePortLocalSetUDPComment), - nodePortAddresses: nodePortAddresses, - networkInterfacer: utilproxy.RealNetwork{}, + portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + serviceMap: make(proxy.ServiceMap), + serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), + endpointsMap: make(proxy.EndpointsMap), + endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder), + syncPeriod: syncPeriod, + minSyncPeriod: minSyncPeriod, + iptables: ipt, + masqueradeAll: masqueradeAll, + masqueradeMark: masqueradeMark, + exec: exec, + clusterCIDR: clusterCIDR, + hostname: hostname, + nodeIP: nodeIP, + portMapper: &listenPortOpener{}, + recorder: recorder, + healthChecker: healthChecker, + healthzServer: healthzServer, + ipvs: ipvs, + ipvsScheduler: scheduler, + ipGetter: &realIPGetter{nl: NewNetLinkHandle()}, + iptablesData: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), + filterChains: bytes.NewBuffer(nil), + filterRules: bytes.NewBuffer(nil), + netlinkHandle: NewNetLinkHandle(), + ipset: ipset, + nodePortAddresses: nodePortAddresses, + networkInterfacer: utilproxy.RealNetwork{}, + } + // initialize ipsetList with all sets we needed + proxier.ipsetList = make(map[string]*IPSet) + for _, is := range ipsetInfo { + if is.isIPv6 { + proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, isIPv6, is.comment) + } + proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, false, is.comment) } burstSyncs := 2 glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) @@ -463,13 +491,13 @@ func CanUseIPVSProxier(handle KernelHandler, ipsetver IPSetVersioner) (bool, err // CleanupIptablesLeftovers removes all iptables rules and chains created by the Proxier // It returns true if an error was encountered. Errors are logged. func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool) { - // Unlink the services chain. - args := []string{ - "-m", "comment", "--comment", "kubernetes service portals", - "-j", string(kubeServicesChain), - } - for _, tc := range tableChainsWithJumpService { - if err := ipt.DeleteRule(tc.table, tc.chain, args...); err != nil { + // Unlink the iptables chains created by ipvs Proxier + for _, jc := range iptablesJumpChain { + args := []string{ + "-m", "comment", "--comment", jc.comment, + "-j", string(jc.to), + } + if err := ipt.DeleteRule(jc.table, jc.from, args...); err != nil { if !utiliptables.IsNotFoundError(err) { glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) encounteredError = true @@ -477,58 +505,21 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool } } - // Unlink the postrouting chain. - args = []string{ - "-m", "comment", "--comment", "kubernetes postrouting rules", - "-j", string(kubePostroutingChain), - } - if err := ipt.DeleteRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { - if !utiliptables.IsNotFoundError(err) { - glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) - encounteredError = true - } - } - - // Unlink the forwarding chain. - args = []string{ - "-m", "comment", "--comment", "kubernetes forwarding rules", - "-j", string(KubeForwardChain), - } - if err := ipt.DeleteRule(utiliptables.TableFilter, utiliptables.ChainForward, args...); err != nil { - if !utiliptables.IsNotFoundError(err) { - glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) - encounteredError = true - } - } - - // Flush and remove all of our "-t nat" chains. - for _, chain := range []utiliptables.Chain{kubeServicesChain, kubePostroutingChain, KubeNodePortChain, KubeLoadBalancerChain, KubeFireWallChain} { - if err := ipt.FlushChain(utiliptables.TableNAT, chain); err != nil { + // Flush and remove all of our chains. + for _, ch := range iptablesChains { + if err := ipt.FlushChain(ch.table, ch.chain); err != nil { if !utiliptables.IsNotFoundError(err) { glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) encounteredError = true } } - if err := ipt.DeleteChain(utiliptables.TableNAT, chain); err != nil { + if err := ipt.DeleteChain(ch.table, ch.chain); err != nil { if !utiliptables.IsNotFoundError(err) { glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) encounteredError = true } } } - // Flush and remove all of our "-t filter" chains. - if err := ipt.FlushChain(utiliptables.TableFilter, KubeForwardChain); err != nil { - if !utiliptables.IsNotFoundError(err) { - glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) - encounteredError = true - } - } - if err := ipt.DeleteChain(utiliptables.TableFilter, KubeForwardChain); err != nil { - if !utiliptables.IsNotFoundError(err) { - glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) - encounteredError = true - } - } return encounteredError } @@ -558,11 +549,8 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset encounteredError = cleanupIptablesLeftovers(ipt) || encounteredError // Destroy ip sets created by ipvs Proxier. We should call it after cleaning up // iptables since we can NOT delete ip set which is still referenced by iptables. - ipSetsToDestroy := []string{kubeClusterIPSet, kubeClusterIPSet, kubeLoadBalancerSet, kubeNodePortSetTCP, kubeNodePortSetUDP, - kubeExternalIPSet, kubeLoadbalancerFWSet, kubeLoadBalancerSourceIPSet, kubeLoadBalancerSourceCIDRSet, - kubeLoadBalancerLocalSet, kubeNodePortLocalSetUDP, kubeNodePortLocalSetTCP} - for _, set := range ipSetsToDestroy { - err = ipset.DestroySet(set) + for _, set := range ipsetInfo { + err = ipset.DestroySet(set.name) if err != nil { if !utilipset.IsNotFoundError(err) { glog.Errorf("Error removing ipset %s, error: %v", set, err) @@ -693,25 +681,7 @@ func (proxier *Proxier) syncProxyRules() { glog.V(3).Infof("Syncing ipvs Proxier rules") // Begin install iptables - // Get iptables-save output so we can check for existing chains and rules. - // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore - existingFilterChains := make(map[utiliptables.Chain]string) - proxier.iptablesData.Reset() - err := proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.iptablesData) - if err != nil { // if we failed to get any rules - glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) - } else { // otherwise parse the output - existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.iptablesData.Bytes()) - } - existingNATChains := make(map[utiliptables.Chain]string) - proxier.iptablesData.Reset() - err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData) - if err != nil { // if we failed to get any rules - glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) - } else { // otherwise parse the output - existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes()) - } // Reset all buffers used later. // This is to avoid memory reallocations and thus improve performance. proxier.natChains.Reset() @@ -721,53 +691,21 @@ func (proxier *Proxier) syncProxyRules() { writeLine(proxier.filterChains, "*filter") writeLine(proxier.natChains, "*nat") - // Make sure we keep stats for the top-level chains, if they existed - // (which most should have because we created them above). - if chain, ok := existingNATChains[kubePostroutingChain]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain)) - } - // Install the kubernetes-specific postrouting rules. We use a whole chain for - // this so that it is easier to flush and change, for example if the mark - // value should ever change. - writeLine(proxier.natRules, []string{ - "-A", string(kubePostroutingChain), - "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, - "-m", "mark", "--mark", proxier.masqueradeMark, - "-j", "MASQUERADE", - }...) - - if chain, ok := existingNATChains[KubeMarkMasqChain]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(KubeMarkMasqChain)) - } - // Install the kubernetes-specific masquerade mark rule. We use a whole chain for - // this so that it is easier to flush and change, for example if the mark - // value should ever change. - writeLine(proxier.natRules, []string{ - "-A", string(KubeMarkMasqChain), - "-j", "MARK", "--set-xmark", proxier.masqueradeMark, - }...) - // End install iptables + proxier.createAndLinkeKubeChain() // make sure dummy interface exists in the system where ipvs Proxier will bind service address on it - _, err = proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice) + _, err := proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice) if err != nil { glog.Errorf("Failed to create dummy interface: %s, error: %v", DefaultDummyDevice, err) return } // make sure ip sets exists in the system. - ipSets := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.externalIPSet, proxier.nodePortSetUDP, proxier.nodePortSetTCP, - proxier.lbSet, proxier.lbFWSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet, proxier.lbLocalSet, - proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP} - if err := ensureIPSets(ipSets...); err != nil { - return - } - for i := range ipSets { - ipSets[i].resetEntries() + for _, set := range proxier.ipsetList { + if err := ensureIPSet(set); err != nil { + return + } + set.resetEntries() } // Accumulate the set of local ports that we will be holding open once this update is complete @@ -777,44 +715,6 @@ func (proxier *Proxier) syncProxyRules() { // currentIPVSServices represent IPVS services listed from the system currentIPVSServices := make(map[string]*utilipvs.VirtualServer) - // We are creating those slices ones here to avoid memory reallocations - // in every loop. Note that reuse the memory, instead of doing: - // slice = - // you should always do one of the below: - // slice = slice[:0] // and then append to it - // slice = append(slice[:0], ...) - // To avoid growing this slice, we arbitrarily set its size to 64, - // there is never more than that many arguments for a single line. - // Note that even if we go over 64, it will still be correct - it - // is just for efficiency, not correctness. - args := make([]string, 64) - - // Kube service portal - if err := proxier.linkKubeServiceChain(existingNATChains, proxier.natChains); err != nil { - glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) - return - } - // `iptables -t nat -N KUBE-FIREWALL` - if err := proxier.createKubeChain(existingNATChains, KubeFireWallChain); err != nil { - glog.Errorf("Failed to create KUBE-FIREWALL chain: %v", err) - return - } - // `iptables -t nat -N KUBE-NODE-PORT` - if err := proxier.createKubeChain(existingNATChains, KubeNodePortChain); err != nil { - glog.Errorf("Failed to create KUBE-NODE-PORT chain: %v", err) - return - } - // `iptables -t nat -N KUBE-LOAD-BALANCER` - if err := proxier.createKubeChain(existingNATChains, KubeLoadBalancerChain); err != nil { - glog.Errorf("Failed to create KUBE-LOAD-BALANCER chain: %v", err) - return - } - // Kube forward - if err := proxier.linkKubeForwardChain(existingFilterChains, proxier.filterChains); err != nil { - glog.Errorf("Failed to create and link KUBE-FORWARD chain: %v", err) - return - } - // Build IPVS rules for each service. for svcName, svc := range proxier.serviceMap { svcInfo, ok := svc.(*serviceInfo) @@ -847,11 +747,11 @@ func (proxier *Proxier) syncProxyRules() { IP2: epIP, SetType: utilipset.HashIPPortIP, } - if valid := proxier.loopbackSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.loopbackSet.Name)) + if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoopBackIPSet].Name)) continue } - proxier.loopbackSet.activeEntries.Insert(entry.String()) + proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String()) } // Capture the clusterIP. @@ -866,11 +766,11 @@ func (proxier *Proxier) syncProxyRules() { // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) // Install masquerade rules if 'masqueradeAll' or 'clusterCIDR' is specified. if proxier.masqueradeAll || len(proxier.clusterCIDR) > 0 { - if valid := proxier.clusterIPSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.clusterIPSet.Name)) + if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name)) continue } - proxier.clusterIPSet.activeEntries.Insert(entry.String()) + proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String()) } // ipvs call serv := &utilipvs.VirtualServer{ @@ -937,11 +837,11 @@ func (proxier *Proxier) syncProxyRules() { SetType: utilipset.HashIPPort, } // We have to SNAT packets to external IPs. - if valid := proxier.externalIPSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.externalIPSet.Name)) + if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPSet].Name)) continue } - proxier.externalIPSet.activeEntries.Insert(entry.String()) + proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String()) // ipvs call serv := &utilipvs.VirtualServer{ @@ -978,28 +878,28 @@ func (proxier *Proxier) syncProxyRules() { // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) // If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying only locally, we can retain the source IP. - if valid := proxier.lbSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbSet.Name)) + if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSet].Name)) continue } - proxier.lbSet.activeEntries.Insert(entry.String()) + proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String()) // insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local if svcInfo.OnlyNodeLocalEndpoints { - if valid := proxier.lbLocalSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbLocalSet.Name)) + if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name)) continue } - proxier.lbLocalSet.activeEntries.Insert(entry.String()) + proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String()) } if len(svcInfo.LoadBalancerSourceRanges) != 0 { // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // This currently works for loadbalancers that preserves source ips. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. - if valid := proxier.lbFWSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbFWSet.Name)) + if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadbalancerFWSet].Name)) continue } - proxier.lbFWSet.activeEntries.Insert(entry.String()) + proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String()) allowFromNode := false for _, src := range svcInfo.LoadBalancerSourceRanges { // ipset call @@ -1011,11 +911,11 @@ func (proxier *Proxier) syncProxyRules() { SetType: utilipset.HashIPPortNet, } // enumerate all white list source cidr - if valid := proxier.lbWhiteListCIDRSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbWhiteListCIDRSet.Name)) + if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name)) continue } - proxier.lbWhiteListCIDRSet.activeEntries.Insert(entry.String()) + proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String()) // ignore error because it has been validated _, cidr, _ := net.ParseCIDR(src) @@ -1035,11 +935,11 @@ func (proxier *Proxier) syncProxyRules() { SetType: utilipset.HashIPPortIP, } // enumerate all white list source ip - if valid := proxier.lbWhiteListIPSet.validateEntry(entry); !valid { - glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbWhiteListIPSet.Name)) + if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid { + glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name)) continue } - proxier.lbWhiteListIPSet.activeEntries.Insert(entry.String()) + proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String()) } } @@ -1120,9 +1020,9 @@ func (proxier *Proxier) syncProxyRules() { var nodePortSet *IPSet switch protocol { case "tcp": - nodePortSet = proxier.nodePortSetTCP + nodePortSet = proxier.ipsetList[kubeNodePortSetTCP] case "udp": - nodePortSet = proxier.nodePortSetUDP + nodePortSet = proxier.ipsetList[kubeNodePortSetUDP] default: // It should never hit glog.Errorf("Unsupported protocol type: %s", protocol) @@ -1140,9 +1040,9 @@ func (proxier *Proxier) syncProxyRules() { var nodePortLocalSet *IPSet switch protocol { case "tcp": - nodePortLocalSet = proxier.nodePortLocalSetTCP + nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetTCP] case "udp": - nodePortLocalSet = proxier.nodePortLocalSetUDP + nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetUDP] default: // It should never hit glog.Errorf("Unsupported protocol type: %s", protocol) @@ -1195,225 +1095,13 @@ func (proxier *Proxier) syncProxyRules() { } // sync ipset entries - ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbSet, proxier.nodePortSetTCP, - proxier.lbFWSet, proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, - proxier.lbWhiteListCIDRSet, proxier.lbLocalSet, proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP} - for i := range ipsetsToSync { - ipsetsToSync[i].syncIPSetEntries() + for _, set := range proxier.ipsetList { + set.syncIPSetEntries() } // Tail call iptables rules for ipset, make sure only call iptables once // in a single loop per ip set. - if !proxier.loopbackSet.isEmpty() { - args = append(args[:0], - "-A", string(kubePostroutingChain), - "-m", "comment", "--comment", `"Kubernetes endpoints dst ip:port, source ip for solving hairpin purpose"`, - "-m", "set", "--match-set", proxier.loopbackSet.Name, - "dst,dst,src", - ) - writeLine(proxier.natRules, append(args, "-j", "MASQUERADE")...) - } - if !proxier.clusterIPSet.isEmpty() { - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", `"Kubernetes service cluster ip + port for masquerade purpose"`, - "-m", "set", "--match-set", proxier.clusterIPSet.Name, - "dst,dst", - ) - if proxier.masqueradeAll { - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - } else if len(proxier.clusterCIDR) > 0 { - // This masquerades off-cluster traffic to a service VIP. The idea - // is that you can establish a static route for your Service range, - // routing to any node, and that node will bridge into the Service - // for you. Since that might bounce off-node, we masquerade here. - // If/when we support "Local" policy for VIPs, we should update this. - writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) - } - } - if !proxier.externalIPSet.isEmpty() { - // Build masquerade rules for packets to external IPs. - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", `"Kubernetes service external ip + port for masquerade and filter purpose"`, - "-m", "set", "--match-set", proxier.externalIPSet.Name, - "dst,dst", - ) - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) - // nor from a local process to be forwarded to the service. - // This rule roughly translates to "all traffic from off-machine". - // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later. - externalTrafficOnlyArgs := append(args, - "-m", "physdev", "!", "--physdev-is-in", - "-m", "addrtype", "!", "--src-type", "LOCAL") - writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", "ACCEPT")...) - dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") - // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. - // This covers cases like GCE load-balancers which get added to the local routing table. - writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...) - } - if !proxier.lbSet.isEmpty() { - // Build masquerade rules for packets which cross node visit load balancer ingress IPs. - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", `"Kubernetes service lb portal"`, - "-m", "set", "--match-set", proxier.lbSet.Name, - "dst,dst", - ) - writeLine(proxier.natRules, append(args, "-j", string(KubeLoadBalancerChain))...) - // if have whitelist, accept or drop. - if !proxier.lbWhiteListCIDRSet.isEmpty() || !proxier.lbWhiteListIPSet.isEmpty() { - if !proxier.lbFWSet.isEmpty() { - args = append(args[:0], - "-A", string(KubeLoadBalancerChain), - "-m", "comment", "--comment", `"Kubernetes service load balancer ip + port for load balancer with sourceRange"`, - "-m", "set", "--match-set", proxier.lbFWSet.Name, - "dst,dst", - ) - writeLine(proxier.natRules, append(args, "-j", string(KubeFireWallChain))...) - } - if !proxier.lbWhiteListCIDRSet.isEmpty() { - args = append(args[:0], - "-A", string(KubeFireWallChain), - "-m", "comment", "--comment", `"Kubernetes service load balancer ip + port + source IP for packet filter purpose"`, - "-m", "set", "--match-set", proxier.lbWhiteListCIDRSet.Name, - "dst,dst,src", - ) - writeLine(proxier.natRules, append(args, "-j", "RETURN")...) - } - if !proxier.lbWhiteListIPSet.isEmpty() { - args = append(args[:0], - "-A", string(KubeFireWallChain), - "-m", "comment", "--comment", `"Kubernetes service load balancer ip + port + source cidr for packet filter purpose"`, - "-m", "set", "--match-set", proxier.lbWhiteListIPSet.Name, - "dst,dst,src", - ) - writeLine(proxier.natRules, append(args, "-j", "RETURN")...) - } - args = append(args[:0], - "-A", string(KubeFireWallChain), - ) - // If the packet was able to reach the end of firewall chain, then it did not get DNATed. - // It means the packet cannot go thru the firewall, then mark it for DROP - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) - } - // Don't masq for service with externaltrafficpolicy =local - if !proxier.lbLocalSet.isEmpty() { - args = append(args[:0], - "-A", string(KubeLoadBalancerChain), - "-m", "comment", "--comment", `"Kubernetes service load balancer ip + port with externalTrafficPolicy=local"`, - "-m", "set", "--match-set", proxier.lbLocalSet.Name, - "dst,dst", - ) - writeLine(proxier.natRules, append(args, "-j", "RETURN")...) - } - // mark masq for others - args = append(args[:0], - "-A", string(KubeLoadBalancerChain), - "-m", "comment", "--comment", - fmt.Sprintf(`"mark MASQ for external traffic policy not local"`), - ) - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - } - if !proxier.nodePortSetTCP.isEmpty() { - // Build masquerade rules for packets which cross node visit nodeport. - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "tcp", "-p", "tcp", - "-m", "comment", "--comment", `"Kubernetes nodeport TCP port for masquerade purpose"`, - "-m", "set", "--match-set", proxier.nodePortSetTCP.Name, - "dst", - ) - writeLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...) - // accept for nodeports w/ externaltrafficpolicy=local - if !proxier.nodePortLocalSetTCP.isEmpty() { - args = append(args[:0], - "-A", string(KubeNodePortChain), - "-m", "comment", "--comment", `"Kubernetes nodeport TCP port with externalTrafficPolicy=local"`, - "-m", "set", "--match-set", proxier.nodePortLocalSetTCP.Name, - "dst", - ) - writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) - } - // mark masq for others - args = append(args[:0], - "-A", string(KubeNodePortChain), - "-m", "comment", "--comment", - fmt.Sprintf(`"mark MASQ for externaltrafficpolicy=cluster"`), - ) - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - } - if !proxier.nodePortSetUDP.isEmpty() { - // accept for nodeports w/ externaltrafficpolicy=local - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "udp", "-p", "udp", - "-m", "comment", "--comment", `"Kubernetes nodeport UDP port for masquerade purpose"`, - "-m", "set", "--match-set", proxier.nodePortSetUDP.Name, - "dst", - ) - writeLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...) - if !proxier.nodePortLocalSetUDP.isEmpty() { - args = append(args[:0], - "-A", string(KubeNodePortChain), - "-m", "comment", "--comment", `"Kubernetes nodeport UDP port with externalTrafficPolicy=local"`, - "-m", "set", "--match-set", proxier.nodePortLocalSetUDP.Name, - "dst", - ) - writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) - } - // mark masq for others - args = append(args[:0], - "-A", string(KubeNodePortChain), - "-m", "comment", "--comment", - fmt.Sprintf(`"mark MASQ for externaltrafficpolicy=cluster"`), - ) - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - } - - // Accept all traffic with destination of ipvs virtual service, in case other iptables rules - // block the traffic, that may result in ipvs rules invalid. - // Those rules must be in the end of KUBE-SERVICE chain - proxier.acceptIPVSTraffic() - - // If the masqueradeMark has been added then we want to forward that same - // traffic, this allows NodePort traffic to be forwarded even if the default - // FORWARD policy is not accept. - writeLine(proxier.filterRules, - "-A", string(KubeForwardChain), - "-m", "comment", "--comment", `"kubernetes forwarding rules"`, - "-m", "mark", "--mark", proxier.masqueradeMark, - "-j", "ACCEPT", - ) - - // The following rules can only be set if clusterCIDR has been defined. - if len(proxier.clusterCIDR) != 0 { - // The following two rules ensure the traffic after the initial packet - // accepted by the "kubernetes forwarding rules" rule above will be - // accepted, to be as specific as possible the traffic must be sourced - // or destined to the clusterCIDR (to/from a pod). - writeLine(proxier.filterRules, - "-A", string(KubeForwardChain), - "-s", proxier.clusterCIDR, - "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`, - "-m", "conntrack", - "--ctstate", "RELATED,ESTABLISHED", - "-j", "ACCEPT", - ) - writeLine(proxier.filterRules, - "-A", string(KubeForwardChain), - "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`, - "-d", proxier.clusterCIDR, - "-m", "conntrack", - "--ctstate", "RELATED,ESTABLISHED", - "-j", "ACCEPT", - ) - } - - // Write the end-of-table markers. - writeLine(proxier.filterRules, "COMMIT") - writeLine(proxier.natRules, "COMMIT") + proxier.writeIptablesRules() // Sync iptables rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. @@ -1474,12 +1162,172 @@ func (proxier *Proxier) syncProxyRules() { proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) } +// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed +// according to proxier.ipsetList information and the ipset match relationship that `ipsetWithIptablesChain` specified. +// some ipset(kubeClusterIPSet for example) have particular match rules and iptables jump relation should be sync separately. +func (proxier *Proxier) writeIptablesRules() { + // We are creating those slices ones here to avoid memory reallocations + // in every loop. Note that reuse the memory, instead of doing: + // slice = + // you should always do one of the below: + // slice = slice[:0] // and then append to it + // slice = append(slice[:0], ...) + // To avoid growing this slice, we arbitrarily set its size to 64, + // there is never more than that many arguments for a single line. + // Note that even if we go over 64, it will still be correct - it + // is just for efficiency, not correctness. + args := make([]string, 64) + + for _, set := range ipsetWithIptablesChain { + if _, find := proxier.ipsetList[set.name]; find && !proxier.ipsetList[set.name].isEmpty() { + args = append(args[:0], + "-A", set.from, + "-m", "comment", "--comment", proxier.ipsetList[set.name].getComment(), + "-m", "set", "--match-set", set.name, + set.matchType, + ) + writeLine(proxier.natRules, append(args, "-j", set.to)...) + } + } + + if !proxier.ipsetList[kubeClusterIPSet].isEmpty() { + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", proxier.ipsetList[kubeClusterIPSet].getComment(), + "-m", "set", "--match-set", kubeClusterIPSet, + "dst,dst", + ) + if proxier.masqueradeAll { + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + } else if len(proxier.clusterCIDR) > 0 { + // This masquerades off-cluster traffic to a service VIP. The idea + // is that you can establish a static route for your Service range, + // routing to any node, and that node will bridge into the Service + // for you. Since that might bounce off-node, we masquerade here. + // If/when we support "Local" policy for VIPs, we should update this. + writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) + } + } + + if !proxier.ipsetList[kubeExternalIPSet].isEmpty() { + // Build masquerade rules for packets to external IPs. + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(), + "-m", "set", "--match-set", kubeExternalIPSet, + "dst,dst", + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) + // nor from a local process to be forwarded to the service. + // This rule roughly translates to "all traffic from off-machine". + // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later. + externalTrafficOnlyArgs := append(args, + "-m", "physdev", "!", "--physdev-is-in", + "-m", "addrtype", "!", "--src-type", "LOCAL") + writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", "ACCEPT")...) + dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") + // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. + // This covers cases like GCE load-balancers which get added to the local routing table. + writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...) + } + + if !proxier.ipsetList[kubeNodePortSetUDP].isEmpty() { + // accept for nodeports w/ externaltrafficpolicy=local + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "udp", "-p", "udp", + "-m", "comment", "--comment", proxier.ipsetList[kubeNodePortSetUDP].getComment(), + "-m", "set", "--match-set", kubeNodePortSetUDP, + "dst", + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...) + if !proxier.ipsetList[kubeNodePortLocalSetUDP].isEmpty() { + args = append(args[:0], + "-A", string(KubeNodePortChain), + "-m", "comment", "--comment", proxier.ipsetList[kubeNodePortLocalSetUDP].getComment(), + "-m", "set", "--match-set", kubeNodePortLocalSetUDP, + "dst", + ) + writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) + } + // mark masq for others + args = append(args[:0], + "-A", string(KubeNodePortChain), + "-m", "comment", "--comment", + fmt.Sprintf(`"mark MASQ for externaltrafficpolicy=cluster"`), + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + } + + // mark masq for KUBE-NODE-PORT + writeLine(proxier.natRules, []string{ + "-A", string(KubeNodePortChain), + "-j", string(KubeMarkMasqChain), + }...) + + // mark drop for KUBE-LOAD-BALANCER + writeLine(proxier.natRules, []string{ + "-A", string(KubeLoadBalancerChain), + "-j", string(KubeMarkMasqChain), + }...) + + // mark drop for KUBE-FIRE-WALL + writeLine(proxier.natRules, []string{ + "-A", string(KubeFireWallChain), + "-j", string(KubeMarkDropChain), + }...) + + // Accept all traffic with destination of ipvs virtual service, in case other iptables rules + // block the traffic, that may result in ipvs rules invalid. + // Those rules must be in the end of KUBE-SERVICE chain + proxier.acceptIPVSTraffic() + + // If the masqueradeMark has been added then we want to forward that same + // traffic, this allows NodePort traffic to be forwarded even if the default + // FORWARD policy is not accept. + writeLine(proxier.filterRules, + "-A", string(KubeForwardChain), + "-m", "comment", "--comment", `"kubernetes forwarding rules"`, + "-m", "mark", "--mark", proxier.masqueradeMark, + "-j", "ACCEPT", + ) + + // The following rules can only be set if clusterCIDR has been defined. + if len(proxier.clusterCIDR) != 0 { + // The following two rules ensure the traffic after the initial packet + // accepted by the "kubernetes forwarding rules" rule above will be + // accepted, to be as specific as possible the traffic must be sourced + // or destined to the clusterCIDR (to/from a pod). + writeLine(proxier.filterRules, + "-A", string(KubeForwardChain), + "-s", proxier.clusterCIDR, + "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`, + "-m", "conntrack", + "--ctstate", "RELATED,ESTABLISHED", + "-j", "ACCEPT", + ) + writeLine(proxier.filterRules, + "-A", string(KubeForwardChain), + "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`, + "-d", proxier.clusterCIDR, + "-m", "conntrack", + "--ctstate", "RELATED,ESTABLISHED", + "-j", "ACCEPT", + ) + } + + // Write the end-of-table markers. + writeLine(proxier.filterRules, "COMMIT") + writeLine(proxier.natRules, "COMMIT") +} + func (proxier *Proxier) acceptIPVSTraffic() { - sets := []*IPSet{proxier.clusterIPSet, proxier.externalIPSet, proxier.lbSet} + sets := []string{kubeClusterIPSet, kubeLoadBalancerSet} for _, set := range sets { var matchType string - if !set.isEmpty() { - switch set.SetType { + if !proxier.ipsetList[set].isEmpty() { + switch proxier.ipsetList[set].SetType { case utilipset.BitmapPort: matchType = "dst" default: @@ -1487,13 +1335,78 @@ func (proxier *Proxier) acceptIPVSTraffic() { } writeLine(proxier.natRules, []string{ "-A", string(kubeServicesChain), - "-m", "set", "--match-set", set.Name, matchType, + "-m", "set", "--match-set", set, matchType, "-j", "ACCEPT", }...) } } } +// createAndLinkeKubeChain create all kube chains that ipvs proxier need and write basic link. +func (proxier *Proxier) createAndLinkeKubeChain() { + existingFilterChains := proxier.getExistingChains(utiliptables.TableFilter) + existingNATChains := proxier.getExistingChains(utiliptables.TableNAT) + + // Make sure we keep stats for the top-level chains + for _, ch := range iptablesChains { + if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil { + glog.Errorf("Failed to ensure that %s chain %s exists: %v", ch.table, ch.chain, err) + return + } + if ch.table == utiliptables.TableNAT { + if chain, ok := existingNATChains[ch.chain]; ok { + writeLine(proxier.natChains, chain) + } else { + writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain)) + } + } else { + if chain, ok := existingFilterChains[KubeForwardChain]; ok { + writeLine(proxier.filterChains, chain) + } else { + writeLine(proxier.filterChains, utiliptables.MakeChainLine(KubeForwardChain)) + } + } + } + + for _, jc := range iptablesJumpChain { + args := []string{"-m", "comment", "--comment", jc.comment, "-j", string(jc.to)} + if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jc.table, jc.from, args...); err != nil { + glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jc.table, jc.from, jc.to, err) + } + } + + // Install the kubernetes-specific postrouting rules. We use a whole chain for + // this so that it is easier to flush and change, for example if the mark + // value should ever change. + writeLine(proxier.natRules, []string{ + "-A", string(kubePostroutingChain), + "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, + "-m", "mark", "--mark", proxier.masqueradeMark, + "-j", "MASQUERADE", + }...) + + // Install the kubernetes-specific masquerade mark rule. We use a whole chain for + // this so that it is easier to flush and change, for example if the mark + // value should ever change. + writeLine(proxier.natRules, []string{ + "-A", string(KubeMarkMasqChain), + "-j", "MARK", "--set-xmark", proxier.masqueradeMark, + }...) +} + +// getExistingChains get iptables-save output so we can check for existing chains and rules. +// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore +func (proxier *Proxier) getExistingChains(table utiliptables.Table) map[utiliptables.Chain]string { + proxier.iptablesData.Reset() + err := proxier.iptables.SaveInto(table, proxier.iptablesData) + if err != nil { // if we failed to get any rules + glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) + } else { // otherwise parse the output + return utiliptables.GetChainLines(table, proxier.iptablesData.Bytes()) + } + return nil +} + // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we // risk sending more traffic to it, all of which will be lost (because UDP). // This assumes the proxier mutex is held @@ -1653,73 +1566,6 @@ func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, curre } } -// linkKubeServiceChain will Create chain KUBE-SERVICES and link the chin in PREROUTING and OUTPUT - -// Chain PREROUTING (policy ACCEPT) -// target prot opt source destination -// KUBE-SERVICES all -- 0.0.0.0/0 0.0.0.0/0 - -// Chain OUTPUT (policy ACCEPT) -// target prot opt source destination -// KUBE-SERVICES all -- 0.0.0.0/0 0.0.0.0/0 - -// Chain KUBE-SERVICES (2 references) -func (proxier *Proxier) linkKubeServiceChain(existingNATChains map[utiliptables.Chain]string, natChains *bytes.Buffer) error { - if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, kubeServicesChain); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubeServicesChain, err) - } - comment := "kubernetes service portals" - args := []string{"-m", "comment", "--comment", comment, "-j", string(kubeServicesChain)} - for _, tc := range tableChainsWithJumpService { - if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeServicesChain, err) - } - } - - // equal to `iptables -t nat -N KUBE-SERVICES` - // write `:KUBE-SERVICES - [0:0]` in nat table - if chain, ok := existingNATChains[kubeServicesChain]; ok { - writeLine(natChains, chain) - } else { - writeLine(natChains, utiliptables.MakeChainLine(kubeServicesChain)) - } - return nil -} - -// `iptables -t nat -N ` -func (proxier *Proxier) createKubeChain(existingNATChains map[utiliptables.Chain]string, chainName utiliptables.Chain) error { - if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, chainName); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, chainName, err) - } - - // write `: - [0:0]` in nat table - if chain, ok := existingNATChains[chainName]; ok { - writeLine(proxier.natChains, chain) - } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName)) - } - return nil -} - -func (proxier *Proxier) linkKubeForwardChain(existingFilterChains map[utiliptables.Chain]string, filterChains *bytes.Buffer) error { - if _, err := proxier.iptables.EnsureChain(utiliptables.TableFilter, KubeForwardChain); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableFilter, KubeForwardChain, err) - } - - comment := "kubernetes forward rules" - args := []string{"-m", "comment", "--comment", comment, "-j", string(KubeForwardChain)} - if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableFilter, utiliptables.ChainForward, args...); err != nil { - return fmt.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", utiliptables.TableFilter, utiliptables.ChainForward, KubeForwardChain, err) - } - - if chain, ok := existingFilterChains[KubeForwardChain]; ok { - writeLine(filterChains, chain) - } else { - writeLine(filterChains, utiliptables.MakeChainLine(KubeForwardChain)) - } - return nil -} - // Join all words with spaces, terminate with newline and write to buff. func writeLine(buf *bytes.Buffer, words ...string) { // We avoid strings.Join for performance reasons. diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 611d30ae008..4a0543fa294 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -119,43 +119,37 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } + // initialize ipsetList with all sets we needed + ipsetList := make(map[string]*IPSet) + for _, is := range ipsetInfo { + ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, false, is.comment) + } return &Proxier{ - exec: fexec, - serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), - endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil), - excludeCIDRs: make([]string, 0), - iptables: ipt, - ipvs: ipvs, - ipset: ipset, - clusterCIDR: "10.0.0.0/24", - hostname: testHostname, - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, - healthChecker: newFakeHealthChecker(), - ipvsScheduler: DefaultScheduler, - ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, - iptablesData: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - filterChains: bytes.NewBuffer(nil), - filterRules: bytes.NewBuffer(nil), - netlinkHandle: netlinktest.NewFakeNetlinkHandle(), - loopbackSet: NewIPSet(ipset, kubeLoopBackIPSet, utilipset.HashIPPortIP, false, kubeLoopBackIPSetComment), - clusterIPSet: NewIPSet(ipset, kubeClusterIPSet, utilipset.HashIPPort, false, kubeClusterIPSetComment), - externalIPSet: NewIPSet(ipset, kubeExternalIPSet, utilipset.HashIPPort, false, kubeExternalIPSetComment), - lbSet: NewIPSet(ipset, kubeLoadBalancerSet, utilipset.HashIPPort, false, kubeLoadBalancerSetComment), - lbFWSet: NewIPSet(ipset, kubeLoadbalancerFWSet, utilipset.HashIPPort, false, kubeLoadbalancerFWSetComment), - lbLocalSet: NewIPSet(ipset, kubeLoadBalancerLocalSet, utilipset.HashIPPort, false, kubeLoadBalancerLocalSetComment), - lbWhiteListIPSet: NewIPSet(ipset, kubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, false, kubeLoadBalancerSourceIPSetComment), - lbWhiteListCIDRSet: NewIPSet(ipset, kubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false, kubeLoadBalancerSourceCIDRSetComment), - nodePortSetTCP: NewIPSet(ipset, kubeNodePortSetTCP, utilipset.BitmapPort, false, kubeNodePortSetTCPComment), - nodePortLocalSetTCP: NewIPSet(ipset, kubeNodePortLocalSetTCP, utilipset.BitmapPort, false, kubeNodePortSetTCPComment), - nodePortLocalSetUDP: NewIPSet(ipset, kubeNodePortLocalSetUDP, utilipset.BitmapPort, false, kubeNodePortLocalSetUDPComment), - nodePortSetUDP: NewIPSet(ipset, kubeNodePortSetUDP, utilipset.BitmapPort, false, kubeNodePortSetUDPComment), - nodePortAddresses: make([]string, 0), - networkInterfacer: proxyutiltest.NewFakeNetwork(), + exec: fexec, + serviceMap: make(proxy.ServiceMap), + serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), + endpointsMap: make(proxy.EndpointsMap), + endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil), + excludeCIDRs: make([]string, 0), + iptables: ipt, + ipvs: ipvs, + ipset: ipset, + clusterCIDR: "10.0.0.0/24", + hostname: testHostname, + portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}}, + healthChecker: newFakeHealthChecker(), + ipvsScheduler: DefaultScheduler, + ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, + iptablesData: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), + filterChains: bytes.NewBuffer(nil), + filterRules: bytes.NewBuffer(nil), + netlinkHandle: netlinktest.NewFakeNetlinkHandle(), + ipsetList: ipsetList, + nodePortAddresses: make([]string, 0), + networkInterfacer: proxyutiltest.NewFakeNetwork(), } } @@ -887,7 +881,7 @@ func TestOnlyLocalNodePorts(t *testing.T) { JumpChain: string(KubeNodePortChain), MatchSet: kubeNodePortSetTCP, }}, string(KubeNodePortChain): {{ - JumpChain: "ACCEPT", MatchSet: kubeNodePortLocalSetTCP, + JumpChain: "RETURN", MatchSet: kubeNodePortLocalSetTCP, }, { JumpChain: string(KubeMarkMasqChain), MatchSet: "", }},