diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 5dbcd41f0c5..dacd9cba920 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -427,7 +427,8 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort, ) - go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), metricsClient, s.HorizontalPodAutoscalerSyncPeriod.Duration). + replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core()) + go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), replicaCalc, s.HorizontalPodAutoscalerSyncPeriod.Duration). Run(wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } diff --git a/pkg/controller/podautoscaler/BUILD b/pkg/controller/podautoscaler/BUILD index 3dd0fbf10b5..f14b2566863 100644 --- a/pkg/controller/podautoscaler/BUILD +++ b/pkg/controller/podautoscaler/BUILD @@ -15,6 +15,7 @@ go_library( srcs = [ "doc.go", "horizontal.go", + "replica_calculator.go", ], tags = ["automanaged"], deps = [ @@ -29,8 +30,10 @@ go_library( "//pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion:go_default_library", "//pkg/client/record:go_default_library", "//pkg/controller/podautoscaler/metrics:go_default_library", + "//pkg/labels:go_default_library", "//pkg/runtime:go_default_library", "//pkg/util/runtime:go_default_library", + "//pkg/util/sets:go_default_library", "//pkg/watch:go_default_library", "//vendor:github.com/golang/glog", ], @@ -38,7 +41,10 @@ go_library( go_test( name = "go_default_test", - srcs = ["horizontal_test.go"], + srcs = [ + "horizontal_test.go", + "replica_calculator_test.go", + ], library = "go_default_library", tags = ["automanaged"], deps = [ @@ -58,6 +64,7 @@ go_test( "//pkg/runtime:go_default_library", "//pkg/watch:go_default_library", "//vendor:github.com/stretchr/testify/assert", + "//vendor:github.com/stretchr/testify/require", "//vendor:k8s.io/heapster/metrics/api/v1/types", "//vendor:k8s.io/heapster/metrics/apis/metrics/v1alpha1", ], diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index 5bec9a902ec..8f741e218cb 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -33,7 +33,6 @@ import ( unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion" "k8s.io/kubernetes/pkg/client/record" - "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/watch" @@ -61,7 +60,7 @@ type HorizontalController struct { scaleNamespacer unversionedextensions.ScalesGetter hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter - metricsClient metrics.MetricsClient + replicaCalc *ReplicaCalculator eventRecorder record.EventRecorder // A store of HPA objects, populated by the controller. @@ -110,13 +109,13 @@ func newInformer(controller *HorizontalController, resyncPeriod time.Duration) ( ) } -func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter, metricsClient metrics.MetricsClient, resyncPeriod time.Duration) *HorizontalController { +func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter, replicaCalc *ReplicaCalculator, resyncPeriod time.Duration) *HorizontalController { broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: evtNamespacer.Events("")}) recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"}) controller := &HorizontalController{ - metricsClient: metricsClient, + replicaCalc: replicaCalc, eventRecorder: recorder, scaleNamespacer: scaleNamespacer, hpaNamespacer: hpaNamespacer, @@ -164,9 +163,8 @@ func (a *HorizontalController) computeReplicasForCPUUtilization(hpa *autoscaling a.eventRecorder.Event(hpa, api.EventTypeWarning, "InvalidSelector", errMsg) return 0, nil, time.Time{}, fmt.Errorf(errMsg) } - currentUtilization, numRunningPods, timestamp, err := a.metricsClient.GetCPUUtilization(hpa.Namespace, selector) - // TODO: what to do on partial errors (like metrics obtained for 75% of pods). + desiredReplicas, utilization, timestamp, err := a.replicaCalc.GetResourceReplicas(currentReplicas, targetUtilization, api.ResourceCPU, hpa.Namespace, selector) if err != nil { lastScaleTime := getLastScaleTime(hpa) if time.Now().After(lastScaleTime.Add(upscaleForbiddenWindow)) { @@ -178,20 +176,13 @@ func (a *HorizontalController) computeReplicasForCPUUtilization(hpa *autoscaling return 0, nil, time.Time{}, fmt.Errorf("failed to get CPU utilization: %v", err) } - utilization := int32(*currentUtilization) - - usageRatio := float64(utilization) / float64(targetUtilization) - if math.Abs(1.0-usageRatio) <= tolerance { - return currentReplicas, &utilization, timestamp, nil + if desiredReplicas != currentReplicas { + a.eventRecorder.Eventf(hpa, api.EventTypeNormal, "DesiredReplicasComputed", + "Computed the desired num of replicas: %d (avgCPUutil: %d, current replicas: %d)", + desiredReplicas, utilization, scale.Status.Replicas) } - desiredReplicas := math.Ceil(usageRatio * float64(numRunningPods)) - - a.eventRecorder.Eventf(hpa, api.EventTypeNormal, "DesiredReplicasComputed", - "Computed the desired num of replicas: %d, on a base of %d report(s) (avgCPUutil: %d, current replicas: %d)", - int32(desiredReplicas), numRunningPods, utilization, scale.Status.Replicas) - - return int32(desiredReplicas), &utilization, timestamp, nil + return desiredReplicas, &utilization, timestamp, nil } // computeReplicasForCustomMetrics computes the desired number of replicas based on the CustomMetrics passed in cmAnnotation @@ -233,8 +224,8 @@ func (a *HorizontalController) computeReplicasForCustomMetrics(hpa *autoscaling. a.eventRecorder.Event(hpa, api.EventTypeWarning, "InvalidSelector", errMsg) return 0, "", "", time.Time{}, fmt.Errorf("couldn't convert selector string to a corresponding selector object: %v", err) } - value, currentTimestamp, err := a.metricsClient.GetCustomMetric(customMetricTarget.Name, hpa.Namespace, selector) - // TODO: what to do on partial errors (like metrics obtained for 75% of pods). + floatTarget := float64(customMetricTarget.TargetValue.MilliValue()) / 1000.0 + replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetMetricReplicas(currentReplicas, floatTarget, fmt.Sprintf("custom/%s", customMetricTarget.Name), hpa.Namespace, selector) if err != nil { lastScaleTime := getLastScaleTime(hpa) if time.Now().After(lastScaleTime.Add(upscaleForbiddenWindow)) { @@ -245,21 +236,13 @@ func (a *HorizontalController) computeReplicasForCustomMetrics(hpa *autoscaling. return 0, "", "", time.Time{}, fmt.Errorf("failed to get custom metric value: %v", err) } - floatTarget := float64(customMetricTarget.TargetValue.MilliValue()) / 1000.0 - usageRatio := *value / floatTarget - replicaCountProposal := int32(0) - if math.Abs(1.0-usageRatio) > tolerance { - replicaCountProposal = int32(math.Ceil(usageRatio * float64(currentReplicas))) - } else { - replicaCountProposal = currentReplicas - } if replicaCountProposal > replicas { - timestamp = currentTimestamp + timestamp = timestampProposal replicas = replicaCountProposal metric = fmt.Sprintf("Custom metric %s", customMetricTarget.Name) } - quantity, err := resource.ParseQuantity(fmt.Sprintf("%.3f", *value)) + quantity, err := resource.ParseQuantity(fmt.Sprintf("%.3f", utilizationProposal)) if err != nil { return 0, "", "", time.Time{}, fmt.Errorf("failed to set custom metric value: %v", err) } diff --git a/pkg/controller/podautoscaler/horizontal_test.go b/pkg/controller/podautoscaler/horizontal_test.go index e59a4ad9240..0652a4d8ba0 100644 --- a/pkg/controller/podautoscaler/horizontal_test.go +++ b/pkg/controller/podautoscaler/horizontal_test.go @@ -21,6 +21,8 @@ import ( "fmt" "io" "math" + "strconv" + "strings" "sync" "testing" "time" @@ -77,17 +79,18 @@ type testCase struct { desiredReplicas int32 // CPU target utilization as a percentage of the requested resources. - CPUTarget int32 - CPUCurrent int32 - verifyCPUCurrent bool - reportedLevels []uint64 - reportedCPURequests []resource.Quantity - cmTarget *extensions.CustomMetricTargetList - scaleUpdated bool - statusUpdated bool - eventCreated bool - verifyEvents bool - useMetricsApi bool + CPUTarget int32 + CPUCurrent int32 + verifyCPUCurrent bool + reportedLevels []uint64 + reportedCPURequests []resource.Quantity + reportedPodReadiness []api.ConditionStatus + cmTarget *extensions.CustomMetricTargetList + scaleUpdated bool + statusUpdated bool + eventCreated bool + verifyEvents bool + useMetricsApi bool // Channel with names of HPA objects which we have reconciled. processed chan string @@ -125,7 +128,9 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset { tc.statusUpdated = false tc.eventCreated = false tc.processed = make(chan string, 100) - tc.computeCPUCurrent() + if tc.CPUCurrent == 0 { + tc.computeCPUCurrent() + } // TODO(madhusudancs): HPA only supports resources in extensions/v1beta1 right now. Add // tests for "v1" replicationcontrollers when HPA adds support for cross-group scale. @@ -248,10 +253,20 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset { obj := &api.PodList{} for i := 0; i < len(tc.reportedCPURequests); i++ { + podReadiness := api.ConditionTrue + if tc.reportedPodReadiness != nil { + podReadiness = tc.reportedPodReadiness[i] + } podName := fmt.Sprintf("%s-%d", podNamePrefix, i) pod := api.Pod{ Status: api.PodStatus{ Phase: api.PodRunning, + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: podReadiness, + }, + }, }, ObjectMeta: api.ObjectMeta{ Name: podName, @@ -310,9 +325,34 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset { } heapsterRawMemResponse, _ = json.Marshal(&metrics) } else { + // only return the pods that we actually asked for + proxyAction := action.(core.ProxyGetAction) + pathParts := strings.Split(proxyAction.GetPath(), "/") + // pathParts should look like [ api, v1, model, namespaces, $NS, pod-list, $PODS, metrics, $METRIC... ] + if len(pathParts) < 9 { + return true, nil, fmt.Errorf("invalid heapster path %q", proxyAction.GetPath()) + } + + podNames := strings.Split(pathParts[7], ",") + podPresent := make([]bool, len(tc.reportedLevels)) + for _, name := range podNames { + if len(name) <= len(podNamePrefix)+1 { + return true, nil, fmt.Errorf("unknown pod %q", name) + } + num, err := strconv.Atoi(name[len(podNamePrefix)+1:]) + if err != nil { + return true, nil, fmt.Errorf("unknown pod %q", name) + } + podPresent[num] = true + } + timestamp := time.Now() metrics := heapster.MetricResultList{} - for _, level := range tc.reportedLevels { + for i, level := range tc.reportedLevels { + if !podPresent[i] { + continue + } + metric := heapster.MetricResult{ Metrics: []heapster.MetricPoint{{Timestamp: timestamp, Value: level, FloatValue: nil}}, LatestTimestamp: timestamp, @@ -331,7 +371,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset { obj := action.(core.UpdateAction).GetObject().(*extensions.Scale) replicas := action.(core.UpdateAction).GetObject().(*extensions.Scale).Spec.Replicas - assert.Equal(t, tc.desiredReplicas, replicas) + assert.Equal(t, tc.desiredReplicas, replicas, "the replica count of the RC should be as expected") tc.scaleUpdated = true return true, obj, nil }) @@ -342,7 +382,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset { obj := action.(core.UpdateAction).GetObject().(*extensions.Scale) replicas := action.(core.UpdateAction).GetObject().(*extensions.Scale).Spec.Replicas - assert.Equal(t, tc.desiredReplicas, replicas) + assert.Equal(t, tc.desiredReplicas, replicas, "the replica count of the deployment should be as expected") tc.scaleUpdated = true return true, obj, nil }) @@ -353,7 +393,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset { obj := action.(core.UpdateAction).GetObject().(*extensions.Scale) replicas := action.(core.UpdateAction).GetObject().(*extensions.Scale).Spec.Replicas - assert.Equal(t, tc.desiredReplicas, replicas) + assert.Equal(t, tc.desiredReplicas, replicas, "the replica count of the replicaset should be as expected") tc.scaleUpdated = true return true, obj, nil }) @@ -363,12 +403,12 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset { defer tc.Unlock() obj := action.(core.UpdateAction).GetObject().(*autoscaling.HorizontalPodAutoscaler) - assert.Equal(t, namespace, obj.Namespace) - assert.Equal(t, hpaName, obj.Name) - assert.Equal(t, tc.desiredReplicas, obj.Status.DesiredReplicas) + assert.Equal(t, namespace, obj.Namespace, "the HPA namespace should be as expected") + assert.Equal(t, hpaName, obj.Name, "the HPA name should be as expected") + assert.Equal(t, tc.desiredReplicas, obj.Status.DesiredReplicas, "the desired replica count reported in the object status should be as expected") if tc.verifyCPUCurrent { - assert.NotNil(t, obj.Status.CurrentCPUUtilizationPercentage) - assert.Equal(t, tc.CPUCurrent, *obj.Status.CurrentCPUUtilizationPercentage) + assert.NotNil(t, obj.Status.CurrentCPUUtilizationPercentage, "the reported CPU utilization percentage should be non-nil") + assert.Equal(t, tc.CPUCurrent, *obj.Status.CurrentCPUUtilizationPercentage, "the report CPU utilization percentage should be as expected") } tc.statusUpdated = true // Every time we reconcile HPA object we are updating status. @@ -387,8 +427,8 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset { assert.Equal(t, fmt.Sprintf("New size: %d; reason: CPU utilization above target", tc.desiredReplicas), obj.Message) case "DesiredReplicasComputed": assert.Equal(t, fmt.Sprintf( - "Computed the desired num of replicas: %d, on a base of %d report(s) (avgCPUutil: %d, current replicas: %d)", - tc.desiredReplicas, len(tc.reportedLevels), + "Computed the desired num of replicas: %d (avgCPUutil: %d, current replicas: %d)", + tc.desiredReplicas, (int64(tc.reportedLevels[0])*100)/tc.reportedCPURequests[0].MilliValue(), tc.initialReplicas), obj.Message) default: assert.False(t, true, fmt.Sprintf("Unexpected event: %s / %s", obj.Reason, obj.Message)) @@ -408,10 +448,10 @@ func (tc *testCase) verifyResults(t *testing.T) { tc.Lock() defer tc.Unlock() - assert.Equal(t, tc.initialReplicas != tc.desiredReplicas, tc.scaleUpdated) - assert.True(t, tc.statusUpdated) + assert.Equal(t, tc.initialReplicas != tc.desiredReplicas, tc.scaleUpdated, "the scale should only be updated if we expected a change in replicas") + assert.True(t, tc.statusUpdated, "the status should have been updated") if tc.verifyEvents { - assert.Equal(t, tc.initialReplicas != tc.desiredReplicas, tc.eventCreated) + assert.Equal(t, tc.initialReplicas != tc.desiredReplicas, tc.eventCreated, "an event should have been created only if we expected a change in replicas") } } @@ -423,8 +463,13 @@ func (tc *testCase) runTest(t *testing.T) { broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: testClient.Core().Events("")}) recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"}) + replicaCalc := &ReplicaCalculator{ + metricsClient: metricsClient, + podsGetter: testClient.Core(), + } + hpaController := &HorizontalController{ - metricsClient: metricsClient, + replicaCalc: replicaCalc, eventRecorder: recorder, scaleNamespacer: testClient.Extensions(), hpaNamespacer: testClient.Autoscaling(), @@ -518,6 +563,40 @@ func TestScaleUp(t *testing.T) { tc.runTest(t) } +func TestScaleUpUnreadyLessScale(t *testing.T) { + tc := testCase{ + minReplicas: 2, + maxReplicas: 6, + initialReplicas: 3, + desiredReplicas: 4, + CPUTarget: 30, + CPUCurrent: 60, + verifyCPUCurrent: true, + reportedLevels: []uint64{300, 500, 700}, + reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + reportedPodReadiness: []api.ConditionStatus{api.ConditionFalse, api.ConditionTrue, api.ConditionTrue}, + useMetricsApi: true, + } + tc.runTest(t) +} + +func TestScaleUpUnreadyNoScale(t *testing.T) { + tc := testCase{ + minReplicas: 2, + maxReplicas: 6, + initialReplicas: 3, + desiredReplicas: 3, + CPUTarget: 30, + CPUCurrent: 40, + verifyCPUCurrent: true, + reportedLevels: []uint64{400, 500, 700}, + reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + reportedPodReadiness: []api.ConditionStatus{api.ConditionTrue, api.ConditionFalse, api.ConditionFalse}, + useMetricsApi: true, + } + tc.runTest(t) +} + func TestScaleUpDeployment(t *testing.T) { tc := testCase{ minReplicas: 2, @@ -577,6 +656,46 @@ func TestScaleUpCM(t *testing.T) { tc.runTest(t) } +func TestScaleUpCMUnreadyLessScale(t *testing.T) { + tc := testCase{ + minReplicas: 2, + maxReplicas: 6, + initialReplicas: 3, + desiredReplicas: 4, + CPUTarget: 0, + cmTarget: &extensions.CustomMetricTargetList{ + Items: []extensions.CustomMetricTarget{{ + Name: "qps", + TargetValue: resource.MustParse("15.0"), + }}, + }, + reportedLevels: []uint64{50, 10, 30}, + reportedPodReadiness: []api.ConditionStatus{api.ConditionTrue, api.ConditionTrue, api.ConditionFalse}, + reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + } + tc.runTest(t) +} + +func TestScaleUpCMUnreadyNoScaleWouldScaleDown(t *testing.T) { + tc := testCase{ + minReplicas: 2, + maxReplicas: 6, + initialReplicas: 3, + desiredReplicas: 3, + CPUTarget: 0, + cmTarget: &extensions.CustomMetricTargetList{ + Items: []extensions.CustomMetricTarget{{ + Name: "qps", + TargetValue: resource.MustParse("15.0"), + }}, + }, + reportedLevels: []uint64{50, 15, 30}, + reportedPodReadiness: []api.ConditionStatus{api.ConditionFalse, api.ConditionTrue, api.ConditionFalse}, + reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + } + tc.runTest(t) +} + func TestDefaultScaleDown(t *testing.T) { tc := testCase{ minReplicas: 2, @@ -624,6 +743,23 @@ func TestScaleDownCM(t *testing.T) { tc.runTest(t) } +func TestScaleDownIgnoresUnreadyPods(t *testing.T) { + tc := testCase{ + minReplicas: 2, + maxReplicas: 6, + initialReplicas: 5, + desiredReplicas: 2, + CPUTarget: 50, + CPUCurrent: 30, + verifyCPUCurrent: true, + reportedLevels: []uint64{100, 300, 500, 250, 250}, + reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + useMetricsApi: true, + reportedPodReadiness: []api.ConditionStatus{api.ConditionTrue, api.ConditionTrue, api.ConditionTrue, api.ConditionFalse, api.ConditionFalse}, + } + tc.runTest(t) +} + func TestTolerance(t *testing.T) { tc := testCase{ minReplicas: 1, @@ -730,7 +866,7 @@ func TestSuperfluousMetrics(t *testing.T) { minReplicas: 2, maxReplicas: 6, initialReplicas: 4, - desiredReplicas: 4, + desiredReplicas: 6, CPUTarget: 100, reportedLevels: []uint64{4000, 9500, 3000, 7000, 3200, 2000}, reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, @@ -744,7 +880,7 @@ func TestMissingMetrics(t *testing.T) { minReplicas: 2, maxReplicas: 6, initialReplicas: 4, - desiredReplicas: 4, + desiredReplicas: 3, CPUTarget: 100, reportedLevels: []uint64{400, 95}, reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, diff --git a/pkg/controller/podautoscaler/metrics/BUILD b/pkg/controller/podautoscaler/metrics/BUILD index 4cdde5a4c92..71421ab3172 100644 --- a/pkg/controller/podautoscaler/metrics/BUILD +++ b/pkg/controller/podautoscaler/metrics/BUILD @@ -12,14 +12,17 @@ load( go_library( name = "go_default_library", - srcs = ["metrics_client.go"], + srcs = [ + "metrics_client.go", + "utilization.go", + ], tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", "//pkg/labels:go_default_library", - "//pkg/util/sets:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/heapster/metrics/api/v1/types", "//vendor:k8s.io/heapster/metrics/apis/metrics/v1alpha1", diff --git a/pkg/controller/podautoscaler/metrics/metrics_client.go b/pkg/controller/podautoscaler/metrics/metrics_client.go index 422e986e37a..76a2387c869 100644 --- a/pkg/controller/podautoscaler/metrics/metrics_client.go +++ b/pkg/controller/podautoscaler/metrics/metrics_client.go @@ -26,13 +26,33 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/util/sets" heapster "k8s.io/heapster/metrics/api/v1/types" metrics_api "k8s.io/heapster/metrics/apis/metrics/v1alpha1" ) +// PodResourceInfo contains pod resourcemetric values as a map from pod names to +// metric values +type PodResourceInfo map[string]int64 + +// PodMetricsInfo contains pod resourcemetric values as a map from pod names to +// metric values +type PodMetricsInfo map[string]float64 + +// MetricsClient knows how to query a remote interface to retrieve container-level +// resource metrics as well as pod-level arbitrary metrics +type MetricsClient interface { + // GetResourceMetric gets the given resource metric (and an associated oldest timestamp) + // for all pods matching the specified selector in the given namespace + GetResourceMetric(resource api.ResourceName, namespace string, selector labels.Selector) (PodResourceInfo, time.Time, error) + + // GetRawMetric gets the given metric (and an associated oldest timestamp) + // for all pods matching the specified selector in the given namespace + GetRawMetric(metricName string, namespace string, selector labels.Selector) (PodMetricsInfo, time.Time, error) +} + const ( DefaultHeapsterNamespace = "kube-system" DefaultHeapsterScheme = "http" @@ -42,126 +62,33 @@ const ( var heapsterQueryStart = -5 * time.Minute -// MetricsClient is an interface for getting metrics for pods. -type MetricsClient interface { - // GetCPUUtilization returns the average utilization over all pods represented as a percent of requested CPU - // (e.g. 70 means that an average pod uses 70% of the requested CPU), - // the number of running pods from which CPU usage was collected and the time of generation of the oldest of utilization reports for pods. - GetCPUUtilization(namespace string, selector labels.Selector) (*int, int, time.Time, error) - - // GetCustomMetric returns the average value of the given custom metrics from the - // pods picked using the namespace and selector passed as arguments. - GetCustomMetric(customMetricName string, namespace string, selector labels.Selector) (*float64, time.Time, error) -} - -type intAndFloat struct { - intValue int64 - floatValue float64 -} - -// Aggregates results into ResourceConsumption. Also returns number of pods included in the aggregation. -type metricAggregator func(heapster.MetricResultList) (intAndFloat, int, time.Time) - -type metricDefinition struct { - name string - aggregator metricAggregator -} - -// HeapsterMetricsClient is Heapster-based implementation of MetricsClient type HeapsterMetricsClient struct { - client clientset.Interface - heapsterNamespace string - heapsterScheme string - heapsterService string - heapsterPort string + services unversionedcore.ServiceInterface + podsGetter unversionedcore.PodsGetter + heapsterScheme string + heapsterService string + heapsterPort string } -var averageFunction = func(metrics heapster.MetricResultList) (intAndFloat, int, time.Time) { - sum, count, timestamp := calculateSumFromTimeSample(metrics, time.Minute) - result := intAndFloat{0, 0} - if count > 0 { - result.intValue = sum.intValue / int64(count) - result.floatValue = sum.floatValue / float64(count) - } - return result, count, timestamp -} - -func getHeapsterCustomMetricDefinition(metricName string) metricDefinition { - return metricDefinition{"custom/" + metricName, averageFunction} -} - -// NewHeapsterMetricsClient returns a new instance of Heapster-based implementation of MetricsClient interface. -func NewHeapsterMetricsClient(client clientset.Interface, namespace, scheme, service, port string) *HeapsterMetricsClient { +func NewHeapsterMetricsClient(client clientset.Interface, namespace, scheme, service, port string) MetricsClient { return &HeapsterMetricsClient{ - client: client, - heapsterNamespace: namespace, - heapsterScheme: scheme, - heapsterService: service, - heapsterPort: port, + services: client.Core().Services(namespace), + podsGetter: client.Core(), + heapsterScheme: scheme, + heapsterService: service, + heapsterPort: port, } } -func (h *HeapsterMetricsClient) GetCPUUtilization(namespace string, selector labels.Selector) (utilization *int, numRunningPods int, timestamp time.Time, err error) { - avgConsumption, avgRequest, numRunningPods, timestamp, err := h.GetCpuConsumptionAndRequestInMillis(namespace, selector) - if err != nil { - return nil, 0, time.Time{}, fmt.Errorf("failed to get CPU consumption and request: %v", err) - } - tmp := int((avgConsumption * 100) / avgRequest) - return &tmp, numRunningPods, timestamp, nil -} - -func (h *HeapsterMetricsClient) GetCpuConsumptionAndRequestInMillis(namespace string, selector labels.Selector) (avgConsumption int64, - avgRequest int64, numRunningPods int, timestamp time.Time, err error) { - - podList, err := h.client.Core().Pods(namespace). - List(api.ListOptions{LabelSelector: selector}) - - if err != nil { - return 0, 0, 0, time.Time{}, fmt.Errorf("failed to get pod list: %v", err) - } - podNames := map[string]struct{}{} - requestSum := int64(0) - missing := false - for _, pod := range podList.Items { - if pod.Status.Phase != api.PodRunning { - // Count only running pods. - continue - } - - podNames[pod.Name] = struct{}{} - for _, container := range pod.Spec.Containers { - if containerRequest, ok := container.Resources.Requests[api.ResourceCPU]; ok { - requestSum += containerRequest.MilliValue() - } else { - missing = true - } - } - } - if len(podNames) == 0 && len(podList.Items) > 0 { - return 0, 0, 0, time.Time{}, fmt.Errorf("no running pods") - } - if missing || requestSum == 0 { - return 0, 0, 0, time.Time{}, fmt.Errorf("some pods do not have request for cpu") - } - glog.V(4).Infof("%s %s - sum of CPU requested: %d", namespace, selector, requestSum) - requestAvg := requestSum / int64(len(podNames)) - // Consumption is already averaged and in millis. - consumption, timestamp, err := h.getCpuUtilizationForPods(namespace, selector, podNames) - if err != nil { - return 0, 0, 0, time.Time{}, err - } - return consumption, requestAvg, len(podNames), timestamp, nil -} - -func (h *HeapsterMetricsClient) getCpuUtilizationForPods(namespace string, selector labels.Selector, podNames map[string]struct{}) (int64, time.Time, error) { +func (h *HeapsterMetricsClient) GetResourceMetric(resource api.ResourceName, namespace string, selector labels.Selector) (PodResourceInfo, time.Time, error) { metricPath := fmt.Sprintf("/apis/metrics/v1alpha1/namespaces/%s/pods", namespace) params := map[string]string{"labelSelector": selector.String()} - resultRaw, err := h.client.Core().Services(h.heapsterNamespace). + resultRaw, err := h.services. ProxyGet(h.heapsterScheme, h.heapsterService, h.heapsterPort, metricPath, params). DoRaw() if err != nil { - return 0, time.Time{}, fmt.Errorf("failed to get pods metrics: %v", err) + return nil, time.Time{}, fmt.Errorf("failed to get pod resource metrics: %v", err) } glog.V(4).Infof("Heapster metrics result: %s", string(resultRaw)) @@ -169,75 +96,55 @@ func (h *HeapsterMetricsClient) getCpuUtilizationForPods(namespace string, selec metrics := metrics_api.PodMetricsList{} err = json.Unmarshal(resultRaw, &metrics) if err != nil { - return 0, time.Time{}, fmt.Errorf("failed to unmarshall heapster response: %v", err) + return nil, time.Time{}, fmt.Errorf("failed to unmarshal heapster response: %v", err) } - if len(metrics.Items) != len(podNames) { - present := sets.NewString() - for _, m := range metrics.Items { - present.Insert(m.Name) - } - missing := make([]string, 0) - for expected := range podNames { - if !present.Has(expected) { - missing = append(missing, expected) - } - } - hint := "" - if len(missing) > 0 { - hint = fmt.Sprintf(" (sample missing pod: %s/%s)", namespace, missing[0]) - } - return 0, time.Time{}, fmt.Errorf("metrics obtained for %d/%d of pods%s", len(metrics.Items), len(podNames), hint) + if len(metrics.Items) == 0 { + return nil, time.Time{}, fmt.Errorf("no metrics returned from heapster") } - sum := int64(0) + res := make(PodResourceInfo, len(metrics.Items)) + for _, m := range metrics.Items { - if _, found := podNames[m.Name]; found { - for _, c := range m.Containers { - cpu, found := c.Usage[v1.ResourceCPU] - if !found { - return 0, time.Time{}, fmt.Errorf("no cpu for container %v in pod %v/%v", c.Name, namespace, m.Name) - } - sum += cpu.MilliValue() + podSum := int64(0) + missing := len(m.Containers) == 0 + for _, c := range m.Containers { + resValue, found := c.Usage[v1.ResourceName(resource)] + if !found { + missing = true + glog.V(2).Infof("missing resource metric %v for container %s in pod %s/%s", resource, c.Name, namespace, m.Name) + continue } - } else { - return 0, time.Time{}, fmt.Errorf("not expected metrics for pod %v/%v", namespace, m.Name) + podSum += resValue.MilliValue() + } + + if !missing { + res[m.Name] = int64(podSum) } } - return sum / int64(len(metrics.Items)), metrics.Items[0].Timestamp.Time, nil + timestamp := time.Time{} + if len(metrics.Items) > 0 { + timestamp = metrics.Items[0].Timestamp.Time + } + + return res, timestamp, nil } -// GetCustomMetric returns the average value of the given custom metric from the -// pods picked using the namespace and selector passed as arguments. -func (h *HeapsterMetricsClient) GetCustomMetric(customMetricName string, namespace string, selector labels.Selector) (*float64, time.Time, error) { - metricSpec := getHeapsterCustomMetricDefinition(customMetricName) - - podList, err := h.client.Core().Pods(namespace).List(api.ListOptions{LabelSelector: selector}) - +func (h *HeapsterMetricsClient) GetRawMetric(metricName string, namespace string, selector labels.Selector) (PodMetricsInfo, time.Time, error) { + podList, err := h.podsGetter.Pods(namespace).List(api.ListOptions{LabelSelector: selector}) if err != nil { - return nil, time.Time{}, fmt.Errorf("failed to get pod list: %v", err) - } - podNames := []string{} - for _, pod := range podList.Items { - if pod.Status.Phase == api.PodPending { - // Skip pending pods. - continue - } - podNames = append(podNames, pod.Name) - } - if len(podNames) == 0 && len(podList.Items) > 0 { - return nil, time.Time{}, fmt.Errorf("no running pods") + return nil, time.Time{}, fmt.Errorf("failed to get pod list while fetching metrics: %v", err) } - value, timestamp, err := h.getCustomMetricForPods(metricSpec, namespace, podNames) - if err != nil { - return nil, time.Time{}, err + if len(podList.Items) == 0 { + return nil, time.Time{}, fmt.Errorf("no pods matched the provided selector") } - return &value.floatValue, timestamp, nil -} -func (h *HeapsterMetricsClient) getCustomMetricForPods(metricSpec metricDefinition, namespace string, podNames []string) (*intAndFloat, time.Time, error) { + podNames := make([]string, len(podList.Items)) + for i, pod := range podList.Items { + podNames[i] = pod.Name + } now := time.Now() @@ -245,89 +152,79 @@ func (h *HeapsterMetricsClient) getCustomMetricForPods(metricSpec metricDefiniti metricPath := fmt.Sprintf("/api/v1/model/namespaces/%s/pod-list/%s/metrics/%s", namespace, strings.Join(podNames, ","), - metricSpec.name) + metricName) - resultRaw, err := h.client.Core().Services(h.heapsterNamespace). + resultRaw, err := h.services. ProxyGet(h.heapsterScheme, h.heapsterService, h.heapsterPort, metricPath, map[string]string{"start": startTime.Format(time.RFC3339)}). DoRaw() - if err != nil { - return nil, time.Time{}, fmt.Errorf("failed to get pods metrics: %v", err) + return nil, time.Time{}, fmt.Errorf("failed to get pod metrics: %v", err) } var metrics heapster.MetricResultList err = json.Unmarshal(resultRaw, &metrics) if err != nil { - return nil, time.Time{}, fmt.Errorf("failed to unmarshall heapster response: %v", err) + return nil, time.Time{}, fmt.Errorf("failed to unmarshal heapster response: %v", err) } glog.V(4).Infof("Heapster metrics result: %s", string(resultRaw)) - sum, count, timestamp := metricSpec.aggregator(metrics) - if count != len(podNames) { - missing := make([]string, 0) - for i, expected := range podNames { - if len(metrics.Items) > i && len(metrics.Items[i].Metrics) == 0 { - missing = append(missing, expected) - } - } - hint := "" - if len(missing) > 0 { - hint = fmt.Sprintf(" (sample missing pod: %s/%s)", namespace, missing[0]) - } - return nil, time.Time{}, fmt.Errorf("metrics obtained for %d/%d of pods%s", count, len(podNames), hint) + if len(metrics.Items) != len(podNames) { + // if we get too many metrics or two few metrics, we have no way of knowing which metric goes to which pod + // (note that Heapster returns *empty* metric items when a pod does not exist or have that metric, so this + // does not cover the "missing metric entry" case) + return nil, time.Time{}, fmt.Errorf("requested metrics for %v pods, got metrics for %v", len(podNames), len(metrics.Items)) } - return &sum, timestamp, nil + var timestamp *time.Time + res := make(PodMetricsInfo, len(metrics.Items)) + for i, podMetrics := range metrics.Items { + val, podTimestamp, hadMetrics := collapseTimeSamples(podMetrics, time.Minute) + if hadMetrics { + res[podNames[i]] = val + if timestamp == nil || podTimestamp.Before(*timestamp) { + timestamp = &podTimestamp + } + } + } + + if timestamp == nil { + timestamp = &time.Time{} + } + + return res, *timestamp, nil } -func calculateSumFromTimeSample(metrics heapster.MetricResultList, duration time.Duration) (sum intAndFloat, count int, timestamp time.Time) { - sum = intAndFloat{0, 0} - count = 0 - timestamp = time.Time{} - var oldest *time.Time // creation time of the oldest of used samples across pods - oldest = nil - for _, metrics := range metrics.Items { - var newest *heapster.MetricPoint // creation time of the newest sample for pod - newest = nil - for i, metricPoint := range metrics.Metrics { - if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) { - newest = &metrics.Metrics[i] - } - } - if newest != nil { - if oldest == nil || newest.Timestamp.Before(*oldest) { - oldest = &newest.Timestamp - } - intervalSum := intAndFloat{0, 0} - intSumCount := 0 - floatSumCount := 0 - for _, metricPoint := range metrics.Metrics { - if metricPoint.Timestamp.Add(duration).After(newest.Timestamp) { - intervalSum.intValue += int64(metricPoint.Value) - intSumCount++ - if metricPoint.FloatValue != nil { - intervalSum.floatValue += *metricPoint.FloatValue - floatSumCount++ - } - } - } - if newest.FloatValue == nil { - if intSumCount > 0 { - sum.intValue += int64(intervalSum.intValue / int64(intSumCount)) - sum.floatValue += float64(intervalSum.intValue / int64(intSumCount)) - } - } else { - if floatSumCount > 0 { - sum.intValue += int64(intervalSum.floatValue / float64(floatSumCount)) - sum.floatValue += intervalSum.floatValue / float64(floatSumCount) - } - } - count++ +func collapseTimeSamples(metrics heapster.MetricResult, duration time.Duration) (float64, time.Time, bool) { + floatSum := float64(0) + intSum := int64(0) + intSumCount := 0 + floatSumCount := 0 + + var newest *heapster.MetricPoint // creation time of the newest sample for this pod + for i, metricPoint := range metrics.Metrics { + if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) { + newest = &metrics.Metrics[i] } } - if oldest != nil { - timestamp = *oldest + if newest != nil { + for _, metricPoint := range metrics.Metrics { + if metricPoint.Timestamp.Add(duration).After(newest.Timestamp) { + intSum += int64(metricPoint.Value) + intSumCount++ + if metricPoint.FloatValue != nil { + floatSum += *metricPoint.FloatValue + floatSumCount++ + } + } + } + + if newest.FloatValue != nil { + return floatSum / float64(floatSumCount), newest.Timestamp, true + } else { + return float64(intSum / int64(intSumCount)), newest.Timestamp, true + } } - return sum, count, timestamp + + return 0, time.Time{}, false } diff --git a/pkg/controller/podautoscaler/metrics/metrics_client_test.go b/pkg/controller/podautoscaler/metrics/metrics_client_test.go index 3d3cd12e7b4..5e3e95a97f9 100644 --- a/pkg/controller/podautoscaler/metrics/metrics_client_test.go +++ b/pkg/controller/podautoscaler/metrics/metrics_client_test.go @@ -65,19 +65,19 @@ type metricPoint struct { } type testCase struct { - replicas int - desiredValue float64 - desiredRequest *float64 + desiredResourceValues PodResourceInfo + desiredMetricValues PodMetricsInfo desiredError error - desiredRunningPods int - targetResource string + + replicas int targetTimestamp int reportedMetricsPoints [][]metricPoint reportedPodMetrics [][]int64 - namespace string - podListOverride *api.PodList - selector labels.Selector - useMetricsApi bool + + namespace string + selector labels.Selector + resourceName api.ResourceName + metricName string } func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset { @@ -87,12 +87,12 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset { podLabels := map[string]string{"name": podNamePrefix} tc.selector = labels.SelectorFromSet(podLabels) + // it's a resource test if we have a resource name + isResource := len(tc.resourceName) > 0 + fakeClient := &fake.Clientset{} fakeClient.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { - if tc.podListOverride != nil { - return true, tc.podListOverride, nil - } obj := &api.PodList{} for i := 0; i < tc.replicas; i++ { podName := fmt.Sprintf("%s-%d", podNamePrefix, i) @@ -102,7 +102,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset { return true, obj, nil }) - if tc.useMetricsApi { + if isResource { fakeClient.AddProxyReactor("services", func(action core.Action) (handled bool, ret restclient.ResponseWrapper, err error) { metrics := metrics_api.PodMetricsList{} for i, containers := range tc.reportedPodMetrics { @@ -181,182 +181,83 @@ func buildPod(namespace, podName string, podLabels map[string]string, phase api. }, Status: api.PodStatus{ Phase: phase, + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: api.ConditionTrue, + }, + }, }, } } -func (tc *testCase) verifyResults(t *testing.T, val *float64, req *float64, pods int, timestamp time.Time, err error) { +func (tc *testCase) verifyResults(t *testing.T, metrics interface{}, timestamp time.Time, err error) { if tc.desiredError != nil { - assert.Error(t, err) - assert.Contains(t, fmt.Sprintf("%v", err), fmt.Sprintf("%v", tc.desiredError)) + assert.Error(t, err, "there should be an error retrieving the metrics") + assert.Contains(t, fmt.Sprintf("%v", err), fmt.Sprintf("%v", tc.desiredError), "the error message should be eas expected") return } - assert.NoError(t, err) - assert.NotNil(t, val) - assert.True(t, tc.desiredValue-0.001 < *val) - assert.True(t, tc.desiredValue+0.001 > *val) - assert.Equal(t, tc.desiredRunningPods, pods) + assert.NoError(t, err, "there should be no error retrieving the metrics") + assert.NotNil(t, metrics, "there should be metrics returned") - if tc.desiredRequest != nil { - assert.True(t, *tc.desiredRequest-0.001 < *req) - assert.True(t, *tc.desiredRequest+0.001 > *req) + if metricsInfo, wasRaw := metrics.(PodMetricsInfo); wasRaw { + assert.Equal(t, tc.desiredMetricValues, metricsInfo, "the raw metrics values should be as expected") + } else if resourceInfo, wasResource := metrics.(PodResourceInfo); wasResource { + assert.Equal(t, tc.desiredResourceValues, resourceInfo, "the resource metrics values be been as expected") + } else { + assert.False(t, true, "should return either resource metrics info or raw metrics info") } targetTimestamp := fixedTimestamp.Add(time.Duration(tc.targetTimestamp) * time.Minute) - assert.True(t, targetTimestamp.Equal(timestamp)) + assert.True(t, targetTimestamp.Equal(timestamp), fmt.Sprintf("the timestamp should be as expected (%s) but was %s", targetTimestamp, timestamp)) } func (tc *testCase) runTest(t *testing.T) { testClient := tc.prepareTestClient(t) metricsClient := NewHeapsterMetricsClient(testClient, DefaultHeapsterNamespace, DefaultHeapsterScheme, DefaultHeapsterService, DefaultHeapsterPort) - if tc.targetResource == "cpu-usage" { - val, req, pods, timestamp, err := metricsClient.GetCpuConsumptionAndRequestInMillis(tc.namespace, tc.selector) - fval := float64(val) - freq := float64(req) - tc.verifyResults(t, &fval, &freq, pods, timestamp, err) + isResource := len(tc.resourceName) > 0 + if isResource { + info, timestamp, err := metricsClient.GetResourceMetric(tc.resourceName, tc.namespace, tc.selector) + tc.verifyResults(t, info, timestamp, err) } else { - val, timestamp, err := metricsClient.GetCustomMetric(tc.targetResource, tc.namespace, tc.selector) - tc.verifyResults(t, val, nil, 0, timestamp, err) + info, timestamp, err := metricsClient.GetRawMetric(tc.metricName, tc.namespace, tc.selector) + tc.verifyResults(t, info, timestamp, err) } } func TestCPU(t *testing.T) { tc := testCase{ - replicas: 3, - desiredValue: 5000, - desiredRunningPods: 3, - targetResource: "cpu-usage", + replicas: 3, + desiredResourceValues: PodResourceInfo{ + "test-pod-0": 5000, "test-pod-1": 5000, "test-pod-2": 5000, + }, + resourceName: api.ResourceCPU, targetTimestamp: 1, reportedPodMetrics: [][]int64{{5000}, {5000}, {5000}}, - useMetricsApi: true, - } - tc.runTest(t) -} - -func TestCPUPending(t *testing.T) { - desiredRequest := float64(2048 * 1000) - tc := testCase{ - replicas: 5, - desiredValue: 5000, - desiredRequest: &desiredRequest, - desiredRunningPods: 3, - targetResource: "cpu-usage", - targetTimestamp: 1, - reportedPodMetrics: [][]int64{{5000}, {5000}, {5000}}, - useMetricsApi: true, - podListOverride: &api.PodList{}, - } - - namespace := "test-namespace" - podNamePrefix := "test-pod" - podLabels := map[string]string{"name": podNamePrefix} - podRequest := []string{"1024", "2048", "3072", "200", "100"} - for i := 0; i < tc.replicas; i++ { - podName := fmt.Sprintf("%s-%d", podNamePrefix, i) - pod := buildPod(namespace, podName, podLabels, api.PodRunning, podRequest[i]) - tc.podListOverride.Items = append(tc.podListOverride.Items, pod) - } - tc.podListOverride.Items[3].Status.Phase = api.PodPending - tc.podListOverride.Items[4].Status.Phase = api.PodFailed - - tc.runTest(t) -} - -func TestCPUAllPending(t *testing.T) { - tc := testCase{ - replicas: 4, - targetResource: "cpu-usage", - targetTimestamp: 1, - reportedPodMetrics: [][]int64{}, - useMetricsApi: true, - podListOverride: &api.PodList{}, - desiredError: fmt.Errorf("no running pods"), - } - - namespace := "test-namespace" - podNamePrefix := "test-pod" - podLabels := map[string]string{"name": podNamePrefix} - for i := 0; i < tc.replicas; i++ { - podName := fmt.Sprintf("%s-%d", podNamePrefix, i) - pod := buildPod(namespace, podName, podLabels, api.PodPending, "2048") - tc.podListOverride.Items = append(tc.podListOverride.Items, pod) } tc.runTest(t) } func TestQPS(t *testing.T) { tc := testCase{ - replicas: 3, - desiredValue: 13.33333, - targetResource: "qps", + replicas: 3, + desiredMetricValues: PodMetricsInfo{ + "test-pod-0": 10, "test-pod-1": 20, "test-pod-2": 10, + }, + metricName: "qps", targetTimestamp: 1, reportedMetricsPoints: [][]metricPoint{{{10, 1}}, {{20, 1}}, {{10, 1}}}, } tc.runTest(t) } -func TestQPSPending(t *testing.T) { - tc := testCase{ - replicas: 4, - desiredValue: 13.33333, - targetResource: "qps", - targetTimestamp: 1, - reportedMetricsPoints: [][]metricPoint{{{10, 1}}, {{20, 1}}, {{10, 1}}}, - podListOverride: &api.PodList{}, - } - - namespace := "test-namespace" - podNamePrefix := "test-pod" - podLabels := map[string]string{"name": podNamePrefix} - for i := 0; i < tc.replicas; i++ { - podName := fmt.Sprintf("%s-%d", podNamePrefix, i) - pod := buildPod(namespace, podName, podLabels, api.PodRunning, "256") - tc.podListOverride.Items = append(tc.podListOverride.Items, pod) - } - tc.podListOverride.Items[0].Status.Phase = api.PodPending - tc.runTest(t) -} - -func TestQPSAllPending(t *testing.T) { - tc := testCase{ - replicas: 4, - desiredError: fmt.Errorf("no running pods"), - targetResource: "qps", - targetTimestamp: 1, - reportedMetricsPoints: [][]metricPoint{}, - podListOverride: &api.PodList{}, - } - - namespace := "test-namespace" - podNamePrefix := "test-pod" - podLabels := map[string]string{"name": podNamePrefix} - for i := 0; i < tc.replicas; i++ { - podName := fmt.Sprintf("%s-%d", podNamePrefix, i) - pod := buildPod(namespace, podName, podLabels, api.PodPending, "512") - tc.podListOverride.Items = append(tc.podListOverride.Items, pod) - } - tc.podListOverride.Items[0].Status.Phase = api.PodPending - tc.runTest(t) -} - -func TestCPUSumEqualZero(t *testing.T) { - tc := testCase{ - replicas: 3, - desiredValue: 0, - desiredRunningPods: 3, - targetResource: "cpu-usage", - targetTimestamp: 0, - reportedPodMetrics: [][]int64{{0}, {0}, {0}}, - useMetricsApi: true, - } - tc.runTest(t) -} - func TestQpsSumEqualZero(t *testing.T) { tc := testCase{ - replicas: 3, - desiredValue: 0, - targetResource: "qps", + replicas: 3, + desiredMetricValues: PodMetricsInfo{ + "test-pod-0": 0, "test-pod-1": 0, "test-pod-2": 0, + }, + metricName: "qps", targetTimestamp: 0, reportedMetricsPoints: [][]metricPoint{{{0, 0}}, {{0, 0}}, {{0, 0}}}, } @@ -365,24 +266,26 @@ func TestQpsSumEqualZero(t *testing.T) { func TestCPUMoreMetrics(t *testing.T) { tc := testCase{ - replicas: 5, - desiredValue: 5000, - desiredRunningPods: 5, - targetResource: "cpu-usage", + replicas: 5, + desiredResourceValues: PodResourceInfo{ + "test-pod-0": 5000, "test-pod-1": 5000, "test-pod-2": 5000, + "test-pod-3": 5000, "test-pod-4": 5000, + }, + resourceName: api.ResourceCPU, targetTimestamp: 10, reportedPodMetrics: [][]int64{{1000, 2000, 2000}, {5000}, {1000, 1000, 1000, 2000}, {4000, 1000}, {5000}}, - useMetricsApi: true, } tc.runTest(t) } func TestCPUMissingMetrics(t *testing.T) { tc := testCase{ - replicas: 3, - targetResource: "cpu-usage", - desiredError: fmt.Errorf("metrics obtained for 1/3 of pods"), + replicas: 3, + desiredResourceValues: PodResourceInfo{ + "test-pod-0": 4000, + }, + resourceName: api.ResourceCPU, reportedPodMetrics: [][]int64{{4000}}, - useMetricsApi: true, } tc.runTest(t) } @@ -390,29 +293,19 @@ func TestCPUMissingMetrics(t *testing.T) { func TestQpsMissingMetrics(t *testing.T) { tc := testCase{ replicas: 3, - targetResource: "qps", - desiredError: fmt.Errorf("metrics obtained for 1/3 of pods"), + desiredError: fmt.Errorf("requested metrics for 3 pods, got metrics for 1"), + metricName: "qps", + targetTimestamp: 1, reportedMetricsPoints: [][]metricPoint{{{4000, 4}}}, } tc.runTest(t) } -func TestCPUSuperfluousMetrics(t *testing.T) { - tc := testCase{ - replicas: 3, - targetResource: "cpu-usage", - desiredError: fmt.Errorf("metrics obtained for 6/3 of pods"), - reportedPodMetrics: [][]int64{{1000}, {2000}, {4000}, {4000}, {2000}, {4000}}, - useMetricsApi: true, - } - tc.runTest(t) -} - func TestQpsSuperfluousMetrics(t *testing.T) { tc := testCase{ replicas: 3, - targetResource: "qps", - desiredError: fmt.Errorf("metrics obtained for 6/3 of pods"), + desiredError: fmt.Errorf("requested metrics for 3 pods, got metrics for 6"), + metricName: "qps", reportedMetricsPoints: [][]metricPoint{{{1000, 1}}, {{2000, 4}}, {{2000, 1}}, {{4000, 5}}, {{2000, 1}}, {{4000, 4}}}, } tc.runTest(t) @@ -421,11 +314,23 @@ func TestQpsSuperfluousMetrics(t *testing.T) { func TestCPUEmptyMetrics(t *testing.T) { tc := testCase{ replicas: 3, - targetResource: "cpu-usage", - desiredError: fmt.Errorf("metrics obtained for 0/3 of pods"), + resourceName: api.ResourceCPU, + desiredError: fmt.Errorf("no metrics returned from heapster"), reportedMetricsPoints: [][]metricPoint{}, reportedPodMetrics: [][]int64{}, - useMetricsApi: true, + } + tc.runTest(t) +} + +func TestQpsEmptyEntries(t *testing.T) { + tc := testCase{ + replicas: 3, + metricName: "qps", + desiredMetricValues: PodMetricsInfo{ + "test-pod-0": 4000, "test-pod-2": 2000, + }, + targetTimestamp: 4, + reportedMetricsPoints: [][]metricPoint{{{4000, 4}}, {}, {{2000, 4}}}, } tc.runTest(t) } @@ -433,61 +338,37 @@ func TestCPUEmptyMetrics(t *testing.T) { func TestCPUZeroReplicas(t *testing.T) { tc := testCase{ replicas: 0, - targetResource: "cpu-usage", - desiredError: fmt.Errorf("some pods do not have request for cpu"), + resourceName: api.ResourceCPU, + desiredError: fmt.Errorf("no metrics returned from heapster"), reportedPodMetrics: [][]int64{}, - useMetricsApi: true, } tc.runTest(t) } func TestCPUEmptyMetricsForOnePod(t *testing.T) { tc := testCase{ - replicas: 3, - targetResource: "cpu-usage", - desiredError: fmt.Errorf("metrics obtained for 2/3 of pods (sample missing pod: test-namespace/test-pod-2)"), - reportedPodMetrics: [][]int64{{100}, {300, 400}}, - useMetricsApi: true, + replicas: 3, + resourceName: api.ResourceCPU, + desiredResourceValues: PodResourceInfo{ + "test-pod-0": 100, "test-pod-1": 700, + }, + reportedPodMetrics: [][]int64{{100}, {300, 400}, {}}, } tc.runTest(t) } -func TestAggregateSum(t *testing.T) { - //calculateSumFromTimeSample(metrics heapster.MetricResultList, duration time.Duration) (sum intAndFloat, count int, timestamp time.Time) { +func testCollapseTimeSamples(t *testing.T) { now := time.Now() - result := heapster.MetricResultList{ - Items: []heapster.MetricResult{ - { - Metrics: []heapster.MetricPoint{ - {Timestamp: now, Value: 50, FloatValue: nil}, - {Timestamp: now.Add(-15 * time.Second), Value: 100, FloatValue: nil}, - {Timestamp: now.Add(-60 * time.Second), Value: 100000, FloatValue: nil}}, - LatestTimestamp: now, - }, - }, + metrics := heapster.MetricResult{ + Metrics: []heapster.MetricPoint{ + {Timestamp: now, Value: 50, FloatValue: nil}, + {Timestamp: now.Add(-15 * time.Second), Value: 100, FloatValue: nil}, + {Timestamp: now.Add(-60 * time.Second), Value: 100000, FloatValue: nil}}, + LatestTimestamp: now, } - sum, cnt, _ := calculateSumFromTimeSample(result, time.Minute) - assert.Equal(t, int64(75), sum.intValue) - assert.InEpsilon(t, 75.0, sum.floatValue, 0.1) - assert.Equal(t, 1, cnt) -} -func TestAggregateSumSingle(t *testing.T) { - now := time.Now() - result := heapster.MetricResultList{ - Items: []heapster.MetricResult{ - { - Metrics: []heapster.MetricPoint{ - {Timestamp: now, Value: 50, FloatValue: nil}, - {Timestamp: now.Add(-65 * time.Second), Value: 100000, FloatValue: nil}}, - LatestTimestamp: now, - }, - }, - } - sum, cnt, _ := calculateSumFromTimeSample(result, time.Minute) - assert.Equal(t, int64(50), sum.intValue) - assert.InEpsilon(t, 50.0, sum.floatValue, 0.1) - assert.Equal(t, 1, cnt) + val, timestamp, hadMetrics := collapseTimeSamples(metrics, time.Minute) + assert.True(t, hadMetrics, "should report that it received a populated list of metrics") + assert.InEpsilon(t, float64(75), val, 0.1, "collapsed sample value should be as expected") + assert.True(t, timestamp.Equal(now), "timestamp should be the current time (the newest)") } - -// TODO: add proper tests for request diff --git a/pkg/controller/podautoscaler/metrics/utilization.go b/pkg/controller/podautoscaler/metrics/utilization.go new file mode 100644 index 00000000000..f0e909207f7 --- /dev/null +++ b/pkg/controller/podautoscaler/metrics/utilization.go @@ -0,0 +1,54 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +// GetResourceUtilizationRatio takes in a set of metrics, a set of matching requests, +// and a target utilization percentage, and calcuates the the ratio of +// desired to actual utilization (returning that and the actual utilization) +func GetResourceUtilizationRatio(metrics PodResourceInfo, requests map[string]int64, targetUtilization int32) (float64, int32, error) { + metricsTotal := int64(0) + requestsTotal := int64(0) + + for podName, metricValue := range metrics { + request, hasRequest := requests[podName] + if !hasRequest { + // we check for missing requests elsewhere, so assuming missing requests == extraneous metrics + continue + } + + metricsTotal += metricValue + requestsTotal += request + } + + currentUtilization := int32((metricsTotal * 100) / requestsTotal) + + return float64(currentUtilization) / float64(targetUtilization), currentUtilization, nil +} + +// GetMetricUtilizationRatio takes in a set of metrics and a target utilization value, +// and calcuates the ratio of desired to actual utilization +// (returning that and the actual utilization) +func GetMetricUtilizationRatio(metrics PodMetricsInfo, targetUtilization float64) (float64, float64) { + metricsTotal := float64(0) + for _, metricValue := range metrics { + metricsTotal += metricValue + } + + currentUtilization := metricsTotal / float64(len(metrics)) + + return currentUtilization / targetUtilization, currentUtilization +} diff --git a/pkg/controller/podautoscaler/replica_calculator.go b/pkg/controller/podautoscaler/replica_calculator.go new file mode 100644 index 00000000000..4074b7801e7 --- /dev/null +++ b/pkg/controller/podautoscaler/replica_calculator.go @@ -0,0 +1,246 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podautoscaler + +import ( + "fmt" + "math" + "time" + + "k8s.io/kubernetes/pkg/api" + unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" + metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/sets" +) + +type ReplicaCalculator struct { + metricsClient metricsclient.MetricsClient + podsGetter unversionedcore.PodsGetter +} + +func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, podsGetter unversionedcore.PodsGetter) *ReplicaCalculator { + return &ReplicaCalculator{ + metricsClient: metricsClient, + podsGetter: podsGetter, + } +} + +// GetResourceReplicas calculates the desired replica count based on a target resource utilization percentage +// of the given resource for pods matching the given selector in the given namespace, and the current replica count +func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUtilization int32, resource api.ResourceName, namespace string, selector labels.Selector) (replicaCount int32, utilization int32, timestamp time.Time, err error) { + metrics, timestamp, err := c.metricsClient.GetResourceMetric(resource, namespace, selector) + if err != nil { + return 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err) + } + + podList, err := c.podsGetter.Pods(namespace).List(api.ListOptions{LabelSelector: selector}) + if err != nil { + return 0, 0, time.Time{}, fmt.Errorf("unable to get pods while calculating replica count: %v", err) + } + + if len(podList.Items) == 0 { + return 0, 0, time.Time{}, fmt.Errorf("no pods returned by selector while calculating replica count") + } + + requests := make(map[string]int64, len(podList.Items)) + readyPodCount := 0 + unreadyPods := sets.NewString() + missingPods := sets.NewString() + + for _, pod := range podList.Items { + podSum := int64(0) + for _, container := range pod.Spec.Containers { + if containerRequest, ok := container.Resources.Requests[resource]; ok { + podSum += containerRequest.MilliValue() + } else { + return 0, 0, time.Time{}, fmt.Errorf("missing request for %s on container %s in pod %s/%s", resource, container.Name, namespace, pod.Name) + } + } + + requests[pod.Name] = podSum + + if pod.Status.Phase != api.PodRunning || !api.IsPodReady(&pod) { + // save this pod name for later, but pretend it doesn't exist for now + unreadyPods.Insert(pod.Name) + delete(metrics, pod.Name) + continue + } + + if _, found := metrics[pod.Name]; !found { + // save this pod name for later, but pretend it doesn't exist for now + missingPods.Insert(pod.Name) + continue + } + + readyPodCount++ + } + + if len(metrics) == 0 { + return 0, 0, time.Time{}, fmt.Errorf("did not receive metrics for any ready pods") + } + + usageRatio, utilization, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization) + if err != nil { + return 0, 0, time.Time{}, err + } + + rebalanceUnready := len(unreadyPods) > 0 && usageRatio > 1.0 + if !rebalanceUnready && len(missingPods) == 0 { + if math.Abs(1.0-usageRatio) <= tolerance { + // return the current replicas if the change would be too small + return currentReplicas, utilization, timestamp, nil + } + + // if we don't have any unready or missing pods, we can calculate the new replica count now + return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, timestamp, nil + } + + if len(missingPods) > 0 { + if usageRatio < 1.0 { + // on a scale-down, treat missing pods as using 100% of the resource request + for podName := range missingPods { + metrics[podName] = requests[podName] + } + } else { + // on a scale-up, treat missing pods as using 0% of the resource request + for podName := range missingPods { + metrics[podName] = 0 + } + } + } + + if rebalanceUnready { + // on a scale-up, treat unready pods as using 0% of the resource request + for podName := range unreadyPods { + metrics[podName] = 0 + } + } + + // re-run the utilization calculation with our new numbers + newUsageRatio, _, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization) + if err != nil { + return 0, utilization, time.Time{}, err + } + + if math.Abs(1.0-newUsageRatio) <= tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) { + // return the current replicas if the change would be too small, + // or if the new usage ratio would cause a change in scale direction + return currentReplicas, utilization, timestamp, nil + } + + // return the result, where the number of replicas considered is + // however many replicas factored into our calculation + return int32(math.Ceil(newUsageRatio * float64(len(metrics)))), utilization, timestamp, nil +} + +// GetMetricReplicas calculates the desired replica count based on a target resource utilization percentage +// of the given resource for pods matching the given selector in the given namespace, and the current replica count +func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtilization float64, metricName string, namespace string, selector labels.Selector) (replicaCount int32, utilization float64, timestamp time.Time, err error) { + metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector) + if err != nil { + return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err) + } + + podList, err := c.podsGetter.Pods(namespace).List(api.ListOptions{LabelSelector: selector}) + if err != nil { + return 0, 0, time.Time{}, fmt.Errorf("unable to get pods while calculating replica count: %v", err) + } + + if len(podList.Items) == 0 { + return 0, 0, time.Time{}, fmt.Errorf("no pods returned by selector while calculating replica count") + } + + readyPodCount := 0 + unreadyPods := sets.NewString() + missingPods := sets.NewString() + + for _, pod := range podList.Items { + if pod.Status.Phase != api.PodRunning || !api.IsPodReady(&pod) { + // save this pod name for later, but pretend it doesn't exist for now + unreadyPods.Insert(pod.Name) + delete(metrics, pod.Name) + continue + } + + if _, found := metrics[pod.Name]; !found { + // save this pod name for later, but pretend it doesn't exist for now + missingPods.Insert(pod.Name) + continue + } + + readyPodCount++ + } + + if len(metrics) == 0 { + return 0, 0, time.Time{}, fmt.Errorf("did not recieve metrics for any ready pods") + } + + usageRatio, utilization := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization) + if err != nil { + return 0, 0, time.Time{}, err + } + + rebalanceUnready := len(unreadyPods) > 0 && usageRatio > 1.0 + + if !rebalanceUnready && len(missingPods) == 0 { + if math.Abs(1.0-usageRatio) <= tolerance { + // return the current replicas if the change would be too small + return currentReplicas, utilization, timestamp, nil + } + + // if we don't have any unready or missing pods, we can calculate the new replica count now + return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, timestamp, nil + } + + if len(missingPods) > 0 { + if usageRatio < 1.0 { + // on a scale-down, treat missing pods as using 100% of the resource request + for podName := range missingPods { + metrics[podName] = targetUtilization + } + } else { + // on a scale-up, treat missing pods as using 0% of the resource request + for podName := range missingPods { + metrics[podName] = 0 + } + } + } + + if rebalanceUnready { + // on a scale-up, treat unready pods as using 0% of the resource request + for podName := range unreadyPods { + metrics[podName] = 0 + } + } + + // re-run the utilization calculation with our new numbers + newUsageRatio, _ := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization) + if err != nil { + return 0, utilization, time.Time{}, err + } + + if math.Abs(1.0-newUsageRatio) <= tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) { + // return the current replicas if the change would be too small, + // or if the new usage ratio would cause a change in scale direction + return currentReplicas, utilization, timestamp, nil + } + + // return the result, where the number of replicas considered is + // however many replicas factored into our calculation + return int32(math.Ceil(newUsageRatio * float64(len(metrics)))), utilization, timestamp, nil +} diff --git a/pkg/controller/podautoscaler/replica_calculator_test.go b/pkg/controller/podautoscaler/replica_calculator_test.go new file mode 100644 index 00000000000..0819cc406fc --- /dev/null +++ b/pkg/controller/podautoscaler/replica_calculator_test.go @@ -0,0 +1,556 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podautoscaler + +import ( + "encoding/json" + "fmt" + "math" + "strconv" + "strings" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/api/v1" + _ "k8s.io/kubernetes/pkg/apimachinery/registered" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" + "k8s.io/kubernetes/pkg/runtime" + + heapster "k8s.io/heapster/metrics/api/v1/types" + metrics_api "k8s.io/heapster/metrics/apis/metrics/v1alpha1" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type resourceInfo struct { + name api.ResourceName + requests []resource.Quantity + levels []int64 + + targetUtilization int32 + expectedUtilization int32 +} + +type metricInfo struct { + name string + levels []float64 + + targetUtilization float64 + expectedUtilization float64 +} + +type replicaCalcTestCase struct { + currentReplicas int32 + expectedReplicas int32 + expectedError error + + timestamp time.Time + + resource *resourceInfo + metric *metricInfo + + podReadiness []api.ConditionStatus +} + +const ( + testNamespace = "test-namespace" + podNamePrefix = "test-pod" +) + +func (tc *replicaCalcTestCase) prepareTestClient(t *testing.T) *fake.Clientset { + + fakeClient := &fake.Clientset{} + fakeClient.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + obj := &api.PodList{} + for i := 0; i < int(tc.currentReplicas); i++ { + podReadiness := api.ConditionTrue + if tc.podReadiness != nil { + podReadiness = tc.podReadiness[i] + } + podName := fmt.Sprintf("%s-%d", podNamePrefix, i) + pod := api.Pod{ + Status: api.PodStatus{ + Phase: api.PodRunning, + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: podReadiness, + }, + }, + }, + ObjectMeta: api.ObjectMeta{ + Name: podName, + Namespace: testNamespace, + Labels: map[string]string{ + "name": podNamePrefix, + }, + }, + Spec: api.PodSpec{ + Containers: []api.Container{{}, {}}, + }, + } + + if tc.resource != nil && i < len(tc.resource.requests) { + pod.Spec.Containers[0].Resources = api.ResourceRequirements{ + Requests: api.ResourceList{ + tc.resource.name: tc.resource.requests[i], + }, + } + pod.Spec.Containers[1].Resources = api.ResourceRequirements{ + Requests: api.ResourceList{ + tc.resource.name: tc.resource.requests[i], + }, + } + } + obj.Items = append(obj.Items, pod) + } + return true, obj, nil + }) + + fakeClient.AddProxyReactor("services", func(action core.Action) (handled bool, ret restclient.ResponseWrapper, err error) { + var heapsterRawMemResponse []byte + + if tc.resource != nil { + metrics := metrics_api.PodMetricsList{} + for i, resValue := range tc.resource.levels { + podMetric := metrics_api.PodMetrics{ + ObjectMeta: v1.ObjectMeta{ + Name: fmt.Sprintf("%s-%d", podNamePrefix, i), + Namespace: testNamespace, + }, + Timestamp: unversioned.Time{Time: tc.timestamp}, + Containers: []metrics_api.ContainerMetrics{ + { + Name: "container1", + Usage: v1.ResourceList{ + v1.ResourceName(tc.resource.name): *resource.NewMilliQuantity( + int64(resValue), + resource.DecimalSI), + }, + }, + { + Name: "container2", + Usage: v1.ResourceList{ + v1.ResourceName(tc.resource.name): *resource.NewMilliQuantity( + int64(resValue), + resource.DecimalSI), + }, + }, + }, + } + metrics.Items = append(metrics.Items, podMetric) + } + heapsterRawMemResponse, _ = json.Marshal(&metrics) + } else { + // only return the pods that we actually asked for + proxyAction := action.(core.ProxyGetAction) + pathParts := strings.Split(proxyAction.GetPath(), "/") + // pathParts should look like [ api, v1, model, namespaces, $NS, pod-list, $PODS, metrics, $METRIC... ] + if len(pathParts) < 9 { + return true, nil, fmt.Errorf("invalid heapster path %q", proxyAction.GetPath()) + } + + podNames := strings.Split(pathParts[7], ",") + podPresent := make([]bool, len(tc.metric.levels)) + for _, name := range podNames { + if len(name) <= len(podNamePrefix)+1 { + return true, nil, fmt.Errorf("unknown pod %q", name) + } + num, err := strconv.Atoi(name[len(podNamePrefix)+1:]) + if err != nil { + return true, nil, fmt.Errorf("unknown pod %q", name) + } + podPresent[num] = true + } + + timestamp := tc.timestamp + metrics := heapster.MetricResultList{} + for i, level := range tc.metric.levels { + if !podPresent[i] { + continue + } + + metric := heapster.MetricResult{ + Metrics: []heapster.MetricPoint{{Timestamp: timestamp, Value: uint64(level), FloatValue: &tc.metric.levels[i]}}, + LatestTimestamp: timestamp, + } + metrics.Items = append(metrics.Items, metric) + } + heapsterRawMemResponse, _ = json.Marshal(&metrics) + } + + return true, newFakeResponseWrapper(heapsterRawMemResponse), nil + }) + + return fakeClient +} + +func (tc *replicaCalcTestCase) runTest(t *testing.T) { + testClient := tc.prepareTestClient(t) + metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort) + + replicaCalc := &ReplicaCalculator{ + metricsClient: metricsClient, + podsGetter: testClient.Core(), + } + + selector, err := unversioned.LabelSelectorAsSelector(&unversioned.LabelSelector{ + MatchLabels: map[string]string{"name": podNamePrefix}, + }) + if err != nil { + require.Nil(t, err, "something went horribly wrong...") + } + + if tc.resource != nil { + outReplicas, outUtilization, outTimestamp, err := replicaCalc.GetResourceReplicas(tc.currentReplicas, tc.resource.targetUtilization, tc.resource.name, testNamespace, selector) + + if tc.expectedError != nil { + require.Error(t, err, "there should be an error calculating the replica count") + assert.Contains(t, err.Error(), tc.expectedError.Error(), "the error message should have contained the expected error message") + return + } + require.NoError(t, err, "there should not have been an error calculating the replica count") + assert.Equal(t, tc.expectedReplicas, outReplicas, "replicas should be as expected") + assert.Equal(t, tc.resource.expectedUtilization, outUtilization, "utilization should be as expected") + assert.True(t, tc.timestamp.Equal(outTimestamp), "timestamp should be as expected") + + } else { + outReplicas, outUtilization, outTimestamp, err := replicaCalc.GetMetricReplicas(tc.currentReplicas, tc.metric.targetUtilization, tc.metric.name, testNamespace, selector) + + if tc.expectedError != nil { + require.Error(t, err, "there should be an error calculating the replica count") + assert.Contains(t, err.Error(), tc.expectedError.Error(), "the error message should have contained the expected error message") + return + } + require.NoError(t, err, "there should not have been an error calculating the replica count") + assert.Equal(t, tc.expectedReplicas, outReplicas, "replicas should be as expected") + assert.InDelta(t, tc.metric.expectedUtilization, 0.1, outUtilization, "utilization should be as expected") + assert.True(t, tc.timestamp.Equal(outTimestamp), "timestamp should be as expected") + } +} + +func TestReplicaCalcScaleUp(t *testing.T) { + tc := replicaCalcTestCase{ + currentReplicas: 3, + expectedReplicas: 5, + resource: &resourceInfo{ + name: api.ResourceCPU, + requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + levels: []int64{300, 500, 700}, + + targetUtilization: 30, + expectedUtilization: 50, + }, + } + tc.runTest(t) +} + +func TestReplicaCalcScaleUpUnreadyLessScale(t *testing.T) { + tc := replicaCalcTestCase{ + currentReplicas: 3, + expectedReplicas: 4, + podReadiness: []api.ConditionStatus{api.ConditionFalse, api.ConditionTrue, api.ConditionTrue}, + resource: &resourceInfo{ + name: api.ResourceCPU, + requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + levels: []int64{300, 500, 700}, + + targetUtilization: 30, + expectedUtilization: 60, + }, + } + tc.runTest(t) +} + +func TestReplicaCalcScaleUpUnreadyNoScale(t *testing.T) { + tc := replicaCalcTestCase{ + currentReplicas: 3, + expectedReplicas: 3, + podReadiness: []api.ConditionStatus{api.ConditionTrue, api.ConditionFalse, api.ConditionFalse}, + resource: &resourceInfo{ + name: api.ResourceCPU, + requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + levels: []int64{400, 500, 700}, + + targetUtilization: 30, + expectedUtilization: 40, + }, + } + tc.runTest(t) +} + +func TestReplicaCalcScaleUpCM(t *testing.T) { + tc := replicaCalcTestCase{ + currentReplicas: 3, + expectedReplicas: 4, + metric: &metricInfo{ + name: "qps", + levels: []float64{20.0, 10.0, 30.0}, + targetUtilization: 15.0, + expectedUtilization: 20.0, + }, + } + tc.runTest(t) +} + +func TestReplicaCalcScaleUpCMUnreadyLessScale(t *testing.T) { + tc := replicaCalcTestCase{ + currentReplicas: 3, + expectedReplicas: 4, + podReadiness: []api.ConditionStatus{api.ConditionTrue, api.ConditionTrue, api.ConditionFalse}, + metric: &metricInfo{ + name: "qps", + levels: []float64{50.0, 10.0, 30.0}, + targetUtilization: 15.0, + expectedUtilization: 30.0, + }, + } + tc.runTest(t) +} + +func TestReplicaCalcScaleUpCMUnreadyNoScaleWouldScaleDown(t *testing.T) { + tc := replicaCalcTestCase{ + currentReplicas: 3, + expectedReplicas: 3, + podReadiness: []api.ConditionStatus{api.ConditionFalse, api.ConditionTrue, api.ConditionFalse}, + metric: &metricInfo{ + name: "qps", + levels: []float64{50.0, 15.0, 30.0}, + targetUtilization: 15.0, + expectedUtilization: 15.0, + }, + } + tc.runTest(t) +} + +func TestReplicaCalcScaleDown(t *testing.T) { + tc := replicaCalcTestCase{ + currentReplicas: 5, + expectedReplicas: 3, + resource: &resourceInfo{ + name: api.ResourceCPU, + requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + levels: []int64{100, 300, 500, 250, 250}, + + targetUtilization: 50, + expectedUtilization: 28, + }, + } + tc.runTest(t) +} + +func TestReplicaCalcScaleDownCM(t *testing.T) { + tc := replicaCalcTestCase{ + currentReplicas: 5, + expectedReplicas: 3, + metric: &metricInfo{ + name: "qps", + levels: []float64{12.0, 12.0, 12.0, 12.0, 12.0}, + targetUtilization: 20.0, + expectedUtilization: 12.0, + }, + } + tc.runTest(t) +} + +func TestReplicaCalcScaleDownIgnoresUnreadyPods(t *testing.T) { + tc := replicaCalcTestCase{ + currentReplicas: 5, + expectedReplicas: 2, + podReadiness: []api.ConditionStatus{api.ConditionTrue, api.ConditionTrue, api.ConditionTrue, api.ConditionFalse, api.ConditionFalse}, + resource: &resourceInfo{ + name: api.ResourceCPU, + requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + levels: []int64{100, 300, 500, 250, 250}, + + targetUtilization: 50, + expectedUtilization: 30, + }, + } + tc.runTest(t) +} + +func TestReplicaCalcTolerance(t *testing.T) { + tc := replicaCalcTestCase{ + currentReplicas: 3, + expectedReplicas: 3, + resource: &resourceInfo{ + name: api.ResourceCPU, + requests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")}, + levels: []int64{1010, 1030, 1020}, + + targetUtilization: 100, + expectedUtilization: 102, + }, + } + tc.runTest(t) +} + +func TestReplicaCalcToleranceCM(t *testing.T) { + tc := replicaCalcTestCase{ + currentReplicas: 3, + expectedReplicas: 3, + metric: &metricInfo{ + name: "qps", + levels: []float64{20.0, 21.0, 21.0}, + targetUtilization: 20.0, + expectedUtilization: 20.66666, + }, + } + tc.runTest(t) +} + +func TestReplicaCalcSuperfluousMetrics(t *testing.T) { + tc := replicaCalcTestCase{ + currentReplicas: 4, + expectedReplicas: 24, + resource: &resourceInfo{ + name: api.ResourceCPU, + requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + levels: []int64{4000, 9500, 3000, 7000, 3200, 2000}, + targetUtilization: 100, + expectedUtilization: 587, + }, + } + tc.runTest(t) +} + +func TestReplicaCalcMissingMetrics(t *testing.T) { + tc := replicaCalcTestCase{ + currentReplicas: 4, + expectedReplicas: 3, + resource: &resourceInfo{ + name: api.ResourceCPU, + requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + levels: []int64{400, 95}, + + targetUtilization: 100, + expectedUtilization: 24, + }, + } + tc.runTest(t) +} + +func TestReplicaCalcEmptyMetrics(t *testing.T) { + tc := replicaCalcTestCase{ + currentReplicas: 4, + expectedError: fmt.Errorf("unable to get metrics for resource cpu: no metrics returned from heapster"), + resource: &resourceInfo{ + name: api.ResourceCPU, + requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + levels: []int64{}, + + targetUtilization: 100, + }, + } + tc.runTest(t) +} + +func TestReplicaCalcEmptyCPURequest(t *testing.T) { + tc := replicaCalcTestCase{ + currentReplicas: 1, + expectedError: fmt.Errorf("missing request for"), + resource: &resourceInfo{ + name: api.ResourceCPU, + requests: []resource.Quantity{}, + levels: []int64{200}, + + targetUtilization: 100, + }, + } + tc.runTest(t) +} + +// TestComputedToleranceAlgImplementation is a regression test which +// back-calculates a minimal percentage for downscaling based on a small percentage +// increase in pod utilization which is calibrated against the tolerance value. +func TestReplicaCalcComputedToleranceAlgImplementation(t *testing.T) { + + startPods := int32(10) + // 150 mCPU per pod. + totalUsedCPUOfAllPods := int64(startPods * 150) + // Each pod starts out asking for 2X what is really needed. + // This means we will have a 50% ratio of used/requested + totalRequestedCPUOfAllPods := int32(2 * totalUsedCPUOfAllPods) + requestedToUsed := float64(totalRequestedCPUOfAllPods / int32(totalUsedCPUOfAllPods)) + // Spread the amount we ask over 10 pods. We can add some jitter later in reportedLevels. + perPodRequested := totalRequestedCPUOfAllPods / startPods + + // Force a minimal scaling event by satisfying (tolerance < 1 - resourcesUsedRatio). + target := math.Abs(1/(requestedToUsed*(1-tolerance))) + .01 + finalCpuPercentTarget := int32(target * 100) + resourcesUsedRatio := float64(totalUsedCPUOfAllPods) / float64(float64(totalRequestedCPUOfAllPods)*target) + + // i.e. .60 * 20 -> scaled down expectation. + finalPods := int32(math.Ceil(resourcesUsedRatio * float64(startPods))) + + // To breach tolerance we will create a utilization ratio difference of tolerance to usageRatioToleranceValue) + tc := replicaCalcTestCase{ + currentReplicas: startPods, + expectedReplicas: finalPods, + resource: &resourceInfo{ + name: api.ResourceCPU, + levels: []int64{ + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + totalUsedCPUOfAllPods / 10, + }, + requests: []resource.Quantity{ + resource.MustParse(fmt.Sprint(perPodRequested+100) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested-100) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested+10) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested-10) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested+2) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested-2) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested+1) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested-1) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested) + "m"), + resource.MustParse(fmt.Sprint(perPodRequested) + "m"), + }, + + targetUtilization: finalCpuPercentTarget, + expectedUtilization: int32(totalUsedCPUOfAllPods*100) / totalRequestedCPUOfAllPods, + }, + } + + tc.runTest(t) + + // Reuse the data structure above, now testing "unscaling". + // Now, we test that no scaling happens if we are in a very close margin to the tolerance + target = math.Abs(1/(requestedToUsed*(1-tolerance))) + .004 + finalCpuPercentTarget = int32(target * 100) + tc.resource.targetUtilization = finalCpuPercentTarget + tc.currentReplicas = startPods + tc.expectedReplicas = startPods + tc.runTest(t) +} + +// TODO: add more tests