diff --git a/pkg/controller/podautoscaler/metrics/metrics_client.go b/pkg/controller/podautoscaler/metrics/metrics_client.go index 7f631309e43..5d2f4309f15 100644 --- a/pkg/controller/podautoscaler/metrics/metrics_client.go +++ b/pkg/controller/podautoscaler/metrics/metrics_client.go @@ -161,9 +161,9 @@ func calculateSumFromLatestSample(metrics heapster.MetricResultList) (uint64, in for _, metrics := range metrics.Items { var newest *heapster.MetricPoint newest = nil - for _, metricPoint := range metrics.Metrics { + for i, metricPoint := range metrics.Metrics { if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) { - newest = &metricPoint + newest = &metrics.Metrics[i] } } if newest != nil { diff --git a/pkg/controller/podautoscaler/metrics/metrics_client_test.go b/pkg/controller/podautoscaler/metrics/metrics_client_test.go index 38211856c14..478d2a65699 100644 --- a/pkg/controller/podautoscaler/metrics/metrics_client_test.go +++ b/pkg/controller/podautoscaler/metrics/metrics_client_test.go @@ -19,136 +19,350 @@ package metrics import ( "encoding/json" "fmt" - "net/http" - "net/http/httptest" + "io" "testing" "time" "k8s.io/kubernetes/pkg/api" _ "k8s.io/kubernetes/pkg/api/latest" - "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/apis/experimental" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" heapster "k8s.io/heapster/api/v1/types" - "github.com/golang/glog" "github.com/stretchr/testify/assert" ) -const ( - namespace = "test-namespace" - podName = "pod1" - podListHandler = "podlisthandler" - heapsterCpuHandler = "heapstercpuhandler" - heapsterMemHandler = "heapstermemhandler" - cpu = 650 - memory = 20000000 -) - -type serverResponse struct { - statusCode int - obj interface{} +func (w fakeResponseWrapper) DoRaw() ([]byte, error) { + return w.raw, nil } -func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httptest.Server, map[string]*util.FakeHandler) { - handlers := map[string]*util.FakeHandler{} - mux := http.NewServeMux() - - mkHandler := func(url string, response serverResponse) *util.FakeHandler { - handler := util.FakeHandler{ - StatusCode: response.statusCode, - ResponseBody: runtime.EncodeOrDie(testapi.Experimental.Codec(), response.obj.(runtime.Object)), - } - mux.Handle(url, &handler) - glog.Infof("Will handle %s", url) - return &handler - } - - mkRawHandler := func(url string, response serverResponse) *util.FakeHandler { - handler := util.FakeHandler{ - StatusCode: response.statusCode, - ResponseBody: *response.obj.(*string), - } - mux.Handle(url, &handler) - glog.Infof("Will handle %s", url) - return &handler - } - - if responses[podListHandler] != nil { - handlers[podListHandler] = mkHandler(fmt.Sprintf("/api/v1/namespaces/%s/pods", namespace), *responses[podListHandler]) - } - - if responses[heapsterCpuHandler] != nil { - handlers[heapsterCpuHandler] = mkRawHandler( - fmt.Sprintf("/api/v1/proxy/namespaces/kube-system/services/monitoring-heapster/api/v1/model/namespaces/%s/pod-list/%s/metrics/cpu-usage", - namespace, podName), *responses[heapsterCpuHandler]) - } - - if responses[heapsterMemHandler] != nil { - handlers[heapsterMemHandler] = mkRawHandler( - fmt.Sprintf("/api/v1/proxy/namespaces/kube-system/services/monitoring-heapster/api/v1/model/namespaces/%s/pod-list/%s/metrics/memory-usage", - namespace, podName), *responses[heapsterMemHandler]) - } - - mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { - t.Errorf("unexpected request: %v", req.RequestURI) - res.WriteHeader(http.StatusNotFound) - }) - return httptest.NewServer(mux), handlers +func (w fakeResponseWrapper) Stream() (io.ReadCloser, error) { + return nil, nil } -func TestHeapsterResourceConsumptionGet(t *testing.T) { - podListResponse := serverResponse{http.StatusOK, &api.PodList{ - Items: []api.Pod{ - { +func newFakeResponseWrapper(raw []byte) fakeResponseWrapper { + return fakeResponseWrapper{raw: raw} +} + +type fakeResponseWrapper struct { + raw []byte +} + +// timestamp is used for establishing order on metricPoints +type metricPoint struct { + level uint64 + timestamp int +} + +type testCase struct { + replicas int + desiredValue int64 + desiredError error + targetResource api.ResourceName + reportedMetricsPoints [][]metricPoint + namespace string + selector map[string]string +} + +func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake { + namespace := "test-namespace" + tc.namespace = namespace + podNamePrefix := "test-pod" + selector := map[string]string{"name": podNamePrefix} + tc.selector = selector + + fakeClient := &testclient.Fake{} + + fakeClient.AddReactor("list", "pods", func(action testclient.Action) (handled bool, ret runtime.Object, err error) { + obj := &api.PodList{} + for i := 0; i < tc.replicas; i++ { + podName := fmt.Sprintf("%s-%d", podNamePrefix, i) + pod := api.Pod{ + Status: api.PodStatus{ + Phase: api.PodRunning, + }, ObjectMeta: api.ObjectMeta{ Name: podName, Namespace: namespace, + Labels: selector, }, - }}}} + } + obj.Items = append(obj.Items, pod) + } + return true, obj, nil + }) - timestamp := time.Now() - metricsCpu := heapster.MetricResultList{ - Items: []heapster.MetricResult{{ - Metrics: []heapster.MetricPoint{{timestamp, cpu}}, - LatestTimestamp: timestamp, - }}} - heapsterRawCpuResponse, _ := json.Marshal(&metricsCpu) - heapsterStrCpuResponse := string(heapsterRawCpuResponse) - heapsterCpuResponse := serverResponse{http.StatusOK, &heapsterStrCpuResponse} + fakeClient.AddProxyReactor("services", func(action testclient.Action) (handled bool, ret client.ResponseWrapper, err error) { + metrics := heapster.MetricResultList{} + firstTimestamp := time.Now() + var latestTimestamp time.Time + for _, reportedMetricPoints := range tc.reportedMetricsPoints { + var heapsterMetricPoints []heapster.MetricPoint + for _, reportedMetricPoint := range reportedMetricPoints { + timestamp := firstTimestamp.Add(time.Duration(reportedMetricPoint.timestamp) * time.Minute) + if latestTimestamp.Before(timestamp) { + latestTimestamp = timestamp + } + heapsterMetricPoint := heapster.MetricPoint{timestamp, reportedMetricPoint.level} + heapsterMetricPoints = append(heapsterMetricPoints, heapsterMetricPoint) + } + metric := heapster.MetricResult{ + Metrics: heapsterMetricPoints, + LatestTimestamp: latestTimestamp, + } + metrics.Items = append(metrics.Items, metric) + } + heapsterRawMemResponse, _ := json.Marshal(&metrics) + return true, newFakeResponseWrapper(heapsterRawMemResponse), nil + }) - metricsMem := heapster.MetricResultList{ - Items: []heapster.MetricResult{{ - Metrics: []heapster.MetricPoint{{timestamp, memory}}, - LatestTimestamp: timestamp, - }}} - heapsterRawMemResponse, _ := json.Marshal(&metricsMem) - heapsterStrMemResponse := string(heapsterRawMemResponse) - heapsterMemResponse := serverResponse{http.StatusOK, &heapsterStrMemResponse} - - testServer, _ := makeTestServer(t, - map[string]*serverResponse{ - heapsterCpuHandler: &heapsterCpuResponse, - heapsterMemHandler: &heapsterMemResponse, - podListHandler: &podListResponse, - }) - - defer testServer.Close() - kubeClient := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Experimental.Version()}) - - metricsClient := NewHeapsterMetricsClient(kubeClient) - - val, err := metricsClient.ResourceConsumption(namespace).Get(api.ResourceCPU, map[string]string{"app": "test"}) - if err != nil { - t.Fatalf("Error while getting consumption: %v", err) - } - assert.Equal(t, int64(cpu), val.Quantity.MilliValue()) - - val, err = metricsClient.ResourceConsumption(namespace).Get(api.ResourceMemory, map[string]string{"app": "test"}) - if err != nil { - t.Fatalf("Error while getting consumption: %v", err) - } - assert.Equal(t, int64(memory), val.Quantity.Value()) + return fakeClient +} + +func (tc *testCase) verifyResults(t *testing.T, val *experimental.ResourceConsumption, err error) { + assert.Equal(t, tc.desiredError, err) + if tc.desiredError != nil { + return + } + if tc.targetResource == api.ResourceCPU { + assert.Equal(t, tc.desiredValue, val.Quantity.MilliValue()) + } + if tc.targetResource == api.ResourceMemory { + assert.Equal(t, tc.desiredValue, val.Quantity.Value()) + } +} + +func (tc *testCase) runTest(t *testing.T) { + testClient := tc.prepareTestClient(t) + metricsClient := NewHeapsterMetricsClient(testClient) + val, err := metricsClient.ResourceConsumption(tc.namespace).Get(tc.targetResource, tc.selector) + tc.verifyResults(t, val, err) +} + +func TestCPU(t *testing.T) { + tc := testCase{ + replicas: 3, + desiredValue: 5000, + targetResource: api.ResourceCPU, + reportedMetricsPoints: [][]metricPoint{{{5000, 1}}, {{5000, 1}}, {{5000, 1}}}, + } + tc.runTest(t) +} + +func TestMemory(t *testing.T) { + tc := testCase{ + replicas: 3, + desiredValue: 5000, + targetResource: api.ResourceMemory, + reportedMetricsPoints: [][]metricPoint{{{5000, 1}}, {{5000, 2}}, {{5000, 4}}}, + } + tc.runTest(t) +} + +func TestCPUSumEqualZero(t *testing.T) { + tc := testCase{ + replicas: 3, + desiredValue: 0, + targetResource: api.ResourceCPU, + reportedMetricsPoints: [][]metricPoint{{{0, 0}}, {{0, 0}}, {{0, 0}}}, + } + tc.runTest(t) +} + +func TestMemorySumEqualZero(t *testing.T) { + tc := testCase{ + replicas: 3, + desiredValue: 0, + targetResource: api.ResourceMemory, + reportedMetricsPoints: [][]metricPoint{{{0, 0}}, {{0, 0}}, {{0, 0}}}, + } + tc.runTest(t) +} + +func TestCPUMoreMetrics(t *testing.T) { + tc := testCase{ + replicas: 5, + desiredValue: 5000, + targetResource: api.ResourceCPU, + reportedMetricsPoints: [][]metricPoint{ + {{0, 3}, {0, 6}, {5, 4}, {9000, 10}}, + {{5000, 2}, {10, 5}, {66, 1}, {0, 10}}, + {{5000, 3}, {80, 5}, {6000, 10}}, + {{5000, 3}, {40, 3}, {0, 9}, {200, 2}, {8000, 10}}, + {{5000, 2}, {20, 2}, {2000, 10}}}, + } + tc.runTest(t) +} + +func TestMemoryMoreMetrics(t *testing.T) { + tc := testCase{ + replicas: 5, + desiredValue: 5000, + targetResource: api.ResourceMemory, + reportedMetricsPoints: [][]metricPoint{ + {{0, 3}, {0, 6}, {5, 4}, {9000, 10}}, + {{5000, 2}, {10, 5}, {66, 1}, {0, 10}}, + {{5000, 3}, {80, 5}, {6000, 10}}, + {{5000, 3}, {40, 3}, {0, 9}, {200, 2}, {8000, 10}}, + {{5000, 2}, {20, 2}, {2000, 10}}}, + } + tc.runTest(t) +} + +func TestCPUResultIsFloat(t *testing.T) { + tc := testCase{ + replicas: 6, + desiredValue: 4783, + targetResource: api.ResourceCPU, + reportedMetricsPoints: [][]metricPoint{{{4000, 4}}, {{9500, 4}}, {{3000, 4}}, {{7000, 4}}, {{3200, 4}}, {{2000, 4}}}, + } + tc.runTest(t) +} + +func TestMemoryResultIsFloat(t *testing.T) { + tc := testCase{ + replicas: 6, + desiredValue: 4783, + targetResource: api.ResourceMemory, + reportedMetricsPoints: [][]metricPoint{{{4000, 4}}, {{9500, 4}}, {{3000, 4}}, {{7000, 4}}, {{3200, 4}}, {{2000, 4}}}, + } + tc.runTest(t) +} + +func TestCPUSamplesWithRandomTimestamps(t *testing.T) { + tc := testCase{ + replicas: 3, + desiredValue: 3000, + targetResource: api.ResourceCPU, + reportedMetricsPoints: [][]metricPoint{ + {{1, 1}, {3000, 3}, {2, 2}}, + {{2, 2}, {1, 1}, {3000, 3}}, + {{3000, 3}, {1, 1}, {2, 2}}}, + } + tc.runTest(t) +} + +func TestMemorySamplesWithRandomTimestamps(t *testing.T) { + tc := testCase{ + replicas: 3, + desiredValue: 3000, + targetResource: api.ResourceMemory, + reportedMetricsPoints: [][]metricPoint{ + {{1, 1}, {3000, 3}, {2, 2}}, + {{2, 2}, {1, 1}, {3000, 3}}, + {{3000, 3}, {1, 1}, {2, 2}}}, + } + tc.runTest(t) +} + +func TestErrorMetricNotDefined(t *testing.T) { + tc := testCase{ + replicas: 1, + desiredError: fmt.Errorf("heapster metric not defined for "), + reportedMetricsPoints: [][]metricPoint{{{4000, 4}}}, + } + tc.runTest(t) +} + +func TestCPUMissingMetrics(t *testing.T) { + tc := testCase{ + replicas: 3, + targetResource: api.ResourceCPU, + desiredError: fmt.Errorf("metrics obtained for 1/3 of pods"), + reportedMetricsPoints: [][]metricPoint{{{4000, 4}}}, + } + tc.runTest(t) +} + +func TestMemoryMissingMetrics(t *testing.T) { + tc := testCase{ + replicas: 3, + targetResource: api.ResourceMemory, + desiredError: fmt.Errorf("metrics obtained for 1/3 of pods"), + reportedMetricsPoints: [][]metricPoint{{{4000, 4}}}, + } + tc.runTest(t) +} + +func TestCPUSuperfluousMetrics(t *testing.T) { + tc := testCase{ + replicas: 3, + targetResource: api.ResourceCPU, + desiredError: fmt.Errorf("metrics obtained for 6/3 of pods"), + reportedMetricsPoints: [][]metricPoint{{{1000, 1}}, {{2000, 4}}, {{2000, 1}}, {{4000, 5}}, {{2000, 1}}, {{4000, 4}}}, + } + tc.runTest(t) +} + +func TestMemorySuperfluousMetrics(t *testing.T) { + tc := testCase{ + replicas: 3, + targetResource: api.ResourceMemory, + desiredError: fmt.Errorf("metrics obtained for 6/3 of pods"), + reportedMetricsPoints: [][]metricPoint{{{1000, 1}}, {{2000, 4}}, {{2000, 1}}, {{4000, 5}}, {{2000, 1}}, {{4000, 4}}}, + } + tc.runTest(t) +} + +func TestCPUEmptyMetrics(t *testing.T) { + tc := testCase{ + replicas: 3, + targetResource: api.ResourceCPU, + desiredError: fmt.Errorf("metrics obtained for 0/3 of pods"), + reportedMetricsPoints: [][]metricPoint{}, + } + tc.runTest(t) +} + +func TestMemoryEmptyMetrics(t *testing.T) { + tc := testCase{ + replicas: 3, + targetResource: api.ResourceMemory, + desiredError: fmt.Errorf("metrics obtained for 0/3 of pods"), + reportedMetricsPoints: [][]metricPoint{}, + } + tc.runTest(t) +} + +func TestCPUZeroReplicas(t *testing.T) { + tc := testCase{ + replicas: 0, + targetResource: api.ResourceCPU, + desiredValue: 0, + reportedMetricsPoints: [][]metricPoint{}, + } + tc.runTest(t) +} + +func TestMemoryZeroReplicas(t *testing.T) { + tc := testCase{ + replicas: 0, + targetResource: api.ResourceMemory, + desiredValue: 0, + reportedMetricsPoints: [][]metricPoint{}, + } + tc.runTest(t) +} + +func TestCPUEmptyMetricsForOnePod(t *testing.T) { + tc := testCase{ + replicas: 3, + targetResource: api.ResourceCPU, + desiredError: fmt.Errorf("metrics obtained for 2/3 of pods"), + reportedMetricsPoints: [][]metricPoint{{}, {{100, 1}}, {{400, 2}, {300, 3}}}, + } + tc.runTest(t) +} + +func TestMemoryEmptyMetricsForOnePod(t *testing.T) { + tc := testCase{ + replicas: 3, + targetResource: api.ResourceMemory, + desiredError: fmt.Errorf("metrics obtained for 2/3 of pods"), + reportedMetricsPoints: [][]metricPoint{{}, {{100, 1}}, {{400, 2}, {300, 3}}}, + } + tc.runTest(t) }