Merge pull request #114757 from danwinship/drop-endpointstracker

Drop unused Endpoints-tracking code from pkg/proxy/
This commit is contained in:
Kubernetes Prow Robot 2023-01-03 10:35:35 -08:00 committed by GitHub
commit 5914f30fd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 443 additions and 895 deletions

View File

@ -18,7 +18,6 @@ package proxy
import (
"net"
"reflect"
"strconv"
"sync"
"time"
@ -32,7 +31,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/proxy/metrics"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
utilnet "k8s.io/utils/net"
)
var supportedEndpointSliceAddressTypes = sets.NewString(
@ -159,20 +157,12 @@ type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap Endpoin
// EndpointChangeTracker carries state about uncommitted changes to an arbitrary number of
// Endpoints, keyed by their namespace and name.
type EndpointChangeTracker struct {
// lock protects items.
// lock protects lastChangeTriggerTimes
lock sync.Mutex
// hostname is the host where kube-proxy is running.
hostname string
// items maps a service to is endpointsChange.
items map[types.NamespacedName]*endpointsChange
// makeEndpointInfo allows proxier to inject customized information when processing endpoint.
makeEndpointInfo makeEndpointFunc
processEndpointsMapChange processEndpointsMapChangeFunc
// endpointSliceCache holds a simplified version of endpoint slices.
endpointSliceCache *EndpointSliceCache
// ipfamily identify the ip family on which the tracker is operating on
ipFamily v1.IPFamily
recorder events.EventRecorder
// Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints
// object to change. Used to calculate the network-programming-latency.
lastChangeTriggerTimes map[types.NamespacedName][]time.Time
@ -186,11 +176,6 @@ type EndpointChangeTracker struct {
// NewEndpointChangeTracker initializes an EndpointsChangeMap
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker {
return &EndpointChangeTracker{
hostname: hostname,
items: make(map[types.NamespacedName]*endpointsChange),
makeEndpointInfo: makeEndpointInfo,
ipFamily: ipFamily,
recorder: recorder,
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
trackerStartTime: time.Now(),
processEndpointsMapChange: processEndpointsMapChange,
@ -198,66 +183,6 @@ func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc
}
}
// Update updates given service's endpoints change map based on the <previous, current> endpoints pair. It returns true
// if items changed, otherwise return false. Update can be used to add/update/delete items of EndpointsChangeMap. For example,
// Add item
// - pass <nil, endpoints> as the <previous, current> pair.
//
// Update item
// - pass <oldEndpoints, endpoints> as the <previous, current> pair.
//
// Delete item
// - pass <endpoints, nil> as the <previous, current> pair.
func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
endpoints := current
if endpoints == nil {
endpoints = previous
}
// previous == nil && current == nil is unexpected, we should return false directly.
if endpoints == nil {
return false
}
metrics.EndpointChangesTotal.Inc()
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
ect.lock.Lock()
defer ect.lock.Unlock()
change, exists := ect.items[namespacedName]
if !exists {
change = &endpointsChange{}
change.previous = ect.endpointsToEndpointsMap(previous)
ect.items[namespacedName] = change
}
// In case of Endpoints deletion, the LastChangeTriggerTime annotation is
// by-definition coming from the time of last update, which is not what
// we want to measure. So we simply ignore it in this cases.
if t := getLastChangeTriggerTime(endpoints.Annotations); !t.IsZero() && current != nil && t.After(ect.trackerStartTime) {
ect.lastChangeTriggerTimes[namespacedName] = append(ect.lastChangeTriggerTimes[namespacedName], t)
}
change.current = ect.endpointsToEndpointsMap(current)
// if change.previous equal to change.current, it means no change
if reflect.DeepEqual(change.previous, change.current) {
delete(ect.items, namespacedName)
// Reset the lastChangeTriggerTimes for the Endpoints object. Given that the network programming
// SLI is defined as the duration between a time of an event and a time when the network was
// programmed to incorporate that event, if there are events that happened between two
// consecutive syncs and that canceled each other out, e.g. pod A added -> pod A deleted,
// there will be no network programming for them and thus no network programming latency metric
// should be exported.
delete(ect.lastChangeTriggerTimes, namespacedName)
} else {
for spn, eps := range change.current {
klog.V(2).InfoS("Service port endpoints update", "servicePort", spn, "endpoints", len(eps))
}
}
metrics.EndpointChangesPending.Set(float64(len(ect.items)))
return len(ect.items) > 0
}
// EndpointSliceUpdate updates given service's endpoints change map based on the <previous, current> endpoints pair.
// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeMap.
// If removeSlice is true, slice will be removed, otherwise it will be added or updated.
@ -293,7 +218,9 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E
// we want to measure. So we simply ignore it in this cases.
// TODO(wojtek-t, robscott): Address the problem for EndpointSlice deletion
// when other EndpointSlice for that service still exist.
if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && !removeSlice && t.After(ect.trackerStartTime) {
if removeSlice {
delete(ect.lastChangeTriggerTimes, namespacedName)
} else if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && t.After(ect.trackerStartTime) {
ect.lastChangeTriggerTimes[namespacedName] =
append(ect.lastChangeTriggerTimes[namespacedName], t)
}
@ -306,38 +233,15 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E
// have changed since the last time ect was used to update an EndpointsMap. (You must call
// this _before_ calling em.Update(ect).)
func (ect *EndpointChangeTracker) PendingChanges() sets.String {
if ect.endpointSliceCache != nil {
return ect.endpointSliceCache.pendingChanges()
}
ect.lock.Lock()
defer ect.lock.Unlock()
changes := sets.NewString()
for name := range ect.items {
changes.Insert(name.String())
}
return changes
return ect.endpointSliceCache.pendingChanges()
}
// checkoutChanges returns a list of pending endpointsChanges and marks them as
// applied.
func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange {
ect.lock.Lock()
defer ect.lock.Unlock()
metrics.EndpointChangesPending.Set(0)
if ect.endpointSliceCache != nil {
return ect.endpointSliceCache.checkoutChanges()
}
changes := []*endpointsChange{}
for _, change := range ect.items {
changes = append(changes, change)
}
ect.items = make(map[types.NamespacedName]*endpointsChange)
return changes
return ect.endpointSliceCache.checkoutChanges()
}
// checkoutTriggerTimes applies the locally cached trigger times to a map of
@ -424,76 +328,6 @@ func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndp
// EndpointsMap maps a service name to a list of all its Endpoints.
type EndpointsMap map[ServicePortName][]Endpoint
// endpointsToEndpointsMap translates single Endpoints object to EndpointsMap.
// This function is used for incremental updated of endpointsMap.
//
// NOTE: endpoints object should NOT be modified.
func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoints) EndpointsMap {
if endpoints == nil {
return nil
}
endpointsMap := make(EndpointsMap)
// We need to build a map of portname -> all ip:ports for that portname.
// Explode Endpoints.Subsets[*] into this structure.
for i := range endpoints.Subsets {
ss := &endpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
if port.Port == 0 {
klog.ErrorS(nil, "Ignoring invalid endpoint port", "portName", port.Name)
continue
}
svcPortName := ServicePortName{
NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name},
Port: port.Name,
Protocol: port.Protocol,
}
for i := range ss.Addresses {
addr := &ss.Addresses[i]
if addr.IP == "" {
klog.ErrorS(nil, "Ignoring invalid endpoint port with empty host", "portName", port.Name)
continue
}
// Filter out the incorrect IP version case.
// Any endpoint port that contains incorrect IP version will be ignored.
if (ect.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(addr.IP) {
// Emit event on the corresponding service which had a different
// IP version than the endpoint.
utilproxy.LogAndEmitIncorrectIPVersionEvent(ect.recorder, "endpoints", addr.IP, endpoints.Namespace, endpoints.Name, "")
continue
}
// it is safe to assume that any address in endpoints.subsets[*].addresses is
// ready and NOT terminating
isReady := true
isServing := true
isTerminating := false
isLocal := false
nodeName := ""
if addr.NodeName != nil {
isLocal = *addr.NodeName == ect.hostname
nodeName = *addr.NodeName
}
// Only supported with EndpointSlice API
zoneHints := sets.String{}
// Zone information is only supported with EndpointSlice API
baseEndpointInfo := newBaseEndpointInfo(addr.IP, nodeName, "", int(port.Port), isLocal, isReady, isServing, isTerminating, zoneHints)
if ect.makeEndpointInfo != nil {
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo, &svcPortName))
} else {
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], baseEndpointInfo)
}
}
klog.V(3).InfoS("Setting endpoints for service port", "portName", svcPortName, "endpoints", formatEndpointsList(endpointsMap[svcPortName]))
}
}
return endpointsMap
}
// apply the changes to EndpointsMap and updates stale endpoints and service-endpoints pair. The `staleEndpoints` argument
// is passed in to store the stale udp endpoints and `staleServiceNames` argument is passed in to store the stale udp service.
// The changes map is cleared after applying them.

File diff suppressed because it is too large Load Diff

View File

@ -518,14 +518,10 @@ func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier {
serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil),
endpointsMap: make(EndpointsMap),
endpointsChanges: &EndpointChangeTracker{
hostname: testHostname,
items: make(map[types.NamespacedName]*endpointsChange),
makeEndpointInfo: nil,
ipFamily: ipFamily,
recorder: nil,
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
trackerStartTime: t,
processEndpointsMapChange: nil,
endpointSliceCache: NewEndpointSliceCache(testHostname, ipFamily, nil, nil),
},
}
}