Merge pull request #122050 from danwinship/proxy-change-tracker-cleanup-1

proxy service/endpoints change tracker cleanups
This commit is contained in:
Kubernetes Prow Robot 2023-12-22 14:32:28 +01:00 committed by GitHub
commit 784c7cec63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 736 additions and 727 deletions

138
pkg/proxy/endpoint.go Normal file
View File

@ -0,0 +1,138 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"net"
"strconv"
"k8s.io/apimachinery/pkg/util/sets"
)
// Endpoint in an interface which abstracts information about an endpoint.
type Endpoint interface {
// String returns endpoint string. An example format can be: `IP:Port`.
// We take the returned value as ServiceEndpoint.Endpoint.
String() string
// IP returns IP part of the endpoint.
IP() string
// Port returns the Port part of the endpoint.
Port() int
// IsLocal returns true if the endpoint is running on the same host as kube-proxy.
IsLocal() bool
// IsReady returns true if an endpoint is ready and not terminating, or
// if PublishNotReadyAddresses is set on the service.
IsReady() bool
// IsServing returns true if an endpoint is ready. It does not account
// for terminating state.
IsServing() bool
// IsTerminating returns true if an endpoint is terminating. For pods,
// that is any pod with a deletion timestamp.
IsTerminating() bool
// ZoneHints returns the zone hint for the endpoint. This is based on
// endpoint.hints.forZones[0].name in the EndpointSlice API.
ZoneHints() sets.Set[string]
}
// BaseEndpointInfo contains base information that defines an endpoint.
// This could be used directly by proxier while processing endpoints,
// or can be used for constructing a more specific EndpointInfo struct
// defined by the proxier if needed.
type BaseEndpointInfo struct {
// Cache this values to improve performance
ip string
port int
// endpoint is the same as net.JoinHostPort(ip,port)
endpoint string
// isLocal indicates whether the endpoint is running on same host as kube-proxy.
isLocal bool
// ready indicates whether this endpoint is ready and NOT terminating, unless
// PublishNotReadyAddresses is set on the service, in which case it will just
// always be true.
ready bool
// serving indicates whether this endpoint is ready regardless of its terminating state.
// For pods this is true if it has a ready status regardless of its deletion timestamp.
serving bool
// terminating indicates whether this endpoint is terminating.
// For pods this is true if it has a non-nil deletion timestamp.
terminating bool
// zoneHints represent the zone hints for the endpoint. This is based on
// endpoint.hints.forZones[*].name in the EndpointSlice API.
zoneHints sets.Set[string]
}
var _ Endpoint = &BaseEndpointInfo{}
// String is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) String() string {
return info.endpoint
}
// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) IP() string {
return info.ip
}
// Port returns just the Port part of the endpoint.
func (info *BaseEndpointInfo) Port() int {
return info.port
}
// IsLocal is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) IsLocal() bool {
return info.isLocal
}
// IsReady returns true if an endpoint is ready and not terminating.
func (info *BaseEndpointInfo) IsReady() bool {
return info.ready
}
// IsServing returns true if an endpoint is ready, regardless of if the
// endpoint is terminating.
func (info *BaseEndpointInfo) IsServing() bool {
return info.serving
}
// IsTerminating retruns true if an endpoint is terminating. For pods,
// that is any pod with a deletion timestamp.
func (info *BaseEndpointInfo) IsTerminating() bool {
return info.terminating
}
// ZoneHints returns the zone hint for the endpoint.
func (info *BaseEndpointInfo) ZoneHints() sets.Set[string] {
return info.zoneHints
}
func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminating bool, zoneHints sets.Set[string]) *BaseEndpointInfo {
return &BaseEndpointInfo{
ip: ip,
port: port,
endpoint: net.JoinHostPort(ip, strconv.Itoa(port)),
isLocal: isLocal,
ready: ready,
serving: serving,
terminating: terminating,
zoneHints: zoneHints,
}
}

View File

@ -17,138 +17,50 @@ limitations under the License.
package proxy
import (
"net"
"strconv"
"sync"
"time"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy/metrics"
)
var supportedEndpointSliceAddressTypes = sets.New[string](
string(discovery.AddressTypeIPv4),
string(discovery.AddressTypeIPv6),
var supportedEndpointSliceAddressTypes = sets.New[discovery.AddressType](
discovery.AddressTypeIPv4,
discovery.AddressTypeIPv6,
)
// BaseEndpointInfo contains base information that defines an endpoint.
// This could be used directly by proxier while processing endpoints,
// or can be used for constructing a more specific EndpointInfo struct
// defined by the proxier if needed.
type BaseEndpointInfo struct {
// Cache this values to improve performance
ip string
port int
// endpoint is the same as net.JoinHostPort(ip,port)
endpoint string
// isLocal indicates whether the endpoint is running on same host as kube-proxy.
isLocal bool
// ready indicates whether this endpoint is ready and NOT terminating, unless
// PublishNotReadyAddresses is set on the service, in which case it will just
// always be true.
ready bool
// serving indicates whether this endpoint is ready regardless of its terminating state.
// For pods this is true if it has a ready status regardless of its deletion timestamp.
serving bool
// terminating indicates whether this endpoint is terminating.
// For pods this is true if it has a non-nil deletion timestamp.
terminating bool
// zoneHints represent the zone hints for the endpoint. This is based on
// endpoint.hints.forZones[*].name in the EndpointSlice API.
zoneHints sets.Set[string]
}
var _ Endpoint = &BaseEndpointInfo{}
// String is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) String() string {
return info.endpoint
}
// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) IP() string {
return info.ip
}
// Port returns just the Port part of the endpoint.
func (info *BaseEndpointInfo) Port() int {
return info.port
}
// IsLocal is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) IsLocal() bool {
return info.isLocal
}
// IsReady returns true if an endpoint is ready and not terminating.
func (info *BaseEndpointInfo) IsReady() bool {
return info.ready
}
// IsServing returns true if an endpoint is ready, regardless of if the
// endpoint is terminating.
func (info *BaseEndpointInfo) IsServing() bool {
return info.serving
}
// IsTerminating retruns true if an endpoint is terminating. For pods,
// that is any pod with a deletion timestamp.
func (info *BaseEndpointInfo) IsTerminating() bool {
return info.terminating
}
// ZoneHints returns the zone hint for the endpoint.
func (info *BaseEndpointInfo) ZoneHints() sets.Set[string] {
return info.zoneHints
}
func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminating bool, zoneHints sets.Set[string]) *BaseEndpointInfo {
return &BaseEndpointInfo{
ip: ip,
port: port,
endpoint: net.JoinHostPort(ip, strconv.Itoa(port)),
isLocal: isLocal,
ready: ready,
serving: serving,
terminating: terminating,
zoneHints: zoneHints,
}
}
type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) Endpoint
// This handler is invoked by the apply function on every change. This function should not modify the
// EndpointsMap's but just use the changes for any Proxier specific cleanup.
type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap)
// EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of
// Endpoints, keyed by their namespace and name.
type EndpointsChangeTracker struct {
// lock protects lastChangeTriggerTimes
lock sync.Mutex
// processEndpointsMapChange is invoked by the apply function on every change.
// This function should not modify the EndpointsMaps, but just use the changes for
// any Proxier-specific cleanup.
processEndpointsMapChange processEndpointsMapChangeFunc
// endpointSliceCache holds a simplified version of endpoint slices.
endpointSliceCache *EndpointSliceCache
// Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints
// object to change. Used to calculate the network-programming-latency.
// lastChangeTriggerTimes maps from the Service's NamespacedName to the times of
// the triggers that caused its EndpointSlice objects to change. Used to calculate
// the network-programming-latency metric.
lastChangeTriggerTimes map[types.NamespacedName][]time.Time
// record the time when the endpointsChangeTracker was created so we can ignore the endpoints
// that were generated before, because we can't estimate the network-programming-latency on those.
// This is specially problematic on restarts, because we process all the endpoints that may have been
// created hours or days before.
// trackerStartTime is the time when the EndpointsChangeTracker was created, so
// we can avoid generating network-programming-latency metrics for changes that
// occurred before that.
trackerStartTime time.Time
}
type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) Endpoint
type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap)
// NewEndpointsChangeTracker initializes an EndpointsChangeTracker
func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker {
return &EndpointsChangeTracker{
@ -159,11 +71,12 @@ func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFun
}
}
// EndpointSliceUpdate updates given service's endpoints change map based on the <previous, current> endpoints pair.
// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeTracker.
// If removeSlice is true, slice will be removed, otherwise it will be added or updated.
// EndpointSliceUpdate updates the EndpointsChangeTracker by adding/updating or removing
// endpointSlice (depending on removeSlice). It returns true if this update contained a
// change that needs to be synced; note that this is different from the return value of
// ServiceChangeTracker.Update().
func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
if !supportedEndpointSliceAddressTypes.Has(string(endpointSlice.AddressType)) {
if !supportedEndpointSliceAddressTypes.Has(endpointSlice.AddressType) {
klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType)
return false
}

View File

@ -1745,6 +1745,6 @@ func initializeCache(endpointSliceCache *EndpointSliceCache, endpointSlices []*d
for _, tracker := range endpointSliceCache.trackerByServiceMap {
tracker.applied = tracker.pending
tracker.pending = endpointSliceInfoByName{}
tracker.pending = endpointSliceDataByName{}
}
}

View File

@ -58,26 +58,26 @@ type EndpointSliceCache struct {
// by a proxier along with any pending EndpointSlices that have been updated
// in this cache but not yet applied by a proxier.
type endpointSliceTracker struct {
applied endpointSliceInfoByName
pending endpointSliceInfoByName
applied endpointSliceDataByName
pending endpointSliceDataByName
}
// endpointSliceInfoByName groups endpointSliceInfo by the names of the
// endpointSliceDataByName groups endpointSliceData by the names of the
// corresponding EndpointSlices.
type endpointSliceInfoByName map[string]*endpointSliceInfo
type endpointSliceDataByName map[string]*endpointSliceData
// endpointSliceInfo contains just the attributes kube-proxy cares about.
// endpointSliceData contains just the attributes kube-proxy cares about.
// Used for caching. Intentionally small to limit memory util.
type endpointSliceInfo struct {
type endpointSliceData struct {
Ports []discovery.EndpointPort
Endpoints []*endpointInfo
Endpoints []*endpointData
Remove bool
}
// endpointInfo contains just the attributes kube-proxy cares about.
// endpointData contains just the attributes kube-proxy cares about.
// Used for caching. Intentionally small to limit memory util.
// Addresses, NodeName, and Zone are copied from EndpointSlice Endpoints.
type endpointInfo struct {
type endpointData struct {
Addresses []string
NodeName *string
Zone *string
@ -88,10 +88,6 @@ type endpointInfo struct {
Terminating bool
}
// spToEndpointMap stores groups Endpoint objects by ServicePortName and
// endpoint string (returned by Endpoint.String()).
type spToEndpointMap map[ServicePortName]map[string]Endpoint
// NewEndpointSliceCache initializes an EndpointSliceCache.
func NewEndpointSliceCache(hostname string, ipFamily v1.IPFamily, recorder events.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache {
if makeEndpointInfo == nil {
@ -109,26 +105,26 @@ func NewEndpointSliceCache(hostname string, ipFamily v1.IPFamily, recorder event
// newEndpointSliceTracker initializes an endpointSliceTracker.
func newEndpointSliceTracker() *endpointSliceTracker {
return &endpointSliceTracker{
applied: endpointSliceInfoByName{},
pending: endpointSliceInfoByName{},
applied: endpointSliceDataByName{},
pending: endpointSliceDataByName{},
}
}
// newEndpointSliceInfo generates endpointSliceInfo from an EndpointSlice.
func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceInfo {
esInfo := &endpointSliceInfo{
// newEndpointSliceData generates endpointSliceData from an EndpointSlice.
func newEndpointSliceData(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceData {
esData := &endpointSliceData{
Ports: make([]discovery.EndpointPort, len(endpointSlice.Ports)),
Endpoints: []*endpointInfo{},
Endpoints: []*endpointData{},
Remove: remove,
}
// copy here to avoid mutating shared EndpointSlice object.
copy(esInfo.Ports, endpointSlice.Ports)
sort.Sort(byPort(esInfo.Ports))
copy(esData.Ports, endpointSlice.Ports)
sort.Sort(byPort(esData.Ports))
if !remove {
for _, endpoint := range endpointSlice.Endpoints {
epInfo := &endpointInfo{
epData := &endpointData{
Addresses: endpoint.Addresses,
Zone: endpoint.Zone,
NodeName: endpoint.NodeName,
@ -141,20 +137,20 @@ func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 {
epInfo.ZoneHints = sets.New[string]()
epData.ZoneHints = sets.New[string]()
for _, zone := range endpoint.Hints.ForZones {
epInfo.ZoneHints.Insert(zone.Name)
epData.ZoneHints.Insert(zone.Name)
}
}
}
esInfo.Endpoints = append(esInfo.Endpoints, epInfo)
esData.Endpoints = append(esData.Endpoints, epData)
}
sort.Sort(byAddress(esInfo.Endpoints))
sort.Sort(byAddress(esData.Endpoints))
}
return esInfo
return esData
}
// standardEndpointInfo is the default makeEndpointFunc.
@ -170,7 +166,7 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint
return false
}
esInfo := newEndpointSliceInfo(endpointSlice, remove)
esData := newEndpointSliceData(endpointSlice, remove)
cache.lock.Lock()
defer cache.lock.Unlock()
@ -179,10 +175,10 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint
cache.trackerByServiceMap[serviceKey] = newEndpointSliceTracker()
}
changed := cache.esInfoChanged(serviceKey, sliceKey, esInfo)
changed := cache.esDataChanged(serviceKey, sliceKey, esData)
if changed {
cache.trackerByServiceMap[serviceKey].pending[sliceKey] = esInfo
cache.trackerByServiceMap[serviceKey].pending[sliceKey] = esData
}
return changed
@ -205,11 +201,11 @@ func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*end
change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied)
for name, sliceInfo := range esTracker.pending {
if sliceInfo.Remove {
for name, sliceData := range esTracker.pending {
if sliceData.Remove {
delete(esTracker.applied, name)
} else {
esTracker.applied[name] = sliceInfo
esTracker.applied[name] = sliceData
}
delete(esTracker.pending, name)
@ -222,18 +218,22 @@ func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*end
return changes
}
// spToEndpointMap stores groups Endpoint objects by ServicePortName and
// endpoint string (returned by Endpoint.String()).
type spToEndpointMap map[ServicePortName]map[string]Endpoint
// getEndpointsMap computes an EndpointsMap for a given set of EndpointSlices.
func (cache *EndpointSliceCache) getEndpointsMap(serviceNN types.NamespacedName, sliceInfoByName endpointSliceInfoByName) EndpointsMap {
endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN, sliceInfoByName)
func (cache *EndpointSliceCache) getEndpointsMap(serviceNN types.NamespacedName, sliceDataByName endpointSliceDataByName) EndpointsMap {
endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN, sliceDataByName)
return endpointsMapFromEndpointInfo(endpointInfoBySP)
}
// endpointInfoByServicePort groups endpoint info by service port name and address.
func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName, sliceInfoByName endpointSliceInfoByName) spToEndpointMap {
func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName, sliceDataByName endpointSliceDataByName) spToEndpointMap {
endpointInfoBySP := spToEndpointMap{}
for _, sliceInfo := range sliceInfoByName {
for _, port := range sliceInfo.Ports {
for _, sliceData := range sliceDataByName {
for _, port := range sliceData.Ports {
if port.Name == nil {
klog.ErrorS(nil, "Ignoring port with nil name", "portName", port.Name)
continue
@ -250,7 +250,7 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names
Protocol: *port.Protocol,
}
endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints)
endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceData.Endpoints)
}
}
@ -258,7 +258,7 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names
}
// addEndpoints adds endpointInfo for each unique endpoint.
func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointInfo) map[string]Endpoint {
func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointData) map[string]Endpoint {
if endpointSet == nil {
endpointSet = map[string]Endpoint{}
}
@ -299,29 +299,29 @@ func (cache *EndpointSliceCache) isLocal(hostname string) bool {
return len(cache.hostname) > 0 && hostname == cache.hostname
}
// esInfoChanged returns true if the esInfo parameter should be set as a new
// esDataChanged returns true if the esData parameter should be set as a new
// pending value in the cache.
func (cache *EndpointSliceCache) esInfoChanged(serviceKey types.NamespacedName, sliceKey string, esInfo *endpointSliceInfo) bool {
func (cache *EndpointSliceCache) esDataChanged(serviceKey types.NamespacedName, sliceKey string, esData *endpointSliceData) bool {
if _, ok := cache.trackerByServiceMap[serviceKey]; ok {
appliedInfo, appliedOk := cache.trackerByServiceMap[serviceKey].applied[sliceKey]
pendingInfo, pendingOk := cache.trackerByServiceMap[serviceKey].pending[sliceKey]
appliedData, appliedOk := cache.trackerByServiceMap[serviceKey].applied[sliceKey]
pendingData, pendingOk := cache.trackerByServiceMap[serviceKey].pending[sliceKey]
// If there's already a pending value, return whether or not this would
// change that.
if pendingOk {
return !reflect.DeepEqual(esInfo, pendingInfo)
return !reflect.DeepEqual(esData, pendingData)
}
// If there's already an applied value, return whether or not this would
// change that.
if appliedOk {
return !reflect.DeepEqual(esInfo, appliedInfo)
return !reflect.DeepEqual(esData, appliedData)
}
}
// If this is marked for removal and does not exist in the cache, no changes
// are necessary.
if esInfo.Remove {
if esData.Remove {
return false
}
@ -373,8 +373,8 @@ func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.Names
return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err
}
// byAddress helps sort endpointInfo
type byAddress []*endpointInfo
// byAddress helps sort endpointData
type byAddress []*endpointData
func (e byAddress) Len() int {
return len(e)

View File

@ -328,7 +328,7 @@ func TestEndpointInfoByServicePort(t *testing.T) {
}
}
func TestEsInfoChanged(t *testing.T) {
func TestEsDataChanged(t *testing.T) {
p80 := int32(80)
p443 := int32(443)
port80 := discovery.EndpointPort{Port: &p80, Name: ptr.To("http"), Protocol: ptr.To(v1.ProtocolTCP)}
@ -454,11 +454,11 @@ func TestEsInfoChanged(t *testing.T) {
t.Fatalf("Expected no error calling endpointSliceCacheKeys(): %v", err)
}
esInfo := newEndpointSliceInfo(tc.updatedSlice, false)
changed := tc.cache.esInfoChanged(serviceKey, sliceKey, esInfo)
esData := newEndpointSliceData(tc.updatedSlice, false)
changed := tc.cache.esDataChanged(serviceKey, sliceKey, esData)
if tc.expectChanged != changed {
t.Errorf("Expected esInfoChanged() to return %t, got %t", tc.expectChanged, changed)
t.Errorf("Expected esDataChanged() to return %t, got %t", tc.expectChanged, changed)
}
cmc.Check(t)

View File

@ -1,489 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"fmt"
"net"
"reflect"
"strings"
"sync"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
apiservice "k8s.io/kubernetes/pkg/api/v1/service"
"k8s.io/kubernetes/pkg/proxy/metrics"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
)
// BaseServicePortInfo contains base information that defines a service.
// This could be used directly by proxier while processing services,
// or can be used for constructing a more specific ServiceInfo struct
// defined by the proxier if needed.
type BaseServicePortInfo struct {
clusterIP net.IP
port int
protocol v1.Protocol
nodePort int
loadBalancerVIPs []string
sessionAffinityType v1.ServiceAffinity
stickyMaxAgeSeconds int
externalIPs []string
loadBalancerSourceRanges []string
healthCheckNodePort int
externalPolicyLocal bool
internalPolicyLocal bool
internalTrafficPolicy *v1.ServiceInternalTrafficPolicy
hintsAnnotation string
}
var _ ServicePort = &BaseServicePortInfo{}
// String is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) String() string {
return fmt.Sprintf("%s:%d/%s", bsvcPortInfo.clusterIP, bsvcPortInfo.port, bsvcPortInfo.protocol)
}
// ClusterIP is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) ClusterIP() net.IP {
return bsvcPortInfo.clusterIP
}
// Port is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) Port() int {
return bsvcPortInfo.port
}
// SessionAffinityType is part of the ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) SessionAffinityType() v1.ServiceAffinity {
return bsvcPortInfo.sessionAffinityType
}
// StickyMaxAgeSeconds is part of the ServicePort interface
func (bsvcPortInfo *BaseServicePortInfo) StickyMaxAgeSeconds() int {
return bsvcPortInfo.stickyMaxAgeSeconds
}
// Protocol is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) Protocol() v1.Protocol {
return bsvcPortInfo.protocol
}
// LoadBalancerSourceRanges is part of ServicePort interface
func (bsvcPortInfo *BaseServicePortInfo) LoadBalancerSourceRanges() []string {
return bsvcPortInfo.loadBalancerSourceRanges
}
// HealthCheckNodePort is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) HealthCheckNodePort() int {
return bsvcPortInfo.healthCheckNodePort
}
// NodePort is part of the ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) NodePort() int {
return bsvcPortInfo.nodePort
}
// ExternalIPStrings is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) ExternalIPStrings() []string {
return bsvcPortInfo.externalIPs
}
// LoadBalancerVIPStrings is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) LoadBalancerVIPStrings() []string {
return bsvcPortInfo.loadBalancerVIPs
}
// ExternalPolicyLocal is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) ExternalPolicyLocal() bool {
return bsvcPortInfo.externalPolicyLocal
}
// InternalPolicyLocal is part of ServicePort interface
func (bsvcPortInfo *BaseServicePortInfo) InternalPolicyLocal() bool {
return bsvcPortInfo.internalPolicyLocal
}
// InternalTrafficPolicy is part of ServicePort interface
func (bsvcPortInfo *BaseServicePortInfo) InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicy {
return bsvcPortInfo.internalTrafficPolicy
}
// HintsAnnotation is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) HintsAnnotation() string {
return bsvcPortInfo.hintsAnnotation
}
// ExternallyAccessible is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) ExternallyAccessible() bool {
return bsvcPortInfo.nodePort != 0 || len(bsvcPortInfo.loadBalancerVIPs) != 0 || len(bsvcPortInfo.externalIPs) != 0
}
// UsesClusterEndpoints is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) UsesClusterEndpoints() bool {
// The service port uses Cluster endpoints if the internal traffic policy is "Cluster",
// or if it accepts external traffic at all. (Even if the external traffic policy is
// "Local", we need Cluster endpoints to implement short circuiting.)
return !bsvcPortInfo.internalPolicyLocal || bsvcPortInfo.ExternallyAccessible()
}
// UsesLocalEndpoints is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) UsesLocalEndpoints() bool {
return bsvcPortInfo.internalPolicyLocal || (bsvcPortInfo.externalPolicyLocal && bsvcPortInfo.ExternallyAccessible())
}
func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServicePortInfo {
externalPolicyLocal := apiservice.ExternalPolicyLocal(service)
internalPolicyLocal := apiservice.InternalPolicyLocal(service)
var stickyMaxAgeSeconds int
if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
// Kube-apiserver side guarantees SessionAffinityConfig won't be nil when session affinity type is ClientIP
stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds)
}
clusterIP := proxyutil.GetClusterIPByFamily(sct.ipFamily, service)
info := &BaseServicePortInfo{
clusterIP: netutils.ParseIPSloppy(clusterIP),
port: int(port.Port),
protocol: port.Protocol,
nodePort: int(port.NodePort),
sessionAffinityType: service.Spec.SessionAffinity,
stickyMaxAgeSeconds: stickyMaxAgeSeconds,
externalPolicyLocal: externalPolicyLocal,
internalPolicyLocal: internalPolicyLocal,
internalTrafficPolicy: service.Spec.InternalTrafficPolicy,
}
// v1.DeprecatedAnnotationTopologyAwareHints has precedence over v1.AnnotationTopologyMode.
var ok bool
info.hintsAnnotation, ok = service.Annotations[v1.DeprecatedAnnotationTopologyAwareHints]
if !ok {
info.hintsAnnotation, _ = service.Annotations[v1.AnnotationTopologyMode]
}
loadBalancerSourceRanges := make([]string, len(service.Spec.LoadBalancerSourceRanges))
for i, sourceRange := range service.Spec.LoadBalancerSourceRanges {
loadBalancerSourceRanges[i] = strings.TrimSpace(sourceRange)
}
// filter external ips, source ranges and ingress ips
// prior to dual stack services, this was considered an error, but with dual stack
// services, this is actually expected. Hence we downgraded from reporting by events
// to just log lines with high verbosity
ipFamilyMap := proxyutil.MapIPsByIPFamily(service.Spec.ExternalIPs)
info.externalIPs = ipFamilyMap[sct.ipFamily]
// Log the IPs not matching the ipFamily
if ips, ok := ipFamilyMap[proxyutil.OtherIPFamily(sct.ipFamily)]; ok && len(ips) > 0 {
klog.V(4).InfoS("Service change tracker ignored the following external IPs for given service as they don't match IP Family",
"ipFamily", sct.ipFamily, "externalIPs", strings.Join(ips, ", "), "service", klog.KObj(service))
}
ipFamilyMap = proxyutil.MapCIDRsByIPFamily(loadBalancerSourceRanges)
info.loadBalancerSourceRanges = ipFamilyMap[sct.ipFamily]
// Log the CIDRs not matching the ipFamily
if cidrs, ok := ipFamilyMap[proxyutil.OtherIPFamily(sct.ipFamily)]; ok && len(cidrs) > 0 {
klog.V(4).InfoS("Service change tracker ignored the following load balancer source ranges for given Service as they don't match IP Family",
"ipFamily", sct.ipFamily, "loadBalancerSourceRanges", strings.Join(cidrs, ", "), "service", klog.KObj(service))
}
// Obtain Load Balancer Ingress
var invalidIPs []string
for _, ing := range service.Status.LoadBalancer.Ingress {
if ing.IP == "" {
continue
}
// proxy mode load balancers do not need to track the IPs in the service cache
// and they can also implement IP family translation, so no need to check if
// the status ingress.IP and the ClusterIP belong to the same family.
if !proxyutil.IsVIPMode(ing) {
klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IP for given Service as it using Proxy mode",
"ipFamily", sct.ipFamily, "loadBalancerIngressIP", ing.IP, "service", klog.KObj(service))
continue
}
// kube-proxy does not implement IP family translation, skip addresses with
// different IP family
if ipFamily := proxyutil.GetIPFamilyFromIP(ing.IP); ipFamily == sct.ipFamily {
info.loadBalancerVIPs = append(info.loadBalancerVIPs, ing.IP)
} else {
invalidIPs = append(invalidIPs, ing.IP)
}
}
if len(invalidIPs) > 0 {
klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IPs for given Service as they don't match the IP Family",
"ipFamily", sct.ipFamily, "loadBalancerIngressIPs", strings.Join(invalidIPs, ", "), "service", klog.KObj(service))
}
if apiservice.NeedsHealthCheck(service) {
p := service.Spec.HealthCheckNodePort
if p == 0 {
klog.ErrorS(nil, "Service has no healthcheck nodeport", "service", klog.KObj(service))
} else {
info.healthCheckNodePort = int(p)
}
}
return info
}
type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort
// This handler is invoked by the apply function on every change. This function should not modify the
// ServicePortMap's but just use the changes for any Proxier specific cleanup.
type processServiceMapChangeFunc func(previous, current ServicePortMap)
// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object,
// changes are accumulated, i.e. previous is state from before applying the changes,
// current is state after applying all of the changes.
type serviceChange struct {
previous ServicePortMap
current ServicePortMap
}
// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of
// Services, keyed by their namespace and name.
type ServiceChangeTracker struct {
// lock protects items.
lock sync.Mutex
// items maps a service to its serviceChange.
items map[types.NamespacedName]*serviceChange
// makeServiceInfo allows proxier to inject customized information when processing service.
makeServiceInfo makeServicePortFunc
processServiceMapChange processServiceMapChangeFunc
ipFamily v1.IPFamily
recorder events.EventRecorder
}
// NewServiceChangeTracker initializes a ServiceChangeTracker
func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker {
return &ServiceChangeTracker{
items: make(map[types.NamespacedName]*serviceChange),
makeServiceInfo: makeServiceInfo,
recorder: recorder,
ipFamily: ipFamily,
processServiceMapChange: processServiceMapChange,
}
}
// Update updates given service's change map based on the <previous, current> service pair. It returns true if items changed,
// otherwise return false. Update can be used to add/update/delete items of ServiceChangeMap. For example,
// Add item
// - pass <nil, service> as the <previous, current> pair.
//
// Update item
// - pass <oldService, service> as the <previous, current> pair.
//
// Delete item
// - pass <service, nil> as the <previous, current> pair.
func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool {
// This is unexpected, we should return false directly.
if previous == nil && current == nil {
return false
}
svc := current
if svc == nil {
svc = previous
}
metrics.ServiceChangesTotal.Inc()
namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}
sct.lock.Lock()
defer sct.lock.Unlock()
change, exists := sct.items[namespacedName]
if !exists {
change = &serviceChange{}
change.previous = sct.serviceToServiceMap(previous)
sct.items[namespacedName] = change
}
change.current = sct.serviceToServiceMap(current)
// if change.previous equal to change.current, it means no change
if reflect.DeepEqual(change.previous, change.current) {
delete(sct.items, namespacedName)
} else {
klog.V(4).InfoS("Service updated ports", "service", klog.KObj(svc), "portCount", len(change.current))
}
metrics.ServiceChangesPending.Set(float64(len(sct.items)))
return len(sct.items) > 0
}
// UpdateServiceMapResult is the updated results after applying service changes.
type UpdateServiceMapResult struct {
// UpdatedServices lists the names of all services added/updated/deleted since the
// last Update.
UpdatedServices sets.Set[types.NamespacedName]
// DeletedUDPClusterIPs holds stale (no longer assigned to a Service) Service IPs
// that had UDP ports. Callers can use this to abort timeout-waits or clear
// connection-tracking information.
DeletedUDPClusterIPs sets.Set[string]
}
// HealthCheckNodePorts returns a map of Service names to HealthCheckNodePort values
// for all Services in sm with non-zero HealthCheckNodePort.
func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 {
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to svcPortMap.
ports := make(map[types.NamespacedName]uint16)
for svcPortName, info := range sm {
if info.HealthCheckNodePort() != 0 {
ports[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort())
}
}
return ports
}
// ServicePortMap maps a service to its ServicePort.
type ServicePortMap map[ServicePortName]ServicePort
// serviceToServiceMap translates a single Service object to a ServicePortMap.
//
// NOTE: service object should NOT be modified.
func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) ServicePortMap {
if service == nil {
return nil
}
if proxyutil.ShouldSkipService(service) {
return nil
}
clusterIP := proxyutil.GetClusterIPByFamily(sct.ipFamily, service)
if clusterIP == "" {
return nil
}
svcPortMap := make(ServicePortMap)
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol}
baseSvcInfo := sct.newBaseServiceInfo(servicePort, service)
if sct.makeServiceInfo != nil {
svcPortMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo)
} else {
svcPortMap[svcPortName] = baseSvcInfo
}
}
return svcPortMap
}
// Update updates ServicePortMap base on the given changes, returns information about the
// diff since the last Update, triggers processServiceMapChange on every change, and
// clears the changes map.
func (sm ServicePortMap) Update(sct *ServiceChangeTracker) UpdateServiceMapResult {
sct.lock.Lock()
defer sct.lock.Unlock()
result := UpdateServiceMapResult{
UpdatedServices: sets.New[types.NamespacedName](),
DeletedUDPClusterIPs: sets.New[string](),
}
for nn, change := range sct.items {
if sct.processServiceMapChange != nil {
sct.processServiceMapChange(change.previous, change.current)
}
result.UpdatedServices.Insert(nn)
sm.merge(change.current)
// filter out the Update event of current changes from previous changes
// before calling unmerge() so that can skip deleting the Update events.
change.previous.filter(change.current)
sm.unmerge(change.previous, result.DeletedUDPClusterIPs)
}
// clear changes after applying them to ServicePortMap.
sct.items = make(map[types.NamespacedName]*serviceChange)
metrics.ServiceChangesPending.Set(0)
return result
}
// merge adds other ServicePortMap's elements to current ServicePortMap.
// If collision, other ALWAYS win. Otherwise add the other to current.
// In other words, if some elements in current collisions with other, update the current by other.
// It returns a string type set which stores all the newly merged services' identifier, ServicePortName.String(), to help users
// tell if a service is deleted or updated.
// The returned value is one of the arguments of ServicePortMap.unmerge().
// ServicePortMap A Merge ServicePortMap B will do following 2 things:
// - update ServicePortMap A.
// - produce a string set which stores all other ServicePortMap's ServicePortName.String().
//
// For example,
//
// A{}
// B{{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}}
// A updated to be {{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}}
// produce string set {"ns/cluster-ip:http"}
//
// A{{"ns", "cluster-ip", "http"}: {"172.16.55.10", 345, "UDP"}}
// B{{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}}
// A updated to be {{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}}
// produce string set {"ns/cluster-ip:http"}
func (sm *ServicePortMap) merge(other ServicePortMap) sets.Set[string] {
// existingPorts is going to store all identifiers of all services in `other` ServicePortMap.
existingPorts := sets.New[string]()
for svcPortName, info := range other {
// Take ServicePortName.String() as the newly merged service's identifier and put it into existingPorts.
existingPorts.Insert(svcPortName.String())
_, exists := (*sm)[svcPortName]
if !exists {
klog.V(4).InfoS("Adding new service port", "portName", svcPortName, "servicePort", info)
} else {
klog.V(4).InfoS("Updating existing service port", "portName", svcPortName, "servicePort", info)
}
(*sm)[svcPortName] = info
}
return existingPorts
}
// filter filters out elements from ServicePortMap base on given ports string sets.
func (sm *ServicePortMap) filter(other ServicePortMap) {
for svcPortName := range *sm {
// skip the delete for Update event.
if _, ok := other[svcPortName]; ok {
delete(*sm, svcPortName)
}
}
}
// unmerge deletes all other ServicePortMap's elements from current ServicePortMap and
// updates deletedUDPClusterIPs with all of the newly-deleted UDP cluster IPs.
func (sm *ServicePortMap) unmerge(other ServicePortMap, deletedUDPClusterIPs sets.Set[string]) {
for svcPortName := range other {
info, exists := (*sm)[svcPortName]
if exists {
klog.V(4).InfoS("Removing service port", "portName", svcPortName)
if info.Protocol() == v1.ProtocolUDP {
deletedUDPClusterIPs.Insert(info.ClusterIP().String())
}
delete(*sm, svcPortName)
} else {
klog.ErrorS(nil, "Service port does not exists", "portName", svcPortName)
}
}
}

View File

@ -0,0 +1,244 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"reflect"
"sync"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy/metrics"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
)
// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of
// Services, keyed by their namespace and name.
type ServiceChangeTracker struct {
// lock protects items.
lock sync.Mutex
// items maps a service to its serviceChange.
items map[types.NamespacedName]*serviceChange
// makeServiceInfo allows the proxier to inject customized information when
// processing services.
makeServiceInfo makeServicePortFunc
// processServiceMapChange is invoked by the apply function on every change. This
// function should not modify the ServicePortMaps, but just use the changes for
// any Proxier-specific cleanup.
processServiceMapChange processServiceMapChangeFunc
ipFamily v1.IPFamily
recorder events.EventRecorder
}
type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort
type processServiceMapChangeFunc func(previous, current ServicePortMap)
// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object,
// changes are accumulated, i.e. previous is state from before applying the changes,
// current is state after applying all of the changes.
type serviceChange struct {
previous ServicePortMap
current ServicePortMap
}
// NewServiceChangeTracker initializes a ServiceChangeTracker
func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker {
return &ServiceChangeTracker{
items: make(map[types.NamespacedName]*serviceChange),
makeServiceInfo: makeServiceInfo,
recorder: recorder,
ipFamily: ipFamily,
processServiceMapChange: processServiceMapChange,
}
}
// Update updates the ServiceChangeTracker based on the <previous, current> service pair
// (where either previous or current, but not both, can be nil). It returns true if sct
// contains changes that need to be synced (whether or not those changes were caused by
// this update); note that this is different from the return value of
// EndpointChangeTracker.EndpointSliceUpdate().
func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool {
// This is unexpected, we should return false directly.
if previous == nil && current == nil {
return false
}
svc := current
if svc == nil {
svc = previous
}
metrics.ServiceChangesTotal.Inc()
namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}
sct.lock.Lock()
defer sct.lock.Unlock()
change, exists := sct.items[namespacedName]
if !exists {
change = &serviceChange{}
change.previous = sct.serviceToServiceMap(previous)
sct.items[namespacedName] = change
}
change.current = sct.serviceToServiceMap(current)
// if change.previous equal to change.current, it means no change
if reflect.DeepEqual(change.previous, change.current) {
delete(sct.items, namespacedName)
} else {
klog.V(4).InfoS("Service updated ports", "service", klog.KObj(svc), "portCount", len(change.current))
}
metrics.ServiceChangesPending.Set(float64(len(sct.items)))
return len(sct.items) > 0
}
// ServicePortMap maps a service to its ServicePort.
type ServicePortMap map[ServicePortName]ServicePort
// UpdateServiceMapResult is the updated results after applying service changes.
type UpdateServiceMapResult struct {
// UpdatedServices lists the names of all services added/updated/deleted since the
// last Update.
UpdatedServices sets.Set[types.NamespacedName]
// DeletedUDPClusterIPs holds stale (no longer assigned to a Service) Service IPs
// that had UDP ports. Callers can use this to abort timeout-waits or clear
// connection-tracking information.
DeletedUDPClusterIPs sets.Set[string]
}
// HealthCheckNodePorts returns a map of Service names to HealthCheckNodePort values
// for all Services in sm with non-zero HealthCheckNodePort.
func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 {
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to svcPortMap.
ports := make(map[types.NamespacedName]uint16)
for svcPortName, info := range sm {
if info.HealthCheckNodePort() != 0 {
ports[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort())
}
}
return ports
}
// serviceToServiceMap translates a single Service object to a ServicePortMap.
//
// NOTE: service object should NOT be modified.
func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) ServicePortMap {
if service == nil {
return nil
}
if proxyutil.ShouldSkipService(service) {
return nil
}
clusterIP := proxyutil.GetClusterIPByFamily(sct.ipFamily, service)
if clusterIP == "" {
return nil
}
svcPortMap := make(ServicePortMap)
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol}
baseSvcInfo := newBaseServiceInfo(service, sct.ipFamily, servicePort)
if sct.makeServiceInfo != nil {
svcPortMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo)
} else {
svcPortMap[svcPortName] = baseSvcInfo
}
}
return svcPortMap
}
// Update updates ServicePortMap base on the given changes, returns information about the
// diff since the last Update, triggers processServiceMapChange on every change, and
// clears the changes map.
func (sm ServicePortMap) Update(sct *ServiceChangeTracker) UpdateServiceMapResult {
sct.lock.Lock()
defer sct.lock.Unlock()
result := UpdateServiceMapResult{
UpdatedServices: sets.New[types.NamespacedName](),
DeletedUDPClusterIPs: sets.New[string](),
}
for nn, change := range sct.items {
if sct.processServiceMapChange != nil {
sct.processServiceMapChange(change.previous, change.current)
}
result.UpdatedServices.Insert(nn)
sm.merge(change.current)
// filter out the Update event of current changes from previous changes
// before calling unmerge() so that can skip deleting the Update events.
change.previous.filter(change.current)
sm.unmerge(change.previous, result.DeletedUDPClusterIPs)
}
// clear changes after applying them to ServicePortMap.
sct.items = make(map[types.NamespacedName]*serviceChange)
metrics.ServiceChangesPending.Set(0)
return result
}
// merge adds other ServicePortMap's elements to current ServicePortMap.
// If collision, other ALWAYS win. Otherwise add the other to current.
// In other words, if some elements in current collisions with other, update the current by other.
func (sm *ServicePortMap) merge(other ServicePortMap) {
for svcPortName, info := range other {
_, exists := (*sm)[svcPortName]
if !exists {
klog.V(4).InfoS("Adding new service port", "portName", svcPortName, "servicePort", info)
} else {
klog.V(4).InfoS("Updating existing service port", "portName", svcPortName, "servicePort", info)
}
(*sm)[svcPortName] = info
}
}
// filter filters out elements from ServicePortMap base on given ports string sets.
func (sm *ServicePortMap) filter(other ServicePortMap) {
for svcPortName := range *sm {
// skip the delete for Update event.
if _, ok := other[svcPortName]; ok {
delete(*sm, svcPortName)
}
}
}
// unmerge deletes all other ServicePortMap's elements from current ServicePortMap and
// updates deletedUDPClusterIPs with all of the newly-deleted UDP cluster IPs.
func (sm *ServicePortMap) unmerge(other ServicePortMap, deletedUDPClusterIPs sets.Set[string]) {
for svcPortName := range other {
info, exists := (*sm)[svcPortName]
if exists {
klog.V(4).InfoS("Removing service port", "portName", svcPortName)
if info.Protocol() == v1.ProtocolUDP {
deletedUDPClusterIPs.Insert(info.ClusterIP().String())
}
delete(*sm, svcPortName)
} else {
klog.ErrorS(nil, "Service port does not exists", "portName", svcPortName)
}
}
}

276
pkg/proxy/serviceport.go Normal file
View File

@ -0,0 +1,276 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"fmt"
"net"
"strings"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
apiservice "k8s.io/kubernetes/pkg/api/v1/service"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
netutils "k8s.io/utils/net"
)
// ServicePort is an interface which abstracts information about a service.
type ServicePort interface {
// String returns service string. An example format can be: `IP:Port/Protocol`.
String() string
// ClusterIP returns service cluster IP in net.IP format.
ClusterIP() net.IP
// Port returns service port if present. If return 0 means not present.
Port() int
// SessionAffinityType returns service session affinity type
SessionAffinityType() v1.ServiceAffinity
// StickyMaxAgeSeconds returns service max connection age
StickyMaxAgeSeconds() int
// ExternalIPStrings returns service ExternalIPs as a string array.
ExternalIPStrings() []string
// LoadBalancerVIPStrings returns service LoadBalancerIPs which are VIP mode as a string array.
LoadBalancerVIPStrings() []string
// Protocol returns service protocol.
Protocol() v1.Protocol
// LoadBalancerSourceRanges returns service LoadBalancerSourceRanges if present empty array if not
LoadBalancerSourceRanges() []string
// HealthCheckNodePort returns service health check node port if present. If return 0, it means not present.
HealthCheckNodePort() int
// NodePort returns a service Node port if present. If return 0, it means not present.
NodePort() int
// ExternalPolicyLocal returns if a service has only node local endpoints for external traffic.
ExternalPolicyLocal() bool
// InternalPolicyLocal returns if a service has only node local endpoints for internal traffic.
InternalPolicyLocal() bool
// HintsAnnotation returns the value of the v1.DeprecatedAnnotationTopologyAwareHints annotation.
HintsAnnotation() string
// ExternallyAccessible returns true if the service port is reachable via something
// other than ClusterIP (NodePort/ExternalIP/LoadBalancer)
ExternallyAccessible() bool
// UsesClusterEndpoints returns true if the service port ever sends traffic to
// endpoints based on "Cluster" traffic policy
UsesClusterEndpoints() bool
// UsesLocalEndpoints returns true if the service port ever sends traffic to
// endpoints based on "Local" traffic policy
UsesLocalEndpoints() bool
}
// BaseServicePortInfo contains base information that defines a service.
// This could be used directly by proxier while processing services,
// or can be used for constructing a more specific ServiceInfo struct
// defined by the proxier if needed.
type BaseServicePortInfo struct {
clusterIP net.IP
port int
protocol v1.Protocol
nodePort int
loadBalancerVIPs []string
sessionAffinityType v1.ServiceAffinity
stickyMaxAgeSeconds int
externalIPs []string
loadBalancerSourceRanges []string
healthCheckNodePort int
externalPolicyLocal bool
internalPolicyLocal bool
hintsAnnotation string
}
var _ ServicePort = &BaseServicePortInfo{}
// String is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) String() string {
return fmt.Sprintf("%s:%d/%s", bsvcPortInfo.clusterIP, bsvcPortInfo.port, bsvcPortInfo.protocol)
}
// ClusterIP is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) ClusterIP() net.IP {
return bsvcPortInfo.clusterIP
}
// Port is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) Port() int {
return bsvcPortInfo.port
}
// SessionAffinityType is part of the ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) SessionAffinityType() v1.ServiceAffinity {
return bsvcPortInfo.sessionAffinityType
}
// StickyMaxAgeSeconds is part of the ServicePort interface
func (bsvcPortInfo *BaseServicePortInfo) StickyMaxAgeSeconds() int {
return bsvcPortInfo.stickyMaxAgeSeconds
}
// Protocol is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) Protocol() v1.Protocol {
return bsvcPortInfo.protocol
}
// LoadBalancerSourceRanges is part of ServicePort interface
func (bsvcPortInfo *BaseServicePortInfo) LoadBalancerSourceRanges() []string {
return bsvcPortInfo.loadBalancerSourceRanges
}
// HealthCheckNodePort is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) HealthCheckNodePort() int {
return bsvcPortInfo.healthCheckNodePort
}
// NodePort is part of the ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) NodePort() int {
return bsvcPortInfo.nodePort
}
// ExternalIPStrings is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) ExternalIPStrings() []string {
return bsvcPortInfo.externalIPs
}
// LoadBalancerVIPStrings is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) LoadBalancerVIPStrings() []string {
return bsvcPortInfo.loadBalancerVIPs
}
// ExternalPolicyLocal is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) ExternalPolicyLocal() bool {
return bsvcPortInfo.externalPolicyLocal
}
// InternalPolicyLocal is part of ServicePort interface
func (bsvcPortInfo *BaseServicePortInfo) InternalPolicyLocal() bool {
return bsvcPortInfo.internalPolicyLocal
}
// HintsAnnotation is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) HintsAnnotation() string {
return bsvcPortInfo.hintsAnnotation
}
// ExternallyAccessible is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) ExternallyAccessible() bool {
return bsvcPortInfo.nodePort != 0 || len(bsvcPortInfo.loadBalancerVIPs) != 0 || len(bsvcPortInfo.externalIPs) != 0
}
// UsesClusterEndpoints is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) UsesClusterEndpoints() bool {
// The service port uses Cluster endpoints if the internal traffic policy is "Cluster",
// or if it accepts external traffic at all. (Even if the external traffic policy is
// "Local", we need Cluster endpoints to implement short circuiting.)
return !bsvcPortInfo.internalPolicyLocal || bsvcPortInfo.ExternallyAccessible()
}
// UsesLocalEndpoints is part of ServicePort interface.
func (bsvcPortInfo *BaseServicePortInfo) UsesLocalEndpoints() bool {
return bsvcPortInfo.internalPolicyLocal || (bsvcPortInfo.externalPolicyLocal && bsvcPortInfo.ExternallyAccessible())
}
func newBaseServiceInfo(service *v1.Service, ipFamily v1.IPFamily, port *v1.ServicePort) *BaseServicePortInfo {
externalPolicyLocal := apiservice.ExternalPolicyLocal(service)
internalPolicyLocal := apiservice.InternalPolicyLocal(service)
var stickyMaxAgeSeconds int
if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
// Kube-apiserver side guarantees SessionAffinityConfig won't be nil when session affinity type is ClientIP
stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds)
}
clusterIP := proxyutil.GetClusterIPByFamily(ipFamily, service)
info := &BaseServicePortInfo{
clusterIP: netutils.ParseIPSloppy(clusterIP),
port: int(port.Port),
protocol: port.Protocol,
nodePort: int(port.NodePort),
sessionAffinityType: service.Spec.SessionAffinity,
stickyMaxAgeSeconds: stickyMaxAgeSeconds,
externalPolicyLocal: externalPolicyLocal,
internalPolicyLocal: internalPolicyLocal,
}
// v1.DeprecatedAnnotationTopologyAwareHints has precedence over v1.AnnotationTopologyMode.
var exists bool
info.hintsAnnotation, exists = service.Annotations[v1.DeprecatedAnnotationTopologyAwareHints]
if !exists {
info.hintsAnnotation = service.Annotations[v1.AnnotationTopologyMode]
}
loadBalancerSourceRanges := make([]string, len(service.Spec.LoadBalancerSourceRanges))
for i, sourceRange := range service.Spec.LoadBalancerSourceRanges {
loadBalancerSourceRanges[i] = strings.TrimSpace(sourceRange)
}
// filter external ips, source ranges and ingress ips
// prior to dual stack services, this was considered an error, but with dual stack
// services, this is actually expected. Hence we downgraded from reporting by events
// to just log lines with high verbosity
ipFamilyMap := proxyutil.MapIPsByIPFamily(service.Spec.ExternalIPs)
info.externalIPs = ipFamilyMap[ipFamily]
// Log the IPs not matching the ipFamily
if ips, ok := ipFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 {
klog.V(4).InfoS("Service change tracker ignored the following external IPs for given service as they don't match IP Family",
"ipFamily", ipFamily, "externalIPs", strings.Join(ips, ", "), "service", klog.KObj(service))
}
ipFamilyMap = proxyutil.MapCIDRsByIPFamily(loadBalancerSourceRanges)
info.loadBalancerSourceRanges = ipFamilyMap[ipFamily]
// Log the CIDRs not matching the ipFamily
if cidrs, ok := ipFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(cidrs) > 0 {
klog.V(4).InfoS("Service change tracker ignored the following load balancer source ranges for given Service as they don't match IP Family",
"ipFamily", ipFamily, "loadBalancerSourceRanges", strings.Join(cidrs, ", "), "service", klog.KObj(service))
}
// Obtain Load Balancer Ingress
var invalidIPs []string
for _, ing := range service.Status.LoadBalancer.Ingress {
if ing.IP == "" {
continue
}
// proxy mode load balancers do not need to track the IPs in the service cache
// and they can also implement IP family translation, so no need to check if
// the status ingress.IP and the ClusterIP belong to the same family.
if !proxyutil.IsVIPMode(ing) {
klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IP for given Service as it using Proxy mode",
"ipFamily", ipFamily, "loadBalancerIngressIP", ing.IP, "service", klog.KObj(service))
continue
}
// kube-proxy does not implement IP family translation, skip addresses with
// different IP family
if ingFamily := proxyutil.GetIPFamilyFromIP(ing.IP); ingFamily == ipFamily {
info.loadBalancerVIPs = append(info.loadBalancerVIPs, ing.IP)
} else {
invalidIPs = append(invalidIPs, ing.IP)
}
}
if len(invalidIPs) > 0 {
klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IPs for given Service as they don't match the IP Family",
"ipFamily", ipFamily, "loadBalancerIngressIPs", strings.Join(invalidIPs, ", "), "service", klog.KObj(service))
}
if apiservice.NeedsHealthCheck(service) {
p := service.Spec.HealthCheckNodePort
if p == 0 {
klog.ErrorS(nil, "Service has no healthcheck nodeport", "service", klog.KObj(service))
} else {
info.healthCheckNodePort = int(p)
}
}
return info
}

View File

@ -18,11 +18,9 @@ package proxy
import (
"fmt"
"net"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/proxy/config"
)
@ -59,77 +57,6 @@ func fmtPortName(in string) string {
return fmt.Sprintf(":%s", in)
}
// ServicePort is an interface which abstracts information about a service.
type ServicePort interface {
// String returns service string. An example format can be: `IP:Port/Protocol`.
String() string
// ClusterIP returns service cluster IP in net.IP format.
ClusterIP() net.IP
// Port returns service port if present. If return 0 means not present.
Port() int
// SessionAffinityType returns service session affinity type
SessionAffinityType() v1.ServiceAffinity
// StickyMaxAgeSeconds returns service max connection age
StickyMaxAgeSeconds() int
// ExternalIPStrings returns service ExternalIPs as a string array.
ExternalIPStrings() []string
// LoadBalancerVIPStrings returns service LoadBalancerIPs which are VIP mode as a string array.
LoadBalancerVIPStrings() []string
// Protocol returns service protocol.
Protocol() v1.Protocol
// LoadBalancerSourceRanges returns service LoadBalancerSourceRanges if present empty array if not
LoadBalancerSourceRanges() []string
// HealthCheckNodePort returns service health check node port if present. If return 0, it means not present.
HealthCheckNodePort() int
// NodePort returns a service Node port if present. If return 0, it means not present.
NodePort() int
// ExternalPolicyLocal returns if a service has only node local endpoints for external traffic.
ExternalPolicyLocal() bool
// InternalPolicyLocal returns if a service has only node local endpoints for internal traffic.
InternalPolicyLocal() bool
// InternalTrafficPolicy returns service InternalTrafficPolicy
InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicy
// HintsAnnotation returns the value of the v1.DeprecatedAnnotationTopologyAwareHints annotation.
HintsAnnotation() string
// ExternallyAccessible returns true if the service port is reachable via something
// other than ClusterIP (NodePort/ExternalIP/LoadBalancer)
ExternallyAccessible() bool
// UsesClusterEndpoints returns true if the service port ever sends traffic to
// endpoints based on "Cluster" traffic policy
UsesClusterEndpoints() bool
// UsesLocalEndpoints returns true if the service port ever sends traffic to
// endpoints based on "Local" traffic policy
UsesLocalEndpoints() bool
}
// Endpoint in an interface which abstracts information about an endpoint.
// TODO: Rename functions to be consistent with ServicePort.
type Endpoint interface {
// String returns endpoint string. An example format can be: `IP:Port`.
// We take the returned value as ServiceEndpoint.Endpoint.
String() string
// IP returns IP part of the endpoint.
IP() string
// Port returns the Port part of the endpoint.
Port() int
// IsLocal returns true if the endpoint is running on the same host as kube-proxy.
IsLocal() bool
// IsReady returns true if an endpoint is ready and not terminating, or
// if PublishNotReadyAddresses is set on the service.
IsReady() bool
// IsServing returns true if an endpoint is ready. It does not account
// for terminating state.
IsServing() bool
// IsTerminating returns true if an endpoint is terminating. For pods,
// that is any pod with a deletion timestamp.
IsTerminating() bool
// ZoneHints returns the zone hint for the endpoint. This is based on
// endpoint.hints.forZones[0].name in the EndpointSlice API.
ZoneHints() sets.Set[string]
}
// ServiceEndpoint is used to identify a service and one of its endpoint pair.
type ServiceEndpoint struct {
Endpoint string