Add --ipvs-exclude-cidrs flag to kube-proxy.

This commit is contained in:
Rohit Ramkumar
2018-04-03 10:32:56 -07:00
parent d4ded15f8a
commit 056ae4421c
12 changed files with 217 additions and 12 deletions

View File

@@ -67,6 +67,9 @@ type KubeProxyIPVSConfiguration struct {
MinSyncPeriod metav1.Duration
// ipvs scheduler
Scheduler string
// excludeCIDRs is a list of CIDR's which the ipvs proxier should not touch
// when cleaning up ipvs services.
ExcludeCIDRs []string
}
// KubeProxyConntrackConfiguration contains conntrack settings for

View File

@@ -63,6 +63,9 @@ type KubeProxyIPVSConfiguration struct {
MinSyncPeriod metav1.Duration `json:"minSyncPeriod"`
// ipvs scheduler
Scheduler string `json:"scheduler"`
// excludeCIDRs is a list of CIDR's which the ipvs proxier should not touch
// when cleaning up ipvs services.
ExcludeCIDRs []string
}
// KubeProxyConntrackConfiguration contains conntrack settings for

View File

@@ -206,6 +206,7 @@ func autoConvert_v1alpha1_KubeProxyIPVSConfiguration_To_kubeproxyconfig_KubeProx
out.SyncPeriod = in.SyncPeriod
out.MinSyncPeriod = in.MinSyncPeriod
out.Scheduler = in.Scheduler
out.ExcludeCIDRs = *(*[]string)(unsafe.Pointer(&in.ExcludeCIDRs))
return nil
}
@@ -218,6 +219,7 @@ func autoConvert_kubeproxyconfig_KubeProxyIPVSConfiguration_To_v1alpha1_KubeProx
out.SyncPeriod = in.SyncPeriod
out.MinSyncPeriod = in.MinSyncPeriod
out.Scheduler = in.Scheduler
out.ExcludeCIDRs = *(*[]string)(unsafe.Pointer(&in.ExcludeCIDRs))
return nil
}

View File

@@ -54,7 +54,7 @@ func (in *KubeProxyConfiguration) DeepCopyInto(out *KubeProxyConfiguration) {
}
out.ClientConnection = in.ClientConnection
in.IPTables.DeepCopyInto(&out.IPTables)
out.IPVS = in.IPVS
in.IPVS.DeepCopyInto(&out.IPVS)
if in.OOMScoreAdj != nil {
in, out := &in.OOMScoreAdj, &out.OOMScoreAdj
if *in == nil {
@@ -186,6 +186,11 @@ func (in *KubeProxyIPVSConfiguration) DeepCopyInto(out *KubeProxyIPVSConfigurati
*out = *in
out.SyncPeriod = in.SyncPeriod
out.MinSyncPeriod = in.MinSyncPeriod
if in.ExcludeCIDRs != nil {
in, out := &in.ExcludeCIDRs, &out.ExcludeCIDRs
*out = make([]string, len(*in))
copy(*out, *in)
}
return
}

View File

@@ -116,6 +116,7 @@ func validateKubeProxyIPVSConfiguration(config kubeproxyconfig.KubeProxyIPVSConf
}
allErrs = append(allErrs, validateIPVSSchedulerMethod(kubeproxyconfig.IPVSSchedulerMethod(config.Scheduler), fldPath.Child("Scheduler"))...)
allErrs = append(allErrs, validateIPVSExcludeCIDRs(config.ExcludeCIDRs, fldPath.Child("ExcludeCidrs"))...)
return allErrs
}
@@ -253,3 +254,14 @@ func validateKubeProxyNodePortAddress(nodePortAddresses []string, fldPath *field
return allErrs
}
func validateIPVSExcludeCIDRs(excludeCIDRs []string, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
for i := range excludeCIDRs {
if _, _, err := net.ParseCIDR(excludeCIDRs[i]); err != nil {
allErrs = append(allErrs, field.Invalid(fldPath, excludeCIDRs, "must be a valid IP block"))
}
}
return allErrs
}

View File

@@ -749,3 +749,75 @@ func TestValidateKubeProxyNodePortAddress(t *testing.T) {
}
}
}
func TestValidateKubeProxyExcludeCIDRs(t *testing.T) {
// TODO(rramkumar): This test is a copy of TestValidateKubeProxyNodePortAddress.
// Maybe some code can be shared?
newPath := field.NewPath("KubeProxyConfiguration")
successCases := []struct {
addresses []string
}{
{[]string{}},
{[]string{"127.0.0.0/8"}},
{[]string{"0.0.0.0/0"}},
{[]string{"::/0"}},
{[]string{"127.0.0.1/32", "1.2.3.0/24"}},
{[]string{"127.0.0.0/8"}},
{[]string{"127.0.0.1/32"}},
{[]string{"::1/128"}},
{[]string{"1.2.3.4/32"}},
{[]string{"10.20.30.0/24"}},
{[]string{"10.20.0.0/16", "100.200.0.0/16"}},
{[]string{"10.0.0.0/8"}},
{[]string{"2001:db8::/32"}},
}
for _, successCase := range successCases {
if errs := validateIPVSExcludeCIDRs(successCase.addresses, newPath.Child("ExcludeCIDRs")); len(errs) != 0 {
t.Errorf("expected success: %v", errs)
}
}
errorCases := []struct {
addresses []string
msg string
}{
{
addresses: []string{"foo"},
msg: "must be a valid IP block",
},
{
addresses: []string{"1.2.3"},
msg: "must be a valid IP block",
},
{
addresses: []string{""},
msg: "must be a valid IP block",
},
{
addresses: []string{"10.20.30.40"},
msg: "must be a valid IP block",
},
{
addresses: []string{"::1"},
msg: "must be a valid IP block",
},
{
addresses: []string{"2001:db8:1"},
msg: "must be a valid IP block",
},
{
addresses: []string{"2001:db8:xyz/64"},
msg: "must be a valid IP block",
},
}
for _, errorCase := range errorCases {
if errs := validateIPVSExcludeCIDRs(errorCase.addresses, newPath.Child("ExcludeCIDRs")); len(errs) == 0 {
t.Errorf("expected failure for %s", errorCase.msg)
} else if !strings.Contains(errs[0].Error(), errorCase.msg) {
t.Errorf("unexpected error: %v, expected: %s", errs[0], errorCase.msg)
}
}
}

View File

@@ -76,7 +76,7 @@ func (in *KubeProxyConfiguration) DeepCopyInto(out *KubeProxyConfiguration) {
}
out.ClientConnection = in.ClientConnection
in.IPTables.DeepCopyInto(&out.IPTables)
out.IPVS = in.IPVS
in.IPVS.DeepCopyInto(&out.IPVS)
if in.OOMScoreAdj != nil {
in, out := &in.OOMScoreAdj, &out.OOMScoreAdj
if *in == nil {
@@ -208,6 +208,11 @@ func (in *KubeProxyIPVSConfiguration) DeepCopyInto(out *KubeProxyIPVSConfigurati
*out = *in
out.SyncPeriod = in.SyncPeriod
out.MinSyncPeriod = in.MinSyncPeriod
if in.ExcludeCIDRs != nil {
in, out := &in.ExcludeCIDRs, &out.ExcludeCIDRs
*out = make([]string, len(*in))
copy(*out, *in)
}
return
}

View File

@@ -127,8 +127,10 @@ type Proxier struct {
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
// These are effectively const and do not need the mutex to be held.
syncPeriod time.Duration
minSyncPeriod time.Duration
syncPeriod time.Duration
minSyncPeriod time.Duration
// Values are CIDR's to exclude when cleaning up IPVS rules.
excludeCIDRs []string
iptables utiliptables.Interface
ipvs utilipvs.Interface
ipset utilipset.Interface
@@ -256,6 +258,7 @@ func NewProxier(ipt utiliptables.Interface,
exec utilexec.Interface,
syncPeriod time.Duration,
minSyncPeriod time.Duration,
excludeCIDRs []string,
masqueradeAll bool,
masqueradeBit int,
clusterCIDR string,
@@ -1459,16 +1462,28 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
return nil
}
func (proxier *Proxier) cleanLegacyService(atciveServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer) {
func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer) {
unbindIPAddr := sets.NewString()
for cS := range currentServices {
if !atciveServices[cS] {
svc := currentServices[cS]
err := proxier.ipvs.DeleteVirtualServer(svc)
if err != nil {
glog.Errorf("Failed to delete service, error: %v", err)
for cs := range currentServices {
svc := currentServices[cs]
if _, ok := activeServices[cs]; !ok {
// This service was not processed in the latest sync loop so before deleting it,
// make sure it does not fall within an excluded CIDR range.
okayToDelete := true
for _, excludedCIDR := range proxier.excludeCIDRs {
// Any validation of this CIDR already should have occurred.
_, n, _ := net.ParseCIDR(excludedCIDR)
if n.Contains(svc.Address) {
okayToDelete = false
break
}
}
if okayToDelete {
if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
glog.Errorf("Failed to delete service, error: %v", err)
}
unbindIPAddr.Insert(svc.Address.String())
}
unbindIPAddr.Insert(svc.Address.String())
}
}

View File

@@ -125,6 +125,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil),
excludeCIDRs: make([]string, 0),
iptables: ipt,
ipvs: ipvs,
ipset: ipset,
@@ -2393,6 +2394,87 @@ func Test_syncService(t *testing.T) {
}
}
func Test_cleanLegacyService(t *testing.T) {
// All ipvs services that were processed in the latest sync loop.
activeServices := map[string]bool{"ipvs0": true, "ipvs1": true}
// All ipvs services in the system.
currentServices := map[string]*utilipvs.VirtualServer{
// Created by kube-proxy.
"ipvs0": {
Address: net.ParseIP("1.1.1.1"),
Protocol: string(api.ProtocolUDP),
Port: 53,
Scheduler: "rr",
Flags: utilipvs.FlagHashed,
},
// Created by kube-proxy.
"ipvs1": {
Address: net.ParseIP("2.2.2.2"),
Protocol: string(api.ProtocolUDP),
Port: 54,
Scheduler: "rr",
Flags: utilipvs.FlagHashed,
},
// Created by an external party.
"ipvs2": {
Address: net.ParseIP("3.3.3.3"),
Protocol: string(api.ProtocolUDP),
Port: 55,
Scheduler: "rr",
Flags: utilipvs.FlagHashed,
},
// Created by an external party.
"ipvs3": {
Address: net.ParseIP("4.4.4.4"),
Protocol: string(api.ProtocolUDP),
Port: 56,
Scheduler: "rr",
Flags: utilipvs.FlagHashed,
},
// Created by an external party.
"ipvs4": {
Address: net.ParseIP("5.5.5.5"),
Protocol: string(api.ProtocolUDP),
Port: 57,
Scheduler: "rr",
Flags: utilipvs.FlagHashed,
},
// Created by kube-proxy, but now stale.
"ipvs5": {
Address: net.ParseIP("6.6.6.6"),
Protocol: string(api.ProtocolUDP),
Port: 58,
Scheduler: "rr",
Flags: utilipvs.FlagHashed,
},
}
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
proxier := NewFakeProxier(ipt, ipvs, ipset, nil)
// These CIDRs cover only ipvs2 and ipvs3.
proxier.excludeCIDRs = []string{"3.3.3.0/24", "4.4.4.0/24"}
for v := range currentServices {
proxier.ipvs.AddVirtualServer(currentServices[v])
}
proxier.cleanLegacyService(activeServices, currentServices)
// ipvs4 and ipvs5 should have been cleaned.
remainingVirtualServers, _ := proxier.ipvs.GetVirtualServers()
if len(remainingVirtualServers) != 4 {
t.Errorf("Expected number of remaining IPVS services after cleanup to be %v. Got %v", 4, len(remainingVirtualServers))
}
for _, vs := range remainingVirtualServers {
// Checking that ipvs4 and ipvs5 were removed.
if vs.Port == 57 {
t.Errorf("Expected ipvs4 to be removed after cleanup. It still remains")
}
if vs.Port == 58 {
t.Errorf("Expected ipvs5 to be removed after cleanup. It still remains")
}
}
}
func buildFakeProxier(nodeIP []net.IP) (*iptablestest.FakeIPTables, *Proxier) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()