From 79d1c078bb1f93798647f744f306300e4f1b8260 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 29 Apr 2024 16:32:22 -0400 Subject: [PATCH] Make change trackers just ignore the "wrong" IP family Dual-stack clusters exist; ServiceChangeTracker does not need to log messages (even at V(4)) when it sees dual-stack Services, and EndpointsChangeTracker does not need to emit Events(!) when it sees EndpointSlices of the wrong AddressType. (Though in most cases the EndpointsChangeTracker Events would not get emitted anyway, since the MetaProxier would ensure that only the v4 tracker saw v4 slices, and only the v6 tracker saw v6 slices.) Also remove a nil check labeled "This should never happen" which, in fact, we know *didn't* happen, since the function has already dereferenced the value before it checking it against nil. --- pkg/proxy/endpointschangetracker.go | 28 ++++++++--------- pkg/proxy/endpointslicecache.go | 18 +---------- pkg/proxy/endpointslicecache_test.go | 18 +++++------ pkg/proxy/serviceport.go | 47 ++++++++-------------------- pkg/proxy/util/utils.go | 17 ---------- 5 files changed, 36 insertions(+), 92 deletions(-) diff --git a/pkg/proxy/endpointschangetracker.go b/pkg/proxy/endpointschangetracker.go index 3918aae24c8..1d3b13e1f00 100644 --- a/pkg/proxy/endpointschangetracker.go +++ b/pkg/proxy/endpointschangetracker.go @@ -29,11 +29,6 @@ import ( "k8s.io/kubernetes/pkg/proxy/metrics" ) -var supportedEndpointSliceAddressTypes = sets.New[discovery.AddressType]( - discovery.AddressTypeIPv4, - discovery.AddressTypeIPv6, -) - // EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of // Endpoints, keyed by their namespace and name. type EndpointsChangeTracker struct { @@ -45,6 +40,9 @@ type EndpointsChangeTracker struct { // any Proxier-specific cleanup. processEndpointsMapChange processEndpointsMapChangeFunc + // addressType is the type of EndpointSlice this proxy tracks + addressType discovery.AddressType + // endpointSliceCache holds a simplified version of endpoint slices. endpointSliceCache *EndpointSliceCache @@ -62,12 +60,18 @@ type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap) // NewEndpointsChangeTracker initializes an EndpointsChangeTracker -func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker { +func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, _ events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker { + addressType := discovery.AddressTypeIPv4 + if ipFamily == v1.IPv6Protocol { + addressType = discovery.AddressTypeIPv6 + } + return &EndpointsChangeTracker{ + addressType: addressType, lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), trackerStartTime: time.Now(), processEndpointsMapChange: processEndpointsMapChange, - endpointSliceCache: NewEndpointSliceCache(hostname, ipFamily, recorder, makeEndpointInfo), + endpointSliceCache: NewEndpointSliceCache(hostname, makeEndpointInfo), } } @@ -76,14 +80,8 @@ func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFun // change that needs to be synced; note that this is different from the return value of // ServiceChangeTracker.Update(). func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool { - if !supportedEndpointSliceAddressTypes.Has(endpointSlice.AddressType) { - klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType) - return false - } - - // This should never happen - if endpointSlice == nil { - klog.ErrorS(nil, "Nil endpointSlice passed to EndpointSliceUpdate") + if endpointSlice.AddressType != ect.addressType { + klog.V(4).InfoS("Ignoring unsupported EndpointSlice", "endpointSlice", klog.KObj(endpointSlice), "type", endpointSlice.AddressType, "expected", ect.addressType) return false } diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index 5671404fcd3..85d62c74eed 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -22,15 +22,12 @@ import ( "sort" "sync" - v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/tools/events" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" - proxyutil "k8s.io/kubernetes/pkg/proxy/util" utilnet "k8s.io/utils/net" ) @@ -49,8 +46,6 @@ type EndpointSliceCache struct { makeEndpointInfo makeEndpointFunc hostname string - ipFamily v1.IPFamily - recorder events.EventRecorder } // endpointSliceTracker keeps track of EndpointSlices as they have been applied @@ -72,16 +67,14 @@ type endpointSliceData struct { } // NewEndpointSliceCache initializes an EndpointSliceCache. -func NewEndpointSliceCache(hostname string, ipFamily v1.IPFamily, recorder events.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache { +func NewEndpointSliceCache(hostname string, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache { if makeEndpointInfo == nil { makeEndpointInfo = standardEndpointInfo } return &EndpointSliceCache{ trackerByServiceMap: map[types.NamespacedName]*endpointSliceTracker{}, hostname: hostname, - ipFamily: ipFamily, makeEndpointInfo: makeEndpointInfo, - recorder: recorder, } } @@ -213,15 +206,6 @@ func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, port continue } - // Filter out the incorrect IP version case. Any endpoint port that - // contains incorrect IP version will be ignored. - if (cache.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(endpoint.Addresses[0]) { - // Emit event on the corresponding service which had a different IP - // version than the endpoint. - proxyutil.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], svcPortName.NamespacedName.Namespace, svcPortName.NamespacedName.Name, "") - continue - } - isLocal := endpoint.NodeName != nil && cache.isLocal(*endpoint.NodeName) ready := endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready diff --git a/pkg/proxy/endpointslicecache_test.go b/pkg/proxy/endpointslicecache_test.go index 3f8317a1860..610bb271c16 100644 --- a/pkg/proxy/endpointslicecache_test.go +++ b/pkg/proxy/endpointslicecache_test.go @@ -205,7 +205,7 @@ func TestEndpointsMapFromESC(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - esCache := NewEndpointSliceCache(tc.hostname, v1.IPv4Protocol, nil, nil) + esCache := NewEndpointSliceCache(tc.hostname, nil) cmc := newCacheMutationCheck(tc.endpointSlices) for _, endpointSlice := range tc.endpointSlices { @@ -315,7 +315,7 @@ func TestEndpointInfoByServicePort(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - esCache := NewEndpointSliceCache(tc.hostname, v1.IPv4Protocol, nil, nil) + esCache := NewEndpointSliceCache(tc.hostname, nil) for _, endpointSlice := range tc.endpointSlices { esCache.updatePending(endpointSlice, false) @@ -350,7 +350,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged bool }{ "identical slices, ports only": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port80}, @@ -362,7 +362,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: false, }, "identical slices, ports out of order": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443, port80}, @@ -374,7 +374,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "port removed": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443, port80}, @@ -386,7 +386,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "port added": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, @@ -398,7 +398,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "identical with endpoints": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, @@ -412,7 +412,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: false, }, "identical with endpoints out of order": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, @@ -426,7 +426,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "identical with endpoint added": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, diff --git a/pkg/proxy/serviceport.go b/pkg/proxy/serviceport.go index fa3f21758cc..cafda3b8af3 100644 --- a/pkg/proxy/serviceport.go +++ b/pkg/proxy/serviceport.go @@ -19,6 +19,7 @@ package proxy import ( "fmt" "net" + "strings" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -207,56 +208,34 @@ func newBaseServiceInfo(service *v1.Service, ipFamily v1.IPFamily, port *v1.Serv info.hintsAnnotation = service.Annotations[v1.AnnotationTopologyMode] } - // filter external ips, source ranges and ingress ips - // prior to dual stack services, this was considered an error, but with dual stack - // services, this is actually expected. Hence we downgraded from reporting by events - // to just log lines with high verbosity + // Filter ExternalIPs to correct IP family ipFamilyMap := proxyutil.MapIPsByIPFamily(service.Spec.ExternalIPs) info.externalIPs = ipFamilyMap[ipFamily] - // Log the IPs not matching the ipFamily - if ips, ok := ipFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 { - klog.V(4).InfoS("Service change tracker ignored the following external IPs for given service as they don't match IP Family", - "ipFamily", ipFamily, "externalIPs", ips, "service", klog.KObj(service)) + // Filter source ranges to correct IP family. Also deal with the fact that + // LoadBalancerSourceRanges validation mistakenly allows whitespace padding + loadBalancerSourceRanges := make([]string, len(service.Spec.LoadBalancerSourceRanges)) + for i, sourceRange := range service.Spec.LoadBalancerSourceRanges { + loadBalancerSourceRanges[i] = strings.TrimSpace(sourceRange) } - cidrFamilyMap := proxyutil.MapCIDRsByIPFamily(service.Spec.LoadBalancerSourceRanges) + cidrFamilyMap := proxyutil.MapCIDRsByIPFamily(loadBalancerSourceRanges) info.loadBalancerSourceRanges = cidrFamilyMap[ipFamily] - // Log the CIDRs not matching the ipFamily - if cidrs, ok := cidrFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(cidrs) > 0 { - klog.V(4).InfoS("Service change tracker ignored the following load balancer source ranges for given Service as they don't match IP Family", - "ipFamily", ipFamily, "loadBalancerSourceRanges", cidrs, "service", klog.KObj(service)) - } - // Obtain Load Balancer Ingress - var invalidIPs []net.IP + // Filter Load Balancer Ingress IPs to correct IP family. While proxying load + // balancers might choose to proxy connections from an LB IP of one family to a + // service IP of another family, that's irrelevant to kube-proxy, which only + // creates rules for VIP-style load balancers. for _, ing := range service.Status.LoadBalancer.Ingress { - if ing.IP == "" { + if ing.IP == "" || !proxyutil.IsVIPMode(ing) { continue } - // proxy mode load balancers do not need to track the IPs in the service cache - // and they can also implement IP family translation, so no need to check if - // the status ingress.IP and the ClusterIP belong to the same family. - if !proxyutil.IsVIPMode(ing) { - klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IP for given Service as it using Proxy mode", - "ipFamily", ipFamily, "loadBalancerIngressIP", ing.IP, "service", klog.KObj(service)) - continue - } - - // kube-proxy does not implement IP family translation, skip addresses with - // different IP family ip := netutils.ParseIPSloppy(ing.IP) // (already verified as an IP-address) if ingFamily := proxyutil.GetIPFamilyFromIP(ip); ingFamily == ipFamily { info.loadBalancerVIPs = append(info.loadBalancerVIPs, ip) - } else { - invalidIPs = append(invalidIPs, ip) } } - if len(invalidIPs) > 0 { - klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IPs for given Service as they don't match the IP Family", - "ipFamily", ipFamily, "loadBalancerIngressIPs", invalidIPs, "service", klog.KObj(service)) - } if apiservice.NeedsHealthCheck(service) { p := service.Spec.HealthCheckNodePort diff --git a/pkg/proxy/util/utils.go b/pkg/proxy/util/utils.go index 320d19bc49b..d9b13f9a18e 100644 --- a/pkg/proxy/util/utils.go +++ b/pkg/proxy/util/utils.go @@ -22,10 +22,8 @@ import ( "strings" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/tools/events" utilsysctl "k8s.io/component-helpers/node/util/sysctl" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/apis/core/v1/helper" @@ -86,21 +84,6 @@ func AddressSet(isValid func(ip net.IP) bool, addrs []net.Addr) sets.Set[string] return ips } -// LogAndEmitIncorrectIPVersionEvent logs and emits incorrect IP version event. -func LogAndEmitIncorrectIPVersionEvent(recorder events.EventRecorder, fieldName, fieldValue, svcNamespace, svcName string, svcUID types.UID) { - errMsg := fmt.Sprintf("%s in %s has incorrect IP version", fieldValue, fieldName) - klog.ErrorS(nil, "Incorrect IP version", "service", klog.KRef(svcNamespace, svcName), "field", fieldName, "value", fieldValue) - if recorder != nil { - recorder.Eventf( - &v1.ObjectReference{ - Kind: "Service", - Name: svcName, - Namespace: svcNamespace, - UID: svcUID, - }, nil, v1.EventTypeWarning, "KubeProxyIncorrectIPVersion", "GatherEndpoints", errMsg) - } -} - // MapIPsByIPFamily maps a slice of IPs to their respective IP families (v4 or v6) func MapIPsByIPFamily(ipStrings []string) map[v1.IPFamily][]net.IP { ipFamilyMap := map[v1.IPFamily][]net.IP{}