From 452fcc5fd6e325432990f29ba0145a97884b777c Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Sun, 26 Nov 2023 16:48:28 -0500 Subject: [PATCH 1/8] Remove some dead code in service.go ServicePortMap.merge had a giant comment explaining its return value, but nothing ever used that return value. ServicePort had an InternalTrafficPolicy() method, but nothing used it (because it was redundant with InternalPolicyLocal().) --- pkg/proxy/service.go | 48 +++++++++----------------------------------- pkg/proxy/types.go | 2 -- 2 files changed, 9 insertions(+), 41 deletions(-) diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index 5c2cbff472c..5b591db26ae 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -52,7 +52,6 @@ type BaseServicePortInfo struct { healthCheckNodePort int externalPolicyLocal bool internalPolicyLocal bool - internalTrafficPolicy *v1.ServiceInternalTrafficPolicy hintsAnnotation string } @@ -123,11 +122,6 @@ func (bsvcPortInfo *BaseServicePortInfo) InternalPolicyLocal() bool { return bsvcPortInfo.internalPolicyLocal } -// InternalTrafficPolicy is part of ServicePort interface -func (bsvcPortInfo *BaseServicePortInfo) InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicy { - return bsvcPortInfo.internalTrafficPolicy -} - // HintsAnnotation is part of ServicePort interface. func (bsvcPortInfo *BaseServicePortInfo) HintsAnnotation() string { return bsvcPortInfo.hintsAnnotation @@ -163,15 +157,14 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic clusterIP := proxyutil.GetClusterIPByFamily(sct.ipFamily, service) info := &BaseServicePortInfo{ - clusterIP: netutils.ParseIPSloppy(clusterIP), - port: int(port.Port), - protocol: port.Protocol, - nodePort: int(port.NodePort), - sessionAffinityType: service.Spec.SessionAffinity, - stickyMaxAgeSeconds: stickyMaxAgeSeconds, - externalPolicyLocal: externalPolicyLocal, - internalPolicyLocal: internalPolicyLocal, - internalTrafficPolicy: service.Spec.InternalTrafficPolicy, + clusterIP: netutils.ParseIPSloppy(clusterIP), + port: int(port.Port), + protocol: port.Protocol, + nodePort: int(port.NodePort), + sessionAffinityType: service.Spec.SessionAffinity, + stickyMaxAgeSeconds: stickyMaxAgeSeconds, + externalPolicyLocal: externalPolicyLocal, + internalPolicyLocal: internalPolicyLocal, } // v1.DeprecatedAnnotationTopologyAwareHints has precedence over v1.AnnotationTopologyMode. @@ -426,30 +419,8 @@ func (sm ServicePortMap) Update(sct *ServiceChangeTracker) UpdateServiceMapResul // merge adds other ServicePortMap's elements to current ServicePortMap. // If collision, other ALWAYS win. Otherwise add the other to current. // In other words, if some elements in current collisions with other, update the current by other. -// It returns a string type set which stores all the newly merged services' identifier, ServicePortName.String(), to help users -// tell if a service is deleted or updated. -// The returned value is one of the arguments of ServicePortMap.unmerge(). -// ServicePortMap A Merge ServicePortMap B will do following 2 things: -// - update ServicePortMap A. -// - produce a string set which stores all other ServicePortMap's ServicePortName.String(). -// -// For example, -// -// A{} -// B{{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}} -// A updated to be {{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}} -// produce string set {"ns/cluster-ip:http"} -// -// A{{"ns", "cluster-ip", "http"}: {"172.16.55.10", 345, "UDP"}} -// B{{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}} -// A updated to be {{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}} -// produce string set {"ns/cluster-ip:http"} -func (sm *ServicePortMap) merge(other ServicePortMap) sets.Set[string] { - // existingPorts is going to store all identifiers of all services in `other` ServicePortMap. - existingPorts := sets.New[string]() +func (sm *ServicePortMap) merge(other ServicePortMap) { for svcPortName, info := range other { - // Take ServicePortName.String() as the newly merged service's identifier and put it into existingPorts. - existingPorts.Insert(svcPortName.String()) _, exists := (*sm)[svcPortName] if !exists { klog.V(4).InfoS("Adding new service port", "portName", svcPortName, "servicePort", info) @@ -458,7 +429,6 @@ func (sm *ServicePortMap) merge(other ServicePortMap) sets.Set[string] { } (*sm)[svcPortName] = info } - return existingPorts } // filter filters out elements from ServicePortMap base on given ports string sets. diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index fbb46cc959b..f1c89d353f5 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -87,8 +87,6 @@ type ServicePort interface { ExternalPolicyLocal() bool // InternalPolicyLocal returns if a service has only node local endpoints for internal traffic. InternalPolicyLocal() bool - // InternalTrafficPolicy returns service InternalTrafficPolicy - InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicy // HintsAnnotation returns the value of the v1.DeprecatedAnnotationTopologyAwareHints annotation. HintsAnnotation() string // ExternallyAccessible returns true if the service port is reachable via something From 0779042a6f1c749da8db11d1ab8c108340011188 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 27 Nov 2023 11:30:53 -0500 Subject: [PATCH 2/8] Remove a useless "_" assignment to appease the linter (This would become an error rather than a warning once we try to move this code to another file.) Also rename an "ok" variable to "exists" since that what it really means. --- pkg/proxy/service.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index 5b591db26ae..e175fd85577 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -168,10 +168,10 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic } // v1.DeprecatedAnnotationTopologyAwareHints has precedence over v1.AnnotationTopologyMode. - var ok bool - info.hintsAnnotation, ok = service.Annotations[v1.DeprecatedAnnotationTopologyAwareHints] - if !ok { - info.hintsAnnotation, _ = service.Annotations[v1.AnnotationTopologyMode] + var exists bool + info.hintsAnnotation, exists = service.Annotations[v1.DeprecatedAnnotationTopologyAwareHints] + if !exists { + info.hintsAnnotation = service.Annotations[v1.AnnotationTopologyMode] } loadBalancerSourceRanges := make([]string, len(service.Spec.LoadBalancerSourceRanges)) From ede0dc1d079410690a2c9422a4ea46d80d60b602 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 15 Nov 2023 11:21:16 -0500 Subject: [PATCH 3/8] Make newBaseServiceInfo a function rather than a method (in preparation for moving it) --- pkg/proxy/service.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index e175fd85577..a28ec8fcb3b 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -145,7 +145,7 @@ func (bsvcPortInfo *BaseServicePortInfo) UsesLocalEndpoints() bool { return bsvcPortInfo.internalPolicyLocal || (bsvcPortInfo.externalPolicyLocal && bsvcPortInfo.ExternallyAccessible()) } -func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServicePortInfo { +func newBaseServiceInfo(service *v1.Service, ipFamily v1.IPFamily, port *v1.ServicePort) *BaseServicePortInfo { externalPolicyLocal := apiservice.ExternalPolicyLocal(service) internalPolicyLocal := apiservice.InternalPolicyLocal(service) @@ -155,7 +155,7 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) } - clusterIP := proxyutil.GetClusterIPByFamily(sct.ipFamily, service) + clusterIP := proxyutil.GetClusterIPByFamily(ipFamily, service) info := &BaseServicePortInfo{ clusterIP: netutils.ParseIPSloppy(clusterIP), port: int(port.Port), @@ -184,20 +184,20 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic // to just log lines with high verbosity ipFamilyMap := proxyutil.MapIPsByIPFamily(service.Spec.ExternalIPs) - info.externalIPs = ipFamilyMap[sct.ipFamily] + info.externalIPs = ipFamilyMap[ipFamily] // Log the IPs not matching the ipFamily - if ips, ok := ipFamilyMap[proxyutil.OtherIPFamily(sct.ipFamily)]; ok && len(ips) > 0 { + if ips, ok := ipFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 { klog.V(4).InfoS("Service change tracker ignored the following external IPs for given service as they don't match IP Family", - "ipFamily", sct.ipFamily, "externalIPs", strings.Join(ips, ", "), "service", klog.KObj(service)) + "ipFamily", ipFamily, "externalIPs", strings.Join(ips, ", "), "service", klog.KObj(service)) } ipFamilyMap = proxyutil.MapCIDRsByIPFamily(loadBalancerSourceRanges) - info.loadBalancerSourceRanges = ipFamilyMap[sct.ipFamily] + info.loadBalancerSourceRanges = ipFamilyMap[ipFamily] // Log the CIDRs not matching the ipFamily - if cidrs, ok := ipFamilyMap[proxyutil.OtherIPFamily(sct.ipFamily)]; ok && len(cidrs) > 0 { + if cidrs, ok := ipFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(cidrs) > 0 { klog.V(4).InfoS("Service change tracker ignored the following load balancer source ranges for given Service as they don't match IP Family", - "ipFamily", sct.ipFamily, "loadBalancerSourceRanges", strings.Join(cidrs, ", "), "service", klog.KObj(service)) + "ipFamily", ipFamily, "loadBalancerSourceRanges", strings.Join(cidrs, ", "), "service", klog.KObj(service)) } // Obtain Load Balancer Ingress @@ -212,13 +212,13 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic // the status ingress.IP and the ClusterIP belong to the same family. if !proxyutil.IsVIPMode(ing) { klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IP for given Service as it using Proxy mode", - "ipFamily", sct.ipFamily, "loadBalancerIngressIP", ing.IP, "service", klog.KObj(service)) + "ipFamily", ipFamily, "loadBalancerIngressIP", ing.IP, "service", klog.KObj(service)) continue } // kube-proxy does not implement IP family translation, skip addresses with // different IP family - if ipFamily := proxyutil.GetIPFamilyFromIP(ing.IP); ipFamily == sct.ipFamily { + if ingFamily := proxyutil.GetIPFamilyFromIP(ing.IP); ingFamily == ipFamily { info.loadBalancerVIPs = append(info.loadBalancerVIPs, ing.IP) } else { invalidIPs = append(invalidIPs, ing.IP) @@ -226,7 +226,7 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic } if len(invalidIPs) > 0 { klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IPs for given Service as they don't match the IP Family", - "ipFamily", sct.ipFamily, "loadBalancerIngressIPs", strings.Join(invalidIPs, ", "), "service", klog.KObj(service)) + "ipFamily", ipFamily, "loadBalancerIngressIPs", strings.Join(invalidIPs, ", "), "service", klog.KObj(service)) } if apiservice.NeedsHealthCheck(service) { @@ -375,7 +375,7 @@ func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) Servic for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol} - baseSvcInfo := sct.newBaseServiceInfo(servicePort, service) + baseSvcInfo := newBaseServiceInfo(service, sct.ipFamily, servicePort) if sct.makeServiceInfo != nil { svcPortMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo) } else { From a73b275031a0063936ed1d236d9e6d53a2999b96 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 15 Nov 2023 11:33:24 -0500 Subject: [PATCH 4/8] Split ServicePort/Endpoint from ServiceChangeTracker/EndpointsChangeTracker Move the ServicePort/BaseServicePortInfo types to serviceport.go. Move the Endpoint/BaseEndpointInfo types to endpoint.go. To avoid confusion with the new filenames, rename service.go to servicechangetracker.go and endpoints.go to endpointschangetracker.go. (No code changes; this just moves some code from types.go and services.go to serviceport.go, and some code from types.go and endpoints.go to endpoint.go.) --- pkg/proxy/endpoint.go | 138 +++++++++ ...endpoints.go => endpointschangetracker.go} | 94 +----- ...test.go => endpointschangetracker_test.go} | 0 pkg/proxy/servicechangetracker.go | 247 ++++++++++++++++ ...e_test.go => servicechangetracker_test.go} | 0 pkg/proxy/{service.go => serviceport.go} | 269 +++--------------- pkg/proxy/types.go | 71 ----- 7 files changed, 430 insertions(+), 389 deletions(-) create mode 100644 pkg/proxy/endpoint.go rename pkg/proxy/{endpoints.go => endpointschangetracker.go} (82%) rename pkg/proxy/{endpoints_test.go => endpointschangetracker_test.go} (100%) create mode 100644 pkg/proxy/servicechangetracker.go rename pkg/proxy/{service_test.go => servicechangetracker_test.go} (100%) rename pkg/proxy/{service.go => serviceport.go} (52%) diff --git a/pkg/proxy/endpoint.go b/pkg/proxy/endpoint.go new file mode 100644 index 00000000000..474edf36a91 --- /dev/null +++ b/pkg/proxy/endpoint.go @@ -0,0 +1,138 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "net" + "strconv" + + "k8s.io/apimachinery/pkg/util/sets" +) + +// Endpoint in an interface which abstracts information about an endpoint. +type Endpoint interface { + // String returns endpoint string. An example format can be: `IP:Port`. + // We take the returned value as ServiceEndpoint.Endpoint. + String() string + // IP returns IP part of the endpoint. + IP() string + // Port returns the Port part of the endpoint. + Port() int + + // IsLocal returns true if the endpoint is running on the same host as kube-proxy. + IsLocal() bool + // IsReady returns true if an endpoint is ready and not terminating, or + // if PublishNotReadyAddresses is set on the service. + IsReady() bool + // IsServing returns true if an endpoint is ready. It does not account + // for terminating state. + IsServing() bool + // IsTerminating returns true if an endpoint is terminating. For pods, + // that is any pod with a deletion timestamp. + IsTerminating() bool + + // ZoneHints returns the zone hint for the endpoint. This is based on + // endpoint.hints.forZones[0].name in the EndpointSlice API. + ZoneHints() sets.Set[string] +} + +// BaseEndpointInfo contains base information that defines an endpoint. +// This could be used directly by proxier while processing endpoints, +// or can be used for constructing a more specific EndpointInfo struct +// defined by the proxier if needed. +type BaseEndpointInfo struct { + // Cache this values to improve performance + ip string + port int + // endpoint is the same as net.JoinHostPort(ip,port) + endpoint string + + // isLocal indicates whether the endpoint is running on same host as kube-proxy. + isLocal bool + + // ready indicates whether this endpoint is ready and NOT terminating, unless + // PublishNotReadyAddresses is set on the service, in which case it will just + // always be true. + ready bool + // serving indicates whether this endpoint is ready regardless of its terminating state. + // For pods this is true if it has a ready status regardless of its deletion timestamp. + serving bool + // terminating indicates whether this endpoint is terminating. + // For pods this is true if it has a non-nil deletion timestamp. + terminating bool + + // zoneHints represent the zone hints for the endpoint. This is based on + // endpoint.hints.forZones[*].name in the EndpointSlice API. + zoneHints sets.Set[string] +} + +var _ Endpoint = &BaseEndpointInfo{} + +// String is part of proxy.Endpoint interface. +func (info *BaseEndpointInfo) String() string { + return info.endpoint +} + +// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface. +func (info *BaseEndpointInfo) IP() string { + return info.ip +} + +// Port returns just the Port part of the endpoint. +func (info *BaseEndpointInfo) Port() int { + return info.port +} + +// IsLocal is part of proxy.Endpoint interface. +func (info *BaseEndpointInfo) IsLocal() bool { + return info.isLocal +} + +// IsReady returns true if an endpoint is ready and not terminating. +func (info *BaseEndpointInfo) IsReady() bool { + return info.ready +} + +// IsServing returns true if an endpoint is ready, regardless of if the +// endpoint is terminating. +func (info *BaseEndpointInfo) IsServing() bool { + return info.serving +} + +// IsTerminating retruns true if an endpoint is terminating. For pods, +// that is any pod with a deletion timestamp. +func (info *BaseEndpointInfo) IsTerminating() bool { + return info.terminating +} + +// ZoneHints returns the zone hint for the endpoint. +func (info *BaseEndpointInfo) ZoneHints() sets.Set[string] { + return info.zoneHints +} + +func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminating bool, zoneHints sets.Set[string]) *BaseEndpointInfo { + return &BaseEndpointInfo{ + ip: ip, + port: port, + endpoint: net.JoinHostPort(ip, strconv.Itoa(port)), + isLocal: isLocal, + ready: ready, + serving: serving, + terminating: terminating, + zoneHints: zoneHints, + } +} diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpointschangetracker.go similarity index 82% rename from pkg/proxy/endpoints.go rename to pkg/proxy/endpointschangetracker.go index 56348643473..f75a65bfc2e 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpointschangetracker.go @@ -17,18 +17,15 @@ limitations under the License. package proxy import ( - "net" - "strconv" "sync" "time" - "k8s.io/client-go/tools/events" - "k8s.io/klog/v2" - v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/events" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy/metrics" ) @@ -37,93 +34,6 @@ var supportedEndpointSliceAddressTypes = sets.New[string]( string(discovery.AddressTypeIPv6), ) -// BaseEndpointInfo contains base information that defines an endpoint. -// This could be used directly by proxier while processing endpoints, -// or can be used for constructing a more specific EndpointInfo struct -// defined by the proxier if needed. -type BaseEndpointInfo struct { - // Cache this values to improve performance - ip string - port int - // endpoint is the same as net.JoinHostPort(ip,port) - endpoint string - - // isLocal indicates whether the endpoint is running on same host as kube-proxy. - isLocal bool - - // ready indicates whether this endpoint is ready and NOT terminating, unless - // PublishNotReadyAddresses is set on the service, in which case it will just - // always be true. - ready bool - // serving indicates whether this endpoint is ready regardless of its terminating state. - // For pods this is true if it has a ready status regardless of its deletion timestamp. - serving bool - // terminating indicates whether this endpoint is terminating. - // For pods this is true if it has a non-nil deletion timestamp. - terminating bool - - // zoneHints represent the zone hints for the endpoint. This is based on - // endpoint.hints.forZones[*].name in the EndpointSlice API. - zoneHints sets.Set[string] -} - -var _ Endpoint = &BaseEndpointInfo{} - -// String is part of proxy.Endpoint interface. -func (info *BaseEndpointInfo) String() string { - return info.endpoint -} - -// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface. -func (info *BaseEndpointInfo) IP() string { - return info.ip -} - -// Port returns just the Port part of the endpoint. -func (info *BaseEndpointInfo) Port() int { - return info.port -} - -// IsLocal is part of proxy.Endpoint interface. -func (info *BaseEndpointInfo) IsLocal() bool { - return info.isLocal -} - -// IsReady returns true if an endpoint is ready and not terminating. -func (info *BaseEndpointInfo) IsReady() bool { - return info.ready -} - -// IsServing returns true if an endpoint is ready, regardless of if the -// endpoint is terminating. -func (info *BaseEndpointInfo) IsServing() bool { - return info.serving -} - -// IsTerminating retruns true if an endpoint is terminating. For pods, -// that is any pod with a deletion timestamp. -func (info *BaseEndpointInfo) IsTerminating() bool { - return info.terminating -} - -// ZoneHints returns the zone hint for the endpoint. -func (info *BaseEndpointInfo) ZoneHints() sets.Set[string] { - return info.zoneHints -} - -func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminating bool, zoneHints sets.Set[string]) *BaseEndpointInfo { - return &BaseEndpointInfo{ - ip: ip, - port: port, - endpoint: net.JoinHostPort(ip, strconv.Itoa(port)), - isLocal: isLocal, - ready: ready, - serving: serving, - terminating: terminating, - zoneHints: zoneHints, - } -} - type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) Endpoint // This handler is invoked by the apply function on every change. This function should not modify the diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpointschangetracker_test.go similarity index 100% rename from pkg/proxy/endpoints_test.go rename to pkg/proxy/endpointschangetracker_test.go diff --git a/pkg/proxy/servicechangetracker.go b/pkg/proxy/servicechangetracker.go new file mode 100644 index 00000000000..1d72719b9f3 --- /dev/null +++ b/pkg/proxy/servicechangetracker.go @@ -0,0 +1,247 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "reflect" + "sync" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/events" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/proxy/metrics" + proxyutil "k8s.io/kubernetes/pkg/proxy/util" +) + +type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort + +// This handler is invoked by the apply function on every change. This function should not modify the +// ServicePortMap's but just use the changes for any Proxier specific cleanup. +type processServiceMapChangeFunc func(previous, current ServicePortMap) + +// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object, +// changes are accumulated, i.e. previous is state from before applying the changes, +// current is state after applying all of the changes. +type serviceChange struct { + previous ServicePortMap + current ServicePortMap +} + +// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of +// Services, keyed by their namespace and name. +type ServiceChangeTracker struct { + // lock protects items. + lock sync.Mutex + // items maps a service to its serviceChange. + items map[types.NamespacedName]*serviceChange + // makeServiceInfo allows proxier to inject customized information when processing service. + makeServiceInfo makeServicePortFunc + processServiceMapChange processServiceMapChangeFunc + ipFamily v1.IPFamily + + recorder events.EventRecorder +} + +// NewServiceChangeTracker initializes a ServiceChangeTracker +func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker { + return &ServiceChangeTracker{ + items: make(map[types.NamespacedName]*serviceChange), + makeServiceInfo: makeServiceInfo, + recorder: recorder, + ipFamily: ipFamily, + processServiceMapChange: processServiceMapChange, + } +} + +// Update updates given service's change map based on the service pair. It returns true if items changed, +// otherwise return false. Update can be used to add/update/delete items of ServiceChangeMap. For example, +// Add item +// - pass as the pair. +// +// Update item +// - pass as the pair. +// +// Delete item +// - pass as the pair. +func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool { + // This is unexpected, we should return false directly. + if previous == nil && current == nil { + return false + } + + svc := current + if svc == nil { + svc = previous + } + metrics.ServiceChangesTotal.Inc() + namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name} + + sct.lock.Lock() + defer sct.lock.Unlock() + + change, exists := sct.items[namespacedName] + if !exists { + change = &serviceChange{} + change.previous = sct.serviceToServiceMap(previous) + sct.items[namespacedName] = change + } + change.current = sct.serviceToServiceMap(current) + // if change.previous equal to change.current, it means no change + if reflect.DeepEqual(change.previous, change.current) { + delete(sct.items, namespacedName) + } else { + klog.V(4).InfoS("Service updated ports", "service", klog.KObj(svc), "portCount", len(change.current)) + } + metrics.ServiceChangesPending.Set(float64(len(sct.items))) + return len(sct.items) > 0 +} + +// UpdateServiceMapResult is the updated results after applying service changes. +type UpdateServiceMapResult struct { + // UpdatedServices lists the names of all services added/updated/deleted since the + // last Update. + UpdatedServices sets.Set[types.NamespacedName] + + // DeletedUDPClusterIPs holds stale (no longer assigned to a Service) Service IPs + // that had UDP ports. Callers can use this to abort timeout-waits or clear + // connection-tracking information. + DeletedUDPClusterIPs sets.Set[string] +} + +// HealthCheckNodePorts returns a map of Service names to HealthCheckNodePort values +// for all Services in sm with non-zero HealthCheckNodePort. +func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 { + // TODO: If this will appear to be computationally expensive, consider + // computing this incrementally similarly to svcPortMap. + ports := make(map[types.NamespacedName]uint16) + for svcPortName, info := range sm { + if info.HealthCheckNodePort() != 0 { + ports[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort()) + } + } + return ports +} + +// ServicePortMap maps a service to its ServicePort. +type ServicePortMap map[ServicePortName]ServicePort + +// serviceToServiceMap translates a single Service object to a ServicePortMap. +// +// NOTE: service object should NOT be modified. +func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) ServicePortMap { + if service == nil { + return nil + } + + if proxyutil.ShouldSkipService(service) { + return nil + } + + clusterIP := proxyutil.GetClusterIPByFamily(sct.ipFamily, service) + if clusterIP == "" { + return nil + } + + svcPortMap := make(ServicePortMap) + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol} + baseSvcInfo := newBaseServiceInfo(service, sct.ipFamily, servicePort) + if sct.makeServiceInfo != nil { + svcPortMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo) + } else { + svcPortMap[svcPortName] = baseSvcInfo + } + } + return svcPortMap +} + +// Update updates ServicePortMap base on the given changes, returns information about the +// diff since the last Update, triggers processServiceMapChange on every change, and +// clears the changes map. +func (sm ServicePortMap) Update(sct *ServiceChangeTracker) UpdateServiceMapResult { + sct.lock.Lock() + defer sct.lock.Unlock() + + result := UpdateServiceMapResult{ + UpdatedServices: sets.New[types.NamespacedName](), + DeletedUDPClusterIPs: sets.New[string](), + } + + for nn, change := range sct.items { + if sct.processServiceMapChange != nil { + sct.processServiceMapChange(change.previous, change.current) + } + result.UpdatedServices.Insert(nn) + + sm.merge(change.current) + // filter out the Update event of current changes from previous changes + // before calling unmerge() so that can skip deleting the Update events. + change.previous.filter(change.current) + sm.unmerge(change.previous, result.DeletedUDPClusterIPs) + } + // clear changes after applying them to ServicePortMap. + sct.items = make(map[types.NamespacedName]*serviceChange) + metrics.ServiceChangesPending.Set(0) + + return result +} + +// merge adds other ServicePortMap's elements to current ServicePortMap. +// If collision, other ALWAYS win. Otherwise add the other to current. +// In other words, if some elements in current collisions with other, update the current by other. +func (sm *ServicePortMap) merge(other ServicePortMap) { + for svcPortName, info := range other { + _, exists := (*sm)[svcPortName] + if !exists { + klog.V(4).InfoS("Adding new service port", "portName", svcPortName, "servicePort", info) + } else { + klog.V(4).InfoS("Updating existing service port", "portName", svcPortName, "servicePort", info) + } + (*sm)[svcPortName] = info + } +} + +// filter filters out elements from ServicePortMap base on given ports string sets. +func (sm *ServicePortMap) filter(other ServicePortMap) { + for svcPortName := range *sm { + // skip the delete for Update event. + if _, ok := other[svcPortName]; ok { + delete(*sm, svcPortName) + } + } +} + +// unmerge deletes all other ServicePortMap's elements from current ServicePortMap and +// updates deletedUDPClusterIPs with all of the newly-deleted UDP cluster IPs. +func (sm *ServicePortMap) unmerge(other ServicePortMap, deletedUDPClusterIPs sets.Set[string]) { + for svcPortName := range other { + info, exists := (*sm)[svcPortName] + if exists { + klog.V(4).InfoS("Removing service port", "portName", svcPortName) + if info.Protocol() == v1.ProtocolUDP { + deletedUDPClusterIPs.Insert(info.ClusterIP().String()) + } + delete(*sm, svcPortName) + } else { + klog.ErrorS(nil, "Service port does not exists", "portName", svcPortName) + } + } +} diff --git a/pkg/proxy/service_test.go b/pkg/proxy/servicechangetracker_test.go similarity index 100% rename from pkg/proxy/service_test.go rename to pkg/proxy/servicechangetracker_test.go diff --git a/pkg/proxy/service.go b/pkg/proxy/serviceport.go similarity index 52% rename from pkg/proxy/service.go rename to pkg/proxy/serviceport.go index a28ec8fcb3b..5ed0b5d79b3 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/serviceport.go @@ -19,22 +19,56 @@ package proxy import ( "fmt" "net" - "reflect" "strings" - "sync" - - "k8s.io/client-go/tools/events" - "k8s.io/klog/v2" - netutils "k8s.io/utils/net" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" apiservice "k8s.io/kubernetes/pkg/api/v1/service" - "k8s.io/kubernetes/pkg/proxy/metrics" proxyutil "k8s.io/kubernetes/pkg/proxy/util" + netutils "k8s.io/utils/net" ) +// ServicePort is an interface which abstracts information about a service. +type ServicePort interface { + // String returns service string. An example format can be: `IP:Port/Protocol`. + String() string + // ClusterIP returns service cluster IP in net.IP format. + ClusterIP() net.IP + // Port returns service port if present. If return 0 means not present. + Port() int + // SessionAffinityType returns service session affinity type + SessionAffinityType() v1.ServiceAffinity + // StickyMaxAgeSeconds returns service max connection age + StickyMaxAgeSeconds() int + // ExternalIPStrings returns service ExternalIPs as a string array. + ExternalIPStrings() []string + // LoadBalancerVIPStrings returns service LoadBalancerIPs which are VIP mode as a string array. + LoadBalancerVIPStrings() []string + // Protocol returns service protocol. + Protocol() v1.Protocol + // LoadBalancerSourceRanges returns service LoadBalancerSourceRanges if present empty array if not + LoadBalancerSourceRanges() []string + // HealthCheckNodePort returns service health check node port if present. If return 0, it means not present. + HealthCheckNodePort() int + // NodePort returns a service Node port if present. If return 0, it means not present. + NodePort() int + // ExternalPolicyLocal returns if a service has only node local endpoints for external traffic. + ExternalPolicyLocal() bool + // InternalPolicyLocal returns if a service has only node local endpoints for internal traffic. + InternalPolicyLocal() bool + // HintsAnnotation returns the value of the v1.DeprecatedAnnotationTopologyAwareHints annotation. + HintsAnnotation() string + // ExternallyAccessible returns true if the service port is reachable via something + // other than ClusterIP (NodePort/ExternalIP/LoadBalancer) + ExternallyAccessible() bool + // UsesClusterEndpoints returns true if the service port ever sends traffic to + // endpoints based on "Cluster" traffic policy + UsesClusterEndpoints() bool + // UsesLocalEndpoints returns true if the service port ever sends traffic to + // endpoints based on "Local" traffic policy + UsesLocalEndpoints() bool +} + // BaseServicePortInfo contains base information that defines a service. // This could be used directly by proxier while processing services, // or can be used for constructing a more specific ServiceInfo struct @@ -240,220 +274,3 @@ func newBaseServiceInfo(service *v1.Service, ipFamily v1.IPFamily, port *v1.Serv return info } - -type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort - -// This handler is invoked by the apply function on every change. This function should not modify the -// ServicePortMap's but just use the changes for any Proxier specific cleanup. -type processServiceMapChangeFunc func(previous, current ServicePortMap) - -// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object, -// changes are accumulated, i.e. previous is state from before applying the changes, -// current is state after applying all of the changes. -type serviceChange struct { - previous ServicePortMap - current ServicePortMap -} - -// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of -// Services, keyed by their namespace and name. -type ServiceChangeTracker struct { - // lock protects items. - lock sync.Mutex - // items maps a service to its serviceChange. - items map[types.NamespacedName]*serviceChange - // makeServiceInfo allows proxier to inject customized information when processing service. - makeServiceInfo makeServicePortFunc - processServiceMapChange processServiceMapChangeFunc - ipFamily v1.IPFamily - - recorder events.EventRecorder -} - -// NewServiceChangeTracker initializes a ServiceChangeTracker -func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker { - return &ServiceChangeTracker{ - items: make(map[types.NamespacedName]*serviceChange), - makeServiceInfo: makeServiceInfo, - recorder: recorder, - ipFamily: ipFamily, - processServiceMapChange: processServiceMapChange, - } -} - -// Update updates given service's change map based on the service pair. It returns true if items changed, -// otherwise return false. Update can be used to add/update/delete items of ServiceChangeMap. For example, -// Add item -// - pass as the pair. -// -// Update item -// - pass as the pair. -// -// Delete item -// - pass as the pair. -func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool { - // This is unexpected, we should return false directly. - if previous == nil && current == nil { - return false - } - - svc := current - if svc == nil { - svc = previous - } - metrics.ServiceChangesTotal.Inc() - namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name} - - sct.lock.Lock() - defer sct.lock.Unlock() - - change, exists := sct.items[namespacedName] - if !exists { - change = &serviceChange{} - change.previous = sct.serviceToServiceMap(previous) - sct.items[namespacedName] = change - } - change.current = sct.serviceToServiceMap(current) - // if change.previous equal to change.current, it means no change - if reflect.DeepEqual(change.previous, change.current) { - delete(sct.items, namespacedName) - } else { - klog.V(4).InfoS("Service updated ports", "service", klog.KObj(svc), "portCount", len(change.current)) - } - metrics.ServiceChangesPending.Set(float64(len(sct.items))) - return len(sct.items) > 0 -} - -// UpdateServiceMapResult is the updated results after applying service changes. -type UpdateServiceMapResult struct { - // UpdatedServices lists the names of all services added/updated/deleted since the - // last Update. - UpdatedServices sets.Set[types.NamespacedName] - - // DeletedUDPClusterIPs holds stale (no longer assigned to a Service) Service IPs - // that had UDP ports. Callers can use this to abort timeout-waits or clear - // connection-tracking information. - DeletedUDPClusterIPs sets.Set[string] -} - -// HealthCheckNodePorts returns a map of Service names to HealthCheckNodePort values -// for all Services in sm with non-zero HealthCheckNodePort. -func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 { - // TODO: If this will appear to be computationally expensive, consider - // computing this incrementally similarly to svcPortMap. - ports := make(map[types.NamespacedName]uint16) - for svcPortName, info := range sm { - if info.HealthCheckNodePort() != 0 { - ports[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort()) - } - } - return ports -} - -// ServicePortMap maps a service to its ServicePort. -type ServicePortMap map[ServicePortName]ServicePort - -// serviceToServiceMap translates a single Service object to a ServicePortMap. -// -// NOTE: service object should NOT be modified. -func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) ServicePortMap { - if service == nil { - return nil - } - - if proxyutil.ShouldSkipService(service) { - return nil - } - - clusterIP := proxyutil.GetClusterIPByFamily(sct.ipFamily, service) - if clusterIP == "" { - return nil - } - - svcPortMap := make(ServicePortMap) - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol} - baseSvcInfo := newBaseServiceInfo(service, sct.ipFamily, servicePort) - if sct.makeServiceInfo != nil { - svcPortMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo) - } else { - svcPortMap[svcPortName] = baseSvcInfo - } - } - return svcPortMap -} - -// Update updates ServicePortMap base on the given changes, returns information about the -// diff since the last Update, triggers processServiceMapChange on every change, and -// clears the changes map. -func (sm ServicePortMap) Update(sct *ServiceChangeTracker) UpdateServiceMapResult { - sct.lock.Lock() - defer sct.lock.Unlock() - - result := UpdateServiceMapResult{ - UpdatedServices: sets.New[types.NamespacedName](), - DeletedUDPClusterIPs: sets.New[string](), - } - - for nn, change := range sct.items { - if sct.processServiceMapChange != nil { - sct.processServiceMapChange(change.previous, change.current) - } - result.UpdatedServices.Insert(nn) - - sm.merge(change.current) - // filter out the Update event of current changes from previous changes - // before calling unmerge() so that can skip deleting the Update events. - change.previous.filter(change.current) - sm.unmerge(change.previous, result.DeletedUDPClusterIPs) - } - // clear changes after applying them to ServicePortMap. - sct.items = make(map[types.NamespacedName]*serviceChange) - metrics.ServiceChangesPending.Set(0) - - return result -} - -// merge adds other ServicePortMap's elements to current ServicePortMap. -// If collision, other ALWAYS win. Otherwise add the other to current. -// In other words, if some elements in current collisions with other, update the current by other. -func (sm *ServicePortMap) merge(other ServicePortMap) { - for svcPortName, info := range other { - _, exists := (*sm)[svcPortName] - if !exists { - klog.V(4).InfoS("Adding new service port", "portName", svcPortName, "servicePort", info) - } else { - klog.V(4).InfoS("Updating existing service port", "portName", svcPortName, "servicePort", info) - } - (*sm)[svcPortName] = info - } -} - -// filter filters out elements from ServicePortMap base on given ports string sets. -func (sm *ServicePortMap) filter(other ServicePortMap) { - for svcPortName := range *sm { - // skip the delete for Update event. - if _, ok := other[svcPortName]; ok { - delete(*sm, svcPortName) - } - } -} - -// unmerge deletes all other ServicePortMap's elements from current ServicePortMap and -// updates deletedUDPClusterIPs with all of the newly-deleted UDP cluster IPs. -func (sm *ServicePortMap) unmerge(other ServicePortMap, deletedUDPClusterIPs sets.Set[string]) { - for svcPortName := range other { - info, exists := (*sm)[svcPortName] - if exists { - klog.V(4).InfoS("Removing service port", "portName", svcPortName) - if info.Protocol() == v1.ProtocolUDP { - deletedUDPClusterIPs.Insert(info.ClusterIP().String()) - } - delete(*sm, svcPortName) - } else { - klog.ErrorS(nil, "Service port does not exists", "portName", svcPortName) - } - } -} diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index f1c89d353f5..1cc0c732650 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -18,11 +18,9 @@ package proxy import ( "fmt" - "net" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/proxy/config" ) @@ -59,75 +57,6 @@ func fmtPortName(in string) string { return fmt.Sprintf(":%s", in) } -// ServicePort is an interface which abstracts information about a service. -type ServicePort interface { - // String returns service string. An example format can be: `IP:Port/Protocol`. - String() string - // ClusterIP returns service cluster IP in net.IP format. - ClusterIP() net.IP - // Port returns service port if present. If return 0 means not present. - Port() int - // SessionAffinityType returns service session affinity type - SessionAffinityType() v1.ServiceAffinity - // StickyMaxAgeSeconds returns service max connection age - StickyMaxAgeSeconds() int - // ExternalIPStrings returns service ExternalIPs as a string array. - ExternalIPStrings() []string - // LoadBalancerVIPStrings returns service LoadBalancerIPs which are VIP mode as a string array. - LoadBalancerVIPStrings() []string - // Protocol returns service protocol. - Protocol() v1.Protocol - // LoadBalancerSourceRanges returns service LoadBalancerSourceRanges if present empty array if not - LoadBalancerSourceRanges() []string - // HealthCheckNodePort returns service health check node port if present. If return 0, it means not present. - HealthCheckNodePort() int - // NodePort returns a service Node port if present. If return 0, it means not present. - NodePort() int - // ExternalPolicyLocal returns if a service has only node local endpoints for external traffic. - ExternalPolicyLocal() bool - // InternalPolicyLocal returns if a service has only node local endpoints for internal traffic. - InternalPolicyLocal() bool - // HintsAnnotation returns the value of the v1.DeprecatedAnnotationTopologyAwareHints annotation. - HintsAnnotation() string - // ExternallyAccessible returns true if the service port is reachable via something - // other than ClusterIP (NodePort/ExternalIP/LoadBalancer) - ExternallyAccessible() bool - // UsesClusterEndpoints returns true if the service port ever sends traffic to - // endpoints based on "Cluster" traffic policy - UsesClusterEndpoints() bool - // UsesLocalEndpoints returns true if the service port ever sends traffic to - // endpoints based on "Local" traffic policy - UsesLocalEndpoints() bool -} - -// Endpoint in an interface which abstracts information about an endpoint. -// TODO: Rename functions to be consistent with ServicePort. -type Endpoint interface { - // String returns endpoint string. An example format can be: `IP:Port`. - // We take the returned value as ServiceEndpoint.Endpoint. - String() string - // IP returns IP part of the endpoint. - IP() string - // Port returns the Port part of the endpoint. - Port() int - - // IsLocal returns true if the endpoint is running on the same host as kube-proxy. - IsLocal() bool - // IsReady returns true if an endpoint is ready and not terminating, or - // if PublishNotReadyAddresses is set on the service. - IsReady() bool - // IsServing returns true if an endpoint is ready. It does not account - // for terminating state. - IsServing() bool - // IsTerminating returns true if an endpoint is terminating. For pods, - // that is any pod with a deletion timestamp. - IsTerminating() bool - - // ZoneHints returns the zone hint for the endpoint. This is based on - // endpoint.hints.forZones[0].name in the EndpointSlice API. - ZoneHints() sets.Set[string] -} - // ServiceEndpoint is used to identify a service and one of its endpoint pair. type ServiceEndpoint struct { Endpoint string From 764cb0457f5cbd1d2cee8476c59fc1daaadcb6ce Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 20 Nov 2023 09:41:55 -0500 Subject: [PATCH 5/8] Move some code around in servicechangetracker.go/endpointschangetracker.go Put the ServiceChangeTracker and EndpointsChangeTracker definitions at the top of the files, and put the ServicePortMap and EndpointsMap definitions before their methods. (No code changes.) --- pkg/proxy/endpointschangetracker.go | 12 +++++----- pkg/proxy/endpointslicecache.go | 8 +++---- pkg/proxy/servicechangetracker.go | 34 ++++++++++++++--------------- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/pkg/proxy/endpointschangetracker.go b/pkg/proxy/endpointschangetracker.go index f75a65bfc2e..3347e55709a 100644 --- a/pkg/proxy/endpointschangetracker.go +++ b/pkg/proxy/endpointschangetracker.go @@ -34,12 +34,6 @@ var supportedEndpointSliceAddressTypes = sets.New[string]( string(discovery.AddressTypeIPv6), ) -type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) Endpoint - -// This handler is invoked by the apply function on every change. This function should not modify the -// EndpointsMap's but just use the changes for any Proxier specific cleanup. -type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap) - // EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of // Endpoints, keyed by their namespace and name. type EndpointsChangeTracker struct { @@ -59,6 +53,12 @@ type EndpointsChangeTracker struct { trackerStartTime time.Time } +type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) Endpoint + +// This handler is invoked by the apply function on every change. This function should not modify the +// EndpointsMap's but just use the changes for any Proxier specific cleanup. +type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap) + // NewEndpointsChangeTracker initializes an EndpointsChangeTracker func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker { return &EndpointsChangeTracker{ diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index 125d625cad2..49254cdf7ba 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -88,10 +88,6 @@ type endpointInfo struct { Terminating bool } -// spToEndpointMap stores groups Endpoint objects by ServicePortName and -// endpoint string (returned by Endpoint.String()). -type spToEndpointMap map[ServicePortName]map[string]Endpoint - // NewEndpointSliceCache initializes an EndpointSliceCache. func NewEndpointSliceCache(hostname string, ipFamily v1.IPFamily, recorder events.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache { if makeEndpointInfo == nil { @@ -222,6 +218,10 @@ func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*end return changes } +// spToEndpointMap stores groups Endpoint objects by ServicePortName and +// endpoint string (returned by Endpoint.String()). +type spToEndpointMap map[ServicePortName]map[string]Endpoint + // getEndpointsMap computes an EndpointsMap for a given set of EndpointSlices. func (cache *EndpointSliceCache) getEndpointsMap(serviceNN types.NamespacedName, sliceInfoByName endpointSliceInfoByName) EndpointsMap { endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN, sliceInfoByName) diff --git a/pkg/proxy/servicechangetracker.go b/pkg/proxy/servicechangetracker.go index 1d72719b9f3..37d22357262 100644 --- a/pkg/proxy/servicechangetracker.go +++ b/pkg/proxy/servicechangetracker.go @@ -29,20 +29,6 @@ import ( proxyutil "k8s.io/kubernetes/pkg/proxy/util" ) -type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort - -// This handler is invoked by the apply function on every change. This function should not modify the -// ServicePortMap's but just use the changes for any Proxier specific cleanup. -type processServiceMapChangeFunc func(previous, current ServicePortMap) - -// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object, -// changes are accumulated, i.e. previous is state from before applying the changes, -// current is state after applying all of the changes. -type serviceChange struct { - previous ServicePortMap - current ServicePortMap -} - // ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of // Services, keyed by their namespace and name. type ServiceChangeTracker struct { @@ -58,6 +44,20 @@ type ServiceChangeTracker struct { recorder events.EventRecorder } +type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort + +// This handler is invoked by the apply function on every change. This function should not modify the +// ServicePortMap's but just use the changes for any Proxier specific cleanup. +type processServiceMapChangeFunc func(previous, current ServicePortMap) + +// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object, +// changes are accumulated, i.e. previous is state from before applying the changes, +// current is state after applying all of the changes. +type serviceChange struct { + previous ServicePortMap + current ServicePortMap +} + // NewServiceChangeTracker initializes a ServiceChangeTracker func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker { return &ServiceChangeTracker{ @@ -112,6 +112,9 @@ func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool { return len(sct.items) > 0 } +// ServicePortMap maps a service to its ServicePort. +type ServicePortMap map[ServicePortName]ServicePort + // UpdateServiceMapResult is the updated results after applying service changes. type UpdateServiceMapResult struct { // UpdatedServices lists the names of all services added/updated/deleted since the @@ -138,9 +141,6 @@ func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 return ports } -// ServicePortMap maps a service to its ServicePort. -type ServicePortMap map[ServicePortName]ServicePort - // serviceToServiceMap translates a single Service object to a ServicePortMap. // // NOTE: service object should NOT be modified. From a8a12be3d30ac6a2d1a73750308b94d061dcc920 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 20 Nov 2023 10:53:41 -0500 Subject: [PATCH 6/8] Rename cache's endpointSliceInfo/endpointInfo to endpointSliceData/endpointData EndpointSliceCache was using the name "endpointInfo" to refer to two different data types (most egregiously in addEndpoints(), which had a variable named `endpoint` of type `*endpointInfo` and a variable named `endpointInfo` of type `Endpoint`). Continue using "endpointInfo" in places that refer to proxy.Endpoint / BaseEndpointInfo, since that's consistent with other code, but rename the local "cache of the Endpoints field of an EndpointSlice" type from "endpointInfo" to "endpointData". Likewise, rename endpointSliceInfo to endpointSliceData, for consistency. --- pkg/proxy/endpointschangetracker_test.go | 2 +- pkg/proxy/endpointslicecache.go | 90 ++++++++++++------------ pkg/proxy/endpointslicecache_test.go | 8 +-- 3 files changed, 50 insertions(+), 50 deletions(-) diff --git a/pkg/proxy/endpointschangetracker_test.go b/pkg/proxy/endpointschangetracker_test.go index a6cdcbbda5b..dd3ddb0fd68 100644 --- a/pkg/proxy/endpointschangetracker_test.go +++ b/pkg/proxy/endpointschangetracker_test.go @@ -1745,6 +1745,6 @@ func initializeCache(endpointSliceCache *EndpointSliceCache, endpointSlices []*d for _, tracker := range endpointSliceCache.trackerByServiceMap { tracker.applied = tracker.pending - tracker.pending = endpointSliceInfoByName{} + tracker.pending = endpointSliceDataByName{} } } diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index 49254cdf7ba..fa98836284c 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -58,26 +58,26 @@ type EndpointSliceCache struct { // by a proxier along with any pending EndpointSlices that have been updated // in this cache but not yet applied by a proxier. type endpointSliceTracker struct { - applied endpointSliceInfoByName - pending endpointSliceInfoByName + applied endpointSliceDataByName + pending endpointSliceDataByName } -// endpointSliceInfoByName groups endpointSliceInfo by the names of the +// endpointSliceDataByName groups endpointSliceData by the names of the // corresponding EndpointSlices. -type endpointSliceInfoByName map[string]*endpointSliceInfo +type endpointSliceDataByName map[string]*endpointSliceData -// endpointSliceInfo contains just the attributes kube-proxy cares about. +// endpointSliceData contains just the attributes kube-proxy cares about. // Used for caching. Intentionally small to limit memory util. -type endpointSliceInfo struct { +type endpointSliceData struct { Ports []discovery.EndpointPort - Endpoints []*endpointInfo + Endpoints []*endpointData Remove bool } -// endpointInfo contains just the attributes kube-proxy cares about. +// endpointData contains just the attributes kube-proxy cares about. // Used for caching. Intentionally small to limit memory util. // Addresses, NodeName, and Zone are copied from EndpointSlice Endpoints. -type endpointInfo struct { +type endpointData struct { Addresses []string NodeName *string Zone *string @@ -105,26 +105,26 @@ func NewEndpointSliceCache(hostname string, ipFamily v1.IPFamily, recorder event // newEndpointSliceTracker initializes an endpointSliceTracker. func newEndpointSliceTracker() *endpointSliceTracker { return &endpointSliceTracker{ - applied: endpointSliceInfoByName{}, - pending: endpointSliceInfoByName{}, + applied: endpointSliceDataByName{}, + pending: endpointSliceDataByName{}, } } -// newEndpointSliceInfo generates endpointSliceInfo from an EndpointSlice. -func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceInfo { - esInfo := &endpointSliceInfo{ +// newEndpointSliceData generates endpointSliceData from an EndpointSlice. +func newEndpointSliceData(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceData { + esData := &endpointSliceData{ Ports: make([]discovery.EndpointPort, len(endpointSlice.Ports)), - Endpoints: []*endpointInfo{}, + Endpoints: []*endpointData{}, Remove: remove, } // copy here to avoid mutating shared EndpointSlice object. - copy(esInfo.Ports, endpointSlice.Ports) - sort.Sort(byPort(esInfo.Ports)) + copy(esData.Ports, endpointSlice.Ports) + sort.Sort(byPort(esData.Ports)) if !remove { for _, endpoint := range endpointSlice.Endpoints { - epInfo := &endpointInfo{ + epData := &endpointData{ Addresses: endpoint.Addresses, Zone: endpoint.Zone, NodeName: endpoint.NodeName, @@ -137,20 +137,20 @@ func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) * if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 { - epInfo.ZoneHints = sets.New[string]() + epData.ZoneHints = sets.New[string]() for _, zone := range endpoint.Hints.ForZones { - epInfo.ZoneHints.Insert(zone.Name) + epData.ZoneHints.Insert(zone.Name) } } } - esInfo.Endpoints = append(esInfo.Endpoints, epInfo) + esData.Endpoints = append(esData.Endpoints, epData) } - sort.Sort(byAddress(esInfo.Endpoints)) + sort.Sort(byAddress(esData.Endpoints)) } - return esInfo + return esData } // standardEndpointInfo is the default makeEndpointFunc. @@ -166,7 +166,7 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint return false } - esInfo := newEndpointSliceInfo(endpointSlice, remove) + esData := newEndpointSliceData(endpointSlice, remove) cache.lock.Lock() defer cache.lock.Unlock() @@ -175,10 +175,10 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint cache.trackerByServiceMap[serviceKey] = newEndpointSliceTracker() } - changed := cache.esInfoChanged(serviceKey, sliceKey, esInfo) + changed := cache.esDataChanged(serviceKey, sliceKey, esData) if changed { - cache.trackerByServiceMap[serviceKey].pending[sliceKey] = esInfo + cache.trackerByServiceMap[serviceKey].pending[sliceKey] = esData } return changed @@ -201,11 +201,11 @@ func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*end change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied) - for name, sliceInfo := range esTracker.pending { - if sliceInfo.Remove { + for name, sliceData := range esTracker.pending { + if sliceData.Remove { delete(esTracker.applied, name) } else { - esTracker.applied[name] = sliceInfo + esTracker.applied[name] = sliceData } delete(esTracker.pending, name) @@ -223,17 +223,17 @@ func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*end type spToEndpointMap map[ServicePortName]map[string]Endpoint // getEndpointsMap computes an EndpointsMap for a given set of EndpointSlices. -func (cache *EndpointSliceCache) getEndpointsMap(serviceNN types.NamespacedName, sliceInfoByName endpointSliceInfoByName) EndpointsMap { - endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN, sliceInfoByName) +func (cache *EndpointSliceCache) getEndpointsMap(serviceNN types.NamespacedName, sliceDataByName endpointSliceDataByName) EndpointsMap { + endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN, sliceDataByName) return endpointsMapFromEndpointInfo(endpointInfoBySP) } // endpointInfoByServicePort groups endpoint info by service port name and address. -func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName, sliceInfoByName endpointSliceInfoByName) spToEndpointMap { +func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName, sliceDataByName endpointSliceDataByName) spToEndpointMap { endpointInfoBySP := spToEndpointMap{} - for _, sliceInfo := range sliceInfoByName { - for _, port := range sliceInfo.Ports { + for _, sliceData := range sliceDataByName { + for _, port := range sliceData.Ports { if port.Name == nil { klog.ErrorS(nil, "Ignoring port with nil name", "portName", port.Name) continue @@ -250,7 +250,7 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names Protocol: *port.Protocol, } - endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints) + endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceData.Endpoints) } } @@ -258,7 +258,7 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names } // addEndpoints adds endpointInfo for each unique endpoint. -func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointInfo) map[string]Endpoint { +func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointData) map[string]Endpoint { if endpointSet == nil { endpointSet = map[string]Endpoint{} } @@ -299,29 +299,29 @@ func (cache *EndpointSliceCache) isLocal(hostname string) bool { return len(cache.hostname) > 0 && hostname == cache.hostname } -// esInfoChanged returns true if the esInfo parameter should be set as a new +// esDataChanged returns true if the esData parameter should be set as a new // pending value in the cache. -func (cache *EndpointSliceCache) esInfoChanged(serviceKey types.NamespacedName, sliceKey string, esInfo *endpointSliceInfo) bool { +func (cache *EndpointSliceCache) esDataChanged(serviceKey types.NamespacedName, sliceKey string, esData *endpointSliceData) bool { if _, ok := cache.trackerByServiceMap[serviceKey]; ok { - appliedInfo, appliedOk := cache.trackerByServiceMap[serviceKey].applied[sliceKey] - pendingInfo, pendingOk := cache.trackerByServiceMap[serviceKey].pending[sliceKey] + appliedData, appliedOk := cache.trackerByServiceMap[serviceKey].applied[sliceKey] + pendingData, pendingOk := cache.trackerByServiceMap[serviceKey].pending[sliceKey] // If there's already a pending value, return whether or not this would // change that. if pendingOk { - return !reflect.DeepEqual(esInfo, pendingInfo) + return !reflect.DeepEqual(esData, pendingData) } // If there's already an applied value, return whether or not this would // change that. if appliedOk { - return !reflect.DeepEqual(esInfo, appliedInfo) + return !reflect.DeepEqual(esData, appliedData) } } // If this is marked for removal and does not exist in the cache, no changes // are necessary. - if esInfo.Remove { + if esData.Remove { return false } @@ -373,8 +373,8 @@ func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.Names return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err } -// byAddress helps sort endpointInfo -type byAddress []*endpointInfo +// byAddress helps sort endpointData +type byAddress []*endpointData func (e byAddress) Len() int { return len(e) diff --git a/pkg/proxy/endpointslicecache_test.go b/pkg/proxy/endpointslicecache_test.go index c93ddd6474c..8c36811a8e2 100644 --- a/pkg/proxy/endpointslicecache_test.go +++ b/pkg/proxy/endpointslicecache_test.go @@ -328,7 +328,7 @@ func TestEndpointInfoByServicePort(t *testing.T) { } } -func TestEsInfoChanged(t *testing.T) { +func TestEsDataChanged(t *testing.T) { p80 := int32(80) p443 := int32(443) port80 := discovery.EndpointPort{Port: &p80, Name: ptr.To("http"), Protocol: ptr.To(v1.ProtocolTCP)} @@ -454,11 +454,11 @@ func TestEsInfoChanged(t *testing.T) { t.Fatalf("Expected no error calling endpointSliceCacheKeys(): %v", err) } - esInfo := newEndpointSliceInfo(tc.updatedSlice, false) - changed := tc.cache.esInfoChanged(serviceKey, sliceKey, esInfo) + esData := newEndpointSliceData(tc.updatedSlice, false) + changed := tc.cache.esDataChanged(serviceKey, sliceKey, esData) if tc.expectChanged != changed { - t.Errorf("Expected esInfoChanged() to return %t, got %t", tc.expectChanged, changed) + t.Errorf("Expected esDataChanged() to return %t, got %t", tc.expectChanged, changed) } cmc.Check(t) From 147114e64852319f2408a89afc2e68ed0fac6c25 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Sun, 26 Nov 2023 16:25:21 -0500 Subject: [PATCH 7/8] Update some change tracker doc comments In particular, fix the description of ServiceChangeTracker.Update's return value, and point out that it's different from EndpointsChangeTracker.EndpointSliceUpdate's (though fortunately, in a way that doesn't matter for any existing code). --- pkg/proxy/endpointschangetracker.go | 27 ++++++++++++++----------- pkg/proxy/servicechangetracker.go | 31 +++++++++++++---------------- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/pkg/proxy/endpointschangetracker.go b/pkg/proxy/endpointschangetracker.go index 3347e55709a..8b532759754 100644 --- a/pkg/proxy/endpointschangetracker.go +++ b/pkg/proxy/endpointschangetracker.go @@ -40,23 +40,25 @@ type EndpointsChangeTracker struct { // lock protects lastChangeTriggerTimes lock sync.Mutex + // processEndpointsMapChange is invoked by the apply function on every change. + // This function should not modify the EndpointsMaps, but just use the changes for + // any Proxier-specific cleanup. processEndpointsMapChange processEndpointsMapChangeFunc + // endpointSliceCache holds a simplified version of endpoint slices. endpointSliceCache *EndpointSliceCache - // Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints - // object to change. Used to calculate the network-programming-latency. + + // lastChangeTriggerTimes maps from the Service's NamespacedName to the times of + // the triggers that caused its EndpointSlice objects to change. Used to calculate + // the network-programming-latency metric. lastChangeTriggerTimes map[types.NamespacedName][]time.Time - // record the time when the endpointsChangeTracker was created so we can ignore the endpoints - // that were generated before, because we can't estimate the network-programming-latency on those. - // This is specially problematic on restarts, because we process all the endpoints that may have been - // created hours or days before. + // trackerStartTime is the time when the EndpointsChangeTracker was created, so + // we can avoid generating network-programming-latency metrics for changes that + // occurred before that. trackerStartTime time.Time } type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) Endpoint - -// This handler is invoked by the apply function on every change. This function should not modify the -// EndpointsMap's but just use the changes for any Proxier specific cleanup. type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap) // NewEndpointsChangeTracker initializes an EndpointsChangeTracker @@ -69,9 +71,10 @@ func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFun } } -// EndpointSliceUpdate updates given service's endpoints change map based on the endpoints pair. -// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeTracker. -// If removeSlice is true, slice will be removed, otherwise it will be added or updated. +// EndpointSliceUpdate updates the EndpointsChangeTracker by adding/updating or removing +// endpointSlice (depending on removeSlice). It returns true if this update contained a +// change that needs to be synced; note that this is different from the return value of +// ServiceChangeTracker.Update(). func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool { if !supportedEndpointSliceAddressTypes.Has(string(endpointSlice.AddressType)) { klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType) diff --git a/pkg/proxy/servicechangetracker.go b/pkg/proxy/servicechangetracker.go index 37d22357262..fb5fd4de406 100644 --- a/pkg/proxy/servicechangetracker.go +++ b/pkg/proxy/servicechangetracker.go @@ -36,18 +36,20 @@ type ServiceChangeTracker struct { lock sync.Mutex // items maps a service to its serviceChange. items map[types.NamespacedName]*serviceChange - // makeServiceInfo allows proxier to inject customized information when processing service. - makeServiceInfo makeServicePortFunc - processServiceMapChange processServiceMapChangeFunc - ipFamily v1.IPFamily + // makeServiceInfo allows the proxier to inject customized information when + // processing services. + makeServiceInfo makeServicePortFunc + // processServiceMapChange is invoked by the apply function on every change. This + // function should not modify the ServicePortMaps, but just use the changes for + // any Proxier-specific cleanup. + processServiceMapChange processServiceMapChangeFunc + + ipFamily v1.IPFamily recorder events.EventRecorder } type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort - -// This handler is invoked by the apply function on every change. This function should not modify the -// ServicePortMap's but just use the changes for any Proxier specific cleanup. type processServiceMapChangeFunc func(previous, current ServicePortMap) // serviceChange contains all changes to services that happened since proxy rules were synced. For a single object, @@ -69,16 +71,11 @@ func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IP } } -// Update updates given service's change map based on the service pair. It returns true if items changed, -// otherwise return false. Update can be used to add/update/delete items of ServiceChangeMap. For example, -// Add item -// - pass as the pair. -// -// Update item -// - pass as the pair. -// -// Delete item -// - pass as the pair. +// Update updates the ServiceChangeTracker based on the service pair +// (where either previous or current, but not both, can be nil). It returns true if sct +// contains changes that need to be synced (whether or not those changes were caused by +// this update); note that this is different from the return value of +// EndpointChangeTracker.EndpointSliceUpdate(). func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool { // This is unexpected, we should return false directly. if previous == nil && current == nil { From 1c089afcf37f4c7026eafdfa83189f7306558efe Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 22 Nov 2023 10:10:06 -0500 Subject: [PATCH 8/8] Fix a set type --- pkg/proxy/endpointschangetracker.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/proxy/endpointschangetracker.go b/pkg/proxy/endpointschangetracker.go index 8b532759754..418aecd728c 100644 --- a/pkg/proxy/endpointschangetracker.go +++ b/pkg/proxy/endpointschangetracker.go @@ -29,9 +29,9 @@ import ( "k8s.io/kubernetes/pkg/proxy/metrics" ) -var supportedEndpointSliceAddressTypes = sets.New[string]( - string(discovery.AddressTypeIPv4), - string(discovery.AddressTypeIPv6), +var supportedEndpointSliceAddressTypes = sets.New[discovery.AddressType]( + discovery.AddressTypeIPv4, + discovery.AddressTypeIPv6, ) // EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of @@ -76,7 +76,7 @@ func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFun // change that needs to be synced; note that this is different from the return value of // ServiceChangeTracker.Update(). func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool { - if !supportedEndpointSliceAddressTypes.Has(string(endpointSlice.AddressType)) { + if !supportedEndpointSliceAddressTypes.Has(endpointSlice.AddressType) { klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType) return false }