diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index d893e08414a..23fe42fa0f9 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -18,7 +18,6 @@ package proxy import ( "net" - "reflect" "strconv" "sync" "time" @@ -32,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/proxy/metrics" utilproxy "k8s.io/kubernetes/pkg/proxy/util" - utilnet "k8s.io/utils/net" ) var supportedEndpointSliceAddressTypes = sets.NewString( @@ -159,20 +157,12 @@ type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap Endpoin // EndpointChangeTracker carries state about uncommitted changes to an arbitrary number of // Endpoints, keyed by their namespace and name. type EndpointChangeTracker struct { - // lock protects items. + // lock protects lastChangeTriggerTimes lock sync.Mutex - // hostname is the host where kube-proxy is running. - hostname string - // items maps a service to is endpointsChange. - items map[types.NamespacedName]*endpointsChange - // makeEndpointInfo allows proxier to inject customized information when processing endpoint. - makeEndpointInfo makeEndpointFunc + processEndpointsMapChange processEndpointsMapChangeFunc // endpointSliceCache holds a simplified version of endpoint slices. endpointSliceCache *EndpointSliceCache - // ipfamily identify the ip family on which the tracker is operating on - ipFamily v1.IPFamily - recorder events.EventRecorder // Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints // object to change. Used to calculate the network-programming-latency. lastChangeTriggerTimes map[types.NamespacedName][]time.Time @@ -186,11 +176,6 @@ type EndpointChangeTracker struct { // NewEndpointChangeTracker initializes an EndpointsChangeMap func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker { return &EndpointChangeTracker{ - hostname: hostname, - items: make(map[types.NamespacedName]*endpointsChange), - makeEndpointInfo: makeEndpointInfo, - ipFamily: ipFamily, - recorder: recorder, lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), trackerStartTime: time.Now(), processEndpointsMapChange: processEndpointsMapChange, @@ -198,66 +183,6 @@ func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc } } -// Update updates given service's endpoints change map based on the endpoints pair. It returns true -// if items changed, otherwise return false. Update can be used to add/update/delete items of EndpointsChangeMap. For example, -// Add item -// - pass as the pair. -// -// Update item -// - pass as the pair. -// -// Delete item -// - pass as the pair. -func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool { - endpoints := current - if endpoints == nil { - endpoints = previous - } - // previous == nil && current == nil is unexpected, we should return false directly. - if endpoints == nil { - return false - } - metrics.EndpointChangesTotal.Inc() - namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - - ect.lock.Lock() - defer ect.lock.Unlock() - - change, exists := ect.items[namespacedName] - if !exists { - change = &endpointsChange{} - change.previous = ect.endpointsToEndpointsMap(previous) - ect.items[namespacedName] = change - } - - // In case of Endpoints deletion, the LastChangeTriggerTime annotation is - // by-definition coming from the time of last update, which is not what - // we want to measure. So we simply ignore it in this cases. - if t := getLastChangeTriggerTime(endpoints.Annotations); !t.IsZero() && current != nil && t.After(ect.trackerStartTime) { - ect.lastChangeTriggerTimes[namespacedName] = append(ect.lastChangeTriggerTimes[namespacedName], t) - } - - change.current = ect.endpointsToEndpointsMap(current) - // if change.previous equal to change.current, it means no change - if reflect.DeepEqual(change.previous, change.current) { - delete(ect.items, namespacedName) - // Reset the lastChangeTriggerTimes for the Endpoints object. Given that the network programming - // SLI is defined as the duration between a time of an event and a time when the network was - // programmed to incorporate that event, if there are events that happened between two - // consecutive syncs and that canceled each other out, e.g. pod A added -> pod A deleted, - // there will be no network programming for them and thus no network programming latency metric - // should be exported. - delete(ect.lastChangeTriggerTimes, namespacedName) - } else { - for spn, eps := range change.current { - klog.V(2).InfoS("Service port endpoints update", "servicePort", spn, "endpoints", len(eps)) - } - } - - metrics.EndpointChangesPending.Set(float64(len(ect.items))) - return len(ect.items) > 0 -} - // EndpointSliceUpdate updates given service's endpoints change map based on the endpoints pair. // It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeMap. // If removeSlice is true, slice will be removed, otherwise it will be added or updated. @@ -308,38 +233,15 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E // have changed since the last time ect was used to update an EndpointsMap. (You must call // this _before_ calling em.Update(ect).) func (ect *EndpointChangeTracker) PendingChanges() sets.String { - if ect.endpointSliceCache != nil { - return ect.endpointSliceCache.pendingChanges() - } - - ect.lock.Lock() - defer ect.lock.Unlock() - - changes := sets.NewString() - for name := range ect.items { - changes.Insert(name.String()) - } - return changes + return ect.endpointSliceCache.pendingChanges() } // checkoutChanges returns a list of pending endpointsChanges and marks them as // applied. func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange { - ect.lock.Lock() - defer ect.lock.Unlock() - metrics.EndpointChangesPending.Set(0) - if ect.endpointSliceCache != nil { - return ect.endpointSliceCache.checkoutChanges() - } - - changes := []*endpointsChange{} - for _, change := range ect.items { - changes = append(changes, change) - } - ect.items = make(map[types.NamespacedName]*endpointsChange) - return changes + return ect.endpointSliceCache.checkoutChanges() } // checkoutTriggerTimes applies the locally cached trigger times to a map of @@ -426,76 +328,6 @@ func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndp // EndpointsMap maps a service name to a list of all its Endpoints. type EndpointsMap map[ServicePortName][]Endpoint -// endpointsToEndpointsMap translates single Endpoints object to EndpointsMap. -// This function is used for incremental updated of endpointsMap. -// -// NOTE: endpoints object should NOT be modified. -func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoints) EndpointsMap { - if endpoints == nil { - return nil - } - - endpointsMap := make(EndpointsMap) - // We need to build a map of portname -> all ip:ports for that portname. - // Explode Endpoints.Subsets[*] into this structure. - for i := range endpoints.Subsets { - ss := &endpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - if port.Port == 0 { - klog.ErrorS(nil, "Ignoring invalid endpoint port", "portName", port.Name) - continue - } - svcPortName := ServicePortName{ - NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, - Port: port.Name, - Protocol: port.Protocol, - } - for i := range ss.Addresses { - addr := &ss.Addresses[i] - if addr.IP == "" { - klog.ErrorS(nil, "Ignoring invalid endpoint port with empty host", "portName", port.Name) - continue - } - - // Filter out the incorrect IP version case. - // Any endpoint port that contains incorrect IP version will be ignored. - if (ect.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(addr.IP) { - // Emit event on the corresponding service which had a different - // IP version than the endpoint. - utilproxy.LogAndEmitIncorrectIPVersionEvent(ect.recorder, "endpoints", addr.IP, endpoints.Namespace, endpoints.Name, "") - continue - } - - // it is safe to assume that any address in endpoints.subsets[*].addresses is - // ready and NOT terminating - isReady := true - isServing := true - isTerminating := false - isLocal := false - nodeName := "" - if addr.NodeName != nil { - isLocal = *addr.NodeName == ect.hostname - nodeName = *addr.NodeName - } - // Only supported with EndpointSlice API - zoneHints := sets.String{} - - // Zone information is only supported with EndpointSlice API - baseEndpointInfo := newBaseEndpointInfo(addr.IP, nodeName, "", int(port.Port), isLocal, isReady, isServing, isTerminating, zoneHints) - if ect.makeEndpointInfo != nil { - endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo, &svcPortName)) - } else { - endpointsMap[svcPortName] = append(endpointsMap[svcPortName], baseEndpointInfo) - } - } - - klog.V(3).InfoS("Setting endpoints for service port", "portName", svcPortName, "endpoints", formatEndpointsList(endpointsMap[svcPortName])) - } - } - return endpointsMap -} - // apply the changes to EndpointsMap and updates stale endpoints and service-endpoints pair. The `staleEndpoints` argument // is passed in to store the stale udp endpoints and `staleServiceNames` argument is passed in to store the stale udp service. // The changes map is cleared after applying them. diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 34d30ab2d68..cd5662cb11a 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -1536,9 +1536,6 @@ func TestEndpointSliceUpdate(t *testing.T) { if !reflect.DeepEqual(got, tc.expectedReturnVal) { t.Errorf("EndpointSliceUpdate return value got: %v, want %v", got, tc.expectedReturnVal) } - if tc.endpointChangeTracker.items == nil { - t.Errorf("Expected ect.items to not be nil") - } pendingChanges := tc.endpointChangeTracker.PendingChanges() if !pendingChanges.Equal(tc.expectedChangedEndpoints) { diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index 8cd03fc29d5..a45ae5e490a 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -518,11 +518,6 @@ func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier { serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil), endpointsMap: make(EndpointsMap), endpointsChanges: &EndpointChangeTracker{ - hostname: testHostname, - items: make(map[types.NamespacedName]*endpointsChange), - makeEndpointInfo: nil, - ipFamily: ipFamily, - recorder: nil, lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), trackerStartTime: t, processEndpointsMapChange: nil,