diff --git a/pkg/apis/autoscaling/v2beta2/doc.go b/pkg/apis/autoscaling/v2beta2/doc.go index 7228e86c609..c7283aa40bb 100644 --- a/pkg/apis/autoscaling/v2beta2/doc.go +++ b/pkg/apis/autoscaling/v2beta2/doc.go @@ -17,5 +17,6 @@ limitations under the License. // +k8s:conversion-gen=k8s.io/kubernetes/pkg/apis/autoscaling // +k8s:conversion-gen-external-types=k8s.io/api/autoscaling/v2beta2 // +k8s:defaulter-gen=TypeMeta +// +k8s:defaulter-gen-input=../../../../vendor/k8s.io/api/autoscaling/v2beta2 package v2beta2 // import "k8s.io/kubernetes/pkg/apis/autoscaling/v2beta2" diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index e2502ab4fa0..85019a4dea7 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -61,6 +61,12 @@ type timestampedRecommendation struct { timestamp time.Time } +type timestampedScaleEvent struct { + replicaChange int32 // positive for scaleUp, negative for scaleDown + timestamp time.Time + outdated bool +} + // HorizontalController is responsible for the synchronizing HPA objects stored // in the system with the actual deployments/replication controllers they // control. @@ -89,6 +95,10 @@ type HorizontalController struct { // Latest unstabilized recommendations for each autoscaler. recommendations map[string][]timestampedRecommendation + + // Latest autoscaler events + scaleUpEvents map[string][]timestampedScaleEvent + scaleDownEvents map[string][]timestampedScaleEvent } // NewHorizontalController creates a new HorizontalController. @@ -120,6 +130,8 @@ func NewHorizontalController( queue: workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"), mapper: mapper, recommendations: map[string][]timestampedRecommendation{}, + scaleUpEvents: map[string][]timestampedScaleEvent{}, + scaleDownEvents: map[string][]timestampedScaleEvent{}, } hpaInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -340,6 +352,8 @@ func (a *HorizontalController) reconcileKey(key string) (deleted bool, err error if errors.IsNotFound(err) { klog.Infof("Horizontal Pod Autoscaler %s has been deleted in %s", name, namespace) delete(a.recommendations, key) + delete(a.scaleUpEvents, key) + delete(a.scaleDownEvents, key) return true, nil } @@ -623,7 +637,11 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho if desiredReplicas < currentReplicas { rescaleReason = "All metrics below target" } - desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas) + if hpa.Spec.Behavior == nil { + desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas) + } else { + desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas) + } rescale = desiredReplicas != currentReplicas } @@ -641,6 +659,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho } setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas) a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason) + a.storeScaleEvent(hpa.Spec.Behavior, key, currentReplicas, desiredReplicas) klog.Infof("Successful rescale of %s, old size: %d, new size: %d, reason: %s", hpa.Name, currentReplicas, desiredReplicas, rescaleReason) } else { @@ -697,6 +716,72 @@ func (a *HorizontalController) normalizeDesiredReplicas(hpa *autoscalingv2.Horiz return desiredReplicas } +// NormalizationArg is used to pass all needed information between functions as one structure +type NormalizationArg struct { + Key string + ScaleUpBehavior *autoscalingv2.HPAScalingRules + ScaleDownBehavior *autoscalingv2.HPAScalingRules + MinReplicas int32 + MaxReplicas int32 + CurrentReplicas int32 + DesiredReplicas int32 +} + +// normalizeDesiredReplicasWithBehaviors takes the metrics desired replicas value and normalizes it: +// 1. Apply the basic conditions (i.e. < maxReplicas, > minReplicas, etc...) +// 2. Apply the scale up/down limits from the hpaSpec.Behaviors (i.e. add no more than 4 pods) +// 3. Apply the constraints period (i.e. add no more than 4 pods per minute) +// 4. Apply the stabilization (i.e. add no more than 4 pods per minute, and pick the smallest recommendation during last 5 minutes) +func (a *HorizontalController) normalizeDesiredReplicasWithBehaviors(hpa *autoscalingv2.HorizontalPodAutoscaler, key string, currentReplicas, prenormalizedDesiredReplicas, minReplicas int32) int32 { + a.maybeInitScaleDownStabilizationWindow(hpa) + normalizationArg := NormalizationArg{ + Key: key, + ScaleUpBehavior: hpa.Spec.Behavior.ScaleUp, + ScaleDownBehavior: hpa.Spec.Behavior.ScaleDown, + MinReplicas: minReplicas, + MaxReplicas: hpa.Spec.MaxReplicas, + CurrentReplicas: currentReplicas, + DesiredReplicas: prenormalizedDesiredReplicas} + stabilizedRecommendation, reason, message := a.stabilizeRecommendationWithBehaviors(normalizationArg) + normalizationArg.DesiredReplicas = stabilizedRecommendation + if stabilizedRecommendation != prenormalizedDesiredReplicas { + // "ScaleUpStabilized" || "ScaleDownStabilized" + setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, reason, message) + } else { + setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "ReadyForNewScale", "recommended size matches current size") + } + desiredReplicas, reason, message := a.convertDesiredReplicasWithBehaviorRate(normalizationArg) + if desiredReplicas == stabilizedRecommendation { + setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionFalse, reason, message) + } else { + setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionTrue, reason, message) + } + + return desiredReplicas +} + +func (a *HorizontalController) maybeInitScaleDownStabilizationWindow(hpa *autoscalingv2.HorizontalPodAutoscaler) { + behavior := hpa.Spec.Behavior + if behavior != nil && behavior.ScaleDown != nil && behavior.ScaleDown.StabilizationWindowSeconds == nil { + stabilizationWindowSeconds := (int32)(a.downscaleStabilisationWindow.Seconds()) + hpa.Spec.Behavior.ScaleDown.StabilizationWindowSeconds = &stabilizationWindowSeconds + } +} + +// getReplicasChangePerPeriod function find all the replica changes per period +func getReplicasChangePerPeriod(periodSeconds int32, scaleEvents []timestampedScaleEvent) int32 { + period := time.Second * time.Duration(periodSeconds) + cutoff := time.Now().Add(-period) + var replicas int32 + for _, rec := range scaleEvents { + if rec.timestamp.After(cutoff) { + replicas += rec.replicaChange + } + } + return replicas + +} + func (a *HorizontalController) getUnableComputeReplicaCountCondition(hpa *autoscalingv2.HorizontalPodAutoscaler, reason string, err error) (condition autoscalingv2.HorizontalPodAutoscalerCondition) { a.eventRecorder.Event(hpa, v1.EventTypeWarning, reason, err.Error()) return autoscalingv2.HorizontalPodAutoscalerCondition{ @@ -707,6 +792,140 @@ func (a *HorizontalController) getUnableComputeReplicaCountCondition(hpa *autosc } } +// storeScaleEvent stores (adds or replaces outdated) scale event. +// outdated events to be replaced were marked as outdated in the `markScaleEventsOutdated` function +func (a *HorizontalController) storeScaleEvent(behavior *autoscalingv2.HorizontalPodAutoscalerBehavior, key string, prevReplicas, newReplicas int32) { + if behavior == nil { + return // we should not store any event as they will not be used + } + var oldSampleIndex int + var longestPolicyPeriod int32 + foundOldSample := false + if newReplicas > prevReplicas { + longestPolicyPeriod = getLongestPolicyPeriod(behavior.ScaleUp) + markScaleEventsOutdated(a.scaleUpEvents[key], longestPolicyPeriod) + replicaChange := newReplicas - prevReplicas + for i, event := range a.scaleUpEvents[key] { + if event.outdated { + foundOldSample = true + oldSampleIndex = i + } + } + newEvent := timestampedScaleEvent{replicaChange, time.Now(), false} + if foundOldSample { + a.scaleUpEvents[key][oldSampleIndex] = newEvent + } else { + a.scaleUpEvents[key] = append(a.scaleUpEvents[key], newEvent) + } + } else { + longestPolicyPeriod = getLongestPolicyPeriod(behavior.ScaleDown) + markScaleEventsOutdated(a.scaleDownEvents[key], longestPolicyPeriod) + replicaChange := prevReplicas - newReplicas + for i, event := range a.scaleDownEvents[key] { + if event.outdated { + foundOldSample = true + oldSampleIndex = i + } + } + newEvent := timestampedScaleEvent{replicaChange, time.Now(), false} + if foundOldSample { + a.scaleDownEvents[key][oldSampleIndex] = newEvent + } else { + a.scaleDownEvents[key] = append(a.scaleDownEvents[key], newEvent) + } + } +} + +// stabilizeRecommendationWithBehaviors: +// - replaces old recommendation with the newest recommendation, +// - returns {max,min} of recommendations that are not older than constraints.Scale{Up,Down}.DelaySeconds +func (a *HorizontalController) stabilizeRecommendationWithBehaviors(args NormalizationArg) (int32, string, string) { + recommendation := args.DesiredReplicas + foundOldSample := false + oldSampleIndex := 0 + var scaleDelaySeconds int32 + var reason, message string + + var betterRecommendation func(int32, int32) int32 + + if args.DesiredReplicas >= args.CurrentReplicas { + scaleDelaySeconds = *args.ScaleUpBehavior.StabilizationWindowSeconds + betterRecommendation = min + reason = "ScaleUpStabilized" + message = "recent recommendations were lower than current one, applying the lowest recent recommendation" + } else { + scaleDelaySeconds = *args.ScaleDownBehavior.StabilizationWindowSeconds + betterRecommendation = max + reason = "ScaleDownStabilized" + message = "recent recommendations were higher than current one, applying the highest recent recommendation" + } + + maxDelaySeconds := max(*args.ScaleUpBehavior.StabilizationWindowSeconds, *args.ScaleDownBehavior.StabilizationWindowSeconds) + obsoleteCutoff := time.Now().Add(-time.Second * time.Duration(maxDelaySeconds)) + + cutoff := time.Now().Add(-time.Second * time.Duration(scaleDelaySeconds)) + for i, rec := range a.recommendations[args.Key] { + if rec.timestamp.After(cutoff) { + recommendation = betterRecommendation(rec.recommendation, recommendation) + } + if rec.timestamp.Before(obsoleteCutoff) { + foundOldSample = true + oldSampleIndex = i + } + } + if foundOldSample { + a.recommendations[args.Key][oldSampleIndex] = timestampedRecommendation{args.DesiredReplicas, time.Now()} + } else { + a.recommendations[args.Key] = append(a.recommendations[args.Key], timestampedRecommendation{args.DesiredReplicas, time.Now()}) + } + return recommendation, reason, message +} + +// convertDesiredReplicasWithBehaviorRate performs the actual normalization, given the constraint rate +// It doesn't consider the stabilizationWindow, it is done separately +func (a *HorizontalController) convertDesiredReplicasWithBehaviorRate(args NormalizationArg) (int32, string, string) { + var possibleLimitingReason, possibleLimitingMessage string + + if args.DesiredReplicas > args.CurrentReplicas { + scaleUpLimit := calculateScaleUpLimitWithScalingRules(args.CurrentReplicas, a.scaleUpEvents[args.Key], args.ScaleUpBehavior) + if scaleUpLimit < args.CurrentReplicas { + // We shouldn't scale up further until the scaleUpEvents will be cleaned up + scaleUpLimit = args.CurrentReplicas + } + maximumAllowedReplicas := args.MaxReplicas + if maximumAllowedReplicas > scaleUpLimit { + maximumAllowedReplicas = scaleUpLimit + possibleLimitingReason = "ScaleUpLimit" + possibleLimitingMessage = "the desired replica count is increasing faster than the maximum scale rate" + } else { + possibleLimitingReason = "TooManyReplicas" + possibleLimitingMessage = "the desired replica count is more than the maximum replica count" + } + if args.DesiredReplicas > maximumAllowedReplicas { + return maximumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage + } + } else if args.DesiredReplicas < args.CurrentReplicas { + scaleDownLimit := calculateScaleDownLimitWithBehaviors(args.CurrentReplicas, a.scaleDownEvents[args.Key], args.ScaleDownBehavior) + if scaleDownLimit > args.CurrentReplicas { + // We shouldn't scale down further until the scaleDownEvents will be cleaned up + scaleDownLimit = args.CurrentReplicas + } + minimumAllowedReplicas := args.MinReplicas + if minimumAllowedReplicas < scaleDownLimit { + minimumAllowedReplicas = scaleDownLimit + possibleLimitingReason = "ScaleDownLimit" + possibleLimitingMessage = "the desired replica count is decreasing faster than the maximum scale rate" + } else { + possibleLimitingMessage = "the desired replica count is less than the minimum replica count" + possibleLimitingReason = "TooFewReplicas" + } + if args.DesiredReplicas < minimumAllowedReplicas { + return minimumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage + } + } + return args.DesiredReplicas, "DesiredWithinRange", "the desired count is within the acceptable range" +} + // convertDesiredReplicas performs the actual normalization, without depending on `HorizontalController` or `HorizontalPodAutoscaler` func convertDesiredReplicasWithRules(currentReplicas, desiredReplicas, hpaMinReplicas, hpaMaxReplicas int32) (int32, string, string) { @@ -750,6 +969,79 @@ func calculateScaleUpLimit(currentReplicas int32) int32 { return int32(math.Max(scaleUpLimitFactor*float64(currentReplicas), scaleUpLimitMinimum)) } +// markScaleEventsOutdated set 'outdated=true' flag for all scale events that are not used by any HPA object +func markScaleEventsOutdated(scaleEvents []timestampedScaleEvent, longestPolicyPeriod int32) { + period := time.Second * time.Duration(longestPolicyPeriod) + cutoff := time.Now().Add(-period) + for i, event := range scaleEvents { + if event.timestamp.Before(cutoff) { + // outdated scale event are marked for later reuse + scaleEvents[i].outdated = true + } + } +} + +func getLongestPolicyPeriod(scalingRules *autoscalingv2.HPAScalingRules) int32 { + var longestPolicyPeriod int32 + for _, policy := range scalingRules.Policies { + if policy.PeriodSeconds > longestPolicyPeriod { + longestPolicyPeriod = policy.PeriodSeconds + } + } + return longestPolicyPeriod +} + +// calculateScaleUpLimitWithScalingRules returns the maximum number of pods that could be added for the given HPAScalingRules +func calculateScaleUpLimitWithScalingRules(currentReplicas int32, scaleEvents []timestampedScaleEvent, scalingRules *autoscalingv2.HPAScalingRules) int32 { + var result int32 = 0 + var proposed int32 + var selectPolicyFn func(int32, int32) int32 + if *scalingRules.SelectPolicy == autoscalingv2.DisabledPolicySelect { + return currentReplicas // Scaling is disabled + } else if *scalingRules.SelectPolicy == autoscalingv2.MinPolicySelect { + selectPolicyFn = min // For scaling up, the lowest change ('min' policy) produces a minimum value + } else { + selectPolicyFn = max // Use the default policy otherwise to produce a highest possible change + } + for _, policy := range scalingRules.Policies { + replicasAddedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleEvents) + periodStartReplicas := currentReplicas - replicasAddedInCurrentPeriod + if policy.Type == autoscalingv2.PodsScalingPolicy { + proposed = int32(periodStartReplicas + policy.Value) + } else if policy.Type == autoscalingv2.PercentScalingPolicy { + // the proposal has to be rounded up because the proposed change might not increase the replica count causing the target to never scale up + proposed = int32(math.Ceil(float64(periodStartReplicas) * (1 + float64(policy.Value)/100))) + } + result = selectPolicyFn(result, proposed) + } + return result +} + +// calculateScaleDownLimitWithBehavior returns the maximum number of pods that could be deleted for the given HPAScalingRules +func calculateScaleDownLimitWithBehaviors(currentReplicas int32, scaleEvents []timestampedScaleEvent, scalingRules *autoscalingv2.HPAScalingRules) int32 { + var result int32 = math.MaxInt32 + var proposed int32 + var selectPolicyFn func(int32, int32) int32 + if *scalingRules.SelectPolicy == autoscalingv2.DisabledPolicySelect { + return currentReplicas // Scaling is disabled + } else if *scalingRules.SelectPolicy == autoscalingv2.MinPolicySelect { + selectPolicyFn = max // For scaling down, the lowest change ('min' policy) produces a maximum value + } else { + selectPolicyFn = min // Use the default policy otherwise to produce a highest possible change + } + for _, policy := range scalingRules.Policies { + replicasDeletedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleEvents) + periodStartReplicas := currentReplicas + replicasDeletedInCurrentPeriod + if policy.Type == autoscalingv2.PodsScalingPolicy { + proposed = periodStartReplicas - policy.Value + } else if policy.Type == autoscalingv2.PercentScalingPolicy { + proposed = int32(float64(periodStartReplicas) * (1 - float64(policy.Value)/100)) + } + result = selectPolicyFn(result, proposed) + } + return result +} + // scaleForResourceMappings attempts to fetch the scale for the // resource with the given name and namespace, trying each RESTMapping // in turn until a working one is found. If none work, the first error @@ -885,3 +1177,19 @@ func setConditionInList(inputList []autoscalingv2.HorizontalPodAutoscalerConditi return resList } + +func max(a, b int32) int32 { + if a >= b { + return a + } else { + return b + } +} + +func min(a, b int32) int32 { + if a <= b { + return a + } else { + return b + } +} diff --git a/staging/src/k8s.io/kubectl/pkg/describe/versioned/describe.go b/staging/src/k8s.io/kubectl/pkg/describe/versioned/describe.go index 0c0b7abb0a2..acf5ef125e1 100644 --- a/staging/src/k8s.io/kubectl/pkg/describe/versioned/describe.go +++ b/staging/src/k8s.io/kubectl/pkg/describe/versioned/describe.go @@ -3392,6 +3392,12 @@ func describeHorizontalPodAutoscalerV2beta2(hpa *autoscalingv2beta2.HorizontalPo } w.Write(LEVEL_0, "Min replicas:\t%s\n", minReplicas) w.Write(LEVEL_0, "Max replicas:\t%d\n", hpa.Spec.MaxReplicas) + // only print the hpa behavior if present + if hpa.Spec.Behavior != nil { + w.Write(LEVEL_0, "Behavior:\n") + printDirectionBehavior(w, "Scale Up", hpa.Spec.Behavior.ScaleUp) + printDirectionBehavior(w, "Scale Down", hpa.Spec.Behavior.ScaleDown) + } w.Write(LEVEL_0, "%s pods:\t", hpa.Spec.ScaleTargetRef.Kind) w.Write(LEVEL_0, "%d current / %d desired\n", hpa.Status.CurrentReplicas, hpa.Status.DesiredReplicas) @@ -3412,6 +3418,26 @@ func describeHorizontalPodAutoscalerV2beta2(hpa *autoscalingv2beta2.HorizontalPo }) } +func printDirectionBehavior(w PrefixWriter, direction string, rules *autoscalingv2beta2.HPAScalingRules) { + if rules != nil { + w.Write(LEVEL_1, "%s:\n", direction) + if rules.StabilizationWindowSeconds != nil { + w.Write(LEVEL_2, "Stabilization Window: %d seconds\n", *rules.StabilizationWindowSeconds) + } + if len(rules.Policies) > 0 { + if rules.SelectPolicy != nil { + w.Write(LEVEL_2, "Select Policy: %s\n", *rules.SelectPolicy) + } else { + w.Write(LEVEL_2, "Select Policy: %s\n", autoscalingv2beta2.MaxPolicySelect) + } + w.Write(LEVEL_2, "Policies:\n") + for _, p := range rules.Policies { + w.Write(LEVEL_3, "- Type: %s\tValue: %d\tPeriod: %d seconds\n", p.Type, p.Value, p.PeriodSeconds) + } + } + } +} + func describeHorizontalPodAutoscalerV1(hpa *autoscalingv1.HorizontalPodAutoscaler, events *corev1.EventList, d *HorizontalPodAutoscalerDescriber) (string, error) { return tabbedString(func(out io.Writer) error { w := NewPrefixWriter(out)