diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index fe0feb34497..4517b0d7299 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -207,14 +207,18 @@ func (a *HorizontalController) processNextWorkItem() bool { } defer a.queue.Done(key) - err := a.reconcileKey(key.(string)) - if err == nil { - // don't "forget" here because we want to only process a given HPA once per resync interval - return true + deleted, err := a.reconcileKey(key.(string)) + if err != nil { + utilruntime.HandleError(err) + } + // Add request processing HPA after resync interval just in case last resync didn't insert + // request into the queue. Request is not inserted into queue by resync if previous one wasn't processed yet. + // This happens quite often because requests from previous resync are removed from the queue at the same moment + // as next resync inserts new requests. + if !deleted { + a.queue.AddRateLimited(key) } - a.queue.AddRateLimited(key) - utilruntime.HandleError(err) return true } @@ -298,20 +302,20 @@ func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.Hori return replicas, metric, statuses, timestamp, nil } -func (a *HorizontalController) reconcileKey(key string) error { +func (a *HorizontalController) reconcileKey(key string) (deleted bool, err error) { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - return err + return true, err } hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name) if errors.IsNotFound(err) { klog.Infof("Horizontal Pod Autoscaler %s has been deleted in %s", name, namespace) delete(a.recommendations, key) - return nil + return true, nil } - return a.reconcileAutoscaler(hpa, key) + return false, a.reconcileAutoscaler(hpa, key) } // computeStatusForObjectMetric computes the desired number of replicas for the specified metric of type ObjectMetricSourceType. diff --git a/pkg/controller/podautoscaler/horizontal_test.go b/pkg/controller/podautoscaler/horizontal_test.go index 06d20d012d5..e9f57836f4f 100644 --- a/pkg/controller/podautoscaler/horizontal_test.go +++ b/pkg/controller/podautoscaler/horizontal_test.go @@ -2176,7 +2176,7 @@ func TestComputedToleranceAlgImplementation(t *testing.T) { finalPods := int32(math.Ceil(resourcesUsedRatio * float64(startPods))) // To breach tolerance we will create a utilization ratio difference of tolerance to usageRatioToleranceValue) - tc := testCase{ + tc1 := testCase{ minReplicas: 0, maxReplicas: 1000, initialReplicas: startPods, @@ -2209,22 +2209,49 @@ func TestComputedToleranceAlgImplementation(t *testing.T) { useMetricsAPI: true, recommendations: []timestampedRecommendation{}, } + tc1.runTest(t) - tc.runTest(t) - - // Reuse the data structure above, now testing "unscaling". - // Now, we test that no scaling happens if we are in a very close margin to the tolerance target = math.Abs(1/(requestedToUsed*(1-defaultTestingTolerance))) + .004 finalCPUPercentTarget = int32(target * 100) - tc.CPUTarget = finalCPUPercentTarget - tc.initialReplicas = startPods - tc.expectedDesiredReplicas = startPods - tc.expectedConditions = statusOkWithOverrides(autoscalingv2.HorizontalPodAutoscalerCondition{ - Type: autoscalingv2.AbleToScale, - Status: v1.ConditionTrue, - Reason: "ReadyForNewScale", - }) - tc.runTest(t) + tc2 := testCase{ + minReplicas: 0, + maxReplicas: 1000, + initialReplicas: startPods, + expectedDesiredReplicas: startPods, + CPUTarget: finalCPUPercentTarget, + reportedLevels: []uint64{ + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + }, + reportedCPURequests: []resource.Quantity{ + resource.MustParse(fmt.Sprint(perPodRequested+100) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested-100) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested+10) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested-10) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested+2) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested-2) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested+1) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested-1) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested) + "m"), + }, + useMetricsAPI: true, + recommendations: []timestampedRecommendation{}, + expectedConditions: statusOkWithOverrides(autoscalingv2.HorizontalPodAutoscalerCondition{ + Type: autoscalingv2.AbleToScale, + Status: v1.ConditionTrue, + Reason: "ReadyForNewScale", + }), + } + tc2.runTest(t) } func TestScaleUpRCImmediately(t *testing.T) { diff --git a/pkg/controller/podautoscaler/legacy_horizontal_test.go b/pkg/controller/podautoscaler/legacy_horizontal_test.go index deeb649b1d1..4f97232a522 100644 --- a/pkg/controller/podautoscaler/legacy_horizontal_test.go +++ b/pkg/controller/podautoscaler/legacy_horizontal_test.go @@ -100,6 +100,8 @@ type legacyTestCase struct { // Last scale time lastScaleTime *metav1.Time recommendations []timestampedRecommendation + + finished bool } // Needs to be called under a lock. @@ -462,12 +464,14 @@ func (tc *legacyTestCase) verifyResults(t *testing.T) { func (tc *legacyTestCase) runTest(t *testing.T) { testClient, testScaleClient := tc.prepareTestClient(t) metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort) - eventClient := &fake.Clientset{} eventClient.AddReactor("*", "events", func(action core.Action) (handled bool, ret runtime.Object, err error) { tc.Lock() defer tc.Unlock() + if tc.finished { + return true, &v1.Event{}, nil + } obj := action.(core.CreateAction).GetObject().(*v1.Event) if tc.verifyEvents { switch obj.Reason { @@ -514,7 +518,10 @@ func (tc *legacyTestCase) runTest(t *testing.T) { informerFactory.Start(stop) go hpaController.Run(stop) + // Wait for HPA to be processed. + <-tc.processed tc.Lock() + tc.finished = true if tc.verifyEvents { tc.Unlock() // We need to wait for events to be broadcasted (sleep for longer than record.sleepDuration). @@ -522,9 +529,8 @@ func (tc *legacyTestCase) runTest(t *testing.T) { } else { tc.Unlock() } - // Wait for HPA to be processed. - <-tc.processed tc.verifyResults(t) + } func TestLegacyScaleUp(t *testing.T) {