Merge pull request #72373 from krzysztof-jastrzebski/hpa_fix

Add request processing HPA into the queue after processing is finished.
This commit is contained in:
Kubernetes Prow Robot 2019-01-04 11:09:00 -08:00 committed by GitHub
commit 86691cad55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 27 deletions

View File

@ -207,14 +207,18 @@ func (a *HorizontalController) processNextWorkItem() bool {
} }
defer a.queue.Done(key) defer a.queue.Done(key)
err := a.reconcileKey(key.(string)) deleted, err := a.reconcileKey(key.(string))
if err == nil { if err != nil {
// don't "forget" here because we want to only process a given HPA once per resync interval utilruntime.HandleError(err)
return true }
// Add request processing HPA after resync interval just in case last resync didn't insert
// request into the queue. Request is not inserted into queue by resync if previous one wasn't processed yet.
// This happens quite often because requests from previous resync are removed from the queue at the same moment
// as next resync inserts new requests.
if !deleted {
a.queue.AddRateLimited(key)
} }
a.queue.AddRateLimited(key)
utilruntime.HandleError(err)
return true return true
} }
@ -298,20 +302,20 @@ func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.Hori
return replicas, metric, statuses, timestamp, nil return replicas, metric, statuses, timestamp, nil
} }
func (a *HorizontalController) reconcileKey(key string) error { func (a *HorizontalController) reconcileKey(key string) (deleted bool, err error) {
namespace, name, err := cache.SplitMetaNamespaceKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {
return err return true, err
} }
hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name) hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
klog.Infof("Horizontal Pod Autoscaler %s has been deleted in %s", name, namespace) klog.Infof("Horizontal Pod Autoscaler %s has been deleted in %s", name, namespace)
delete(a.recommendations, key) delete(a.recommendations, key)
return nil return true, nil
} }
return a.reconcileAutoscaler(hpa, key) return false, a.reconcileAutoscaler(hpa, key)
} }
// computeStatusForObjectMetric computes the desired number of replicas for the specified metric of type ObjectMetricSourceType. // computeStatusForObjectMetric computes the desired number of replicas for the specified metric of type ObjectMetricSourceType.

View File

@ -2176,7 +2176,7 @@ func TestComputedToleranceAlgImplementation(t *testing.T) {
finalPods := int32(math.Ceil(resourcesUsedRatio * float64(startPods))) finalPods := int32(math.Ceil(resourcesUsedRatio * float64(startPods)))
// To breach tolerance we will create a utilization ratio difference of tolerance to usageRatioToleranceValue) // To breach tolerance we will create a utilization ratio difference of tolerance to usageRatioToleranceValue)
tc := testCase{ tc1 := testCase{
minReplicas: 0, minReplicas: 0,
maxReplicas: 1000, maxReplicas: 1000,
initialReplicas: startPods, initialReplicas: startPods,
@ -2209,22 +2209,49 @@ func TestComputedToleranceAlgImplementation(t *testing.T) {
useMetricsAPI: true, useMetricsAPI: true,
recommendations: []timestampedRecommendation{}, recommendations: []timestampedRecommendation{},
} }
tc1.runTest(t)
tc.runTest(t)
// Reuse the data structure above, now testing "unscaling".
// Now, we test that no scaling happens if we are in a very close margin to the tolerance
target = math.Abs(1/(requestedToUsed*(1-defaultTestingTolerance))) + .004 target = math.Abs(1/(requestedToUsed*(1-defaultTestingTolerance))) + .004
finalCPUPercentTarget = int32(target * 100) finalCPUPercentTarget = int32(target * 100)
tc.CPUTarget = finalCPUPercentTarget tc2 := testCase{
tc.initialReplicas = startPods minReplicas: 0,
tc.expectedDesiredReplicas = startPods maxReplicas: 1000,
tc.expectedConditions = statusOkWithOverrides(autoscalingv2.HorizontalPodAutoscalerCondition{ initialReplicas: startPods,
Type: autoscalingv2.AbleToScale, expectedDesiredReplicas: startPods,
Status: v1.ConditionTrue, CPUTarget: finalCPUPercentTarget,
Reason: "ReadyForNewScale", reportedLevels: []uint64{
}) totalUsedCPUOfAllPods / 10,
tc.runTest(t) totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
},
reportedCPURequests: []resource.Quantity{
resource.MustParse(fmt.Sprint(perPodRequested+100) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested-100) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested+10) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested-10) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested+2) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested-2) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested+1) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested-1) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested) + "m"),
},
useMetricsAPI: true,
recommendations: []timestampedRecommendation{},
expectedConditions: statusOkWithOverrides(autoscalingv2.HorizontalPodAutoscalerCondition{
Type: autoscalingv2.AbleToScale,
Status: v1.ConditionTrue,
Reason: "ReadyForNewScale",
}),
}
tc2.runTest(t)
} }
func TestScaleUpRCImmediately(t *testing.T) { func TestScaleUpRCImmediately(t *testing.T) {

View File

@ -100,6 +100,8 @@ type legacyTestCase struct {
// Last scale time // Last scale time
lastScaleTime *metav1.Time lastScaleTime *metav1.Time
recommendations []timestampedRecommendation recommendations []timestampedRecommendation
finished bool
} }
// Needs to be called under a lock. // Needs to be called under a lock.
@ -462,12 +464,14 @@ func (tc *legacyTestCase) verifyResults(t *testing.T) {
func (tc *legacyTestCase) runTest(t *testing.T) { func (tc *legacyTestCase) runTest(t *testing.T) {
testClient, testScaleClient := tc.prepareTestClient(t) testClient, testScaleClient := tc.prepareTestClient(t)
metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort) metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort)
eventClient := &fake.Clientset{} eventClient := &fake.Clientset{}
eventClient.AddReactor("*", "events", func(action core.Action) (handled bool, ret runtime.Object, err error) { eventClient.AddReactor("*", "events", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock() tc.Lock()
defer tc.Unlock() defer tc.Unlock()
if tc.finished {
return true, &v1.Event{}, nil
}
obj := action.(core.CreateAction).GetObject().(*v1.Event) obj := action.(core.CreateAction).GetObject().(*v1.Event)
if tc.verifyEvents { if tc.verifyEvents {
switch obj.Reason { switch obj.Reason {
@ -514,7 +518,10 @@ func (tc *legacyTestCase) runTest(t *testing.T) {
informerFactory.Start(stop) informerFactory.Start(stop)
go hpaController.Run(stop) go hpaController.Run(stop)
// Wait for HPA to be processed.
<-tc.processed
tc.Lock() tc.Lock()
tc.finished = true
if tc.verifyEvents { if tc.verifyEvents {
tc.Unlock() tc.Unlock()
// We need to wait for events to be broadcasted (sleep for longer than record.sleepDuration). // We need to wait for events to be broadcasted (sleep for longer than record.sleepDuration).
@ -522,9 +529,8 @@ func (tc *legacyTestCase) runTest(t *testing.T) {
} else { } else {
tc.Unlock() tc.Unlock()
} }
// Wait for HPA to be processed.
<-tc.processed
tc.verifyResults(t) tc.verifyResults(t)
} }
func TestLegacyScaleUp(t *testing.T) { func TestLegacyScaleUp(t *testing.T) {