Adds the algorithm implementation for the Configurable HPA

This commit is contained in:
Ivan Glushkov 2019-10-14 19:33:09 +04:00
parent 5c70cda6e5
commit 27ffe439b6
No known key found for this signature in database
GPG Key ID: 10C5CFB0C77CA594
3 changed files with 336 additions and 1 deletions

View File

@ -17,5 +17,6 @@ limitations under the License.
// +k8s:conversion-gen=k8s.io/kubernetes/pkg/apis/autoscaling
// +k8s:conversion-gen-external-types=k8s.io/api/autoscaling/v2beta2
// +k8s:defaulter-gen=TypeMeta
// +k8s:defaulter-gen-input=../../../../vendor/k8s.io/api/autoscaling/v2beta2
package v2beta2 // import "k8s.io/kubernetes/pkg/apis/autoscaling/v2beta2"

View File

@ -61,6 +61,12 @@ type timestampedRecommendation struct {
timestamp time.Time
}
type timestampedScaleEvent struct {
replicaChange int32 // positive for scaleUp, negative for scaleDown
timestamp time.Time
outdated bool
}
// HorizontalController is responsible for the synchronizing HPA objects stored
// in the system with the actual deployments/replication controllers they
// control.
@ -89,6 +95,10 @@ type HorizontalController struct {
// Latest unstabilized recommendations for each autoscaler.
recommendations map[string][]timestampedRecommendation
// Latest autoscaler events
scaleUpEvents map[string][]timestampedScaleEvent
scaleDownEvents map[string][]timestampedScaleEvent
}
// NewHorizontalController creates a new HorizontalController.
@ -120,6 +130,8 @@ func NewHorizontalController(
queue: workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"),
mapper: mapper,
recommendations: map[string][]timestampedRecommendation{},
scaleUpEvents: map[string][]timestampedScaleEvent{},
scaleDownEvents: map[string][]timestampedScaleEvent{},
}
hpaInformer.Informer().AddEventHandlerWithResyncPeriod(
@ -340,6 +352,8 @@ func (a *HorizontalController) reconcileKey(key string) (deleted bool, err error
if errors.IsNotFound(err) {
klog.Infof("Horizontal Pod Autoscaler %s has been deleted in %s", name, namespace)
delete(a.recommendations, key)
delete(a.scaleUpEvents, key)
delete(a.scaleDownEvents, key)
return true, nil
}
@ -623,7 +637,11 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
if desiredReplicas < currentReplicas {
rescaleReason = "All metrics below target"
}
desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)
if hpa.Spec.Behavior == nil {
desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)
} else {
desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)
}
rescale = desiredReplicas != currentReplicas
}
@ -641,6 +659,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
}
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
a.storeScaleEvent(hpa.Spec.Behavior, key, currentReplicas, desiredReplicas)
klog.Infof("Successful rescale of %s, old size: %d, new size: %d, reason: %s",
hpa.Name, currentReplicas, desiredReplicas, rescaleReason)
} else {
@ -697,6 +716,72 @@ func (a *HorizontalController) normalizeDesiredReplicas(hpa *autoscalingv2.Horiz
return desiredReplicas
}
// NormalizationArg is used to pass all needed information between functions as one structure
type NormalizationArg struct {
Key string
ScaleUpBehavior *autoscalingv2.HPAScalingRules
ScaleDownBehavior *autoscalingv2.HPAScalingRules
MinReplicas int32
MaxReplicas int32
CurrentReplicas int32
DesiredReplicas int32
}
// normalizeDesiredReplicasWithBehaviors takes the metrics desired replicas value and normalizes it:
// 1. Apply the basic conditions (i.e. < maxReplicas, > minReplicas, etc...)
// 2. Apply the scale up/down limits from the hpaSpec.Behaviors (i.e. add no more than 4 pods)
// 3. Apply the constraints period (i.e. add no more than 4 pods per minute)
// 4. Apply the stabilization (i.e. add no more than 4 pods per minute, and pick the smallest recommendation during last 5 minutes)
func (a *HorizontalController) normalizeDesiredReplicasWithBehaviors(hpa *autoscalingv2.HorizontalPodAutoscaler, key string, currentReplicas, prenormalizedDesiredReplicas, minReplicas int32) int32 {
a.maybeInitScaleDownStabilizationWindow(hpa)
normalizationArg := NormalizationArg{
Key: key,
ScaleUpBehavior: hpa.Spec.Behavior.ScaleUp,
ScaleDownBehavior: hpa.Spec.Behavior.ScaleDown,
MinReplicas: minReplicas,
MaxReplicas: hpa.Spec.MaxReplicas,
CurrentReplicas: currentReplicas,
DesiredReplicas: prenormalizedDesiredReplicas}
stabilizedRecommendation, reason, message := a.stabilizeRecommendationWithBehaviors(normalizationArg)
normalizationArg.DesiredReplicas = stabilizedRecommendation
if stabilizedRecommendation != prenormalizedDesiredReplicas {
// "ScaleUpStabilized" || "ScaleDownStabilized"
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, reason, message)
} else {
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "ReadyForNewScale", "recommended size matches current size")
}
desiredReplicas, reason, message := a.convertDesiredReplicasWithBehaviorRate(normalizationArg)
if desiredReplicas == stabilizedRecommendation {
setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionFalse, reason, message)
} else {
setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionTrue, reason, message)
}
return desiredReplicas
}
func (a *HorizontalController) maybeInitScaleDownStabilizationWindow(hpa *autoscalingv2.HorizontalPodAutoscaler) {
behavior := hpa.Spec.Behavior
if behavior != nil && behavior.ScaleDown != nil && behavior.ScaleDown.StabilizationWindowSeconds == nil {
stabilizationWindowSeconds := (int32)(a.downscaleStabilisationWindow.Seconds())
hpa.Spec.Behavior.ScaleDown.StabilizationWindowSeconds = &stabilizationWindowSeconds
}
}
// getReplicasChangePerPeriod function find all the replica changes per period
func getReplicasChangePerPeriod(periodSeconds int32, scaleEvents []timestampedScaleEvent) int32 {
period := time.Second * time.Duration(periodSeconds)
cutoff := time.Now().Add(-period)
var replicas int32
for _, rec := range scaleEvents {
if rec.timestamp.After(cutoff) {
replicas += rec.replicaChange
}
}
return replicas
}
func (a *HorizontalController) getUnableComputeReplicaCountCondition(hpa *autoscalingv2.HorizontalPodAutoscaler, reason string, err error) (condition autoscalingv2.HorizontalPodAutoscalerCondition) {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, reason, err.Error())
return autoscalingv2.HorizontalPodAutoscalerCondition{
@ -707,6 +792,140 @@ func (a *HorizontalController) getUnableComputeReplicaCountCondition(hpa *autosc
}
}
// storeScaleEvent stores (adds or replaces outdated) scale event.
// outdated events to be replaced were marked as outdated in the `markScaleEventsOutdated` function
func (a *HorizontalController) storeScaleEvent(behavior *autoscalingv2.HorizontalPodAutoscalerBehavior, key string, prevReplicas, newReplicas int32) {
if behavior == nil {
return // we should not store any event as they will not be used
}
var oldSampleIndex int
var longestPolicyPeriod int32
foundOldSample := false
if newReplicas > prevReplicas {
longestPolicyPeriod = getLongestPolicyPeriod(behavior.ScaleUp)
markScaleEventsOutdated(a.scaleUpEvents[key], longestPolicyPeriod)
replicaChange := newReplicas - prevReplicas
for i, event := range a.scaleUpEvents[key] {
if event.outdated {
foundOldSample = true
oldSampleIndex = i
}
}
newEvent := timestampedScaleEvent{replicaChange, time.Now(), false}
if foundOldSample {
a.scaleUpEvents[key][oldSampleIndex] = newEvent
} else {
a.scaleUpEvents[key] = append(a.scaleUpEvents[key], newEvent)
}
} else {
longestPolicyPeriod = getLongestPolicyPeriod(behavior.ScaleDown)
markScaleEventsOutdated(a.scaleDownEvents[key], longestPolicyPeriod)
replicaChange := prevReplicas - newReplicas
for i, event := range a.scaleDownEvents[key] {
if event.outdated {
foundOldSample = true
oldSampleIndex = i
}
}
newEvent := timestampedScaleEvent{replicaChange, time.Now(), false}
if foundOldSample {
a.scaleDownEvents[key][oldSampleIndex] = newEvent
} else {
a.scaleDownEvents[key] = append(a.scaleDownEvents[key], newEvent)
}
}
}
// stabilizeRecommendationWithBehaviors:
// - replaces old recommendation with the newest recommendation,
// - returns {max,min} of recommendations that are not older than constraints.Scale{Up,Down}.DelaySeconds
func (a *HorizontalController) stabilizeRecommendationWithBehaviors(args NormalizationArg) (int32, string, string) {
recommendation := args.DesiredReplicas
foundOldSample := false
oldSampleIndex := 0
var scaleDelaySeconds int32
var reason, message string
var betterRecommendation func(int32, int32) int32
if args.DesiredReplicas >= args.CurrentReplicas {
scaleDelaySeconds = *args.ScaleUpBehavior.StabilizationWindowSeconds
betterRecommendation = min
reason = "ScaleUpStabilized"
message = "recent recommendations were lower than current one, applying the lowest recent recommendation"
} else {
scaleDelaySeconds = *args.ScaleDownBehavior.StabilizationWindowSeconds
betterRecommendation = max
reason = "ScaleDownStabilized"
message = "recent recommendations were higher than current one, applying the highest recent recommendation"
}
maxDelaySeconds := max(*args.ScaleUpBehavior.StabilizationWindowSeconds, *args.ScaleDownBehavior.StabilizationWindowSeconds)
obsoleteCutoff := time.Now().Add(-time.Second * time.Duration(maxDelaySeconds))
cutoff := time.Now().Add(-time.Second * time.Duration(scaleDelaySeconds))
for i, rec := range a.recommendations[args.Key] {
if rec.timestamp.After(cutoff) {
recommendation = betterRecommendation(rec.recommendation, recommendation)
}
if rec.timestamp.Before(obsoleteCutoff) {
foundOldSample = true
oldSampleIndex = i
}
}
if foundOldSample {
a.recommendations[args.Key][oldSampleIndex] = timestampedRecommendation{args.DesiredReplicas, time.Now()}
} else {
a.recommendations[args.Key] = append(a.recommendations[args.Key], timestampedRecommendation{args.DesiredReplicas, time.Now()})
}
return recommendation, reason, message
}
// convertDesiredReplicasWithBehaviorRate performs the actual normalization, given the constraint rate
// It doesn't consider the stabilizationWindow, it is done separately
func (a *HorizontalController) convertDesiredReplicasWithBehaviorRate(args NormalizationArg) (int32, string, string) {
var possibleLimitingReason, possibleLimitingMessage string
if args.DesiredReplicas > args.CurrentReplicas {
scaleUpLimit := calculateScaleUpLimitWithScalingRules(args.CurrentReplicas, a.scaleUpEvents[args.Key], args.ScaleUpBehavior)
if scaleUpLimit < args.CurrentReplicas {
// We shouldn't scale up further until the scaleUpEvents will be cleaned up
scaleUpLimit = args.CurrentReplicas
}
maximumAllowedReplicas := args.MaxReplicas
if maximumAllowedReplicas > scaleUpLimit {
maximumAllowedReplicas = scaleUpLimit
possibleLimitingReason = "ScaleUpLimit"
possibleLimitingMessage = "the desired replica count is increasing faster than the maximum scale rate"
} else {
possibleLimitingReason = "TooManyReplicas"
possibleLimitingMessage = "the desired replica count is more than the maximum replica count"
}
if args.DesiredReplicas > maximumAllowedReplicas {
return maximumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage
}
} else if args.DesiredReplicas < args.CurrentReplicas {
scaleDownLimit := calculateScaleDownLimitWithBehaviors(args.CurrentReplicas, a.scaleDownEvents[args.Key], args.ScaleDownBehavior)
if scaleDownLimit > args.CurrentReplicas {
// We shouldn't scale down further until the scaleDownEvents will be cleaned up
scaleDownLimit = args.CurrentReplicas
}
minimumAllowedReplicas := args.MinReplicas
if minimumAllowedReplicas < scaleDownLimit {
minimumAllowedReplicas = scaleDownLimit
possibleLimitingReason = "ScaleDownLimit"
possibleLimitingMessage = "the desired replica count is decreasing faster than the maximum scale rate"
} else {
possibleLimitingMessage = "the desired replica count is less than the minimum replica count"
possibleLimitingReason = "TooFewReplicas"
}
if args.DesiredReplicas < minimumAllowedReplicas {
return minimumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage
}
}
return args.DesiredReplicas, "DesiredWithinRange", "the desired count is within the acceptable range"
}
// convertDesiredReplicas performs the actual normalization, without depending on `HorizontalController` or `HorizontalPodAutoscaler`
func convertDesiredReplicasWithRules(currentReplicas, desiredReplicas, hpaMinReplicas, hpaMaxReplicas int32) (int32, string, string) {
@ -750,6 +969,79 @@ func calculateScaleUpLimit(currentReplicas int32) int32 {
return int32(math.Max(scaleUpLimitFactor*float64(currentReplicas), scaleUpLimitMinimum))
}
// markScaleEventsOutdated set 'outdated=true' flag for all scale events that are not used by any HPA object
func markScaleEventsOutdated(scaleEvents []timestampedScaleEvent, longestPolicyPeriod int32) {
period := time.Second * time.Duration(longestPolicyPeriod)
cutoff := time.Now().Add(-period)
for i, event := range scaleEvents {
if event.timestamp.Before(cutoff) {
// outdated scale event are marked for later reuse
scaleEvents[i].outdated = true
}
}
}
func getLongestPolicyPeriod(scalingRules *autoscalingv2.HPAScalingRules) int32 {
var longestPolicyPeriod int32
for _, policy := range scalingRules.Policies {
if policy.PeriodSeconds > longestPolicyPeriod {
longestPolicyPeriod = policy.PeriodSeconds
}
}
return longestPolicyPeriod
}
// calculateScaleUpLimitWithScalingRules returns the maximum number of pods that could be added for the given HPAScalingRules
func calculateScaleUpLimitWithScalingRules(currentReplicas int32, scaleEvents []timestampedScaleEvent, scalingRules *autoscalingv2.HPAScalingRules) int32 {
var result int32 = 0
var proposed int32
var selectPolicyFn func(int32, int32) int32
if *scalingRules.SelectPolicy == autoscalingv2.DisabledPolicySelect {
return currentReplicas // Scaling is disabled
} else if *scalingRules.SelectPolicy == autoscalingv2.MinPolicySelect {
selectPolicyFn = min // For scaling up, the lowest change ('min' policy) produces a minimum value
} else {
selectPolicyFn = max // Use the default policy otherwise to produce a highest possible change
}
for _, policy := range scalingRules.Policies {
replicasAddedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleEvents)
periodStartReplicas := currentReplicas - replicasAddedInCurrentPeriod
if policy.Type == autoscalingv2.PodsScalingPolicy {
proposed = int32(periodStartReplicas + policy.Value)
} else if policy.Type == autoscalingv2.PercentScalingPolicy {
// the proposal has to be rounded up because the proposed change might not increase the replica count causing the target to never scale up
proposed = int32(math.Ceil(float64(periodStartReplicas) * (1 + float64(policy.Value)/100)))
}
result = selectPolicyFn(result, proposed)
}
return result
}
// calculateScaleDownLimitWithBehavior returns the maximum number of pods that could be deleted for the given HPAScalingRules
func calculateScaleDownLimitWithBehaviors(currentReplicas int32, scaleEvents []timestampedScaleEvent, scalingRules *autoscalingv2.HPAScalingRules) int32 {
var result int32 = math.MaxInt32
var proposed int32
var selectPolicyFn func(int32, int32) int32
if *scalingRules.SelectPolicy == autoscalingv2.DisabledPolicySelect {
return currentReplicas // Scaling is disabled
} else if *scalingRules.SelectPolicy == autoscalingv2.MinPolicySelect {
selectPolicyFn = max // For scaling down, the lowest change ('min' policy) produces a maximum value
} else {
selectPolicyFn = min // Use the default policy otherwise to produce a highest possible change
}
for _, policy := range scalingRules.Policies {
replicasDeletedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleEvents)
periodStartReplicas := currentReplicas + replicasDeletedInCurrentPeriod
if policy.Type == autoscalingv2.PodsScalingPolicy {
proposed = periodStartReplicas - policy.Value
} else if policy.Type == autoscalingv2.PercentScalingPolicy {
proposed = int32(float64(periodStartReplicas) * (1 - float64(policy.Value)/100))
}
result = selectPolicyFn(result, proposed)
}
return result
}
// scaleForResourceMappings attempts to fetch the scale for the
// resource with the given name and namespace, trying each RESTMapping
// in turn until a working one is found. If none work, the first error
@ -885,3 +1177,19 @@ func setConditionInList(inputList []autoscalingv2.HorizontalPodAutoscalerConditi
return resList
}
func max(a, b int32) int32 {
if a >= b {
return a
} else {
return b
}
}
func min(a, b int32) int32 {
if a <= b {
return a
} else {
return b
}
}

View File

@ -3392,6 +3392,12 @@ func describeHorizontalPodAutoscalerV2beta2(hpa *autoscalingv2beta2.HorizontalPo
}
w.Write(LEVEL_0, "Min replicas:\t%s\n", minReplicas)
w.Write(LEVEL_0, "Max replicas:\t%d\n", hpa.Spec.MaxReplicas)
// only print the hpa behavior if present
if hpa.Spec.Behavior != nil {
w.Write(LEVEL_0, "Behavior:\n")
printDirectionBehavior(w, "Scale Up", hpa.Spec.Behavior.ScaleUp)
printDirectionBehavior(w, "Scale Down", hpa.Spec.Behavior.ScaleDown)
}
w.Write(LEVEL_0, "%s pods:\t", hpa.Spec.ScaleTargetRef.Kind)
w.Write(LEVEL_0, "%d current / %d desired\n", hpa.Status.CurrentReplicas, hpa.Status.DesiredReplicas)
@ -3412,6 +3418,26 @@ func describeHorizontalPodAutoscalerV2beta2(hpa *autoscalingv2beta2.HorizontalPo
})
}
func printDirectionBehavior(w PrefixWriter, direction string, rules *autoscalingv2beta2.HPAScalingRules) {
if rules != nil {
w.Write(LEVEL_1, "%s:\n", direction)
if rules.StabilizationWindowSeconds != nil {
w.Write(LEVEL_2, "Stabilization Window: %d seconds\n", *rules.StabilizationWindowSeconds)
}
if len(rules.Policies) > 0 {
if rules.SelectPolicy != nil {
w.Write(LEVEL_2, "Select Policy: %s\n", *rules.SelectPolicy)
} else {
w.Write(LEVEL_2, "Select Policy: %s\n", autoscalingv2beta2.MaxPolicySelect)
}
w.Write(LEVEL_2, "Policies:\n")
for _, p := range rules.Policies {
w.Write(LEVEL_3, "- Type: %s\tValue: %d\tPeriod: %d seconds\n", p.Type, p.Value, p.PeriodSeconds)
}
}
}
}
func describeHorizontalPodAutoscalerV1(hpa *autoscalingv1.HorizontalPodAutoscaler, events *corev1.EventList, d *HorizontalPodAutoscalerDescriber) (string, error) {
return tabbedString(func(out io.Writer) error {
w := NewPrefixWriter(out)