From 1cefcdea2d6195a794ef122cb7a36614a580d41b Mon Sep 17 00:00:00 2001 From: Zbynek Roubalik Date: Thu, 10 Mar 2022 13:00:13 +0100 Subject: [PATCH] add `--concurrent-horizontal-pod-autoscaler-syncs` flag to kube-controller-manager Signed-off-by: Zbynek Roubalik --- api/api-rules/violation_exceptions.list | 1 + .../app/autoscaling.go | 2 +- .../app/options/hpacontroller.go | 7 + .../app/options/options_test.go | 3 + pkg/controller/podautoscaler/config/types.go | 3 + .../podautoscaler/config/v1alpha1/defaults.go | 3 + .../v1alpha1/zz_generated.conversion.go | 2 + pkg/controller/podautoscaler/horizontal.go | 53 +++- .../podautoscaler/horizontal_test.go | 270 +++++++++++++++++- pkg/generated/openapi/zz_generated.openapi.go | 10 +- .../config/v1alpha1/types.go | 3 + 11 files changed, 348 insertions(+), 9 deletions(-) diff --git a/api/api-rules/violation_exceptions.list b/api/api-rules/violation_exceptions.list index b4131c24db3..03718b6691b 100644 --- a/api/api-rules/violation_exceptions.list +++ b/api/api-rules/violation_exceptions.list @@ -513,6 +513,7 @@ API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,G API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,GarbageCollectorControllerConfiguration,GCIgnoredResources API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,GroupResource,Group API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,GroupResource,Resource +API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,HPAControllerConfiguration,ConcurrentHorizontalPodAutoscalerSyncs API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,HPAControllerConfiguration,HorizontalPodAutoscalerCPUInitializationPeriod API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,HPAControllerConfiguration,HorizontalPodAutoscalerDownscaleForbiddenWindow API rule violation: names_match,k8s.io/kube-controller-manager/config/v1alpha1,HPAControllerConfiguration,HorizontalPodAutoscalerDownscaleStabilizationWindow diff --git a/cmd/kube-controller-manager/app/autoscaling.go b/cmd/kube-controller-manager/app/autoscaling.go index 90cf272b4d0..28f8ada52b4 100644 --- a/cmd/kube-controller-manager/app/autoscaling.go +++ b/cmd/kube-controller-manager/app/autoscaling.go @@ -87,6 +87,6 @@ func startHPAControllerWithMetricsClient(ctx context.Context, controllerContext controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance, controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration, controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration, - ).Run(ctx) + ).Run(ctx, int(controllerContext.ComponentConfig.HPAController.ConcurrentHorizontalPodAutoscalerSyncs)) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/options/hpacontroller.go b/cmd/kube-controller-manager/app/options/hpacontroller.go index 3ca1a74cb51..a6cb41052f7 100644 --- a/cmd/kube-controller-manager/app/options/hpacontroller.go +++ b/cmd/kube-controller-manager/app/options/hpacontroller.go @@ -17,6 +17,8 @@ limitations under the License. package options import ( + "fmt" + "github.com/spf13/pflag" poautosclerconfig "k8s.io/kubernetes/pkg/controller/podautoscaler/config" @@ -33,6 +35,7 @@ func (o *HPAControllerOptions) AddFlags(fs *pflag.FlagSet) { return } + fs.Int32Var(&o.ConcurrentHorizontalPodAutoscalerSyncs, "concurrent-horizontal-pod-autoscaler-syncs", o.ConcurrentHorizontalPodAutoscalerSyncs, "The number of horizontal pod autoscaler objects that are allowed to sync concurrently. Larger number = more responsive horizontal pod autoscaler objects processing, but more CPU (and network) load.") fs.DurationVar(&o.HorizontalPodAutoscalerSyncPeriod.Duration, "horizontal-pod-autoscaler-sync-period", o.HorizontalPodAutoscalerSyncPeriod.Duration, "The period for syncing the number of pods in horizontal pod autoscaler.") fs.DurationVar(&o.HorizontalPodAutoscalerUpscaleForbiddenWindow.Duration, "horizontal-pod-autoscaler-upscale-delay", o.HorizontalPodAutoscalerUpscaleForbiddenWindow.Duration, "The period since last upscale, before another upscale can be performed in horizontal pod autoscaler.") fs.MarkDeprecated("horizontal-pod-autoscaler-upscale-delay", "This flag is currently no-op and will be deleted.") @@ -50,6 +53,7 @@ func (o *HPAControllerOptions) ApplyTo(cfg *poautosclerconfig.HPAControllerConfi return nil } + cfg.ConcurrentHorizontalPodAutoscalerSyncs = o.ConcurrentHorizontalPodAutoscalerSyncs cfg.HorizontalPodAutoscalerSyncPeriod = o.HorizontalPodAutoscalerSyncPeriod cfg.HorizontalPodAutoscalerDownscaleStabilizationWindow = o.HorizontalPodAutoscalerDownscaleStabilizationWindow cfg.HorizontalPodAutoscalerTolerance = o.HorizontalPodAutoscalerTolerance @@ -68,5 +72,8 @@ func (o *HPAControllerOptions) Validate() []error { } errs := []error{} + if o.ConcurrentHorizontalPodAutoscalerSyncs < 1 { + errs = append(errs, fmt.Errorf("concurrent-horizontal-pod-autoscaler-syncs must be greater than 0, but got %d", o.ConcurrentHorizontalPodAutoscalerSyncs)) + } return errs } diff --git a/cmd/kube-controller-manager/app/options/options_test.go b/cmd/kube-controller-manager/app/options/options_test.go index 952a70adc4e..7c983b84b9b 100644 --- a/cmd/kube-controller-manager/app/options/options_test.go +++ b/cmd/kube-controller-manager/app/options/options_test.go @@ -84,6 +84,7 @@ var args = []string{ "--cluster-signing-legacy-unknown-cert-file=/cluster-signing-legacy-unknown/cert-file", "--cluster-signing-legacy-unknown-key-file=/cluster-signing-legacy-unknown/key-file", "--concurrent-deployment-syncs=10", + "--concurrent-horizontal-pod-autoscaler-syncs=10", "--concurrent-statefulset-syncs=15", "--concurrent-endpoint-syncs=10", "--concurrent-ephemeralvolume-syncs=10", @@ -304,6 +305,7 @@ func TestAddFlags(t *testing.T) { }, HPAController: &HPAControllerOptions{ &poautosclerconfig.HPAControllerConfiguration{ + ConcurrentHorizontalPodAutoscalerSyncs: 10, HorizontalPodAutoscalerSyncPeriod: metav1.Duration{Duration: 45 * time.Second}, HorizontalPodAutoscalerUpscaleForbiddenWindow: metav1.Duration{Duration: 1 * time.Minute}, HorizontalPodAutoscalerDownscaleForbiddenWindow: metav1.Duration{Duration: 2 * time.Minute}, @@ -558,6 +560,7 @@ func TestApplyTo(t *testing.T) { EnableGarbageCollector: false, }, HPAController: poautosclerconfig.HPAControllerConfiguration{ + ConcurrentHorizontalPodAutoscalerSyncs: 10, HorizontalPodAutoscalerSyncPeriod: metav1.Duration{Duration: 45 * time.Second}, HorizontalPodAutoscalerUpscaleForbiddenWindow: metav1.Duration{Duration: 1 * time.Minute}, HorizontalPodAutoscalerDownscaleForbiddenWindow: metav1.Duration{Duration: 2 * time.Minute}, diff --git a/pkg/controller/podautoscaler/config/types.go b/pkg/controller/podautoscaler/config/types.go index 570e0970690..25d21920a37 100644 --- a/pkg/controller/podautoscaler/config/types.go +++ b/pkg/controller/podautoscaler/config/types.go @@ -22,6 +22,9 @@ import ( // HPAControllerConfiguration contains elements describing HPAController. type HPAControllerConfiguration struct { + // ConcurrentHorizontalPodAutoscalerSyncs is the number of HPA objects that are allowed to sync concurrently. + // Larger number = more responsive HPA processing, but more CPU (and network) load. + ConcurrentHorizontalPodAutoscalerSyncs int32 // horizontalPodAutoscalerSyncPeriod is the period for syncing the number of // pods in horizontal pod autoscaler. HorizontalPodAutoscalerSyncPeriod metav1.Duration diff --git a/pkg/controller/podautoscaler/config/v1alpha1/defaults.go b/pkg/controller/podautoscaler/config/v1alpha1/defaults.go index f25e8d611ee..9bbb8efecb1 100644 --- a/pkg/controller/podautoscaler/config/v1alpha1/defaults.go +++ b/pkg/controller/podautoscaler/config/v1alpha1/defaults.go @@ -34,6 +34,9 @@ import ( // run it in your wrapper struct of this type in its `SetDefaults_` method. func RecommendedDefaultHPAControllerConfiguration(obj *kubectrlmgrconfigv1alpha1.HPAControllerConfiguration) { zero := metav1.Duration{} + if obj.ConcurrentHorizontalPodAutoscalerSyncs == 0 { + obj.ConcurrentHorizontalPodAutoscalerSyncs = 5 + } if obj.HorizontalPodAutoscalerSyncPeriod == zero { obj.HorizontalPodAutoscalerSyncPeriod = metav1.Duration{Duration: 15 * time.Second} } diff --git a/pkg/controller/podautoscaler/config/v1alpha1/zz_generated.conversion.go b/pkg/controller/podautoscaler/config/v1alpha1/zz_generated.conversion.go index f2cae9341f3..e83535a0d71 100644 --- a/pkg/controller/podautoscaler/config/v1alpha1/zz_generated.conversion.go +++ b/pkg/controller/podautoscaler/config/v1alpha1/zz_generated.conversion.go @@ -82,6 +82,7 @@ func Convert_v1_GroupResource_To_v1alpha1_GroupResource(in *v1.GroupResource, ou } func autoConvert_v1alpha1_HPAControllerConfiguration_To_config_HPAControllerConfiguration(in *v1alpha1.HPAControllerConfiguration, out *config.HPAControllerConfiguration, s conversion.Scope) error { + out.ConcurrentHorizontalPodAutoscalerSyncs = in.ConcurrentHorizontalPodAutoscalerSyncs out.HorizontalPodAutoscalerSyncPeriod = in.HorizontalPodAutoscalerSyncPeriod out.HorizontalPodAutoscalerUpscaleForbiddenWindow = in.HorizontalPodAutoscalerUpscaleForbiddenWindow out.HorizontalPodAutoscalerDownscaleStabilizationWindow = in.HorizontalPodAutoscalerDownscaleStabilizationWindow @@ -93,6 +94,7 @@ func autoConvert_v1alpha1_HPAControllerConfiguration_To_config_HPAControllerConf } func autoConvert_config_HPAControllerConfiguration_To_v1alpha1_HPAControllerConfiguration(in *config.HPAControllerConfiguration, out *v1alpha1.HPAControllerConfiguration, s conversion.Scope) error { + out.ConcurrentHorizontalPodAutoscalerSyncs = in.ConcurrentHorizontalPodAutoscalerSyncs out.HorizontalPodAutoscalerSyncPeriod = in.HorizontalPodAutoscalerSyncPeriod out.HorizontalPodAutoscalerUpscaleForbiddenWindow = in.HorizontalPodAutoscalerUpscaleForbiddenWindow out.HorizontalPodAutoscalerDownscaleForbiddenWindow = in.HorizontalPodAutoscalerDownscaleForbiddenWindow diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index f4b298e74f6..67830e2a28c 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math" + "sync" "time" autoscalingv1 "k8s.io/api/autoscaling/v1" @@ -94,11 +95,14 @@ type HorizontalController struct { queue workqueue.RateLimitingInterface // Latest unstabilized recommendations for each autoscaler. - recommendations map[string][]timestampedRecommendation + recommendations map[string][]timestampedRecommendation + recommendationsLock sync.Mutex // Latest autoscaler events - scaleUpEvents map[string][]timestampedScaleEvent - scaleDownEvents map[string][]timestampedScaleEvent + scaleUpEvents map[string][]timestampedScaleEvent + scaleUpEventsLock sync.RWMutex + scaleDownEvents map[string][]timestampedScaleEvent + scaleDownEventsLock sync.RWMutex } // NewHorizontalController creates a new HorizontalController. @@ -130,8 +134,11 @@ func NewHorizontalController( queue: workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"), mapper: mapper, recommendations: map[string][]timestampedRecommendation{}, + recommendationsLock: sync.Mutex{}, scaleUpEvents: map[string][]timestampedScaleEvent{}, + scaleUpEventsLock: sync.RWMutex{}, scaleDownEvents: map[string][]timestampedScaleEvent{}, + scaleDownEventsLock: sync.RWMutex{}, } hpaInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -161,7 +168,7 @@ func NewHorizontalController( } // Run begins watching and syncing. -func (a *HorizontalController) Run(ctx context.Context) { +func (a *HorizontalController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer a.queue.ShutDown() @@ -172,8 +179,9 @@ func (a *HorizontalController) Run(ctx context.Context) { return } - // start a single worker (we may wish to start more in the future) - go wait.UntilWithContext(ctx, a.worker, time.Second) + for i := 0; i < workers; i++ { + go wait.UntilWithContext(ctx, a.worker, time.Second) + } <-ctx.Done() } @@ -358,9 +366,19 @@ func (a *HorizontalController) reconcileKey(ctx context.Context, key string) (de hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name) if errors.IsNotFound(err) { klog.Infof("Horizontal Pod Autoscaler %s has been deleted in %s", name, namespace) + + a.recommendationsLock.Lock() delete(a.recommendations, key) + a.recommendationsLock.Unlock() + + a.scaleUpEventsLock.Lock() delete(a.scaleUpEvents, key) + a.scaleUpEventsLock.Unlock() + + a.scaleDownEventsLock.Lock() delete(a.scaleDownEvents, key) + a.scaleDownEventsLock.Unlock() + return true, nil } if err != nil { @@ -565,6 +583,8 @@ func (a *HorizontalController) computeStatusForExternalMetric(specReplicas, stat } func (a *HorizontalController) recordInitialRecommendation(currentReplicas int32, key string) { + a.recommendationsLock.Lock() + defer a.recommendationsLock.Unlock() if a.recommendations[key] == nil { a.recommendations[key] = []timestampedRecommendation{{currentReplicas, time.Now()}} } @@ -713,6 +733,9 @@ func (a *HorizontalController) stabilizeRecommendation(key string, prenormalized foundOldSample := false oldSampleIndex := 0 cutoff := time.Now().Add(-a.downscaleStabilisationWindow) + + a.recommendationsLock.Lock() + defer a.recommendationsLock.Unlock() for i, rec := range a.recommendations[key] { if rec.timestamp.Before(cutoff) { foundOldSample = true @@ -837,6 +860,9 @@ func (a *HorizontalController) storeScaleEvent(behavior *autoscalingv2.Horizonta foundOldSample := false if newReplicas > prevReplicas { longestPolicyPeriod = getLongestPolicyPeriod(behavior.ScaleUp) + + a.scaleUpEventsLock.Lock() + defer a.scaleUpEventsLock.Unlock() markScaleEventsOutdated(a.scaleUpEvents[key], longestPolicyPeriod) replicaChange := newReplicas - prevReplicas for i, event := range a.scaleUpEvents[key] { @@ -853,6 +879,9 @@ func (a *HorizontalController) storeScaleEvent(behavior *autoscalingv2.Horizonta } } else { longestPolicyPeriod = getLongestPolicyPeriod(behavior.ScaleDown) + + a.scaleDownEventsLock.Lock() + defer a.scaleDownEventsLock.Unlock() markScaleEventsOutdated(a.scaleDownEvents[key], longestPolicyPeriod) replicaChange := prevReplicas - newReplicas for i, event := range a.scaleDownEvents[key] { @@ -888,6 +917,8 @@ func (a *HorizontalController) stabilizeRecommendationWithBehaviors(args Normali downCutoff := now.Add(-time.Second * time.Duration(downDelaySeconds)) // Calculate the upper and lower stabilization limits. + a.recommendationsLock.Lock() + defer a.recommendationsLock.Unlock() for i, rec := range a.recommendations[args.Key] { if rec.timestamp.After(upCutoff) { upRecommendation = min(rec.recommendation, upRecommendation) @@ -935,7 +966,12 @@ func (a *HorizontalController) convertDesiredReplicasWithBehaviorRate(args Norma var possibleLimitingReason, possibleLimitingMessage string if args.DesiredReplicas > args.CurrentReplicas { + a.scaleUpEventsLock.RLock() + defer a.scaleUpEventsLock.RUnlock() + a.scaleDownEventsLock.RLock() + defer a.scaleDownEventsLock.RUnlock() scaleUpLimit := calculateScaleUpLimitWithScalingRules(args.CurrentReplicas, a.scaleUpEvents[args.Key], a.scaleDownEvents[args.Key], args.ScaleUpBehavior) + if scaleUpLimit < args.CurrentReplicas { // We shouldn't scale up further until the scaleUpEvents will be cleaned up scaleUpLimit = args.CurrentReplicas @@ -953,7 +989,12 @@ func (a *HorizontalController) convertDesiredReplicasWithBehaviorRate(args Norma return maximumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage } } else if args.DesiredReplicas < args.CurrentReplicas { + a.scaleUpEventsLock.RLock() + defer a.scaleUpEventsLock.RUnlock() + a.scaleDownEventsLock.RLock() + defer a.scaleDownEventsLock.RUnlock() scaleDownLimit := calculateScaleDownLimitWithBehaviors(args.CurrentReplicas, a.scaleUpEvents[args.Key], a.scaleDownEvents[args.Key], args.ScaleDownBehavior) + if scaleDownLimit > args.CurrentReplicas { // We shouldn't scale down further until the scaleDownEvents will be cleaned up scaleDownLimit = args.CurrentReplicas diff --git a/pkg/controller/podautoscaler/horizontal_test.go b/pkg/controller/podautoscaler/horizontal_test.go index de095c159f5..7d7281e0adb 100644 --- a/pkg/controller/podautoscaler/horizontal_test.go +++ b/pkg/controller/podautoscaler/horizontal_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math" + "strings" "sync" "testing" "time" @@ -756,7 +757,7 @@ func (tc *testCase) runTestWithController(t *testing.T, hpaController *Horizonta ctx, cancel := context.WithCancel(context.Background()) defer cancel() informerFactory.Start(ctx.Done()) - go hpaController.Run(ctx) + go hpaController.Run(ctx, 5) tc.Lock() shouldWait := tc.verifyEvents @@ -4180,3 +4181,270 @@ func TestNoScaleDownOneMetricEmpty(t *testing.T) { tc.testEMClient = testEMClient tc.runTest(t) } + +func TestMultipleHPAs(t *testing.T) { + const hpaCount = 1000 + const testNamespace = "dummy-namespace" + + processed := make(chan string, hpaCount) + + testClient := &fake.Clientset{} + testScaleClient := &scalefake.FakeScaleClient{} + testMetricsClient := &metricsfake.Clientset{} + + hpaList := [hpaCount]autoscalingv2.HorizontalPodAutoscaler{} + scaleUpEventsMap := map[string][]timestampedScaleEvent{} + scaleDownEventsMap := map[string][]timestampedScaleEvent{} + scaleList := map[string]*autoscalingv1.Scale{} + podList := map[string]*v1.Pod{} + + var minReplicas int32 = 1 + var cpuTarget int32 = 10 + + // generate resources (HPAs, Scales, Pods...) + for i := 0; i < hpaCount; i++ { + hpaName := fmt.Sprintf("dummy-hpa-%v", i) + deploymentName := fmt.Sprintf("dummy-target-%v", i) + labelSet := map[string]string{"name": deploymentName} + selector := labels.SelectorFromSet(labelSet).String() + + // generate HPAs + h := autoscalingv2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Name: hpaName, + Namespace: testNamespace, + }, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: deploymentName, + }, + MinReplicas: &minReplicas, + MaxReplicas: 10, + Behavior: &autoscalingv2.HorizontalPodAutoscalerBehavior{ + ScaleUp: generateScalingRules(100, 60, 0, 0, 0), + ScaleDown: generateScalingRules(2, 60, 1, 60, 300), + }, + Metrics: []autoscalingv2.MetricSpec{ + { + Type: autoscalingv2.ResourceMetricSourceType, + Resource: &autoscalingv2.ResourceMetricSource{ + Name: v1.ResourceCPU, + Target: autoscalingv2.MetricTarget{ + Type: autoscalingv2.UtilizationMetricType, + AverageUtilization: &cpuTarget, + }, + }, + }, + }, + }, + Status: autoscalingv2.HorizontalPodAutoscalerStatus{ + CurrentReplicas: 1, + DesiredReplicas: 5, + LastScaleTime: &metav1.Time{Time: time.Now()}, + }, + } + hpaList[i] = h + + // generate Scale + scaleList[deploymentName] = &autoscalingv1.Scale{ + ObjectMeta: metav1.ObjectMeta{ + Name: deploymentName, + Namespace: testNamespace, + }, + Spec: autoscalingv1.ScaleSpec{ + Replicas: 1, + }, + Status: autoscalingv1.ScaleStatus{ + Replicas: 1, + Selector: selector, + }, + } + + // generate Pods + cpuRequest := resource.MustParse("1.0") + pod := v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + }, + StartTime: &metav1.Time{Time: time.Now().Add(-10 * time.Minute)}, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-0", deploymentName), + Namespace: testNamespace, + Labels: labelSet, + }, + + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container1", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(cpuRequest.MilliValue()/2, resource.DecimalSI), + }, + }, + }, + { + Name: "container2", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(cpuRequest.MilliValue()/2, resource.DecimalSI), + }, + }, + }, + }, + }, + } + podList[deploymentName] = &pod + + scaleUpEventsMap[fmt.Sprintf("%s/%s", testNamespace, hpaName)] = generateEventsUniformDistribution([]int{8, 12, 9, 11}, 120) + scaleDownEventsMap[fmt.Sprintf("%s/%s", testNamespace, hpaName)] = generateEventsUniformDistribution([]int{10, 10, 10}, 120) + } + + testMetricsClient.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + podNamePrefix := "" + labelSet := map[string]string{} + + // selector should be in form: "name=dummy-target-X" where X is the number of resource + selector := action.(core.ListAction).GetListRestrictions().Labels + parsedSelector := strings.Split(selector.String(), "=") + if len(parsedSelector) > 1 { + labelSet[parsedSelector[0]] = parsedSelector[1] + podNamePrefix = parsedSelector[1] + } + + podMetric := metricsapi.PodMetrics{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-0", podNamePrefix), + Namespace: testNamespace, + Labels: labelSet, + }, + Timestamp: metav1.Time{Time: time.Now()}, + Window: metav1.Duration{Duration: time.Minute}, + Containers: []metricsapi.ContainerMetrics{ + { + Name: "container1", + Usage: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity( + int64(200), + resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity( + int64(1024*1024/2), + resource.BinarySI), + }, + }, + { + Name: "container2", + Usage: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity( + int64(300), + resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity( + int64(1024*1024/2), + resource.BinarySI), + }, + }, + }, + } + metrics := &metricsapi.PodMetricsList{} + metrics.Items = append(metrics.Items, podMetric) + + return true, metrics, nil + }) + + metricsClient := metrics.NewRESTMetricsClient( + testMetricsClient.MetricsV1beta1(), + &cmfake.FakeCustomMetricsClient{}, + &emfake.FakeExternalMetricsClient{}, + ) + + testScaleClient.AddReactor("get", "deployments", func(action core.Action) (handled bool, ret runtime.Object, err error) { + deploymentName := action.(core.GetAction).GetName() + obj := scaleList[deploymentName] + return true, obj, nil + }) + + testClient.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + obj := &v1.PodList{} + + // selector should be in form: "name=dummy-target-X" where X is the number of resource + selector := action.(core.ListAction).GetListRestrictions().Labels + parsedSelector := strings.Split(selector.String(), "=") + + // list with filter + if len(parsedSelector) > 1 { + obj.Items = append(obj.Items, *podList[parsedSelector[1]]) + } else { + // no filter - return all pods + for _, p := range podList { + obj.Items = append(obj.Items, *p) + } + } + + return true, obj, nil + }) + + testClient.AddReactor("list", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) { + obj := &autoscalingv2.HorizontalPodAutoscalerList{ + Items: hpaList[:], + } + return true, obj, nil + }) + + testClient.AddReactor("update", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) { + handled, obj, err := func() (handled bool, ret *autoscalingv2.HorizontalPodAutoscaler, err error) { + obj := action.(core.UpdateAction).GetObject().(*autoscalingv2.HorizontalPodAutoscaler) + assert.Equal(t, testNamespace, obj.Namespace, "the HPA namespace should be as expected") + + return true, obj, nil + }() + processed <- obj.Name + + return handled, obj, err + }) + + informerFactory := informers.NewSharedInformerFactory(testClient, controller.NoResyncPeriodFunc()) + + hpaController := NewHorizontalController( + testClient.CoreV1(), + testScaleClient, + testClient.AutoscalingV2(), + testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Scheme), + metricsClient, + informerFactory.Autoscaling().V2().HorizontalPodAutoscalers(), + informerFactory.Core().V1().Pods(), + 100*time.Millisecond, + 5*time.Minute, + defaultTestingTolerance, + defaultTestingCPUInitializationPeriod, + defaultTestingDelayOfInitialReadinessStatus, + ) + hpaController.scaleUpEvents = scaleUpEventsMap + hpaController.scaleDownEvents = scaleDownEventsMap + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + informerFactory.Start(ctx.Done()) + go hpaController.Run(ctx, 5) + + timeoutTime := time.After(15 * time.Second) + timeout := false + processedHPA := make(map[string]bool) + for timeout == false && len(processedHPA) < hpaCount { + select { + case hpaName := <-processed: + processedHPA[hpaName] = true + case <-timeoutTime: + timeout = true + } + } + + assert.Equal(t, hpaCount, len(processedHPA), "Expected to process all HPAs") +} diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index da56df5a55c..8c751ce52c1 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -49086,6 +49086,14 @@ func schema_k8sio_kube_controller_manager_config_v1alpha1_HPAControllerConfigura Description: "HPAControllerConfiguration contains elements describing HPAController.", Type: []string{"object"}, Properties: map[string]spec.Schema{ + "ConcurrentHorizontalPodAutoscalerSyncs": { + SchemaProps: spec.SchemaProps{ + Description: "ConcurrentHorizontalPodAutoscalerSyncs is the number of HPA objects that are allowed to sync concurrently. Larger number = more responsive HPA processing, but more CPU (and network) load.", + Default: 0, + Type: []string{"integer"}, + Format: "int32", + }, + }, "HorizontalPodAutoscalerSyncPeriod": { SchemaProps: spec.SchemaProps{ Description: "HorizontalPodAutoscalerSyncPeriod is the period for syncing the number of pods in horizontal pod autoscaler.", @@ -49137,7 +49145,7 @@ func schema_k8sio_kube_controller_manager_config_v1alpha1_HPAControllerConfigura }, }, }, - Required: []string{"HorizontalPodAutoscalerSyncPeriod", "HorizontalPodAutoscalerUpscaleForbiddenWindow", "HorizontalPodAutoscalerDownscaleStabilizationWindow", "HorizontalPodAutoscalerDownscaleForbiddenWindow", "HorizontalPodAutoscalerTolerance", "HorizontalPodAutoscalerCPUInitializationPeriod", "HorizontalPodAutoscalerInitialReadinessDelay"}, + Required: []string{"ConcurrentHorizontalPodAutoscalerSyncs", "HorizontalPodAutoscalerSyncPeriod", "HorizontalPodAutoscalerUpscaleForbiddenWindow", "HorizontalPodAutoscalerDownscaleStabilizationWindow", "HorizontalPodAutoscalerDownscaleForbiddenWindow", "HorizontalPodAutoscalerTolerance", "HorizontalPodAutoscalerCPUInitializationPeriod", "HorizontalPodAutoscalerInitialReadinessDelay"}, }, }, Dependencies: []string{ diff --git a/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/types.go b/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/types.go index 7162fb927df..3d02f6d4cd1 100644 --- a/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/types.go +++ b/staging/src/k8s.io/kube-controller-manager/config/v1alpha1/types.go @@ -315,6 +315,9 @@ type GarbageCollectorControllerConfiguration struct { // HPAControllerConfiguration contains elements describing HPAController. type HPAControllerConfiguration struct { + // ConcurrentHorizontalPodAutoscalerSyncs is the number of HPA objects that are allowed to sync concurrently. + // Larger number = more responsive HPA processing, but more CPU (and network) load. + ConcurrentHorizontalPodAutoscalerSyncs int32 // HorizontalPodAutoscalerSyncPeriod is the period for syncing the number of // pods in horizontal pod autoscaler. HorizontalPodAutoscalerSyncPeriod metav1.Duration