Merge pull request #113544 from LiorLieberman/topology-hints-events

Added: publishing events for topologyAwareHints changes
This commit is contained in:
Kubernetes Prow Robot 2022-11-07 16:01:15 -08:00 committed by GitHub
commit 0e530f44af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 220 additions and 36 deletions

View File

@ -164,6 +164,7 @@ func NewController(podInformer coreinformers.PodInformer,
endpointSliceTracker: c.endpointSliceTracker,
metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice),
topologyCache: c.topologyCache,
eventRecorder: c.eventRecorder,
}
return c

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
"k8s.io/kubernetes/pkg/controller/endpointslice/topologycache"
@ -50,6 +51,8 @@ type reconciler struct {
// topologyCache tracks the distribution of Nodes and endpoints across zones
// to enable TopologyAwareHints.
topologyCache *topologycache.TopologyCache
// eventRecorder allows reconciler to record and publish events.
eventRecorder record.EventRecorder
}
// endpointMeta includes the attributes we group slices on, this type helps with
@ -133,6 +136,7 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor
slicesToCreate := []*discovery.EndpointSlice{}
slicesToUpdate := []*discovery.EndpointSlice{}
slicesToDelete := []*discovery.EndpointSlice{}
events := []*topologycache.EventBuilder{}
// Build data structures for existing state.
existingSlicesByPortMap := map[endpointutil.PortMapKey][]*discovery.EndpointSlice{}
@ -252,25 +256,36 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor
// theoretically possible for endpoints of one address type to be assigned
// hints while another endpoints of another address type are not.
si := &topologycache.SliceInfo{
ServiceKey: fmt.Sprintf("%s/%s", service.Namespace, service.Name),
ToCreate: slicesToCreate,
ToUpdate: slicesToUpdate,
Unchanged: unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete),
ServiceKey: fmt.Sprintf("%s/%s", service.Namespace, service.Name),
AddressType: addressType,
ToCreate: slicesToCreate,
ToUpdate: slicesToUpdate,
Unchanged: unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete),
}
if r.topologyCache != nil && hintsEnabled(service.Annotations) {
slicesToCreate, slicesToUpdate = r.topologyCache.AddHints(si)
slicesToCreate, slicesToUpdate, events = r.topologyCache.AddHints(si)
} else {
if r.topologyCache != nil {
if r.topologyCache.HasPopulatedHints(si.ServiceKey) {
klog.InfoS("TopologyAwareHints annotation has changed, removing hints", "serviceKey", si.ServiceKey, "addressType", si.AddressType)
events = append(events, &topologycache.EventBuilder{
EventType: corev1.EventTypeWarning,
Reason: "TopologyAwareHintsDisabled",
Message: topologycache.FormatWithAddressType(topologycache.TopologyAwareHintsDisabled, si.AddressType),
})
}
r.topologyCache.RemoveHints(si.ServiceKey, addressType)
}
slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)
}
err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
if err != nil {
errs = append(errs, err)
}
for _, event := range events {
r.eventRecorder.Event(service, event.EventType, event.Reason, event.Message)
}
return utilerrors.NewAggregate(errs)
}

View File

@ -36,6 +36,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
corelisters "k8s.io/client-go/listers/core/v1"
k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
@ -1908,6 +1909,7 @@ func TestReconcileTopology(t *testing.T) {
// Test Helpers
func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPerSlice int32) *reconciler {
eventRecorder := record.NewFakeRecorder(10)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
nodeInformer := informerFactory.Core().V1().Nodes()
indexer := nodeInformer.Informer().GetIndexer()
@ -1921,6 +1923,7 @@ func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPer
maxEndpointsPerSlice: maxEndpointsPerSlice,
endpointSliceTracker: endpointsliceutil.NewEndpointSliceTracker(),
metricsCache: metrics.NewCache(maxEndpointsPerSlice),
eventRecorder: eventRecorder,
}
}

View File

@ -0,0 +1,38 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologycache
// TopologyAwareHints events messages list.
const (
NoZoneSpecified = "One or more endpoints do not have a zone specified"
NoAllocatedHintsForZones = "No hints allocated for zones"
TopologyAwareHintsEnabled = "Topology Aware Hints has been enabled"
TopologyAwareHintsDisabled = "Topology Aware Hints has been disabled"
InsufficientNodeInfo = "Insufficient Node information: allocatable CPU or zone not specified on one or more nodes"
NodesReadyInOneZoneOnly = "Nodes only ready in one zone"
InsufficientNumberOfEndpoints = "Insufficient number of endpoints"
MinAllocationExceedsOverloadThreshold = "Unable to allocate minimum required endpoints to each zone without exceeding overload threshold"
)
// EventBuilder let's us construct events in the code.
// We use it to build events and return them from a function instead of publishing them from within it.
// EventType, Reason, and Message fields are equivalent to the v1.Event fields - https://pkg.go.dev/k8s.io/api/core/v1#Event.
type EventBuilder struct {
EventType string
Reason string
Message string
}

View File

@ -17,12 +17,14 @@ limitations under the License.
package topologycache
import (
"fmt"
"math"
"sync"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
endpointsliceutil "k8s.io/kubernetes/pkg/controller/util/endpointslice"
)
@ -35,11 +37,12 @@ const (
// TopologyCache tracks the distribution of Nodes and endpoints across zones.
type TopologyCache struct {
lock sync.Mutex
sufficientNodeInfo bool
cpuByZone map[string]*resource.Quantity
cpuRatiosByZone map[string]float64
endpointsByService map[string]map[discovery.AddressType]EndpointZoneInfo
lock sync.Mutex
sufficientNodeInfo bool
cpuByZone map[string]*resource.Quantity
cpuRatiosByZone map[string]float64
endpointsByService map[string]map[discovery.AddressType]EndpointZoneInfo
hintsPopulatedByService sets.Set[string]
}
// EndpointZoneInfo tracks the distribution of endpoints across zones for a
@ -57,9 +60,10 @@ type Allocation struct {
// NewTopologyCache initializes a new TopologyCache.
func NewTopologyCache() *TopologyCache {
return &TopologyCache{
cpuByZone: map[string]*resource.Quantity{},
cpuRatiosByZone: map[string]float64{},
endpointsByService: map[string]map[discovery.AddressType]EndpointZoneInfo{},
cpuByZone: map[string]*resource.Quantity{},
cpuRatiosByZone: map[string]float64{},
endpointsByService: map[string]map[discovery.AddressType]EndpointZoneInfo{},
hintsPopulatedByService: sets.Set[string]{},
}
}
@ -84,14 +88,17 @@ func (t *TopologyCache) GetOverloadedServices() []string {
// AddHints adds or updates topology hints on EndpointSlices and returns updated
// lists of EndpointSlices to create and update.
func (t *TopologyCache) AddHints(si *SliceInfo) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice) {
func (t *TopologyCache) AddHints(si *SliceInfo) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice, []*EventBuilder) {
totalEndpoints := si.getTotalReadyEndpoints()
allocations := t.getAllocations(totalEndpoints)
if allocations == nil {
klog.V(2).InfoS("Insufficient endpoints, removing hints from service", "serviceKey", si.ServiceKey)
allocations, allocationsEvent := t.getAllocations(totalEndpoints)
events := []*EventBuilder{}
if allocationsEvent != nil {
klog.InfoS(allocationsEvent.Message+", removing hints", "serviceKey", si.ServiceKey, "addressType", si.AddressType)
allocationsEvent.Message = FormatWithAddressType(allocationsEvent.Message, si.AddressType)
events = append(events, allocationsEvent)
t.RemoveHints(si.ServiceKey, si.AddressType)
return RemoveHintsFromSlices(si)
slicesToCreate, slicesToUpdate := RemoveHintsFromSlices(si)
return slicesToCreate, slicesToUpdate, events
}
allocatedHintsByZone := si.getAllocatedHintsByZone(allocations)
@ -109,9 +116,15 @@ func (t *TopologyCache) AddHints(si *SliceInfo) ([]*discovery.EndpointSlice, []*
continue
}
if endpoint.Zone == nil || *endpoint.Zone == "" {
klog.InfoS("Endpoint found without zone specified, removing hints from service", "serviceKey", si.ServiceKey)
klog.InfoS("Endpoint found without zone specified, removing hints", "serviceKey", si.ServiceKey, "addressType", si.AddressType)
events = append(events, &EventBuilder{
EventType: v1.EventTypeWarning,
Reason: "TopologyAwareHintsDisabled",
Message: FormatWithAddressType(NoZoneSpecified, si.AddressType),
})
t.RemoveHints(si.ServiceKey, si.AddressType)
return RemoveHintsFromSlices(si)
slicesToCreate, slicesToUpdate := RemoveHintsFromSlices(si)
return slicesToCreate, slicesToUpdate, events
}
allocatedHintsByZone[*endpoint.Zone]++
@ -129,19 +142,36 @@ func (t *TopologyCache) AddHints(si *SliceInfo) ([]*discovery.EndpointSlice, []*
allocatedHintsByZone[zone] += diff
}
if len(allocatedHintsByZone) == 0 {
klog.V(2).InfoS("No hints allocated for zones, removing them", "serviceKey", si.ServiceKey, "addressType", si.AddressType)
events = append(events, &EventBuilder{
EventType: v1.EventTypeWarning,
Reason: "TopologyAwareHintsDisabled",
Message: FormatWithAddressType(NoAllocatedHintsForZones, si.AddressType),
})
t.RemoveHints(si.ServiceKey, si.AddressType)
slicesToCreate, slicesToUpdate := RemoveHintsFromSlices(si)
return slicesToCreate, slicesToUpdate, events
}
hintsEnabled := t.hintsPopulatedByService.Has(si.ServiceKey)
t.SetHints(si.ServiceKey, si.AddressType, allocatedHintsByZone)
return si.ToCreate, si.ToUpdate
// if hints were not enabled before, we publish an event to indicate we enabled them.
if !hintsEnabled {
klog.InfoS("Topology Aware Hints has been enabled, adding hints.", "serviceKey", si.ServiceKey, "addressType", si.AddressType)
events = append(events, &EventBuilder{
EventType: v1.EventTypeNormal,
Reason: "TopologyAwareHintsEnabled",
Message: FormatWithAddressType(TopologyAwareHintsEnabled, si.AddressType),
})
}
return si.ToCreate, si.ToUpdate, events
}
// SetHints sets topology hints for the provided serviceKey and addrType in this
// cache.
func (t *TopologyCache) SetHints(serviceKey string, addrType discovery.AddressType, allocatedHintsByZone EndpointZoneInfo) {
if len(allocatedHintsByZone) == 0 {
klog.V(2).Infof("No hints allocated for zones, removing them from %s EndpointSlices for %s Service", addrType, serviceKey)
t.RemoveHints(serviceKey, addrType)
return
}
t.lock.Lock()
defer t.lock.Unlock()
@ -150,6 +180,8 @@ func (t *TopologyCache) SetHints(serviceKey string, addrType discovery.AddressTy
t.endpointsByService[serviceKey] = map[discovery.AddressType]EndpointZoneInfo{}
}
t.endpointsByService[serviceKey][addrType] = allocatedHintsByZone
t.hintsPopulatedByService.Insert(serviceKey)
}
// RemoveHints removes topology hints for the provided serviceKey and addrType
@ -165,6 +197,7 @@ func (t *TopologyCache) RemoveHints(serviceKey string, addrType discovery.Addres
if len(t.endpointsByService[serviceKey]) == 0 {
delete(t.endpointsByService, serviceKey)
}
t.hintsPopulatedByService.Delete(serviceKey)
}
// SetNodes updates the Node distribution for the TopologyCache.
@ -228,13 +261,36 @@ func (t *TopologyCache) SetNodes(nodes []*v1.Node) {
}
}
// HasPopulatedHints checks whether there are populated hints for a given service in the cache.
func (t *TopologyCache) HasPopulatedHints(serviceKey string) bool {
return t.hintsPopulatedByService.Has(serviceKey)
}
// getAllocations returns a set of minimum and maximum allocations per zone. If
// it is not possible to provide allocations that are below the overload
// threshold, a nil value will be returned.
func (t *TopologyCache) getAllocations(numEndpoints int) map[string]Allocation {
if t.cpuRatiosByZone == nil || len(t.cpuRatiosByZone) < 2 || len(t.cpuRatiosByZone) > numEndpoints {
klog.V(2).Infof("Insufficient info to allocate endpoints (%d endpoints, %d zones)", numEndpoints, len(t.cpuRatiosByZone))
return nil
func (t *TopologyCache) getAllocations(numEndpoints int) (map[string]Allocation, *EventBuilder) {
// it is similar to checking !t.sufficientNodeInfo
if t.cpuRatiosByZone == nil {
return nil, &EventBuilder{
EventType: v1.EventTypeWarning,
Reason: "TopologyAwareHintsDisabled",
Message: InsufficientNodeInfo,
}
}
if len(t.cpuRatiosByZone) < 2 {
return nil, &EventBuilder{
EventType: v1.EventTypeWarning,
Reason: "TopologyAwareHintsDisabled",
Message: NodesReadyInOneZoneOnly,
}
}
if len(t.cpuRatiosByZone) > numEndpoints {
return nil, &EventBuilder{
EventType: v1.EventTypeWarning,
Reason: "TopologyAwareHintsDisabled",
Message: fmt.Sprintf("%s (%d endpoints, %d zones)", InsufficientNumberOfEndpoints, numEndpoints, len(t.cpuRatiosByZone)),
}
}
t.lock.Lock()
@ -254,7 +310,11 @@ func (t *TopologyCache) getAllocations(numEndpoints int) map[string]Allocation {
minTotal += minimum
remainingMinEndpoints -= minimum
if remainingMinEndpoints < 0 {
return nil
return nil, &EventBuilder{
EventType: v1.EventTypeWarning,
Reason: "TopologyAwareHintsDisabled",
Message: fmt.Sprintf("%s (%d endpoints, %d zones)", MinAllocationExceedsOverloadThreshold, numEndpoints, len(t.cpuRatiosByZone)),
}
}
}
@ -263,7 +323,7 @@ func (t *TopologyCache) getAllocations(numEndpoints int) map[string]Allocation {
allocations[zone] = allocation
}
return allocations
return allocations, nil
}
// Nodes with any of these labels set to any value will be excluded from

View File

@ -18,8 +18,11 @@ package topologycache
import (
"reflect"
"strings"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/resource"
@ -35,6 +38,7 @@ func TestAddHints(t *testing.T) {
expectedEndpointsByAddrType map[discovery.AddressType]EndpointZoneInfo
expectedSlicesToCreate []*discovery.EndpointSlice
expectedSlicesToUpdate []*discovery.EndpointSlice
expectedEvents []*EventBuilder
}{{
name: "empty",
cpuRatiosByZone: nil,
@ -45,6 +49,13 @@ func TestAddHints(t *testing.T) {
expectedEndpointsByAddrType: nil,
expectedSlicesToCreate: []*discovery.EndpointSlice{},
expectedSlicesToUpdate: []*discovery.EndpointSlice{},
expectedEvents: []*EventBuilder{
{
EventType: v1.EventTypeWarning,
Reason: "TopologyAwareHintsDisabled",
Message: InsufficientNodeInfo,
},
},
}, {
name: "slice to create, no zone ratios",
cpuRatiosByZone: nil,
@ -68,6 +79,13 @@ func TestAddHints(t *testing.T) {
}},
}},
expectedSlicesToUpdate: []*discovery.EndpointSlice{},
expectedEvents: []*EventBuilder{
{
EventType: v1.EventTypeWarning,
Reason: "TopologyAwareHintsDisabled",
Message: InsufficientNodeInfo,
},
},
}, {
name: "slice to create with 2 endpoints, zone ratios require 3",
cpuRatiosByZone: map[string]float64{
@ -103,6 +121,13 @@ func TestAddHints(t *testing.T) {
}},
}},
expectedSlicesToUpdate: []*discovery.EndpointSlice{},
expectedEvents: []*EventBuilder{
{
EventType: v1.EventTypeWarning,
Reason: "TopologyAwareHintsDisabled",
Message: InsufficientNumberOfEndpoints,
},
},
}, {
name: "slice to create with 2 endpoints, zone ratios only require 2",
cpuRatiosByZone: map[string]float64{
@ -144,6 +169,13 @@ func TestAddHints(t *testing.T) {
}},
}},
expectedSlicesToUpdate: []*discovery.EndpointSlice{},
expectedEvents: []*EventBuilder{
{
EventType: v1.EventTypeNormal,
Reason: "TopologyAwareHintsEnabled",
Message: TopologyAwareHintsEnabled,
},
},
}, {
name: "slice to create with 2 ready, 1 unready, 1 unknown endpoints, zone ratios only require 2",
cpuRatiosByZone: map[string]float64{
@ -201,6 +233,13 @@ func TestAddHints(t *testing.T) {
}},
}},
expectedSlicesToUpdate: []*discovery.EndpointSlice{},
expectedEvents: []*EventBuilder{
{
EventType: v1.EventTypeNormal,
Reason: "TopologyAwareHintsEnabled",
Message: TopologyAwareHintsEnabled,
},
},
}, {
name: "slices to create and update within 3 zone threshold",
cpuRatiosByZone: map[string]float64{
@ -329,6 +368,13 @@ func TestAddHints(t *testing.T) {
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
}},
}},
expectedEvents: []*EventBuilder{
{
EventType: v1.EventTypeNormal,
Reason: "TopologyAwareHintsEnabled",
Message: TopologyAwareHintsEnabled,
},
},
}}
for _, tc := range testCases {
@ -336,10 +382,11 @@ func TestAddHints(t *testing.T) {
cache := NewTopologyCache()
cache.cpuRatiosByZone = tc.cpuRatiosByZone
slicesToCreate, slicesToUpdate := cache.AddHints(tc.sliceInfo)
slicesToCreate, slicesToUpdate, events := cache.AddHints(tc.sliceInfo)
expectEquivalentSlices(t, slicesToCreate, tc.expectedSlicesToCreate)
expectEquivalentSlices(t, slicesToUpdate, tc.expectedSlicesToUpdate)
compareExpectedEvents(t, tc.expectedEvents, events)
endpointsByAddrType, ok := cache.endpointsByService[tc.sliceInfo.ServiceKey]
if tc.expectedEndpointsByAddrType == nil {
@ -602,3 +649,17 @@ func expectEquivalentSlices(t *testing.T, actualSlices, expectedSlices []*discov
}
}
}
func compareExpectedEvents(t *testing.T, expectedEvents, events []*EventBuilder) {
if len(expectedEvents) != len(events) {
t.Errorf("Expected %d event, got %d", len(expectedEvents), len(events))
}
for i, event := range events {
if diff := cmp.Diff(event, expectedEvents[i], cmpopts.IgnoreFields(EventBuilder{}, "Message")); diff != "" {
t.Errorf("Unexpected event (-want,+got):\n%s", diff)
}
if got, want := event.Message, expectedEvents[i].Message; !strings.HasPrefix(got, want) || want == "" {
t.Errorf("Unexpected event message:\ngot %q want a message with %q prefix", got, want)
}
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package topologycache
import (
"fmt"
"math"
v1 "k8s.io/api/core/v1"
@ -67,6 +68,11 @@ func RemoveHintsFromSlices(si *SliceInfo) ([]*discovery.EndpointSlice, []*discov
return si.ToCreate, si.ToUpdate
}
// FormatWithAddressType foramts a given string by adding an addressType to the end of it.
func FormatWithAddressType(s string, addressType discovery.AddressType) string {
return fmt.Sprintf("%s, addressType: %s", s, addressType)
}
// redistributeHints redistributes hints based in the provided EndpointSlices.
// It allocates endpoints from the provided givingZones to the provided
// receivingZones. This returns a map that represents the changes in allocated