diff --git a/pkg/proxy/endpoint.go b/pkg/proxy/endpoint.go new file mode 100644 index 00000000000..474edf36a91 --- /dev/null +++ b/pkg/proxy/endpoint.go @@ -0,0 +1,138 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "net" + "strconv" + + "k8s.io/apimachinery/pkg/util/sets" +) + +// Endpoint in an interface which abstracts information about an endpoint. +type Endpoint interface { + // String returns endpoint string. An example format can be: `IP:Port`. + // We take the returned value as ServiceEndpoint.Endpoint. + String() string + // IP returns IP part of the endpoint. + IP() string + // Port returns the Port part of the endpoint. + Port() int + + // IsLocal returns true if the endpoint is running on the same host as kube-proxy. + IsLocal() bool + // IsReady returns true if an endpoint is ready and not terminating, or + // if PublishNotReadyAddresses is set on the service. + IsReady() bool + // IsServing returns true if an endpoint is ready. It does not account + // for terminating state. + IsServing() bool + // IsTerminating returns true if an endpoint is terminating. For pods, + // that is any pod with a deletion timestamp. + IsTerminating() bool + + // ZoneHints returns the zone hint for the endpoint. This is based on + // endpoint.hints.forZones[0].name in the EndpointSlice API. + ZoneHints() sets.Set[string] +} + +// BaseEndpointInfo contains base information that defines an endpoint. +// This could be used directly by proxier while processing endpoints, +// or can be used for constructing a more specific EndpointInfo struct +// defined by the proxier if needed. +type BaseEndpointInfo struct { + // Cache this values to improve performance + ip string + port int + // endpoint is the same as net.JoinHostPort(ip,port) + endpoint string + + // isLocal indicates whether the endpoint is running on same host as kube-proxy. + isLocal bool + + // ready indicates whether this endpoint is ready and NOT terminating, unless + // PublishNotReadyAddresses is set on the service, in which case it will just + // always be true. + ready bool + // serving indicates whether this endpoint is ready regardless of its terminating state. + // For pods this is true if it has a ready status regardless of its deletion timestamp. + serving bool + // terminating indicates whether this endpoint is terminating. + // For pods this is true if it has a non-nil deletion timestamp. + terminating bool + + // zoneHints represent the zone hints for the endpoint. This is based on + // endpoint.hints.forZones[*].name in the EndpointSlice API. + zoneHints sets.Set[string] +} + +var _ Endpoint = &BaseEndpointInfo{} + +// String is part of proxy.Endpoint interface. +func (info *BaseEndpointInfo) String() string { + return info.endpoint +} + +// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface. +func (info *BaseEndpointInfo) IP() string { + return info.ip +} + +// Port returns just the Port part of the endpoint. +func (info *BaseEndpointInfo) Port() int { + return info.port +} + +// IsLocal is part of proxy.Endpoint interface. +func (info *BaseEndpointInfo) IsLocal() bool { + return info.isLocal +} + +// IsReady returns true if an endpoint is ready and not terminating. +func (info *BaseEndpointInfo) IsReady() bool { + return info.ready +} + +// IsServing returns true if an endpoint is ready, regardless of if the +// endpoint is terminating. +func (info *BaseEndpointInfo) IsServing() bool { + return info.serving +} + +// IsTerminating retruns true if an endpoint is terminating. For pods, +// that is any pod with a deletion timestamp. +func (info *BaseEndpointInfo) IsTerminating() bool { + return info.terminating +} + +// ZoneHints returns the zone hint for the endpoint. +func (info *BaseEndpointInfo) ZoneHints() sets.Set[string] { + return info.zoneHints +} + +func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminating bool, zoneHints sets.Set[string]) *BaseEndpointInfo { + return &BaseEndpointInfo{ + ip: ip, + port: port, + endpoint: net.JoinHostPort(ip, strconv.Itoa(port)), + isLocal: isLocal, + ready: ready, + serving: serving, + terminating: terminating, + zoneHints: zoneHints, + } +} diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpointschangetracker.go similarity index 82% rename from pkg/proxy/endpoints.go rename to pkg/proxy/endpointschangetracker.go index 56348643473..f75a65bfc2e 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpointschangetracker.go @@ -17,18 +17,15 @@ limitations under the License. package proxy import ( - "net" - "strconv" "sync" "time" - "k8s.io/client-go/tools/events" - "k8s.io/klog/v2" - v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/events" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy/metrics" ) @@ -37,93 +34,6 @@ var supportedEndpointSliceAddressTypes = sets.New[string]( string(discovery.AddressTypeIPv6), ) -// BaseEndpointInfo contains base information that defines an endpoint. -// This could be used directly by proxier while processing endpoints, -// or can be used for constructing a more specific EndpointInfo struct -// defined by the proxier if needed. -type BaseEndpointInfo struct { - // Cache this values to improve performance - ip string - port int - // endpoint is the same as net.JoinHostPort(ip,port) - endpoint string - - // isLocal indicates whether the endpoint is running on same host as kube-proxy. - isLocal bool - - // ready indicates whether this endpoint is ready and NOT terminating, unless - // PublishNotReadyAddresses is set on the service, in which case it will just - // always be true. - ready bool - // serving indicates whether this endpoint is ready regardless of its terminating state. - // For pods this is true if it has a ready status regardless of its deletion timestamp. - serving bool - // terminating indicates whether this endpoint is terminating. - // For pods this is true if it has a non-nil deletion timestamp. - terminating bool - - // zoneHints represent the zone hints for the endpoint. This is based on - // endpoint.hints.forZones[*].name in the EndpointSlice API. - zoneHints sets.Set[string] -} - -var _ Endpoint = &BaseEndpointInfo{} - -// String is part of proxy.Endpoint interface. -func (info *BaseEndpointInfo) String() string { - return info.endpoint -} - -// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface. -func (info *BaseEndpointInfo) IP() string { - return info.ip -} - -// Port returns just the Port part of the endpoint. -func (info *BaseEndpointInfo) Port() int { - return info.port -} - -// IsLocal is part of proxy.Endpoint interface. -func (info *BaseEndpointInfo) IsLocal() bool { - return info.isLocal -} - -// IsReady returns true if an endpoint is ready and not terminating. -func (info *BaseEndpointInfo) IsReady() bool { - return info.ready -} - -// IsServing returns true if an endpoint is ready, regardless of if the -// endpoint is terminating. -func (info *BaseEndpointInfo) IsServing() bool { - return info.serving -} - -// IsTerminating retruns true if an endpoint is terminating. For pods, -// that is any pod with a deletion timestamp. -func (info *BaseEndpointInfo) IsTerminating() bool { - return info.terminating -} - -// ZoneHints returns the zone hint for the endpoint. -func (info *BaseEndpointInfo) ZoneHints() sets.Set[string] { - return info.zoneHints -} - -func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminating bool, zoneHints sets.Set[string]) *BaseEndpointInfo { - return &BaseEndpointInfo{ - ip: ip, - port: port, - endpoint: net.JoinHostPort(ip, strconv.Itoa(port)), - isLocal: isLocal, - ready: ready, - serving: serving, - terminating: terminating, - zoneHints: zoneHints, - } -} - type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) Endpoint // This handler is invoked by the apply function on every change. This function should not modify the diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpointschangetracker_test.go similarity index 100% rename from pkg/proxy/endpoints_test.go rename to pkg/proxy/endpointschangetracker_test.go diff --git a/pkg/proxy/servicechangetracker.go b/pkg/proxy/servicechangetracker.go new file mode 100644 index 00000000000..1d72719b9f3 --- /dev/null +++ b/pkg/proxy/servicechangetracker.go @@ -0,0 +1,247 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "reflect" + "sync" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/events" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/proxy/metrics" + proxyutil "k8s.io/kubernetes/pkg/proxy/util" +) + +type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort + +// This handler is invoked by the apply function on every change. This function should not modify the +// ServicePortMap's but just use the changes for any Proxier specific cleanup. +type processServiceMapChangeFunc func(previous, current ServicePortMap) + +// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object, +// changes are accumulated, i.e. previous is state from before applying the changes, +// current is state after applying all of the changes. +type serviceChange struct { + previous ServicePortMap + current ServicePortMap +} + +// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of +// Services, keyed by their namespace and name. +type ServiceChangeTracker struct { + // lock protects items. + lock sync.Mutex + // items maps a service to its serviceChange. + items map[types.NamespacedName]*serviceChange + // makeServiceInfo allows proxier to inject customized information when processing service. + makeServiceInfo makeServicePortFunc + processServiceMapChange processServiceMapChangeFunc + ipFamily v1.IPFamily + + recorder events.EventRecorder +} + +// NewServiceChangeTracker initializes a ServiceChangeTracker +func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker { + return &ServiceChangeTracker{ + items: make(map[types.NamespacedName]*serviceChange), + makeServiceInfo: makeServiceInfo, + recorder: recorder, + ipFamily: ipFamily, + processServiceMapChange: processServiceMapChange, + } +} + +// Update updates given service's change map based on the service pair. It returns true if items changed, +// otherwise return false. Update can be used to add/update/delete items of ServiceChangeMap. For example, +// Add item +// - pass as the pair. +// +// Update item +// - pass as the pair. +// +// Delete item +// - pass as the pair. +func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool { + // This is unexpected, we should return false directly. + if previous == nil && current == nil { + return false + } + + svc := current + if svc == nil { + svc = previous + } + metrics.ServiceChangesTotal.Inc() + namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name} + + sct.lock.Lock() + defer sct.lock.Unlock() + + change, exists := sct.items[namespacedName] + if !exists { + change = &serviceChange{} + change.previous = sct.serviceToServiceMap(previous) + sct.items[namespacedName] = change + } + change.current = sct.serviceToServiceMap(current) + // if change.previous equal to change.current, it means no change + if reflect.DeepEqual(change.previous, change.current) { + delete(sct.items, namespacedName) + } else { + klog.V(4).InfoS("Service updated ports", "service", klog.KObj(svc), "portCount", len(change.current)) + } + metrics.ServiceChangesPending.Set(float64(len(sct.items))) + return len(sct.items) > 0 +} + +// UpdateServiceMapResult is the updated results after applying service changes. +type UpdateServiceMapResult struct { + // UpdatedServices lists the names of all services added/updated/deleted since the + // last Update. + UpdatedServices sets.Set[types.NamespacedName] + + // DeletedUDPClusterIPs holds stale (no longer assigned to a Service) Service IPs + // that had UDP ports. Callers can use this to abort timeout-waits or clear + // connection-tracking information. + DeletedUDPClusterIPs sets.Set[string] +} + +// HealthCheckNodePorts returns a map of Service names to HealthCheckNodePort values +// for all Services in sm with non-zero HealthCheckNodePort. +func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 { + // TODO: If this will appear to be computationally expensive, consider + // computing this incrementally similarly to svcPortMap. + ports := make(map[types.NamespacedName]uint16) + for svcPortName, info := range sm { + if info.HealthCheckNodePort() != 0 { + ports[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort()) + } + } + return ports +} + +// ServicePortMap maps a service to its ServicePort. +type ServicePortMap map[ServicePortName]ServicePort + +// serviceToServiceMap translates a single Service object to a ServicePortMap. +// +// NOTE: service object should NOT be modified. +func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) ServicePortMap { + if service == nil { + return nil + } + + if proxyutil.ShouldSkipService(service) { + return nil + } + + clusterIP := proxyutil.GetClusterIPByFamily(sct.ipFamily, service) + if clusterIP == "" { + return nil + } + + svcPortMap := make(ServicePortMap) + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol} + baseSvcInfo := newBaseServiceInfo(service, sct.ipFamily, servicePort) + if sct.makeServiceInfo != nil { + svcPortMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo) + } else { + svcPortMap[svcPortName] = baseSvcInfo + } + } + return svcPortMap +} + +// Update updates ServicePortMap base on the given changes, returns information about the +// diff since the last Update, triggers processServiceMapChange on every change, and +// clears the changes map. +func (sm ServicePortMap) Update(sct *ServiceChangeTracker) UpdateServiceMapResult { + sct.lock.Lock() + defer sct.lock.Unlock() + + result := UpdateServiceMapResult{ + UpdatedServices: sets.New[types.NamespacedName](), + DeletedUDPClusterIPs: sets.New[string](), + } + + for nn, change := range sct.items { + if sct.processServiceMapChange != nil { + sct.processServiceMapChange(change.previous, change.current) + } + result.UpdatedServices.Insert(nn) + + sm.merge(change.current) + // filter out the Update event of current changes from previous changes + // before calling unmerge() so that can skip deleting the Update events. + change.previous.filter(change.current) + sm.unmerge(change.previous, result.DeletedUDPClusterIPs) + } + // clear changes after applying them to ServicePortMap. + sct.items = make(map[types.NamespacedName]*serviceChange) + metrics.ServiceChangesPending.Set(0) + + return result +} + +// merge adds other ServicePortMap's elements to current ServicePortMap. +// If collision, other ALWAYS win. Otherwise add the other to current. +// In other words, if some elements in current collisions with other, update the current by other. +func (sm *ServicePortMap) merge(other ServicePortMap) { + for svcPortName, info := range other { + _, exists := (*sm)[svcPortName] + if !exists { + klog.V(4).InfoS("Adding new service port", "portName", svcPortName, "servicePort", info) + } else { + klog.V(4).InfoS("Updating existing service port", "portName", svcPortName, "servicePort", info) + } + (*sm)[svcPortName] = info + } +} + +// filter filters out elements from ServicePortMap base on given ports string sets. +func (sm *ServicePortMap) filter(other ServicePortMap) { + for svcPortName := range *sm { + // skip the delete for Update event. + if _, ok := other[svcPortName]; ok { + delete(*sm, svcPortName) + } + } +} + +// unmerge deletes all other ServicePortMap's elements from current ServicePortMap and +// updates deletedUDPClusterIPs with all of the newly-deleted UDP cluster IPs. +func (sm *ServicePortMap) unmerge(other ServicePortMap, deletedUDPClusterIPs sets.Set[string]) { + for svcPortName := range other { + info, exists := (*sm)[svcPortName] + if exists { + klog.V(4).InfoS("Removing service port", "portName", svcPortName) + if info.Protocol() == v1.ProtocolUDP { + deletedUDPClusterIPs.Insert(info.ClusterIP().String()) + } + delete(*sm, svcPortName) + } else { + klog.ErrorS(nil, "Service port does not exists", "portName", svcPortName) + } + } +} diff --git a/pkg/proxy/service_test.go b/pkg/proxy/servicechangetracker_test.go similarity index 100% rename from pkg/proxy/service_test.go rename to pkg/proxy/servicechangetracker_test.go diff --git a/pkg/proxy/service.go b/pkg/proxy/serviceport.go similarity index 52% rename from pkg/proxy/service.go rename to pkg/proxy/serviceport.go index a28ec8fcb3b..5ed0b5d79b3 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/serviceport.go @@ -19,22 +19,56 @@ package proxy import ( "fmt" "net" - "reflect" "strings" - "sync" - - "k8s.io/client-go/tools/events" - "k8s.io/klog/v2" - netutils "k8s.io/utils/net" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" apiservice "k8s.io/kubernetes/pkg/api/v1/service" - "k8s.io/kubernetes/pkg/proxy/metrics" proxyutil "k8s.io/kubernetes/pkg/proxy/util" + netutils "k8s.io/utils/net" ) +// ServicePort is an interface which abstracts information about a service. +type ServicePort interface { + // String returns service string. An example format can be: `IP:Port/Protocol`. + String() string + // ClusterIP returns service cluster IP in net.IP format. + ClusterIP() net.IP + // Port returns service port if present. If return 0 means not present. + Port() int + // SessionAffinityType returns service session affinity type + SessionAffinityType() v1.ServiceAffinity + // StickyMaxAgeSeconds returns service max connection age + StickyMaxAgeSeconds() int + // ExternalIPStrings returns service ExternalIPs as a string array. + ExternalIPStrings() []string + // LoadBalancerVIPStrings returns service LoadBalancerIPs which are VIP mode as a string array. + LoadBalancerVIPStrings() []string + // Protocol returns service protocol. + Protocol() v1.Protocol + // LoadBalancerSourceRanges returns service LoadBalancerSourceRanges if present empty array if not + LoadBalancerSourceRanges() []string + // HealthCheckNodePort returns service health check node port if present. If return 0, it means not present. + HealthCheckNodePort() int + // NodePort returns a service Node port if present. If return 0, it means not present. + NodePort() int + // ExternalPolicyLocal returns if a service has only node local endpoints for external traffic. + ExternalPolicyLocal() bool + // InternalPolicyLocal returns if a service has only node local endpoints for internal traffic. + InternalPolicyLocal() bool + // HintsAnnotation returns the value of the v1.DeprecatedAnnotationTopologyAwareHints annotation. + HintsAnnotation() string + // ExternallyAccessible returns true if the service port is reachable via something + // other than ClusterIP (NodePort/ExternalIP/LoadBalancer) + ExternallyAccessible() bool + // UsesClusterEndpoints returns true if the service port ever sends traffic to + // endpoints based on "Cluster" traffic policy + UsesClusterEndpoints() bool + // UsesLocalEndpoints returns true if the service port ever sends traffic to + // endpoints based on "Local" traffic policy + UsesLocalEndpoints() bool +} + // BaseServicePortInfo contains base information that defines a service. // This could be used directly by proxier while processing services, // or can be used for constructing a more specific ServiceInfo struct @@ -240,220 +274,3 @@ func newBaseServiceInfo(service *v1.Service, ipFamily v1.IPFamily, port *v1.Serv return info } - -type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort - -// This handler is invoked by the apply function on every change. This function should not modify the -// ServicePortMap's but just use the changes for any Proxier specific cleanup. -type processServiceMapChangeFunc func(previous, current ServicePortMap) - -// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object, -// changes are accumulated, i.e. previous is state from before applying the changes, -// current is state after applying all of the changes. -type serviceChange struct { - previous ServicePortMap - current ServicePortMap -} - -// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of -// Services, keyed by their namespace and name. -type ServiceChangeTracker struct { - // lock protects items. - lock sync.Mutex - // items maps a service to its serviceChange. - items map[types.NamespacedName]*serviceChange - // makeServiceInfo allows proxier to inject customized information when processing service. - makeServiceInfo makeServicePortFunc - processServiceMapChange processServiceMapChangeFunc - ipFamily v1.IPFamily - - recorder events.EventRecorder -} - -// NewServiceChangeTracker initializes a ServiceChangeTracker -func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker { - return &ServiceChangeTracker{ - items: make(map[types.NamespacedName]*serviceChange), - makeServiceInfo: makeServiceInfo, - recorder: recorder, - ipFamily: ipFamily, - processServiceMapChange: processServiceMapChange, - } -} - -// Update updates given service's change map based on the service pair. It returns true if items changed, -// otherwise return false. Update can be used to add/update/delete items of ServiceChangeMap. For example, -// Add item -// - pass as the pair. -// -// Update item -// - pass as the pair. -// -// Delete item -// - pass as the pair. -func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool { - // This is unexpected, we should return false directly. - if previous == nil && current == nil { - return false - } - - svc := current - if svc == nil { - svc = previous - } - metrics.ServiceChangesTotal.Inc() - namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name} - - sct.lock.Lock() - defer sct.lock.Unlock() - - change, exists := sct.items[namespacedName] - if !exists { - change = &serviceChange{} - change.previous = sct.serviceToServiceMap(previous) - sct.items[namespacedName] = change - } - change.current = sct.serviceToServiceMap(current) - // if change.previous equal to change.current, it means no change - if reflect.DeepEqual(change.previous, change.current) { - delete(sct.items, namespacedName) - } else { - klog.V(4).InfoS("Service updated ports", "service", klog.KObj(svc), "portCount", len(change.current)) - } - metrics.ServiceChangesPending.Set(float64(len(sct.items))) - return len(sct.items) > 0 -} - -// UpdateServiceMapResult is the updated results after applying service changes. -type UpdateServiceMapResult struct { - // UpdatedServices lists the names of all services added/updated/deleted since the - // last Update. - UpdatedServices sets.Set[types.NamespacedName] - - // DeletedUDPClusterIPs holds stale (no longer assigned to a Service) Service IPs - // that had UDP ports. Callers can use this to abort timeout-waits or clear - // connection-tracking information. - DeletedUDPClusterIPs sets.Set[string] -} - -// HealthCheckNodePorts returns a map of Service names to HealthCheckNodePort values -// for all Services in sm with non-zero HealthCheckNodePort. -func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 { - // TODO: If this will appear to be computationally expensive, consider - // computing this incrementally similarly to svcPortMap. - ports := make(map[types.NamespacedName]uint16) - for svcPortName, info := range sm { - if info.HealthCheckNodePort() != 0 { - ports[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort()) - } - } - return ports -} - -// ServicePortMap maps a service to its ServicePort. -type ServicePortMap map[ServicePortName]ServicePort - -// serviceToServiceMap translates a single Service object to a ServicePortMap. -// -// NOTE: service object should NOT be modified. -func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) ServicePortMap { - if service == nil { - return nil - } - - if proxyutil.ShouldSkipService(service) { - return nil - } - - clusterIP := proxyutil.GetClusterIPByFamily(sct.ipFamily, service) - if clusterIP == "" { - return nil - } - - svcPortMap := make(ServicePortMap) - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] - svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol} - baseSvcInfo := newBaseServiceInfo(service, sct.ipFamily, servicePort) - if sct.makeServiceInfo != nil { - svcPortMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo) - } else { - svcPortMap[svcPortName] = baseSvcInfo - } - } - return svcPortMap -} - -// Update updates ServicePortMap base on the given changes, returns information about the -// diff since the last Update, triggers processServiceMapChange on every change, and -// clears the changes map. -func (sm ServicePortMap) Update(sct *ServiceChangeTracker) UpdateServiceMapResult { - sct.lock.Lock() - defer sct.lock.Unlock() - - result := UpdateServiceMapResult{ - UpdatedServices: sets.New[types.NamespacedName](), - DeletedUDPClusterIPs: sets.New[string](), - } - - for nn, change := range sct.items { - if sct.processServiceMapChange != nil { - sct.processServiceMapChange(change.previous, change.current) - } - result.UpdatedServices.Insert(nn) - - sm.merge(change.current) - // filter out the Update event of current changes from previous changes - // before calling unmerge() so that can skip deleting the Update events. - change.previous.filter(change.current) - sm.unmerge(change.previous, result.DeletedUDPClusterIPs) - } - // clear changes after applying them to ServicePortMap. - sct.items = make(map[types.NamespacedName]*serviceChange) - metrics.ServiceChangesPending.Set(0) - - return result -} - -// merge adds other ServicePortMap's elements to current ServicePortMap. -// If collision, other ALWAYS win. Otherwise add the other to current. -// In other words, if some elements in current collisions with other, update the current by other. -func (sm *ServicePortMap) merge(other ServicePortMap) { - for svcPortName, info := range other { - _, exists := (*sm)[svcPortName] - if !exists { - klog.V(4).InfoS("Adding new service port", "portName", svcPortName, "servicePort", info) - } else { - klog.V(4).InfoS("Updating existing service port", "portName", svcPortName, "servicePort", info) - } - (*sm)[svcPortName] = info - } -} - -// filter filters out elements from ServicePortMap base on given ports string sets. -func (sm *ServicePortMap) filter(other ServicePortMap) { - for svcPortName := range *sm { - // skip the delete for Update event. - if _, ok := other[svcPortName]; ok { - delete(*sm, svcPortName) - } - } -} - -// unmerge deletes all other ServicePortMap's elements from current ServicePortMap and -// updates deletedUDPClusterIPs with all of the newly-deleted UDP cluster IPs. -func (sm *ServicePortMap) unmerge(other ServicePortMap, deletedUDPClusterIPs sets.Set[string]) { - for svcPortName := range other { - info, exists := (*sm)[svcPortName] - if exists { - klog.V(4).InfoS("Removing service port", "portName", svcPortName) - if info.Protocol() == v1.ProtocolUDP { - deletedUDPClusterIPs.Insert(info.ClusterIP().String()) - } - delete(*sm, svcPortName) - } else { - klog.ErrorS(nil, "Service port does not exists", "portName", svcPortName) - } - } -} diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index f1c89d353f5..1cc0c732650 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -18,11 +18,9 @@ package proxy import ( "fmt" - "net" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/proxy/config" ) @@ -59,75 +57,6 @@ func fmtPortName(in string) string { return fmt.Sprintf(":%s", in) } -// ServicePort is an interface which abstracts information about a service. -type ServicePort interface { - // String returns service string. An example format can be: `IP:Port/Protocol`. - String() string - // ClusterIP returns service cluster IP in net.IP format. - ClusterIP() net.IP - // Port returns service port if present. If return 0 means not present. - Port() int - // SessionAffinityType returns service session affinity type - SessionAffinityType() v1.ServiceAffinity - // StickyMaxAgeSeconds returns service max connection age - StickyMaxAgeSeconds() int - // ExternalIPStrings returns service ExternalIPs as a string array. - ExternalIPStrings() []string - // LoadBalancerVIPStrings returns service LoadBalancerIPs which are VIP mode as a string array. - LoadBalancerVIPStrings() []string - // Protocol returns service protocol. - Protocol() v1.Protocol - // LoadBalancerSourceRanges returns service LoadBalancerSourceRanges if present empty array if not - LoadBalancerSourceRanges() []string - // HealthCheckNodePort returns service health check node port if present. If return 0, it means not present. - HealthCheckNodePort() int - // NodePort returns a service Node port if present. If return 0, it means not present. - NodePort() int - // ExternalPolicyLocal returns if a service has only node local endpoints for external traffic. - ExternalPolicyLocal() bool - // InternalPolicyLocal returns if a service has only node local endpoints for internal traffic. - InternalPolicyLocal() bool - // HintsAnnotation returns the value of the v1.DeprecatedAnnotationTopologyAwareHints annotation. - HintsAnnotation() string - // ExternallyAccessible returns true if the service port is reachable via something - // other than ClusterIP (NodePort/ExternalIP/LoadBalancer) - ExternallyAccessible() bool - // UsesClusterEndpoints returns true if the service port ever sends traffic to - // endpoints based on "Cluster" traffic policy - UsesClusterEndpoints() bool - // UsesLocalEndpoints returns true if the service port ever sends traffic to - // endpoints based on "Local" traffic policy - UsesLocalEndpoints() bool -} - -// Endpoint in an interface which abstracts information about an endpoint. -// TODO: Rename functions to be consistent with ServicePort. -type Endpoint interface { - // String returns endpoint string. An example format can be: `IP:Port`. - // We take the returned value as ServiceEndpoint.Endpoint. - String() string - // IP returns IP part of the endpoint. - IP() string - // Port returns the Port part of the endpoint. - Port() int - - // IsLocal returns true if the endpoint is running on the same host as kube-proxy. - IsLocal() bool - // IsReady returns true if an endpoint is ready and not terminating, or - // if PublishNotReadyAddresses is set on the service. - IsReady() bool - // IsServing returns true if an endpoint is ready. It does not account - // for terminating state. - IsServing() bool - // IsTerminating returns true if an endpoint is terminating. For pods, - // that is any pod with a deletion timestamp. - IsTerminating() bool - - // ZoneHints returns the zone hint for the endpoint. This is based on - // endpoint.hints.forZones[0].name in the EndpointSlice API. - ZoneHints() sets.Set[string] -} - // ServiceEndpoint is used to identify a service and one of its endpoint pair. type ServiceEndpoint struct { Endpoint string