add --concurrent-horizontal-pod-autoscaler-syncs flag to kube-controller-manager

Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>
This commit is contained in:
Zbynek Roubalik
2022-03-10 13:00:13 +01:00
parent b307321c0a
commit 1cefcdea2d
11 changed files with 348 additions and 9 deletions

View File

@@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math"
"sync"
"time"
autoscalingv1 "k8s.io/api/autoscaling/v1"
@@ -94,11 +95,14 @@ type HorizontalController struct {
queue workqueue.RateLimitingInterface
// Latest unstabilized recommendations for each autoscaler.
recommendations map[string][]timestampedRecommendation
recommendations map[string][]timestampedRecommendation
recommendationsLock sync.Mutex
// Latest autoscaler events
scaleUpEvents map[string][]timestampedScaleEvent
scaleDownEvents map[string][]timestampedScaleEvent
scaleUpEvents map[string][]timestampedScaleEvent
scaleUpEventsLock sync.RWMutex
scaleDownEvents map[string][]timestampedScaleEvent
scaleDownEventsLock sync.RWMutex
}
// NewHorizontalController creates a new HorizontalController.
@@ -130,8 +134,11 @@ func NewHorizontalController(
queue: workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"),
mapper: mapper,
recommendations: map[string][]timestampedRecommendation{},
recommendationsLock: sync.Mutex{},
scaleUpEvents: map[string][]timestampedScaleEvent{},
scaleUpEventsLock: sync.RWMutex{},
scaleDownEvents: map[string][]timestampedScaleEvent{},
scaleDownEventsLock: sync.RWMutex{},
}
hpaInformer.Informer().AddEventHandlerWithResyncPeriod(
@@ -161,7 +168,7 @@ func NewHorizontalController(
}
// Run begins watching and syncing.
func (a *HorizontalController) Run(ctx context.Context) {
func (a *HorizontalController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer a.queue.ShutDown()
@@ -172,8 +179,9 @@ func (a *HorizontalController) Run(ctx context.Context) {
return
}
// start a single worker (we may wish to start more in the future)
go wait.UntilWithContext(ctx, a.worker, time.Second)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, a.worker, time.Second)
}
<-ctx.Done()
}
@@ -358,9 +366,19 @@ func (a *HorizontalController) reconcileKey(ctx context.Context, key string) (de
hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
if errors.IsNotFound(err) {
klog.Infof("Horizontal Pod Autoscaler %s has been deleted in %s", name, namespace)
a.recommendationsLock.Lock()
delete(a.recommendations, key)
a.recommendationsLock.Unlock()
a.scaleUpEventsLock.Lock()
delete(a.scaleUpEvents, key)
a.scaleUpEventsLock.Unlock()
a.scaleDownEventsLock.Lock()
delete(a.scaleDownEvents, key)
a.scaleDownEventsLock.Unlock()
return true, nil
}
if err != nil {
@@ -565,6 +583,8 @@ func (a *HorizontalController) computeStatusForExternalMetric(specReplicas, stat
}
func (a *HorizontalController) recordInitialRecommendation(currentReplicas int32, key string) {
a.recommendationsLock.Lock()
defer a.recommendationsLock.Unlock()
if a.recommendations[key] == nil {
a.recommendations[key] = []timestampedRecommendation{{currentReplicas, time.Now()}}
}
@@ -713,6 +733,9 @@ func (a *HorizontalController) stabilizeRecommendation(key string, prenormalized
foundOldSample := false
oldSampleIndex := 0
cutoff := time.Now().Add(-a.downscaleStabilisationWindow)
a.recommendationsLock.Lock()
defer a.recommendationsLock.Unlock()
for i, rec := range a.recommendations[key] {
if rec.timestamp.Before(cutoff) {
foundOldSample = true
@@ -837,6 +860,9 @@ func (a *HorizontalController) storeScaleEvent(behavior *autoscalingv2.Horizonta
foundOldSample := false
if newReplicas > prevReplicas {
longestPolicyPeriod = getLongestPolicyPeriod(behavior.ScaleUp)
a.scaleUpEventsLock.Lock()
defer a.scaleUpEventsLock.Unlock()
markScaleEventsOutdated(a.scaleUpEvents[key], longestPolicyPeriod)
replicaChange := newReplicas - prevReplicas
for i, event := range a.scaleUpEvents[key] {
@@ -853,6 +879,9 @@ func (a *HorizontalController) storeScaleEvent(behavior *autoscalingv2.Horizonta
}
} else {
longestPolicyPeriod = getLongestPolicyPeriod(behavior.ScaleDown)
a.scaleDownEventsLock.Lock()
defer a.scaleDownEventsLock.Unlock()
markScaleEventsOutdated(a.scaleDownEvents[key], longestPolicyPeriod)
replicaChange := prevReplicas - newReplicas
for i, event := range a.scaleDownEvents[key] {
@@ -888,6 +917,8 @@ func (a *HorizontalController) stabilizeRecommendationWithBehaviors(args Normali
downCutoff := now.Add(-time.Second * time.Duration(downDelaySeconds))
// Calculate the upper and lower stabilization limits.
a.recommendationsLock.Lock()
defer a.recommendationsLock.Unlock()
for i, rec := range a.recommendations[args.Key] {
if rec.timestamp.After(upCutoff) {
upRecommendation = min(rec.recommendation, upRecommendation)
@@ -935,7 +966,12 @@ func (a *HorizontalController) convertDesiredReplicasWithBehaviorRate(args Norma
var possibleLimitingReason, possibleLimitingMessage string
if args.DesiredReplicas > args.CurrentReplicas {
a.scaleUpEventsLock.RLock()
defer a.scaleUpEventsLock.RUnlock()
a.scaleDownEventsLock.RLock()
defer a.scaleDownEventsLock.RUnlock()
scaleUpLimit := calculateScaleUpLimitWithScalingRules(args.CurrentReplicas, a.scaleUpEvents[args.Key], a.scaleDownEvents[args.Key], args.ScaleUpBehavior)
if scaleUpLimit < args.CurrentReplicas {
// We shouldn't scale up further until the scaleUpEvents will be cleaned up
scaleUpLimit = args.CurrentReplicas
@@ -953,7 +989,12 @@ func (a *HorizontalController) convertDesiredReplicasWithBehaviorRate(args Norma
return maximumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage
}
} else if args.DesiredReplicas < args.CurrentReplicas {
a.scaleUpEventsLock.RLock()
defer a.scaleUpEventsLock.RUnlock()
a.scaleDownEventsLock.RLock()
defer a.scaleDownEventsLock.RUnlock()
scaleDownLimit := calculateScaleDownLimitWithBehaviors(args.CurrentReplicas, a.scaleUpEvents[args.Key], a.scaleDownEvents[args.Key], args.ScaleDownBehavior)
if scaleDownLimit > args.CurrentReplicas {
// We shouldn't scale down further until the scaleDownEvents will be cleaned up
scaleDownLimit = args.CurrentReplicas