From 4faede03fa4a124ef9b91736eec313a31951f3b5 Mon Sep 17 00:00:00 2001 From: Lior Lieberman Date: Wed, 2 Nov 2022 12:15:40 +0000 Subject: [PATCH] Added events publishing for topologyHints changes --- .../endpointslice/endpointslice_controller.go | 1 + pkg/controller/endpointslice/reconciler.go | 27 +++- .../endpointslice/reconciler_test.go | 3 + .../endpointslice/topologycache/event.go | 38 ++++++ .../topologycache/topologycache.go | 118 +++++++++++++----- .../topologycache/topologycache_test.go | 63 +++++++++- .../endpointslice/topologycache/utils.go | 6 + 7 files changed, 220 insertions(+), 36 deletions(-) create mode 100644 pkg/controller/endpointslice/topologycache/event.go diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index 52b4da0d4fa..f4767f240de 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -164,6 +164,7 @@ func NewController(podInformer coreinformers.PodInformer, endpointSliceTracker: c.endpointSliceTracker, metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice), topologyCache: c.topologyCache, + eventRecorder: c.eventRecorder, } return c diff --git a/pkg/controller/endpointslice/reconciler.go b/pkg/controller/endpointslice/reconciler.go index f97d3ccf43d..c9e92db0b74 100644 --- a/pkg/controller/endpointslice/reconciler.go +++ b/pkg/controller/endpointslice/reconciler.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller/endpointslice/metrics" "k8s.io/kubernetes/pkg/controller/endpointslice/topologycache" @@ -50,6 +51,8 @@ type reconciler struct { // topologyCache tracks the distribution of Nodes and endpoints across zones // to enable TopologyAwareHints. topologyCache *topologycache.TopologyCache + // eventRecorder allows reconciler to record and publish events. + eventRecorder record.EventRecorder } // endpointMeta includes the attributes we group slices on, this type helps with @@ -133,6 +136,7 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor slicesToCreate := []*discovery.EndpointSlice{} slicesToUpdate := []*discovery.EndpointSlice{} slicesToDelete := []*discovery.EndpointSlice{} + events := []*topologycache.EventBuilder{} // Build data structures for existing state. existingSlicesByPortMap := map[endpointutil.PortMapKey][]*discovery.EndpointSlice{} @@ -252,25 +256,36 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor // theoretically possible for endpoints of one address type to be assigned // hints while another endpoints of another address type are not. si := &topologycache.SliceInfo{ - ServiceKey: fmt.Sprintf("%s/%s", service.Namespace, service.Name), - ToCreate: slicesToCreate, - ToUpdate: slicesToUpdate, - Unchanged: unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete), + ServiceKey: fmt.Sprintf("%s/%s", service.Namespace, service.Name), + AddressType: addressType, + ToCreate: slicesToCreate, + ToUpdate: slicesToUpdate, + Unchanged: unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete), } if r.topologyCache != nil && hintsEnabled(service.Annotations) { - slicesToCreate, slicesToUpdate = r.topologyCache.AddHints(si) + slicesToCreate, slicesToUpdate, events = r.topologyCache.AddHints(si) } else { if r.topologyCache != nil { + if r.topologyCache.HasPopulatedHints(si.ServiceKey) { + klog.InfoS("TopologyAwareHints annotation has changed, removing hints", "serviceKey", si.ServiceKey, "addressType", si.AddressType) + events = append(events, &topologycache.EventBuilder{ + EventType: corev1.EventTypeWarning, + Reason: "TopologyAwareHintsDisabled", + Message: topologycache.FormatWithAddressType(topologycache.TopologyAwareHintsDisabled, si.AddressType), + }) + } r.topologyCache.RemoveHints(si.ServiceKey, addressType) } slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si) } - err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime) if err != nil { errs = append(errs, err) } + for _, event := range events { + r.eventRecorder.Event(service, event.EventType, event.Reason, event.Message) + } return utilerrors.NewAggregate(errs) } diff --git a/pkg/controller/endpointslice/reconciler_test.go b/pkg/controller/endpointslice/reconciler_test.go index c2db2fe18f9..94ec53d8f14 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/pkg/controller/endpointslice/reconciler_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/kubernetes/fake" corelisters "k8s.io/client-go/listers/core/v1" k8stesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/record" "k8s.io/component-base/metrics/testutil" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/endpointslice/metrics" @@ -1908,6 +1909,7 @@ func TestReconcileTopology(t *testing.T) { // Test Helpers func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPerSlice int32) *reconciler { + eventRecorder := record.NewFakeRecorder(10) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) nodeInformer := informerFactory.Core().V1().Nodes() indexer := nodeInformer.Informer().GetIndexer() @@ -1921,6 +1923,7 @@ func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPer maxEndpointsPerSlice: maxEndpointsPerSlice, endpointSliceTracker: endpointsliceutil.NewEndpointSliceTracker(), metricsCache: metrics.NewCache(maxEndpointsPerSlice), + eventRecorder: eventRecorder, } } diff --git a/pkg/controller/endpointslice/topologycache/event.go b/pkg/controller/endpointslice/topologycache/event.go new file mode 100644 index 00000000000..e4b92d186b9 --- /dev/null +++ b/pkg/controller/endpointslice/topologycache/event.go @@ -0,0 +1,38 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topologycache + +// TopologyAwareHints events messages list. +const ( + NoZoneSpecified = "One or more endpoints do not have a zone specified" + NoAllocatedHintsForZones = "No hints allocated for zones" + TopologyAwareHintsEnabled = "Topology Aware Hints has been enabled" + TopologyAwareHintsDisabled = "Topology Aware Hints has been disabled" + InsufficientNodeInfo = "Insufficient Node information: allocatable CPU or zone not specified on one or more nodes" + NodesReadyInOneZoneOnly = "Nodes only ready in one zone" + InsufficientNumberOfEndpoints = "Insufficient number of endpoints" + MinAllocationExceedsOverloadThreshold = "Unable to allocate minimum required endpoints to each zone without exceeding overload threshold" +) + +// EventBuilder let's us construct events in the code. +// We use it to build events and return them from a function instead of publishing them from within it. +// EventType, Reason, and Message fields are equivalent to the v1.Event fields - https://pkg.go.dev/k8s.io/api/core/v1#Event. +type EventBuilder struct { + EventType string + Reason string + Message string +} diff --git a/pkg/controller/endpointslice/topologycache/topologycache.go b/pkg/controller/endpointslice/topologycache/topologycache.go index 6889ed67192..fa499795281 100644 --- a/pkg/controller/endpointslice/topologycache/topologycache.go +++ b/pkg/controller/endpointslice/topologycache/topologycache.go @@ -17,12 +17,14 @@ limitations under the License. package topologycache import ( + "fmt" "math" "sync" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice" ) @@ -35,11 +37,12 @@ const ( // TopologyCache tracks the distribution of Nodes and endpoints across zones. type TopologyCache struct { - lock sync.Mutex - sufficientNodeInfo bool - cpuByZone map[string]*resource.Quantity - cpuRatiosByZone map[string]float64 - endpointsByService map[string]map[discovery.AddressType]EndpointZoneInfo + lock sync.Mutex + sufficientNodeInfo bool + cpuByZone map[string]*resource.Quantity + cpuRatiosByZone map[string]float64 + endpointsByService map[string]map[discovery.AddressType]EndpointZoneInfo + hintsPopulatedByService sets.Set[string] } // EndpointZoneInfo tracks the distribution of endpoints across zones for a @@ -57,9 +60,10 @@ type Allocation struct { // NewTopologyCache initializes a new TopologyCache. func NewTopologyCache() *TopologyCache { return &TopologyCache{ - cpuByZone: map[string]*resource.Quantity{}, - cpuRatiosByZone: map[string]float64{}, - endpointsByService: map[string]map[discovery.AddressType]EndpointZoneInfo{}, + cpuByZone: map[string]*resource.Quantity{}, + cpuRatiosByZone: map[string]float64{}, + endpointsByService: map[string]map[discovery.AddressType]EndpointZoneInfo{}, + hintsPopulatedByService: sets.Set[string]{}, } } @@ -84,14 +88,17 @@ func (t *TopologyCache) GetOverloadedServices() []string { // AddHints adds or updates topology hints on EndpointSlices and returns updated // lists of EndpointSlices to create and update. -func (t *TopologyCache) AddHints(si *SliceInfo) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice) { +func (t *TopologyCache) AddHints(si *SliceInfo) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice, []*EventBuilder) { totalEndpoints := si.getTotalReadyEndpoints() - allocations := t.getAllocations(totalEndpoints) - - if allocations == nil { - klog.V(2).InfoS("Insufficient endpoints, removing hints from service", "serviceKey", si.ServiceKey) + allocations, allocationsEvent := t.getAllocations(totalEndpoints) + events := []*EventBuilder{} + if allocationsEvent != nil { + klog.InfoS(allocationsEvent.Message+", removing hints", "serviceKey", si.ServiceKey, "addressType", si.AddressType) + allocationsEvent.Message = FormatWithAddressType(allocationsEvent.Message, si.AddressType) + events = append(events, allocationsEvent) t.RemoveHints(si.ServiceKey, si.AddressType) - return RemoveHintsFromSlices(si) + slicesToCreate, slicesToUpdate := RemoveHintsFromSlices(si) + return slicesToCreate, slicesToUpdate, events } allocatedHintsByZone := si.getAllocatedHintsByZone(allocations) @@ -109,9 +116,15 @@ func (t *TopologyCache) AddHints(si *SliceInfo) ([]*discovery.EndpointSlice, []* continue } if endpoint.Zone == nil || *endpoint.Zone == "" { - klog.InfoS("Endpoint found without zone specified, removing hints from service", "serviceKey", si.ServiceKey) + klog.InfoS("Endpoint found without zone specified, removing hints", "serviceKey", si.ServiceKey, "addressType", si.AddressType) + events = append(events, &EventBuilder{ + EventType: v1.EventTypeWarning, + Reason: "TopologyAwareHintsDisabled", + Message: FormatWithAddressType(NoZoneSpecified, si.AddressType), + }) t.RemoveHints(si.ServiceKey, si.AddressType) - return RemoveHintsFromSlices(si) + slicesToCreate, slicesToUpdate := RemoveHintsFromSlices(si) + return slicesToCreate, slicesToUpdate, events } allocatedHintsByZone[*endpoint.Zone]++ @@ -129,19 +142,36 @@ func (t *TopologyCache) AddHints(si *SliceInfo) ([]*discovery.EndpointSlice, []* allocatedHintsByZone[zone] += diff } + if len(allocatedHintsByZone) == 0 { + klog.V(2).InfoS("No hints allocated for zones, removing them", "serviceKey", si.ServiceKey, "addressType", si.AddressType) + events = append(events, &EventBuilder{ + EventType: v1.EventTypeWarning, + Reason: "TopologyAwareHintsDisabled", + Message: FormatWithAddressType(NoAllocatedHintsForZones, si.AddressType), + }) + t.RemoveHints(si.ServiceKey, si.AddressType) + slicesToCreate, slicesToUpdate := RemoveHintsFromSlices(si) + return slicesToCreate, slicesToUpdate, events + } + + hintsEnabled := t.hintsPopulatedByService.Has(si.ServiceKey) t.SetHints(si.ServiceKey, si.AddressType, allocatedHintsByZone) - return si.ToCreate, si.ToUpdate + + // if hints were not enabled before, we publish an event to indicate we enabled them. + if !hintsEnabled { + klog.InfoS("Topology Aware Hints has been enabled, adding hints.", "serviceKey", si.ServiceKey, "addressType", si.AddressType) + events = append(events, &EventBuilder{ + EventType: v1.EventTypeNormal, + Reason: "TopologyAwareHintsEnabled", + Message: FormatWithAddressType(TopologyAwareHintsEnabled, si.AddressType), + }) + } + return si.ToCreate, si.ToUpdate, events } // SetHints sets topology hints for the provided serviceKey and addrType in this // cache. func (t *TopologyCache) SetHints(serviceKey string, addrType discovery.AddressType, allocatedHintsByZone EndpointZoneInfo) { - if len(allocatedHintsByZone) == 0 { - klog.V(2).Infof("No hints allocated for zones, removing them from %s EndpointSlices for %s Service", addrType, serviceKey) - t.RemoveHints(serviceKey, addrType) - return - } - t.lock.Lock() defer t.lock.Unlock() @@ -150,6 +180,8 @@ func (t *TopologyCache) SetHints(serviceKey string, addrType discovery.AddressTy t.endpointsByService[serviceKey] = map[discovery.AddressType]EndpointZoneInfo{} } t.endpointsByService[serviceKey][addrType] = allocatedHintsByZone + + t.hintsPopulatedByService.Insert(serviceKey) } // RemoveHints removes topology hints for the provided serviceKey and addrType @@ -165,6 +197,7 @@ func (t *TopologyCache) RemoveHints(serviceKey string, addrType discovery.Addres if len(t.endpointsByService[serviceKey]) == 0 { delete(t.endpointsByService, serviceKey) } + t.hintsPopulatedByService.Delete(serviceKey) } // SetNodes updates the Node distribution for the TopologyCache. @@ -228,13 +261,36 @@ func (t *TopologyCache) SetNodes(nodes []*v1.Node) { } } +// HasPopulatedHints checks whether there are populated hints for a given service in the cache. +func (t *TopologyCache) HasPopulatedHints(serviceKey string) bool { + return t.hintsPopulatedByService.Has(serviceKey) +} + // getAllocations returns a set of minimum and maximum allocations per zone. If // it is not possible to provide allocations that are below the overload // threshold, a nil value will be returned. -func (t *TopologyCache) getAllocations(numEndpoints int) map[string]Allocation { - if t.cpuRatiosByZone == nil || len(t.cpuRatiosByZone) < 2 || len(t.cpuRatiosByZone) > numEndpoints { - klog.V(2).Infof("Insufficient info to allocate endpoints (%d endpoints, %d zones)", numEndpoints, len(t.cpuRatiosByZone)) - return nil +func (t *TopologyCache) getAllocations(numEndpoints int) (map[string]Allocation, *EventBuilder) { + // it is similar to checking !t.sufficientNodeInfo + if t.cpuRatiosByZone == nil { + return nil, &EventBuilder{ + EventType: v1.EventTypeWarning, + Reason: "TopologyAwareHintsDisabled", + Message: InsufficientNodeInfo, + } + } + if len(t.cpuRatiosByZone) < 2 { + return nil, &EventBuilder{ + EventType: v1.EventTypeWarning, + Reason: "TopologyAwareHintsDisabled", + Message: NodesReadyInOneZoneOnly, + } + } + if len(t.cpuRatiosByZone) > numEndpoints { + return nil, &EventBuilder{ + EventType: v1.EventTypeWarning, + Reason: "TopologyAwareHintsDisabled", + Message: fmt.Sprintf("%s (%d endpoints, %d zones)", InsufficientNumberOfEndpoints, numEndpoints, len(t.cpuRatiosByZone)), + } } t.lock.Lock() @@ -254,7 +310,11 @@ func (t *TopologyCache) getAllocations(numEndpoints int) map[string]Allocation { minTotal += minimum remainingMinEndpoints -= minimum if remainingMinEndpoints < 0 { - return nil + return nil, &EventBuilder{ + EventType: v1.EventTypeWarning, + Reason: "TopologyAwareHintsDisabled", + Message: fmt.Sprintf("%s (%d endpoints, %d zones)", MinAllocationExceedsOverloadThreshold, numEndpoints, len(t.cpuRatiosByZone)), + } } } @@ -263,7 +323,7 @@ func (t *TopologyCache) getAllocations(numEndpoints int) map[string]Allocation { allocations[zone] = allocation } - return allocations + return allocations, nil } // Nodes with any of these labels set to any value will be excluded from diff --git a/pkg/controller/endpointslice/topologycache/topologycache_test.go b/pkg/controller/endpointslice/topologycache/topologycache_test.go index a12dd97c40c..1ca16d63f66 100644 --- a/pkg/controller/endpointslice/topologycache/topologycache_test.go +++ b/pkg/controller/endpointslice/topologycache/topologycache_test.go @@ -18,8 +18,11 @@ package topologycache import ( "reflect" + "strings" "testing" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -35,6 +38,7 @@ func TestAddHints(t *testing.T) { expectedEndpointsByAddrType map[discovery.AddressType]EndpointZoneInfo expectedSlicesToCreate []*discovery.EndpointSlice expectedSlicesToUpdate []*discovery.EndpointSlice + expectedEvents []*EventBuilder }{{ name: "empty", cpuRatiosByZone: nil, @@ -45,6 +49,13 @@ func TestAddHints(t *testing.T) { expectedEndpointsByAddrType: nil, expectedSlicesToCreate: []*discovery.EndpointSlice{}, expectedSlicesToUpdate: []*discovery.EndpointSlice{}, + expectedEvents: []*EventBuilder{ + { + EventType: v1.EventTypeWarning, + Reason: "TopologyAwareHintsDisabled", + Message: InsufficientNodeInfo, + }, + }, }, { name: "slice to create, no zone ratios", cpuRatiosByZone: nil, @@ -68,6 +79,13 @@ func TestAddHints(t *testing.T) { }}, }}, expectedSlicesToUpdate: []*discovery.EndpointSlice{}, + expectedEvents: []*EventBuilder{ + { + EventType: v1.EventTypeWarning, + Reason: "TopologyAwareHintsDisabled", + Message: InsufficientNodeInfo, + }, + }, }, { name: "slice to create with 2 endpoints, zone ratios require 3", cpuRatiosByZone: map[string]float64{ @@ -103,6 +121,13 @@ func TestAddHints(t *testing.T) { }}, }}, expectedSlicesToUpdate: []*discovery.EndpointSlice{}, + expectedEvents: []*EventBuilder{ + { + EventType: v1.EventTypeWarning, + Reason: "TopologyAwareHintsDisabled", + Message: InsufficientNumberOfEndpoints, + }, + }, }, { name: "slice to create with 2 endpoints, zone ratios only require 2", cpuRatiosByZone: map[string]float64{ @@ -144,6 +169,13 @@ func TestAddHints(t *testing.T) { }}, }}, expectedSlicesToUpdate: []*discovery.EndpointSlice{}, + expectedEvents: []*EventBuilder{ + { + EventType: v1.EventTypeNormal, + Reason: "TopologyAwareHintsEnabled", + Message: TopologyAwareHintsEnabled, + }, + }, }, { name: "slice to create with 2 ready, 1 unready, 1 unknown endpoints, zone ratios only require 2", cpuRatiosByZone: map[string]float64{ @@ -201,6 +233,13 @@ func TestAddHints(t *testing.T) { }}, }}, expectedSlicesToUpdate: []*discovery.EndpointSlice{}, + expectedEvents: []*EventBuilder{ + { + EventType: v1.EventTypeNormal, + Reason: "TopologyAwareHintsEnabled", + Message: TopologyAwareHintsEnabled, + }, + }, }, { name: "slices to create and update within 3 zone threshold", cpuRatiosByZone: map[string]float64{ @@ -329,6 +368,13 @@ func TestAddHints(t *testing.T) { Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)}, }}, }}, + expectedEvents: []*EventBuilder{ + { + EventType: v1.EventTypeNormal, + Reason: "TopologyAwareHintsEnabled", + Message: TopologyAwareHintsEnabled, + }, + }, }} for _, tc := range testCases { @@ -336,10 +382,11 @@ func TestAddHints(t *testing.T) { cache := NewTopologyCache() cache.cpuRatiosByZone = tc.cpuRatiosByZone - slicesToCreate, slicesToUpdate := cache.AddHints(tc.sliceInfo) + slicesToCreate, slicesToUpdate, events := cache.AddHints(tc.sliceInfo) expectEquivalentSlices(t, slicesToCreate, tc.expectedSlicesToCreate) expectEquivalentSlices(t, slicesToUpdate, tc.expectedSlicesToUpdate) + compareExpectedEvents(t, tc.expectedEvents, events) endpointsByAddrType, ok := cache.endpointsByService[tc.sliceInfo.ServiceKey] if tc.expectedEndpointsByAddrType == nil { @@ -602,3 +649,17 @@ func expectEquivalentSlices(t *testing.T, actualSlices, expectedSlices []*discov } } } + +func compareExpectedEvents(t *testing.T, expectedEvents, events []*EventBuilder) { + if len(expectedEvents) != len(events) { + t.Errorf("Expected %d event, got %d", len(expectedEvents), len(events)) + } + for i, event := range events { + if diff := cmp.Diff(event, expectedEvents[i], cmpopts.IgnoreFields(EventBuilder{}, "Message")); diff != "" { + t.Errorf("Unexpected event (-want,+got):\n%s", diff) + } + if got, want := event.Message, expectedEvents[i].Message; !strings.HasPrefix(got, want) || want == "" { + t.Errorf("Unexpected event message:\ngot %q want a message with %q prefix", got, want) + } + } +} diff --git a/pkg/controller/endpointslice/topologycache/utils.go b/pkg/controller/endpointslice/topologycache/utils.go index e650e7d7dd8..b6d0f2d5388 100644 --- a/pkg/controller/endpointslice/topologycache/utils.go +++ b/pkg/controller/endpointslice/topologycache/utils.go @@ -17,6 +17,7 @@ limitations under the License. package topologycache import ( + "fmt" "math" v1 "k8s.io/api/core/v1" @@ -67,6 +68,11 @@ func RemoveHintsFromSlices(si *SliceInfo) ([]*discovery.EndpointSlice, []*discov return si.ToCreate, si.ToUpdate } +// FormatWithAddressType foramts a given string by adding an addressType to the end of it. +func FormatWithAddressType(s string, addressType discovery.AddressType) string { + return fmt.Sprintf("%s, addressType: %s", s, addressType) +} + // redistributeHints redistributes hints based in the provided EndpointSlices. // It allocates endpoints from the provided givingZones to the provided // receivingZones. This returns a map that represents the changes in allocated