mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Drop unused Endpoints-only tracking code in kube-proxy
This leaves the code in a somewhat messy state but it can be improved further later.
This commit is contained in:
parent
c78b057d85
commit
fe2b658ef5
@ -18,7 +18,6 @@ package proxy
|
||||
|
||||
import (
|
||||
"net"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
@ -32,7 +31,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/proxy/metrics"
|
||||
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||
utilnet "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
var supportedEndpointSliceAddressTypes = sets.NewString(
|
||||
@ -159,20 +157,12 @@ type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap Endpoin
|
||||
// EndpointChangeTracker carries state about uncommitted changes to an arbitrary number of
|
||||
// Endpoints, keyed by their namespace and name.
|
||||
type EndpointChangeTracker struct {
|
||||
// lock protects items.
|
||||
// lock protects lastChangeTriggerTimes
|
||||
lock sync.Mutex
|
||||
// hostname is the host where kube-proxy is running.
|
||||
hostname string
|
||||
// items maps a service to is endpointsChange.
|
||||
items map[types.NamespacedName]*endpointsChange
|
||||
// makeEndpointInfo allows proxier to inject customized information when processing endpoint.
|
||||
makeEndpointInfo makeEndpointFunc
|
||||
|
||||
processEndpointsMapChange processEndpointsMapChangeFunc
|
||||
// endpointSliceCache holds a simplified version of endpoint slices.
|
||||
endpointSliceCache *EndpointSliceCache
|
||||
// ipfamily identify the ip family on which the tracker is operating on
|
||||
ipFamily v1.IPFamily
|
||||
recorder events.EventRecorder
|
||||
// Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints
|
||||
// object to change. Used to calculate the network-programming-latency.
|
||||
lastChangeTriggerTimes map[types.NamespacedName][]time.Time
|
||||
@ -186,11 +176,6 @@ type EndpointChangeTracker struct {
|
||||
// NewEndpointChangeTracker initializes an EndpointsChangeMap
|
||||
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker {
|
||||
return &EndpointChangeTracker{
|
||||
hostname: hostname,
|
||||
items: make(map[types.NamespacedName]*endpointsChange),
|
||||
makeEndpointInfo: makeEndpointInfo,
|
||||
ipFamily: ipFamily,
|
||||
recorder: recorder,
|
||||
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
|
||||
trackerStartTime: time.Now(),
|
||||
processEndpointsMapChange: processEndpointsMapChange,
|
||||
@ -198,66 +183,6 @@ func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc
|
||||
}
|
||||
}
|
||||
|
||||
// Update updates given service's endpoints change map based on the <previous, current> endpoints pair. It returns true
|
||||
// if items changed, otherwise return false. Update can be used to add/update/delete items of EndpointsChangeMap. For example,
|
||||
// Add item
|
||||
// - pass <nil, endpoints> as the <previous, current> pair.
|
||||
//
|
||||
// Update item
|
||||
// - pass <oldEndpoints, endpoints> as the <previous, current> pair.
|
||||
//
|
||||
// Delete item
|
||||
// - pass <endpoints, nil> as the <previous, current> pair.
|
||||
func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
|
||||
endpoints := current
|
||||
if endpoints == nil {
|
||||
endpoints = previous
|
||||
}
|
||||
// previous == nil && current == nil is unexpected, we should return false directly.
|
||||
if endpoints == nil {
|
||||
return false
|
||||
}
|
||||
metrics.EndpointChangesTotal.Inc()
|
||||
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
|
||||
|
||||
ect.lock.Lock()
|
||||
defer ect.lock.Unlock()
|
||||
|
||||
change, exists := ect.items[namespacedName]
|
||||
if !exists {
|
||||
change = &endpointsChange{}
|
||||
change.previous = ect.endpointsToEndpointsMap(previous)
|
||||
ect.items[namespacedName] = change
|
||||
}
|
||||
|
||||
// In case of Endpoints deletion, the LastChangeTriggerTime annotation is
|
||||
// by-definition coming from the time of last update, which is not what
|
||||
// we want to measure. So we simply ignore it in this cases.
|
||||
if t := getLastChangeTriggerTime(endpoints.Annotations); !t.IsZero() && current != nil && t.After(ect.trackerStartTime) {
|
||||
ect.lastChangeTriggerTimes[namespacedName] = append(ect.lastChangeTriggerTimes[namespacedName], t)
|
||||
}
|
||||
|
||||
change.current = ect.endpointsToEndpointsMap(current)
|
||||
// if change.previous equal to change.current, it means no change
|
||||
if reflect.DeepEqual(change.previous, change.current) {
|
||||
delete(ect.items, namespacedName)
|
||||
// Reset the lastChangeTriggerTimes for the Endpoints object. Given that the network programming
|
||||
// SLI is defined as the duration between a time of an event and a time when the network was
|
||||
// programmed to incorporate that event, if there are events that happened between two
|
||||
// consecutive syncs and that canceled each other out, e.g. pod A added -> pod A deleted,
|
||||
// there will be no network programming for them and thus no network programming latency metric
|
||||
// should be exported.
|
||||
delete(ect.lastChangeTriggerTimes, namespacedName)
|
||||
} else {
|
||||
for spn, eps := range change.current {
|
||||
klog.V(2).InfoS("Service port endpoints update", "servicePort", spn, "endpoints", len(eps))
|
||||
}
|
||||
}
|
||||
|
||||
metrics.EndpointChangesPending.Set(float64(len(ect.items)))
|
||||
return len(ect.items) > 0
|
||||
}
|
||||
|
||||
// EndpointSliceUpdate updates given service's endpoints change map based on the <previous, current> endpoints pair.
|
||||
// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeMap.
|
||||
// If removeSlice is true, slice will be removed, otherwise it will be added or updated.
|
||||
@ -308,40 +233,17 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E
|
||||
// have changed since the last time ect was used to update an EndpointsMap. (You must call
|
||||
// this _before_ calling em.Update(ect).)
|
||||
func (ect *EndpointChangeTracker) PendingChanges() sets.String {
|
||||
if ect.endpointSliceCache != nil {
|
||||
return ect.endpointSliceCache.pendingChanges()
|
||||
}
|
||||
|
||||
ect.lock.Lock()
|
||||
defer ect.lock.Unlock()
|
||||
|
||||
changes := sets.NewString()
|
||||
for name := range ect.items {
|
||||
changes.Insert(name.String())
|
||||
}
|
||||
return changes
|
||||
}
|
||||
|
||||
// checkoutChanges returns a list of pending endpointsChanges and marks them as
|
||||
// applied.
|
||||
func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange {
|
||||
ect.lock.Lock()
|
||||
defer ect.lock.Unlock()
|
||||
|
||||
metrics.EndpointChangesPending.Set(0)
|
||||
|
||||
if ect.endpointSliceCache != nil {
|
||||
return ect.endpointSliceCache.checkoutChanges()
|
||||
}
|
||||
|
||||
changes := []*endpointsChange{}
|
||||
for _, change := range ect.items {
|
||||
changes = append(changes, change)
|
||||
}
|
||||
ect.items = make(map[types.NamespacedName]*endpointsChange)
|
||||
return changes
|
||||
}
|
||||
|
||||
// checkoutTriggerTimes applies the locally cached trigger times to a map of
|
||||
// trigger times that have been passed in and empties the local cache.
|
||||
func (ect *EndpointChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
|
||||
@ -426,76 +328,6 @@ func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndp
|
||||
// EndpointsMap maps a service name to a list of all its Endpoints.
|
||||
type EndpointsMap map[ServicePortName][]Endpoint
|
||||
|
||||
// endpointsToEndpointsMap translates single Endpoints object to EndpointsMap.
|
||||
// This function is used for incremental updated of endpointsMap.
|
||||
//
|
||||
// NOTE: endpoints object should NOT be modified.
|
||||
func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoints) EndpointsMap {
|
||||
if endpoints == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
endpointsMap := make(EndpointsMap)
|
||||
// We need to build a map of portname -> all ip:ports for that portname.
|
||||
// Explode Endpoints.Subsets[*] into this structure.
|
||||
for i := range endpoints.Subsets {
|
||||
ss := &endpoints.Subsets[i]
|
||||
for i := range ss.Ports {
|
||||
port := &ss.Ports[i]
|
||||
if port.Port == 0 {
|
||||
klog.ErrorS(nil, "Ignoring invalid endpoint port", "portName", port.Name)
|
||||
continue
|
||||
}
|
||||
svcPortName := ServicePortName{
|
||||
NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name},
|
||||
Port: port.Name,
|
||||
Protocol: port.Protocol,
|
||||
}
|
||||
for i := range ss.Addresses {
|
||||
addr := &ss.Addresses[i]
|
||||
if addr.IP == "" {
|
||||
klog.ErrorS(nil, "Ignoring invalid endpoint port with empty host", "portName", port.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
// Filter out the incorrect IP version case.
|
||||
// Any endpoint port that contains incorrect IP version will be ignored.
|
||||
if (ect.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(addr.IP) {
|
||||
// Emit event on the corresponding service which had a different
|
||||
// IP version than the endpoint.
|
||||
utilproxy.LogAndEmitIncorrectIPVersionEvent(ect.recorder, "endpoints", addr.IP, endpoints.Namespace, endpoints.Name, "")
|
||||
continue
|
||||
}
|
||||
|
||||
// it is safe to assume that any address in endpoints.subsets[*].addresses is
|
||||
// ready and NOT terminating
|
||||
isReady := true
|
||||
isServing := true
|
||||
isTerminating := false
|
||||
isLocal := false
|
||||
nodeName := ""
|
||||
if addr.NodeName != nil {
|
||||
isLocal = *addr.NodeName == ect.hostname
|
||||
nodeName = *addr.NodeName
|
||||
}
|
||||
// Only supported with EndpointSlice API
|
||||
zoneHints := sets.String{}
|
||||
|
||||
// Zone information is only supported with EndpointSlice API
|
||||
baseEndpointInfo := newBaseEndpointInfo(addr.IP, nodeName, "", int(port.Port), isLocal, isReady, isServing, isTerminating, zoneHints)
|
||||
if ect.makeEndpointInfo != nil {
|
||||
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo, &svcPortName))
|
||||
} else {
|
||||
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], baseEndpointInfo)
|
||||
}
|
||||
}
|
||||
|
||||
klog.V(3).InfoS("Setting endpoints for service port", "portName", svcPortName, "endpoints", formatEndpointsList(endpointsMap[svcPortName]))
|
||||
}
|
||||
}
|
||||
return endpointsMap
|
||||
}
|
||||
|
||||
// apply the changes to EndpointsMap and updates stale endpoints and service-endpoints pair. The `staleEndpoints` argument
|
||||
// is passed in to store the stale udp endpoints and `staleServiceNames` argument is passed in to store the stale udp service.
|
||||
// The changes map is cleared after applying them.
|
||||
|
@ -1536,9 +1536,6 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
||||
if !reflect.DeepEqual(got, tc.expectedReturnVal) {
|
||||
t.Errorf("EndpointSliceUpdate return value got: %v, want %v", got, tc.expectedReturnVal)
|
||||
}
|
||||
if tc.endpointChangeTracker.items == nil {
|
||||
t.Errorf("Expected ect.items to not be nil")
|
||||
}
|
||||
|
||||
pendingChanges := tc.endpointChangeTracker.PendingChanges()
|
||||
if !pendingChanges.Equal(tc.expectedChangedEndpoints) {
|
||||
|
@ -518,11 +518,6 @@ func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier {
|
||||
serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil),
|
||||
endpointsMap: make(EndpointsMap),
|
||||
endpointsChanges: &EndpointChangeTracker{
|
||||
hostname: testHostname,
|
||||
items: make(map[types.NamespacedName]*endpointsChange),
|
||||
makeEndpointInfo: nil,
|
||||
ipFamily: ipFamily,
|
||||
recorder: nil,
|
||||
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
|
||||
trackerStartTime: t,
|
||||
processEndpointsMapChange: nil,
|
||||
|
Loading…
Reference in New Issue
Block a user