diff --git a/pkg/proxy/endpointschangetracker.go b/pkg/proxy/endpointschangetracker.go index 3918aae24c8..1d3b13e1f00 100644 --- a/pkg/proxy/endpointschangetracker.go +++ b/pkg/proxy/endpointschangetracker.go @@ -29,11 +29,6 @@ import ( "k8s.io/kubernetes/pkg/proxy/metrics" ) -var supportedEndpointSliceAddressTypes = sets.New[discovery.AddressType]( - discovery.AddressTypeIPv4, - discovery.AddressTypeIPv6, -) - // EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of // Endpoints, keyed by their namespace and name. type EndpointsChangeTracker struct { @@ -45,6 +40,9 @@ type EndpointsChangeTracker struct { // any Proxier-specific cleanup. processEndpointsMapChange processEndpointsMapChangeFunc + // addressType is the type of EndpointSlice this proxy tracks + addressType discovery.AddressType + // endpointSliceCache holds a simplified version of endpoint slices. endpointSliceCache *EndpointSliceCache @@ -62,12 +60,18 @@ type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap) // NewEndpointsChangeTracker initializes an EndpointsChangeTracker -func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker { +func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, _ events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker { + addressType := discovery.AddressTypeIPv4 + if ipFamily == v1.IPv6Protocol { + addressType = discovery.AddressTypeIPv6 + } + return &EndpointsChangeTracker{ + addressType: addressType, lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), trackerStartTime: time.Now(), processEndpointsMapChange: processEndpointsMapChange, - endpointSliceCache: NewEndpointSliceCache(hostname, ipFamily, recorder, makeEndpointInfo), + endpointSliceCache: NewEndpointSliceCache(hostname, makeEndpointInfo), } } @@ -76,14 +80,8 @@ func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFun // change that needs to be synced; note that this is different from the return value of // ServiceChangeTracker.Update(). func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool { - if !supportedEndpointSliceAddressTypes.Has(endpointSlice.AddressType) { - klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType) - return false - } - - // This should never happen - if endpointSlice == nil { - klog.ErrorS(nil, "Nil endpointSlice passed to EndpointSliceUpdate") + if endpointSlice.AddressType != ect.addressType { + klog.V(4).InfoS("Ignoring unsupported EndpointSlice", "endpointSlice", klog.KObj(endpointSlice), "type", endpointSlice.AddressType, "expected", ect.addressType) return false } diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index 5671404fcd3..85d62c74eed 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -22,15 +22,12 @@ import ( "sort" "sync" - v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/tools/events" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" - proxyutil "k8s.io/kubernetes/pkg/proxy/util" utilnet "k8s.io/utils/net" ) @@ -49,8 +46,6 @@ type EndpointSliceCache struct { makeEndpointInfo makeEndpointFunc hostname string - ipFamily v1.IPFamily - recorder events.EventRecorder } // endpointSliceTracker keeps track of EndpointSlices as they have been applied @@ -72,16 +67,14 @@ type endpointSliceData struct { } // NewEndpointSliceCache initializes an EndpointSliceCache. -func NewEndpointSliceCache(hostname string, ipFamily v1.IPFamily, recorder events.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache { +func NewEndpointSliceCache(hostname string, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache { if makeEndpointInfo == nil { makeEndpointInfo = standardEndpointInfo } return &EndpointSliceCache{ trackerByServiceMap: map[types.NamespacedName]*endpointSliceTracker{}, hostname: hostname, - ipFamily: ipFamily, makeEndpointInfo: makeEndpointInfo, - recorder: recorder, } } @@ -213,15 +206,6 @@ func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, port continue } - // Filter out the incorrect IP version case. Any endpoint port that - // contains incorrect IP version will be ignored. - if (cache.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(endpoint.Addresses[0]) { - // Emit event on the corresponding service which had a different IP - // version than the endpoint. - proxyutil.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], svcPortName.NamespacedName.Namespace, svcPortName.NamespacedName.Name, "") - continue - } - isLocal := endpoint.NodeName != nil && cache.isLocal(*endpoint.NodeName) ready := endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready diff --git a/pkg/proxy/endpointslicecache_test.go b/pkg/proxy/endpointslicecache_test.go index 3f8317a1860..610bb271c16 100644 --- a/pkg/proxy/endpointslicecache_test.go +++ b/pkg/proxy/endpointslicecache_test.go @@ -205,7 +205,7 @@ func TestEndpointsMapFromESC(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - esCache := NewEndpointSliceCache(tc.hostname, v1.IPv4Protocol, nil, nil) + esCache := NewEndpointSliceCache(tc.hostname, nil) cmc := newCacheMutationCheck(tc.endpointSlices) for _, endpointSlice := range tc.endpointSlices { @@ -315,7 +315,7 @@ func TestEndpointInfoByServicePort(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - esCache := NewEndpointSliceCache(tc.hostname, v1.IPv4Protocol, nil, nil) + esCache := NewEndpointSliceCache(tc.hostname, nil) for _, endpointSlice := range tc.endpointSlices { esCache.updatePending(endpointSlice, false) @@ -350,7 +350,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged bool }{ "identical slices, ports only": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port80}, @@ -362,7 +362,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: false, }, "identical slices, ports out of order": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443, port80}, @@ -374,7 +374,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "port removed": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443, port80}, @@ -386,7 +386,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "port added": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, @@ -398,7 +398,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "identical with endpoints": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, @@ -412,7 +412,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: false, }, "identical with endpoints out of order": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, @@ -426,7 +426,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "identical with endpoint added": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, diff --git a/pkg/proxy/serviceport.go b/pkg/proxy/serviceport.go index fa3f21758cc..cafda3b8af3 100644 --- a/pkg/proxy/serviceport.go +++ b/pkg/proxy/serviceport.go @@ -19,6 +19,7 @@ package proxy import ( "fmt" "net" + "strings" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -207,56 +208,34 @@ func newBaseServiceInfo(service *v1.Service, ipFamily v1.IPFamily, port *v1.Serv info.hintsAnnotation = service.Annotations[v1.AnnotationTopologyMode] } - // filter external ips, source ranges and ingress ips - // prior to dual stack services, this was considered an error, but with dual stack - // services, this is actually expected. Hence we downgraded from reporting by events - // to just log lines with high verbosity + // Filter ExternalIPs to correct IP family ipFamilyMap := proxyutil.MapIPsByIPFamily(service.Spec.ExternalIPs) info.externalIPs = ipFamilyMap[ipFamily] - // Log the IPs not matching the ipFamily - if ips, ok := ipFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 { - klog.V(4).InfoS("Service change tracker ignored the following external IPs for given service as they don't match IP Family", - "ipFamily", ipFamily, "externalIPs", ips, "service", klog.KObj(service)) + // Filter source ranges to correct IP family. Also deal with the fact that + // LoadBalancerSourceRanges validation mistakenly allows whitespace padding + loadBalancerSourceRanges := make([]string, len(service.Spec.LoadBalancerSourceRanges)) + for i, sourceRange := range service.Spec.LoadBalancerSourceRanges { + loadBalancerSourceRanges[i] = strings.TrimSpace(sourceRange) } - cidrFamilyMap := proxyutil.MapCIDRsByIPFamily(service.Spec.LoadBalancerSourceRanges) + cidrFamilyMap := proxyutil.MapCIDRsByIPFamily(loadBalancerSourceRanges) info.loadBalancerSourceRanges = cidrFamilyMap[ipFamily] - // Log the CIDRs not matching the ipFamily - if cidrs, ok := cidrFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(cidrs) > 0 { - klog.V(4).InfoS("Service change tracker ignored the following load balancer source ranges for given Service as they don't match IP Family", - "ipFamily", ipFamily, "loadBalancerSourceRanges", cidrs, "service", klog.KObj(service)) - } - // Obtain Load Balancer Ingress - var invalidIPs []net.IP + // Filter Load Balancer Ingress IPs to correct IP family. While proxying load + // balancers might choose to proxy connections from an LB IP of one family to a + // service IP of another family, that's irrelevant to kube-proxy, which only + // creates rules for VIP-style load balancers. for _, ing := range service.Status.LoadBalancer.Ingress { - if ing.IP == "" { + if ing.IP == "" || !proxyutil.IsVIPMode(ing) { continue } - // proxy mode load balancers do not need to track the IPs in the service cache - // and they can also implement IP family translation, so no need to check if - // the status ingress.IP and the ClusterIP belong to the same family. - if !proxyutil.IsVIPMode(ing) { - klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IP for given Service as it using Proxy mode", - "ipFamily", ipFamily, "loadBalancerIngressIP", ing.IP, "service", klog.KObj(service)) - continue - } - - // kube-proxy does not implement IP family translation, skip addresses with - // different IP family ip := netutils.ParseIPSloppy(ing.IP) // (already verified as an IP-address) if ingFamily := proxyutil.GetIPFamilyFromIP(ip); ingFamily == ipFamily { info.loadBalancerVIPs = append(info.loadBalancerVIPs, ip) - } else { - invalidIPs = append(invalidIPs, ip) } } - if len(invalidIPs) > 0 { - klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IPs for given Service as they don't match the IP Family", - "ipFamily", ipFamily, "loadBalancerIngressIPs", invalidIPs, "service", klog.KObj(service)) - } if apiservice.NeedsHealthCheck(service) { p := service.Spec.HealthCheckNodePort diff --git a/pkg/proxy/util/utils.go b/pkg/proxy/util/utils.go index 320d19bc49b..d9b13f9a18e 100644 --- a/pkg/proxy/util/utils.go +++ b/pkg/proxy/util/utils.go @@ -22,10 +22,8 @@ import ( "strings" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/tools/events" utilsysctl "k8s.io/component-helpers/node/util/sysctl" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/apis/core/v1/helper" @@ -86,21 +84,6 @@ func AddressSet(isValid func(ip net.IP) bool, addrs []net.Addr) sets.Set[string] return ips } -// LogAndEmitIncorrectIPVersionEvent logs and emits incorrect IP version event. -func LogAndEmitIncorrectIPVersionEvent(recorder events.EventRecorder, fieldName, fieldValue, svcNamespace, svcName string, svcUID types.UID) { - errMsg := fmt.Sprintf("%s in %s has incorrect IP version", fieldValue, fieldName) - klog.ErrorS(nil, "Incorrect IP version", "service", klog.KRef(svcNamespace, svcName), "field", fieldName, "value", fieldValue) - if recorder != nil { - recorder.Eventf( - &v1.ObjectReference{ - Kind: "Service", - Name: svcName, - Namespace: svcNamespace, - UID: svcUID, - }, nil, v1.EventTypeWarning, "KubeProxyIncorrectIPVersion", "GatherEndpoints", errMsg) - } -} - // MapIPsByIPFamily maps a slice of IPs to their respective IP families (v4 or v6) func MapIPsByIPFamily(ipStrings []string) map[v1.IPFamily][]net.IP { ipFamilyMap := map[v1.IPFamily][]net.IP{}