From a3972947cc140cc527d20c955a75a0b7af109a8d Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Wed, 27 Jan 2016 21:14:45 +0100 Subject: [PATCH] CustomMetrics in HPA MetricsClient --- .../podautoscaler/metrics/metrics_client.go | 149 ++++++++++-------- .../metrics/metrics_client_test.go | 148 +++++------------ 2 files changed, 121 insertions(+), 176 deletions(-) diff --git a/pkg/controller/podautoscaler/metrics/metrics_client.go b/pkg/controller/podautoscaler/metrics/metrics_client.go index b3cf4e14e00..a6f15342d6c 100644 --- a/pkg/controller/podautoscaler/metrics/metrics_client.go +++ b/pkg/controller/podautoscaler/metrics/metrics_client.go @@ -24,7 +24,6 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/resource" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/labels" @@ -46,16 +45,19 @@ type MetricsClient interface { // (e.g. 70 means that an average pod uses 70% of the requested CPU) // and the time of generation of the oldest of utilization reports for pods. GetCPUUtilization(namespace string, selector map[string]string) (*int, time.Time, error) + + // GetCustomMetric returns the average value of the given custom metrics from the + // pods picked using the namespace and selector passed as arguments. + GetCustomMetric(customMetricName string, namespace string, selector map[string]string) (*float64, time.Time, error) } -// ResourceConsumption specifies consumption of a particular resource. -type ResourceConsumption struct { - Resource api.ResourceName - Quantity resource.Quantity +type intAndFloat struct { + intValue int64 + floatValue float64 } // Aggregates results into ResourceConsumption. Also returns number of pods included in the aggregation. -type metricAggregator func(heapster.MetricResultList) (ResourceConsumption, int, time.Time) +type metricAggregator func(heapster.MetricResultList) (intAndFloat, int, time.Time) type metricDefinition struct { name string @@ -64,98 +66,111 @@ type metricDefinition struct { // HeapsterMetricsClient is Heapster-based implementation of MetricsClient type HeapsterMetricsClient struct { - client client.Interface - resourceDefinitions map[api.ResourceName]metricDefinition - heapsterNamespace string - heapsterScheme string - heapsterService string - heapsterPort string + client client.Interface + heapsterNamespace string + heapsterScheme string + heapsterService string + heapsterPort string } -var heapsterMetricDefinitions = map[api.ResourceName]metricDefinition{ - api.ResourceCPU: {"cpu-usage", - func(metrics heapster.MetricResultList) (ResourceConsumption, int, time.Time) { - sum, count, timestamp := calculateSumFromLatestSample(metrics) - value := "0" - if count > 0 { - // assumes that cpu usage is in millis - value = fmt.Sprintf("%dm", sum/uint64(count)) - } - return ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count, timestamp - }}, - api.ResourceMemory: {"memory-usage", - func(metrics heapster.MetricResultList) (ResourceConsumption, int, time.Time) { - sum, count, timestamp := calculateSumFromLatestSample(metrics) - value := int64(0) - if count > 0 { - value = int64(sum) / int64(count) - } - return ResourceConsumption{Resource: api.ResourceMemory, Quantity: *resource.NewQuantity(value, resource.DecimalSI)}, count, timestamp - }}, +var averageFunction = func(metrics heapster.MetricResultList) (intAndFloat, int, time.Time) { + sum, count, timestamp := calculateSumFromLatestSample(metrics) + result := intAndFloat{0, 0} + if count > 0 { + result.intValue = sum.intValue / int64(count) + result.floatValue = sum.floatValue / float64(count) + } + return result, count, timestamp +} + +var heapsterCpuUsageMetricDefinition = metricDefinition{"cpu-usage", averageFunction} + +func getHeapsterCustomMetricDefinition(metricName string) metricDefinition { + return metricDefinition{"CM:" + metricName, averageFunction} } // NewHeapsterMetricsClient returns a new instance of Heapster-based implementation of MetricsClient interface. func NewHeapsterMetricsClient(client client.Interface, namespace, scheme, service, port string) *HeapsterMetricsClient { return &HeapsterMetricsClient{ - client: client, - resourceDefinitions: heapsterMetricDefinitions, - heapsterNamespace: namespace, - heapsterScheme: scheme, - heapsterService: service, - heapsterPort: port, + client: client, + heapsterNamespace: namespace, + heapsterScheme: scheme, + heapsterService: service, + heapsterPort: port, } } func (h *HeapsterMetricsClient) GetCPUUtilization(namespace string, selector map[string]string) (*int, time.Time, error) { - consumption, request, timestamp, err := h.GetResourceConsumptionAndRequest(api.ResourceCPU, namespace, selector) + avgConsumption, avgRequest, timestamp, err := h.GetCpuConsumptionAndRequestInMillis(namespace, selector) if err != nil { return nil, time.Time{}, fmt.Errorf("failed to get CPU consumption and request: %v", err) } - utilization := new(int) - *utilization = int(float64(consumption.Quantity.MilliValue()) / float64(request.MilliValue()) * 100) - return utilization, timestamp, nil + utilization := int((avgConsumption * 100) / avgRequest) + return &utilization, timestamp, nil } -func (h *HeapsterMetricsClient) GetResourceConsumptionAndRequest(resourceName api.ResourceName, namespace string, selector map[string]string) (consumption *ResourceConsumption, request *resource.Quantity, timestamp time.Time, err error) { +func (h *HeapsterMetricsClient) GetCpuConsumptionAndRequestInMillis(namespace string, selector map[string]string) (avgConsumption int64, + avgRequest int64, timestamp time.Time, err error) { + labelSelector := labels.SelectorFromSet(labels.Set(selector)) podList, err := h.client.Pods(namespace). List(api.ListOptions{LabelSelector: labelSelector}) if err != nil { - return nil, nil, time.Time{}, fmt.Errorf("failed to get pod list: %v", err) + return 0, 0, time.Time{}, fmt.Errorf("failed to get pod list: %v", err) } podNames := []string{} - sum := resource.MustParse("0") + requestSum := int64(0) missing := false for _, pod := range podList.Items { podNames = append(podNames, pod.Name) for _, container := range pod.Spec.Containers { - containerRequest := container.Resources.Requests[resourceName] + containerRequest := container.Resources.Requests[api.ResourceCPU] if containerRequest.Amount != nil { - sum.Add(containerRequest) + requestSum += containerRequest.MilliValue() } else { missing = true } } } - if missing || sum.Cmp(resource.MustParse("0")) == 0 { - return nil, nil, time.Time{}, fmt.Errorf("some pods do not have request for %s", resourceName) + if missing || requestSum == 0 { + return 0, 0, time.Time{}, fmt.Errorf("some pods do not have request for cpu") } - glog.Infof("Sum of %s requested: %v", resourceName, sum) - avg := resource.MustParse(fmt.Sprintf("%dm", sum.MilliValue()/int64(len(podList.Items)))) - request = &avg - consumption, timestamp, err = h.getForPods(resourceName, namespace, podNames) + glog.Infof("Sum of CPU requested: %d", requestSum) + requestAvg := requestSum / int64(len(podList.Items)) + // Consumption is already averaged and in millis. + consumption, timestamp, err := h.getForPods(heapsterCpuUsageMetricDefinition, namespace, podNames) if err != nil { - return nil, nil, time.Time{}, err + return 0, 0, time.Time{}, err } - return consumption, request, timestamp, nil + return consumption.intValue, requestAvg, timestamp, nil } -func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namespace string, podNames []string) (*ResourceConsumption, time.Time, error) { - metricSpec, metricDefined := h.resourceDefinitions[resourceName] - if !metricDefined { - return nil, time.Time{}, fmt.Errorf("heapster metric not defined for %v", resourceName) +// GetCustomMetric returns the average value of the given custom metric from the +// pods picked using the namespace and selector passed as arguments. +func (h *HeapsterMetricsClient) GetCustomMetric(customMetricName string, namespace string, selector map[string]string) (*float64, time.Time, error) { + metricSpec := getHeapsterCustomMetricDefinition(customMetricName) + + labelSelector := labels.SelectorFromSet(labels.Set(selector)) + podList, err := h.client.Pods(namespace).List(api.ListOptions{LabelSelector: labelSelector}) + + if err != nil { + return nil, time.Time{}, fmt.Errorf("failed to get pod list: %v", err) } + podNames := []string{} + for _, pod := range podList.Items { + podNames = append(podNames, pod.Name) + } + + value, timestamp, err := h.getForPods(metricSpec, namespace, podNames) + if err != nil { + return nil, time.Time{}, err + } + return &value.floatValue, timestamp, nil +} + +func (h *HeapsterMetricsClient) getForPods(metricSpec metricDefinition, namespace string, podNames []string) (*intAndFloat, time.Time, error) { + now := time.Now() startTime := now.Add(heapsterQueryStart) @@ -180,16 +195,16 @@ func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namesp glog.Infof("Metrics available: %s", string(resultRaw)) - currentConsumption, count, timestamp := metricSpec.aggregator(metrics) + sum, count, timestamp := metricSpec.aggregator(metrics) if count != len(podNames) { return nil, time.Time{}, fmt.Errorf("metrics obtained for %d/%d of pods", count, len(podNames)) } - return ¤tConsumption, timestamp, nil + return &sum, timestamp, nil } -func calculateSumFromLatestSample(metrics heapster.MetricResultList) (sum uint64, count int, timestamp time.Time) { - sum = uint64(0) +func calculateSumFromLatestSample(metrics heapster.MetricResultList) (sum intAndFloat, count int, timestamp time.Time) { + sum = intAndFloat{0, 0} count = 0 timestamp = time.Time{} var oldest *time.Time // creation time of the oldest of used samples across pods @@ -206,7 +221,13 @@ func calculateSumFromLatestSample(metrics heapster.MetricResultList) (sum uint64 if oldest == nil || newest.Timestamp.Before(*oldest) { oldest = &newest.Timestamp } - sum += newest.Value + if newest.FloatValue == nil { + sum.intValue += int64(newest.Value) + sum.floatValue += float64(newest.Value) + } else { + sum.intValue += int64(*newest.FloatValue) + sum.floatValue += *newest.FloatValue + } count++ } } diff --git a/pkg/controller/podautoscaler/metrics/metrics_client_test.go b/pkg/controller/podautoscaler/metrics/metrics_client_test.go index a472b198f25..c87201c0d7a 100644 --- a/pkg/controller/podautoscaler/metrics/metrics_client_test.go +++ b/pkg/controller/podautoscaler/metrics/metrics_client_test.go @@ -61,9 +61,9 @@ type metricPoint struct { type testCase struct { replicas int - desiredValue int64 + desiredValue float64 desiredError error - targetResource api.ResourceName + targetResource string targetTimestamp int reportedMetricsPoints [][]metricPoint namespace string @@ -94,7 +94,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake { { Resources: api.ResourceRequirements{ Requests: api.ResourceList{ - tc.targetResource: resource.MustParse("10"), + api.ResourceCPU: resource.MustParse("10"), }, }, }, @@ -135,17 +135,15 @@ func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake { return fakeClient } -func (tc *testCase) verifyResults(t *testing.T, val *ResourceConsumption, timestamp time.Time, err error) { +func (tc *testCase) verifyResults(t *testing.T, val *float64, timestamp time.Time, err error) { assert.Equal(t, tc.desiredError, err) if tc.desiredError != nil { return } - if tc.targetResource == api.ResourceCPU { - assert.Equal(t, tc.desiredValue, val.Quantity.MilliValue()) - } - if tc.targetResource == api.ResourceMemory { - assert.Equal(t, tc.desiredValue, val.Quantity.Value()) - } + assert.NotNil(t, val) + assert.True(t, tc.desiredValue-0.001 < *val) + assert.True(t, tc.desiredValue+0.001 > *val) + targetTimestamp := fixedTimestamp.Add(time.Duration(tc.targetTimestamp) * time.Minute) assert.Equal(t, targetTimestamp, timestamp) } @@ -153,28 +151,34 @@ func (tc *testCase) verifyResults(t *testing.T, val *ResourceConsumption, timest func (tc *testCase) runTest(t *testing.T) { testClient := tc.prepareTestClient(t) metricsClient := NewHeapsterMetricsClient(testClient, DefaultHeapsterNamespace, DefaultHeapsterScheme, DefaultHeapsterService, DefaultHeapsterPort) - val, _, timestamp, err := metricsClient.GetResourceConsumptionAndRequest(tc.targetResource, tc.namespace, tc.selector) - tc.verifyResults(t, val, timestamp, err) + if tc.targetResource == "cpu-usage" { + val, _, timestamp, err := metricsClient.GetCpuConsumptionAndRequestInMillis(tc.namespace, tc.selector) + fval := float64(val) + tc.verifyResults(t, &fval, timestamp, err) + } else { + val, timestamp, err := metricsClient.GetCustomMetric(tc.targetResource, tc.namespace, tc.selector) + tc.verifyResults(t, val, timestamp, err) + } } func TestCPU(t *testing.T) { tc := testCase{ replicas: 3, desiredValue: 5000, - targetResource: api.ResourceCPU, + targetResource: "cpu-usage", targetTimestamp: 1, reportedMetricsPoints: [][]metricPoint{{{5000, 1}}, {{5000, 1}}, {{5000, 1}}}, } tc.runTest(t) } -func TestMemory(t *testing.T) { +func TestQPS(t *testing.T) { tc := testCase{ replicas: 3, - desiredValue: 5000, - targetResource: api.ResourceMemory, + desiredValue: 13.33333, + targetResource: "qps", targetTimestamp: 1, - reportedMetricsPoints: [][]metricPoint{{{5000, 1}}, {{5000, 2}}, {{5000, 4}}}, + reportedMetricsPoints: [][]metricPoint{{{10, 1}}, {{20, 1}}, {{10, 1}}}, } tc.runTest(t) } @@ -183,18 +187,18 @@ func TestCPUSumEqualZero(t *testing.T) { tc := testCase{ replicas: 3, desiredValue: 0, - targetResource: api.ResourceCPU, + targetResource: "cpu-usage", targetTimestamp: 0, reportedMetricsPoints: [][]metricPoint{{{0, 0}}, {{0, 0}}, {{0, 0}}}, } tc.runTest(t) } -func TestMemorySumEqualZero(t *testing.T) { +func TestQpsSumEqualZero(t *testing.T) { tc := testCase{ replicas: 3, desiredValue: 0, - targetResource: api.ResourceMemory, + targetResource: "qps", targetTimestamp: 0, reportedMetricsPoints: [][]metricPoint{{{0, 0}}, {{0, 0}}, {{0, 0}}}, } @@ -205,23 +209,7 @@ func TestCPUMoreMetrics(t *testing.T) { tc := testCase{ replicas: 5, desiredValue: 5000, - targetResource: api.ResourceCPU, - targetTimestamp: 10, - reportedMetricsPoints: [][]metricPoint{ - {{0, 3}, {0, 6}, {5, 4}, {9000, 10}}, - {{5000, 2}, {10, 5}, {66, 1}, {0, 10}}, - {{5000, 3}, {80, 5}, {6000, 10}}, - {{5000, 3}, {40, 3}, {0, 9}, {200, 2}, {8000, 10}}, - {{5000, 2}, {20, 2}, {2000, 10}}}, - } - tc.runTest(t) -} - -func TestMemoryMoreMetrics(t *testing.T) { - tc := testCase{ - replicas: 5, - desiredValue: 5000, - targetResource: api.ResourceMemory, + targetResource: "cpu-usage", targetTimestamp: 10, reportedMetricsPoints: [][]metricPoint{ {{0, 3}, {0, 6}, {5, 4}, {9000, 10}}, @@ -237,18 +225,7 @@ func TestCPUResultIsFloat(t *testing.T) { tc := testCase{ replicas: 6, desiredValue: 4783, - targetResource: api.ResourceCPU, - targetTimestamp: 4, - reportedMetricsPoints: [][]metricPoint{{{4000, 4}}, {{9500, 4}}, {{3000, 4}}, {{7000, 4}}, {{3200, 4}}, {{2000, 4}}}, - } - tc.runTest(t) -} - -func TestMemoryResultIsFloat(t *testing.T) { - tc := testCase{ - replicas: 6, - desiredValue: 4783, - targetResource: api.ResourceMemory, + targetResource: "cpu-usage", targetTimestamp: 4, reportedMetricsPoints: [][]metricPoint{{{4000, 4}}, {{9500, 4}}, {{3000, 4}}, {{7000, 4}}, {{3200, 4}}, {{2000, 4}}}, } @@ -259,7 +236,7 @@ func TestCPUSamplesWithRandomTimestamps(t *testing.T) { tc := testCase{ replicas: 3, desiredValue: 3000, - targetResource: api.ResourceCPU, + targetResource: "cpu-usage", targetTimestamp: 3, reportedMetricsPoints: [][]metricPoint{ {{1, 1}, {3000, 5}, {2, 2}}, @@ -269,43 +246,20 @@ func TestCPUSamplesWithRandomTimestamps(t *testing.T) { tc.runTest(t) } -func TestMemorySamplesWithRandomTimestamps(t *testing.T) { - tc := testCase{ - replicas: 3, - desiredValue: 3000, - targetResource: api.ResourceMemory, - targetTimestamp: 3, - reportedMetricsPoints: [][]metricPoint{ - {{1, 1}, {3000, 3}, {2, 2}}, - {{2, 2}, {1, 1}, {3000, 3}}, - {{3000, 3}, {1, 1}, {2, 2}}}, - } - tc.runTest(t) -} - -func TestErrorMetricNotDefined(t *testing.T) { - tc := testCase{ - replicas: 1, - desiredError: fmt.Errorf("heapster metric not defined for "), - reportedMetricsPoints: [][]metricPoint{{{4000, 4}}}, - } - tc.runTest(t) -} - func TestCPUMissingMetrics(t *testing.T) { tc := testCase{ replicas: 3, - targetResource: api.ResourceCPU, + targetResource: "cpu-usage", desiredError: fmt.Errorf("metrics obtained for 1/3 of pods"), reportedMetricsPoints: [][]metricPoint{{{4000, 4}}}, } tc.runTest(t) } -func TestMemoryMissingMetrics(t *testing.T) { +func TestQpsMissingMetrics(t *testing.T) { tc := testCase{ replicas: 3, - targetResource: api.ResourceMemory, + targetResource: "qps", desiredError: fmt.Errorf("metrics obtained for 1/3 of pods"), reportedMetricsPoints: [][]metricPoint{{{4000, 4}}}, } @@ -315,17 +269,17 @@ func TestMemoryMissingMetrics(t *testing.T) { func TestCPUSuperfluousMetrics(t *testing.T) { tc := testCase{ replicas: 3, - targetResource: api.ResourceCPU, + targetResource: "cpu-usage", desiredError: fmt.Errorf("metrics obtained for 6/3 of pods"), reportedMetricsPoints: [][]metricPoint{{{1000, 1}}, {{2000, 4}}, {{2000, 1}}, {{4000, 5}}, {{2000, 1}}, {{4000, 4}}}, } tc.runTest(t) } -func TestMemorySuperfluousMetrics(t *testing.T) { +func TestQpsSuperfluousMetrics(t *testing.T) { tc := testCase{ replicas: 3, - targetResource: api.ResourceMemory, + targetResource: "qps", desiredError: fmt.Errorf("metrics obtained for 6/3 of pods"), reportedMetricsPoints: [][]metricPoint{{{1000, 1}}, {{2000, 4}}, {{2000, 1}}, {{4000, 5}}, {{2000, 1}}, {{4000, 4}}}, } @@ -335,17 +289,7 @@ func TestMemorySuperfluousMetrics(t *testing.T) { func TestCPUEmptyMetrics(t *testing.T) { tc := testCase{ replicas: 3, - targetResource: api.ResourceCPU, - desiredError: fmt.Errorf("metrics obtained for 0/3 of pods"), - reportedMetricsPoints: [][]metricPoint{}, - } - tc.runTest(t) -} - -func TestMemoryEmptyMetrics(t *testing.T) { - tc := testCase{ - replicas: 3, - targetResource: api.ResourceMemory, + targetResource: "cpu-usage", desiredError: fmt.Errorf("metrics obtained for 0/3 of pods"), reportedMetricsPoints: [][]metricPoint{}, } @@ -355,37 +299,17 @@ func TestMemoryEmptyMetrics(t *testing.T) { func TestCPUZeroReplicas(t *testing.T) { tc := testCase{ replicas: 0, - targetResource: api.ResourceCPU, + targetResource: "cpu-usage", desiredError: fmt.Errorf("some pods do not have request for cpu"), reportedMetricsPoints: [][]metricPoint{}, } tc.runTest(t) } -func TestMemoryZeroReplicas(t *testing.T) { - tc := testCase{ - replicas: 0, - targetResource: api.ResourceMemory, - desiredError: fmt.Errorf("some pods do not have request for memory"), - reportedMetricsPoints: [][]metricPoint{}, - } - tc.runTest(t) -} - func TestCPUEmptyMetricsForOnePod(t *testing.T) { tc := testCase{ replicas: 3, - targetResource: api.ResourceCPU, - desiredError: fmt.Errorf("metrics obtained for 2/3 of pods"), - reportedMetricsPoints: [][]metricPoint{{}, {{100, 1}}, {{400, 2}, {300, 3}}}, - } - tc.runTest(t) -} - -func TestMemoryEmptyMetricsForOnePod(t *testing.T) { - tc := testCase{ - replicas: 3, - targetResource: api.ResourceMemory, + targetResource: "cpu-usage", desiredError: fmt.Errorf("metrics obtained for 2/3 of pods"), reportedMetricsPoints: [][]metricPoint{{}, {{100, 1}}, {{400, 2}, {300, 3}}}, }