From 056ae4421cacb29c3da7bed4a3aa960a9faf1c1b Mon Sep 17 00:00:00 2001 From: Rohit Ramkumar Date: Tue, 3 Apr 2018 10:32:56 -0700 Subject: [PATCH] Add --ipvs-exclude-cidrs flag to kube-proxy. --- cmd/kube-proxy/app/server.go | 1 + cmd/kube-proxy/app/server_others.go | 1 + cmd/kube-proxy/app/server_test.go | 4 + pkg/proxy/apis/kubeproxyconfig/types.go | 3 + .../apis/kubeproxyconfig/v1alpha1/types.go | 3 + .../v1alpha1/zz_generated.conversion.go | 2 + .../v1alpha1/zz_generated.deepcopy.go | 7 +- .../kubeproxyconfig/validation/validation.go | 12 +++ .../validation/validation_test.go | 72 ++++++++++++++++ .../kubeproxyconfig/zz_generated.deepcopy.go | 7 +- pkg/proxy/ipvs/proxier.go | 35 +++++--- pkg/proxy/ipvs/proxier_test.go | 82 +++++++++++++++++++ 12 files changed, 217 insertions(+), 12 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 93bafe23666..60468d6990c 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -149,6 +149,7 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&o.config.IPTables.MinSyncPeriod.Duration, "iptables-min-sync-period", o.config.IPTables.MinSyncPeriod.Duration, "The minimum interval of how often the iptables rules can be refreshed as endpoints and services change (e.g. '5s', '1m', '2h22m').") fs.DurationVar(&o.config.IPVS.SyncPeriod.Duration, "ipvs-sync-period", o.config.IPVS.SyncPeriod.Duration, "The maximum interval of how often ipvs rules are refreshed (e.g. '5s', '1m', '2h22m'). Must be greater than 0.") fs.DurationVar(&o.config.IPVS.MinSyncPeriod.Duration, "ipvs-min-sync-period", o.config.IPVS.MinSyncPeriod.Duration, "The minimum interval of how often the ipvs rules can be refreshed as endpoints and services change (e.g. '5s', '1m', '2h22m').") + fs.StringSliceVar(&o.config.IPVS.ExcludeCIDRs, "ipvs-exclude-cidrs", o.config.IPVS.ExcludeCIDRs, "A comma-separated list of CIDR's which the ipvs proxier should not touch when cleaning up IPVS rules.") fs.DurationVar(&o.config.ConfigSyncPeriod.Duration, "config-sync-period", o.config.ConfigSyncPeriod.Duration, "How often configuration from the apiserver is refreshed. Must be greater than 0.") fs.BoolVar(&o.config.IPTables.MasqueradeAll, "masquerade-all", o.config.IPTables.MasqueradeAll, "If using the pure iptables proxy, SNAT all traffic sent via Service cluster IPs (this not commonly needed)") fs.StringVar(&o.config.ClusterCIDR, "cluster-cidr", o.config.ClusterCIDR, "The CIDR range of pods in the cluster. When configured, traffic sent to a Service cluster IP from outside this range will be masqueraded and traffic sent from pods to an external LoadBalancer IP will be directed to the respective cluster IP instead") diff --git a/cmd/kube-proxy/app/server_others.go b/cmd/kube-proxy/app/server_others.go index 46b1acbe286..1ab70b1ab52 100644 --- a/cmd/kube-proxy/app/server_others.go +++ b/cmd/kube-proxy/app/server_others.go @@ -189,6 +189,7 @@ func newProxyServer( execer, config.IPVS.SyncPeriod.Duration, config.IPVS.MinSyncPeriod.Duration, + config.IPVS.ExcludeCIDRs, config.IPTables.MasqueradeAll, int(*config.IPTables.MasqueradeBit), config.ClusterCIDR, diff --git a/cmd/kube-proxy/app/server_test.go b/cmd/kube-proxy/app/server_test.go index 8a451e988be..86dd84ba94d 100644 --- a/cmd/kube-proxy/app/server_test.go +++ b/cmd/kube-proxy/app/server_test.go @@ -201,6 +201,9 @@ iptables: ipvs: minSyncPeriod: 10s syncPeriod: 60s + excludeCIDRs: + - "10.20.30.40/16" + - "fd00:1::0/64" kind: KubeProxyConfiguration metricsBindAddress: "%s" mode: "%s" @@ -316,6 +319,7 @@ nodePortAddresses: IPVS: kubeproxyconfig.KubeProxyIPVSConfiguration{ MinSyncPeriod: metav1.Duration{Duration: 10 * time.Second}, SyncPeriod: metav1.Duration{Duration: 60 * time.Second}, + ExcludeCIDRs: []string{"10.20.30.40/16", "fd00:1::0/64"}, }, MetricsBindAddress: tc.metricsBindAddress, Mode: kubeproxyconfig.ProxyMode(tc.mode), diff --git a/pkg/proxy/apis/kubeproxyconfig/types.go b/pkg/proxy/apis/kubeproxyconfig/types.go index b9e662826ae..bb44675267d 100644 --- a/pkg/proxy/apis/kubeproxyconfig/types.go +++ b/pkg/proxy/apis/kubeproxyconfig/types.go @@ -67,6 +67,9 @@ type KubeProxyIPVSConfiguration struct { MinSyncPeriod metav1.Duration // ipvs scheduler Scheduler string + // excludeCIDRs is a list of CIDR's which the ipvs proxier should not touch + // when cleaning up ipvs services. + ExcludeCIDRs []string } // KubeProxyConntrackConfiguration contains conntrack settings for diff --git a/pkg/proxy/apis/kubeproxyconfig/v1alpha1/types.go b/pkg/proxy/apis/kubeproxyconfig/v1alpha1/types.go index b0e44c1fe12..c9936e701f7 100644 --- a/pkg/proxy/apis/kubeproxyconfig/v1alpha1/types.go +++ b/pkg/proxy/apis/kubeproxyconfig/v1alpha1/types.go @@ -63,6 +63,9 @@ type KubeProxyIPVSConfiguration struct { MinSyncPeriod metav1.Duration `json:"minSyncPeriod"` // ipvs scheduler Scheduler string `json:"scheduler"` + // excludeCIDRs is a list of CIDR's which the ipvs proxier should not touch + // when cleaning up ipvs services. + ExcludeCIDRs []string } // KubeProxyConntrackConfiguration contains conntrack settings for diff --git a/pkg/proxy/apis/kubeproxyconfig/v1alpha1/zz_generated.conversion.go b/pkg/proxy/apis/kubeproxyconfig/v1alpha1/zz_generated.conversion.go index 8c8b93549e6..72549ce4239 100644 --- a/pkg/proxy/apis/kubeproxyconfig/v1alpha1/zz_generated.conversion.go +++ b/pkg/proxy/apis/kubeproxyconfig/v1alpha1/zz_generated.conversion.go @@ -206,6 +206,7 @@ func autoConvert_v1alpha1_KubeProxyIPVSConfiguration_To_kubeproxyconfig_KubeProx out.SyncPeriod = in.SyncPeriod out.MinSyncPeriod = in.MinSyncPeriod out.Scheduler = in.Scheduler + out.ExcludeCIDRs = *(*[]string)(unsafe.Pointer(&in.ExcludeCIDRs)) return nil } @@ -218,6 +219,7 @@ func autoConvert_kubeproxyconfig_KubeProxyIPVSConfiguration_To_v1alpha1_KubeProx out.SyncPeriod = in.SyncPeriod out.MinSyncPeriod = in.MinSyncPeriod out.Scheduler = in.Scheduler + out.ExcludeCIDRs = *(*[]string)(unsafe.Pointer(&in.ExcludeCIDRs)) return nil } diff --git a/pkg/proxy/apis/kubeproxyconfig/v1alpha1/zz_generated.deepcopy.go b/pkg/proxy/apis/kubeproxyconfig/v1alpha1/zz_generated.deepcopy.go index 9382b6d2bd4..e6839b8673c 100644 --- a/pkg/proxy/apis/kubeproxyconfig/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/proxy/apis/kubeproxyconfig/v1alpha1/zz_generated.deepcopy.go @@ -54,7 +54,7 @@ func (in *KubeProxyConfiguration) DeepCopyInto(out *KubeProxyConfiguration) { } out.ClientConnection = in.ClientConnection in.IPTables.DeepCopyInto(&out.IPTables) - out.IPVS = in.IPVS + in.IPVS.DeepCopyInto(&out.IPVS) if in.OOMScoreAdj != nil { in, out := &in.OOMScoreAdj, &out.OOMScoreAdj if *in == nil { @@ -186,6 +186,11 @@ func (in *KubeProxyIPVSConfiguration) DeepCopyInto(out *KubeProxyIPVSConfigurati *out = *in out.SyncPeriod = in.SyncPeriod out.MinSyncPeriod = in.MinSyncPeriod + if in.ExcludeCIDRs != nil { + in, out := &in.ExcludeCIDRs, &out.ExcludeCIDRs + *out = make([]string, len(*in)) + copy(*out, *in) + } return } diff --git a/pkg/proxy/apis/kubeproxyconfig/validation/validation.go b/pkg/proxy/apis/kubeproxyconfig/validation/validation.go index d28453e2bb1..bd193c060a8 100644 --- a/pkg/proxy/apis/kubeproxyconfig/validation/validation.go +++ b/pkg/proxy/apis/kubeproxyconfig/validation/validation.go @@ -116,6 +116,7 @@ func validateKubeProxyIPVSConfiguration(config kubeproxyconfig.KubeProxyIPVSConf } allErrs = append(allErrs, validateIPVSSchedulerMethod(kubeproxyconfig.IPVSSchedulerMethod(config.Scheduler), fldPath.Child("Scheduler"))...) + allErrs = append(allErrs, validateIPVSExcludeCIDRs(config.ExcludeCIDRs, fldPath.Child("ExcludeCidrs"))...) return allErrs } @@ -253,3 +254,14 @@ func validateKubeProxyNodePortAddress(nodePortAddresses []string, fldPath *field return allErrs } + +func validateIPVSExcludeCIDRs(excludeCIDRs []string, fldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + for i := range excludeCIDRs { + if _, _, err := net.ParseCIDR(excludeCIDRs[i]); err != nil { + allErrs = append(allErrs, field.Invalid(fldPath, excludeCIDRs, "must be a valid IP block")) + } + } + return allErrs +} diff --git a/pkg/proxy/apis/kubeproxyconfig/validation/validation_test.go b/pkg/proxy/apis/kubeproxyconfig/validation/validation_test.go index aeb53a3e5a2..fbdd8c4078a 100644 --- a/pkg/proxy/apis/kubeproxyconfig/validation/validation_test.go +++ b/pkg/proxy/apis/kubeproxyconfig/validation/validation_test.go @@ -749,3 +749,75 @@ func TestValidateKubeProxyNodePortAddress(t *testing.T) { } } } + +func TestValidateKubeProxyExcludeCIDRs(t *testing.T) { + // TODO(rramkumar): This test is a copy of TestValidateKubeProxyNodePortAddress. + // Maybe some code can be shared? + newPath := field.NewPath("KubeProxyConfiguration") + + successCases := []struct { + addresses []string + }{ + {[]string{}}, + {[]string{"127.0.0.0/8"}}, + {[]string{"0.0.0.0/0"}}, + {[]string{"::/0"}}, + {[]string{"127.0.0.1/32", "1.2.3.0/24"}}, + {[]string{"127.0.0.0/8"}}, + {[]string{"127.0.0.1/32"}}, + {[]string{"::1/128"}}, + {[]string{"1.2.3.4/32"}}, + {[]string{"10.20.30.0/24"}}, + {[]string{"10.20.0.0/16", "100.200.0.0/16"}}, + {[]string{"10.0.0.0/8"}}, + {[]string{"2001:db8::/32"}}, + } + + for _, successCase := range successCases { + if errs := validateIPVSExcludeCIDRs(successCase.addresses, newPath.Child("ExcludeCIDRs")); len(errs) != 0 { + t.Errorf("expected success: %v", errs) + } + } + + errorCases := []struct { + addresses []string + msg string + }{ + { + addresses: []string{"foo"}, + msg: "must be a valid IP block", + }, + { + addresses: []string{"1.2.3"}, + msg: "must be a valid IP block", + }, + { + addresses: []string{""}, + msg: "must be a valid IP block", + }, + { + addresses: []string{"10.20.30.40"}, + msg: "must be a valid IP block", + }, + { + addresses: []string{"::1"}, + msg: "must be a valid IP block", + }, + { + addresses: []string{"2001:db8:1"}, + msg: "must be a valid IP block", + }, + { + addresses: []string{"2001:db8:xyz/64"}, + msg: "must be a valid IP block", + }, + } + + for _, errorCase := range errorCases { + if errs := validateIPVSExcludeCIDRs(errorCase.addresses, newPath.Child("ExcludeCIDRs")); len(errs) == 0 { + t.Errorf("expected failure for %s", errorCase.msg) + } else if !strings.Contains(errs[0].Error(), errorCase.msg) { + t.Errorf("unexpected error: %v, expected: %s", errs[0], errorCase.msg) + } + } +} diff --git a/pkg/proxy/apis/kubeproxyconfig/zz_generated.deepcopy.go b/pkg/proxy/apis/kubeproxyconfig/zz_generated.deepcopy.go index f8d5636ddd4..db491d40c00 100644 --- a/pkg/proxy/apis/kubeproxyconfig/zz_generated.deepcopy.go +++ b/pkg/proxy/apis/kubeproxyconfig/zz_generated.deepcopy.go @@ -76,7 +76,7 @@ func (in *KubeProxyConfiguration) DeepCopyInto(out *KubeProxyConfiguration) { } out.ClientConnection = in.ClientConnection in.IPTables.DeepCopyInto(&out.IPTables) - out.IPVS = in.IPVS + in.IPVS.DeepCopyInto(&out.IPVS) if in.OOMScoreAdj != nil { in, out := &in.OOMScoreAdj, &out.OOMScoreAdj if *in == nil { @@ -208,6 +208,11 @@ func (in *KubeProxyIPVSConfiguration) DeepCopyInto(out *KubeProxyIPVSConfigurati *out = *in out.SyncPeriod = in.SyncPeriod out.MinSyncPeriod = in.MinSyncPeriod + if in.ExcludeCIDRs != nil { + in, out := &in.ExcludeCIDRs, &out.ExcludeCIDRs + *out = make([]string, len(*in)) + copy(*out, *in) + } return } diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 43cf636313c..4ed3c29ee95 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -127,8 +127,10 @@ type Proxier struct { syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules // These are effectively const and do not need the mutex to be held. - syncPeriod time.Duration - minSyncPeriod time.Duration + syncPeriod time.Duration + minSyncPeriod time.Duration + // Values are CIDR's to exclude when cleaning up IPVS rules. + excludeCIDRs []string iptables utiliptables.Interface ipvs utilipvs.Interface ipset utilipset.Interface @@ -256,6 +258,7 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration, minSyncPeriod time.Duration, + excludeCIDRs []string, masqueradeAll bool, masqueradeBit int, clusterCIDR string, @@ -1459,16 +1462,28 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode return nil } -func (proxier *Proxier) cleanLegacyService(atciveServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer) { +func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer) { unbindIPAddr := sets.NewString() - for cS := range currentServices { - if !atciveServices[cS] { - svc := currentServices[cS] - err := proxier.ipvs.DeleteVirtualServer(svc) - if err != nil { - glog.Errorf("Failed to delete service, error: %v", err) + for cs := range currentServices { + svc := currentServices[cs] + if _, ok := activeServices[cs]; !ok { + // This service was not processed in the latest sync loop so before deleting it, + // make sure it does not fall within an excluded CIDR range. + okayToDelete := true + for _, excludedCIDR := range proxier.excludeCIDRs { + // Any validation of this CIDR already should have occurred. + _, n, _ := net.ParseCIDR(excludedCIDR) + if n.Contains(svc.Address) { + okayToDelete = false + break + } + } + if okayToDelete { + if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil { + glog.Errorf("Failed to delete service, error: %v", err) + } + unbindIPAddr.Insert(svc.Address.String()) } - unbindIPAddr.Insert(svc.Address.String()) } } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index f5d062b8d5a..43f337a3484 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -125,6 +125,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil), + excludeCIDRs: make([]string, 0), iptables: ipt, ipvs: ipvs, ipset: ipset, @@ -2393,6 +2394,87 @@ func Test_syncService(t *testing.T) { } } +func Test_cleanLegacyService(t *testing.T) { + // All ipvs services that were processed in the latest sync loop. + activeServices := map[string]bool{"ipvs0": true, "ipvs1": true} + // All ipvs services in the system. + currentServices := map[string]*utilipvs.VirtualServer{ + // Created by kube-proxy. + "ipvs0": { + Address: net.ParseIP("1.1.1.1"), + Protocol: string(api.ProtocolUDP), + Port: 53, + Scheduler: "rr", + Flags: utilipvs.FlagHashed, + }, + // Created by kube-proxy. + "ipvs1": { + Address: net.ParseIP("2.2.2.2"), + Protocol: string(api.ProtocolUDP), + Port: 54, + Scheduler: "rr", + Flags: utilipvs.FlagHashed, + }, + // Created by an external party. + "ipvs2": { + Address: net.ParseIP("3.3.3.3"), + Protocol: string(api.ProtocolUDP), + Port: 55, + Scheduler: "rr", + Flags: utilipvs.FlagHashed, + }, + // Created by an external party. + "ipvs3": { + Address: net.ParseIP("4.4.4.4"), + Protocol: string(api.ProtocolUDP), + Port: 56, + Scheduler: "rr", + Flags: utilipvs.FlagHashed, + }, + // Created by an external party. + "ipvs4": { + Address: net.ParseIP("5.5.5.5"), + Protocol: string(api.ProtocolUDP), + Port: 57, + Scheduler: "rr", + Flags: utilipvs.FlagHashed, + }, + // Created by kube-proxy, but now stale. + "ipvs5": { + Address: net.ParseIP("6.6.6.6"), + Protocol: string(api.ProtocolUDP), + Port: 58, + Scheduler: "rr", + Flags: utilipvs.FlagHashed, + }, + } + + ipt := iptablestest.NewFake() + ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake(testIPSetVersion) + proxier := NewFakeProxier(ipt, ipvs, ipset, nil) + // These CIDRs cover only ipvs2 and ipvs3. + proxier.excludeCIDRs = []string{"3.3.3.0/24", "4.4.4.0/24"} + for v := range currentServices { + proxier.ipvs.AddVirtualServer(currentServices[v]) + } + proxier.cleanLegacyService(activeServices, currentServices) + // ipvs4 and ipvs5 should have been cleaned. + remainingVirtualServers, _ := proxier.ipvs.GetVirtualServers() + if len(remainingVirtualServers) != 4 { + t.Errorf("Expected number of remaining IPVS services after cleanup to be %v. Got %v", 4, len(remainingVirtualServers)) + } + for _, vs := range remainingVirtualServers { + // Checking that ipvs4 and ipvs5 were removed. + if vs.Port == 57 { + t.Errorf("Expected ipvs4 to be removed after cleanup. It still remains") + } + if vs.Port == 58 { + t.Errorf("Expected ipvs5 to be removed after cleanup. It still remains") + } + } +} + func buildFakeProxier(nodeIP []net.IP) (*iptablestest.FakeIPTables, *Proxier) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake()