diff --git a/pkg/controller/podautoscaler/BUILD b/pkg/controller/podautoscaler/BUILD index 45a781042b6..dbd1ad4488d 100644 --- a/pkg/controller/podautoscaler/BUILD +++ b/pkg/controller/podautoscaler/BUILD @@ -32,6 +32,7 @@ go_library( "//pkg/controller:go_default_library", "//pkg/controller/podautoscaler/metrics:go_default_library", "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index 31e0fb990d9..6182d175c2d 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -22,6 +22,7 @@ import ( "time" "github.com/golang/glog" + apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -371,6 +372,12 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho return fmt.Errorf("failed to convert the given HPA to %s: %v", autoscalingv2.SchemeGroupVersion.String(), err) } hpa := hpaRaw.(*autoscalingv2.HorizontalPodAutoscaler) + hpaStatusOriginalRaw, err := api.Scheme.DeepCopy(&hpa.Status) + if err != nil { + a.eventRecorder.Event(hpav1Shared, v1.EventTypeWarning, "FailedConvertHPA", err.Error()) + return fmt.Errorf("failed to deep-copy the HPA status: %v", err) + } + hpaStatusOriginal := hpaStatusOriginalRaw.(*autoscalingv2.HorizontalPodAutoscalerStatus) reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name) @@ -378,7 +385,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho if err != nil { a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error()) setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err) - a.update(hpa) + a.updateStatusIfNeeded(hpaStatusOriginal, hpa) return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err) } setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale") @@ -412,7 +419,10 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho } else { metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics) if err != nil { - a.updateCurrentReplicasInStatus(hpa, currentReplicas) + a.setCurrentReplicasInStatus(hpa, currentReplicas) + if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil { + utilruntime.HandleError(err) + } a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error()) return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err) } @@ -489,7 +499,10 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho if err != nil { a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error()) setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err) - a.updateCurrentReplicasInStatus(hpa, currentReplicas) + a.setCurrentReplicasInStatus(hpa, currentReplicas) + if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil { + utilruntime.HandleError(err) + } return fmt.Errorf("failed to rescale %s: %v", reference, err) } setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas) @@ -501,7 +514,8 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho desiredReplicas = currentReplicas } - return a.updateStatusWithReplicas(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale) + a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale) + return a.updateStatusIfNeeded(hpaStatusOriginal, hpa) } func (a *HorizontalController) shouldScale(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, timestamp time.Time) bool { @@ -528,14 +542,14 @@ func (a *HorizontalController) shouldScale(hpa *autoscalingv2.HorizontalPodAutos return false } -func (a *HorizontalController) updateCurrentReplicasInStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas int32) { - err := a.updateStatusWithReplicas(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentMetrics, false) - if err != nil { - utilruntime.HandleError(err) - } +// setCurrentReplicasInStatus sets the current replica count in the status of the HPA. +func (a *HorizontalController) setCurrentReplicasInStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas int32) { + a.setStatus(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentMetrics, false) } -func (a *HorizontalController) updateStatusWithReplicas(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, metricStatuses []autoscalingv2.MetricStatus, rescale bool) error { +// setStatus recreates the status of the given HPA, updating the current and +// desired replicas, as well as the metric statuses +func (a *HorizontalController) setStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, metricStatuses []autoscalingv2.MetricStatus, rescale bool) { hpa.Status = autoscalingv2.HorizontalPodAutoscalerStatus{ CurrentReplicas: currentReplicas, DesiredReplicas: desiredReplicas, @@ -548,11 +562,19 @@ func (a *HorizontalController) updateStatusWithReplicas(hpa *autoscalingv2.Horiz now := metav1.NewTime(time.Now()) hpa.Status.LastScaleTime = &now } - - return a.update(hpa) } -func (a *HorizontalController) update(hpa *autoscalingv2.HorizontalPodAutoscaler) error { +// updateStatusIfNeeded calls updateStatus only if the status of the new HPA is not the same as the old status +func (a *HorizontalController) updateStatusIfNeeded(oldStatus *autoscalingv2.HorizontalPodAutoscalerStatus, newHPA *autoscalingv2.HorizontalPodAutoscaler) error { + // skip a write if we wouldn't need to update + if apiequality.Semantic.DeepEqual(oldStatus, &newHPA.Status) { + return nil + } + return a.updateStatus(newHPA) +} + +// updateStatus actually does the update request for the status of the given HPA +func (a *HorizontalController) updateStatus(hpa *autoscalingv2.HorizontalPodAutoscaler) error { // convert back to autoscalingv1 hpaRaw, err := UnsafeConvertToVersionVia(hpa, autoscalingv1.SchemeGroupVersion) if err != nil { diff --git a/pkg/controller/podautoscaler/horizontal_test.go b/pkg/controller/podautoscaler/horizontal_test.go index 3ff79b7bc7a..dfe9d756b73 100644 --- a/pkg/controller/podautoscaler/horizontal_test.go +++ b/pkg/controller/podautoscaler/horizontal_test.go @@ -538,7 +538,7 @@ func (tc *testCase) verifyResults(t *testing.T) { } } -func (tc *testCase) runTest(t *testing.T) { +func (tc *testCase) setupController(t *testing.T) (*HorizontalController, informers.SharedInformerFactory) { testClient, testMetricsClient, testCMClient := tc.prepareTestClient(t) if tc.testClient != nil { testClient = tc.testClient @@ -598,6 +598,10 @@ func (tc *testCase) runTest(t *testing.T) { ) hpaController.hpaListerSynced = alwaysReady + return hpaController, informerFactory +} + +func (tc *testCase) runTestWithController(t *testing.T, hpaController *HorizontalController, informerFactory informers.SharedInformerFactory) { stop := make(chan struct{}) defer close(stop) informerFactory.Start(stop) @@ -616,6 +620,11 @@ func (tc *testCase) runTest(t *testing.T) { tc.verifyResults(t) } +func (tc *testCase) runTest(t *testing.T) { + hpaController, informerFactory := tc.setupController(t) + tc.runTestWithController(t, hpaController, informerFactory) +} + func TestScaleUp(t *testing.T) { tc := testCase{ minReplicas: 2, @@ -1594,4 +1603,73 @@ func TestScaleDownRCImmediately(t *testing.T) { tc.runTest(t) } +func TestAvoidUncessaryUpdates(t *testing.T) { + tc := testCase{ + minReplicas: 2, + maxReplicas: 6, + initialReplicas: 3, + desiredReplicas: 3, + CPUTarget: 30, + CPUCurrent: 40, + verifyCPUCurrent: true, + reportedLevels: []uint64{400, 500, 700}, + reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + reportedPodReadiness: []v1.ConditionStatus{v1.ConditionTrue, v1.ConditionFalse, v1.ConditionFalse}, + useMetricsApi: true, + } + testClient, _, _ := tc.prepareTestClient(t) + tc.testClient = testClient + var savedHPA *autoscalingv1.HorizontalPodAutoscaler + testClient.PrependReactor("list", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) { + tc.Lock() + defer tc.Unlock() + + if savedHPA != nil { + // fake out the verification logic and mark that we're done processing + go func() { + // wait a tick and then mark that we're finished (otherwise, we have no + // way to indicate that we're finished, because the function decides not to do anything) + time.Sleep(1 * time.Second) + tc.statusUpdated = true + tc.processed <- "test-hpa" + }() + return true, &autoscalingv1.HorizontalPodAutoscalerList{ + Items: []autoscalingv1.HorizontalPodAutoscaler{*savedHPA}, + }, nil + } + + // fallthrough + return false, nil, nil + }) + testClient.PrependReactor("update", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) { + tc.Lock() + defer tc.Unlock() + + if savedHPA == nil { + // save the HPA and return it + savedHPA = action.(core.UpdateAction).GetObject().(*autoscalingv1.HorizontalPodAutoscaler) + return true, savedHPA, nil + } + + assert.Fail(t, "should not have attempted to update the HPA when nothing changed") + // mark that we've processed this HPA + tc.processed <- "" + return true, nil, fmt.Errorf("unexpected call") + }) + + controller, informerFactory := tc.setupController(t) + + // fake an initial processing loop to populate savedHPA + initialHPAs, err := testClient.Autoscaling().HorizontalPodAutoscalers("test-namespace").List(metav1.ListOptions{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := controller.reconcileAutoscaler(&initialHPAs.Items[0]); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // actually run the test + tc.runTestWithController(t, controller, informerFactory) +} + // TODO: add more tests