diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index ae13517a9cd..ce1161c27d6 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -36,6 +36,7 @@ import ( clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller/autoscaler" + "k8s.io/kubernetes/pkg/controller/autoscaler/metrics" "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/controller/namespace" "k8s.io/kubernetes/pkg/controller/node" @@ -276,7 +277,8 @@ func (s *CMServer) Run(_ []string) error { if err != nil { glog.Fatalf("Invalid API configuration: %v", err) } - horizontalPodAutoscalerController := autoscalercontroller.New(kubeClient, expClient) + horizontalPodAutoscalerController := autoscalercontroller.New(kubeClient, expClient, + metrics.NewHeapsterMetricsClient(kubeClient)) horizontalPodAutoscalerController.Run(s.HorizontalPodAutoscalerSyncPeriod) } diff --git a/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go b/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go index e376c92d249..a3cce83833b 100644 --- a/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go +++ b/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go @@ -17,64 +17,33 @@ limitations under the License. package autoscalercontroller import ( - "encoding/json" "fmt" - "strings" "time" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/resource" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller/autoscaler/metrics" "k8s.io/kubernetes/pkg/expapi" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util" - - heapster "k8s.io/heapster/api/v1/types" -) - -const ( - heapsterNamespace = "kube-system" - heapsterService = "monitoring-heapster" ) type HorizontalPodAutoscalerController struct { - client client.Interface - expClient client.ExperimentalInterface + client client.Interface + expClient client.ExperimentalInterface + metricsClient metrics.MetricsClient } -// Aggregates results into ResourceConsumption. Also returns number of -// pods included in the aggregation. -type metricAggregator func(heapster.MetricResultList) (expapi.ResourceConsumption, int) - -type metricDefinition struct { - name string - aggregator metricAggregator -} - -var resourceDefinitions = map[api.ResourceName]metricDefinition{ - //TODO: add memory - api.ResourceCPU: {"cpu-usage", - func(metrics heapster.MetricResultList) (expapi.ResourceConsumption, int) { - sum, count := calculateSumFromLatestSample(metrics) - value := "0" - if count > 0 { - // assumes that cpu usage is in millis - value = fmt.Sprintf("%dm", sum/uint64(count)) - } - return expapi.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count - }}, -} - -var heapsterQueryStart, _ = time.ParseDuration("-5m") var downscaleForbiddenWindow, _ = time.ParseDuration("20m") var upscaleForbiddenWindow, _ = time.ParseDuration("3m") -func New(client client.Interface, expClient client.ExperimentalInterface) *HorizontalPodAutoscalerController { +func New(client client.Interface, expClient client.ExperimentalInterface, metricsClient metrics.MetricsClient) *HorizontalPodAutoscalerController { return &HorizontalPodAutoscalerController{ - client: client, - expClient: expClient, + client: client, + expClient: expClient, + metricsClient: metricsClient, } } @@ -100,57 +69,18 @@ func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error { glog.Warningf("Failed to query scale subresource for %s: %v", reference, err) continue } - podList, err := a.client.Pods(hpa.Spec.ScaleRef.Namespace). - List(labels.SelectorFromSet(labels.Set(scale.Status.Selector)), fields.Everything()) + currentReplicas := scale.Status.Replicas + currentConsumption, err := a.metricsClient.ResourceConsumption(hpa.Spec.ScaleRef.Namespace).Get(hpa.Spec.Target.Resource, + scale.Status.Selector) + // TODO: what to do on partial errors (like metrics obtained for 75% of pods). if err != nil { - glog.Warningf("Failed to get pod list for %s: %v", reference, err) - continue - } - podNames := []string{} - for _, pod := range podList.Items { - podNames = append(podNames, pod.Name) - } - - metricSpec, metricDefined := resourceDefinitions[hpa.Spec.Target.Resource] - if !metricDefined { - glog.Warningf("Heapster metric not defined for %s %v", reference, hpa.Spec.Target.Resource) - continue - } - now := time.Now() - - startTime := now.Add(heapsterQueryStart) - metricPath := fmt.Sprintf("/api/v1/model/namespaces/%s/pod-list/%s/metrics/%s", - hpa.Spec.ScaleRef.Namespace, - strings.Join(podNames, ","), - metricSpec.name) - - resultRaw, err := a.client.Services(heapsterNamespace). - ProxyGet(heapsterService, metricPath, map[string]string{"start": startTime.Format(time.RFC3339)}). - DoRaw() - - if err != nil { - glog.Warningf("Failed to get pods metrics for %s: %v", reference, err) - continue - } - - var metrics heapster.MetricResultList - err = json.Unmarshal(resultRaw, &metrics) - if err != nil { - glog.Warningf("Failed to unmarshall heapster response: %v", err) - continue - } - - glog.Infof("Metrics available for %s: %s", reference, string(resultRaw)) - - currentConsumption, count := metricSpec.aggregator(metrics) - if count != len(podList.Items) { - glog.Warningf("Metrics obtained for %d/%d of pods", count, len(podList.Items)) + glog.Warningf("Error while getting metrics for %s: %v", reference, err) continue } // if the ratio is 1.2 we want to have 2 replicas - desiredReplicas := 1 + int((currentConsumption.Quantity.MilliValue()*int64(count))/hpa.Spec.Target.Quantity.MilliValue()) + desiredReplicas := 1 + int((currentConsumption.Quantity.MilliValue()*int64(currentReplicas))/hpa.Spec.Target.Quantity.MilliValue()) if desiredReplicas < hpa.Spec.MinCount { desiredReplicas = hpa.Spec.MinCount @@ -158,18 +88,17 @@ func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error { if desiredReplicas > hpa.Spec.MaxCount { desiredReplicas = hpa.Spec.MaxCount } - + now := time.Now() rescale := false - - if desiredReplicas != count { + if desiredReplicas != currentReplicas { // Going down - if desiredReplicas < count && (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil || + if desiredReplicas < currentReplicas && (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil || hpa.Status.LastScaleTimestamp.Add(downscaleForbiddenWindow).Before(now)) { rescale = true } // Going up - if desiredReplicas > count && (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil || + if desiredReplicas > currentReplicas && (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil || hpa.Status.LastScaleTimestamp.Add(upscaleForbiddenWindow).Before(now)) { rescale = true } @@ -185,9 +114,9 @@ func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error { } status := expapi.HorizontalPodAutoscalerStatus{ - CurrentReplicas: count, + CurrentReplicas: currentReplicas, DesiredReplicas: desiredReplicas, - CurrentConsumption: ¤tConsumption, + CurrentConsumption: currentConsumption, } hpa.Status = &status if rescale { @@ -203,22 +132,3 @@ func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error { } return nil } - -func calculateSumFromLatestSample(metrics heapster.MetricResultList) (uint64, int) { - sum := uint64(0) - count := 0 - for _, metrics := range metrics.Items { - var newest *heapster.MetricPoint - newest = nil - for _, metricPoint := range metrics.Metrics { - if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) { - newest = &metricPoint - } - } - if newest != nil { - sum += newest.Value - count++ - } - } - return sum, count -} diff --git a/pkg/controller/autoscaler/horizontalpodautoscaler_controller_test.go b/pkg/controller/autoscaler/horizontalpodautoscaler_controller_test.go index 3049edca0df..276dcff2c84 100644 --- a/pkg/controller/autoscaler/horizontalpodautoscaler_controller_test.go +++ b/pkg/controller/autoscaler/horizontalpodautoscaler_controller_test.go @@ -17,39 +17,33 @@ limitations under the License. package autoscalercontroller import ( - "encoding/json" "fmt" "net/http" "net/http/httptest" "testing" - "time" "k8s.io/kubernetes/pkg/api" _ "k8s.io/kubernetes/pkg/api/latest" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/testapi" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller/autoscaler/metrics" "k8s.io/kubernetes/pkg/expapi" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" "github.com/golang/glog" "github.com/stretchr/testify/assert" - - heapster "k8s.io/heapster/api/v1/types" ) const ( namespace = api.NamespaceDefault rcName = "app-rc" podNameLabel = "app" - podName = "p1" hpaName = "foo" hpaListHandler = "HpaList" scaleHandler = "Scale" - podListHandler = "PodList" - heapsterHandler = "Heapster" updateHpaHandler = "HpaUpdate" ) @@ -58,6 +52,26 @@ type serverResponse struct { obj interface{} } +type fakeMetricsClient struct { + consumption metrics.ResourceConsumptionClient +} + +type fakeResourceConsumptionClient struct { + metrics map[api.ResourceName]expapi.ResourceConsumption +} + +func (f *fakeMetricsClient) ResourceConsumption(namespace string) metrics.ResourceConsumptionClient { + return f.consumption +} + +func (f *fakeResourceConsumptionClient) Get(resource api.ResourceName, selector map[string]string) (*expapi.ResourceConsumption, error) { + consumption, found := f.metrics[resource] + if !found { + return nil, fmt.Errorf("resource not found: %v", resource) + } + return &consumption, nil +} + func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httptest.Server, map[string]*util.FakeHandler) { handlers := map[string]*util.FakeHandler{} @@ -73,16 +87,6 @@ func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httpte return &handler } - mkRawHandler := func(url string, response serverResponse) *util.FakeHandler { - handler := util.FakeHandler{ - StatusCode: response.statusCode, - ResponseBody: *response.obj.(*string), - } - mux.Handle(url, &handler) - glog.Infof("Will handle %s", url) - return &handler - } - if responses[hpaListHandler] != nil { handlers[hpaListHandler] = mkHandler("/experimental/v1/horizontalpodautoscalers", *responses[hpaListHandler]) } @@ -92,16 +96,6 @@ func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httpte fmt.Sprintf("/experimental/v1/namespaces/%s/replicationcontrollers/%s/scale", namespace, rcName), *responses[scaleHandler]) } - if responses[podListHandler] != nil { - handlers[podListHandler] = mkHandler(fmt.Sprintf("/api/v1/namespaces/%s/pods", namespace), *responses[podListHandler]) - } - - if responses[heapsterHandler] != nil { - handlers[heapsterHandler] = mkRawHandler( - fmt.Sprintf("/api/v1/proxy/namespaces/kube-system/services/monitoring-heapster/api/v1/model/namespaces/%s/pod-list/%s/metrics/cpu-usage", - namespace, podName), *responses[heapsterHandler]) - } - if responses[updateHpaHandler] != nil { handlers[updateHpaHandler] = mkHandler(fmt.Sprintf("/experimental/v1/namespaces/%s/horizontalpodautoscalers/%s", namespace, hpaName), *responses[updateHpaHandler]) @@ -150,21 +144,6 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { }, }} - podListResponse := serverResponse{http.StatusOK, &api.PodList{ - Items: []api.Pod{ - { - ObjectMeta: api.ObjectMeta{ - Name: podName, - Namespace: namespace, - }, - }}}} - timestamp := time.Now() - metrics := heapster.MetricResultList{ - Items: []heapster.MetricResult{{ - Metrics: []heapster.MetricPoint{{timestamp, 650}}, - LatestTimestamp: timestamp, - }}} - status := expapi.HorizontalPodAutoscalerStatus{ CurrentReplicas: 1, DesiredReplicas: 3, @@ -189,16 +168,10 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { Status: &status, }} - heapsterRawResponse, _ := json.Marshal(&metrics) - heapsterStrResponse := string(heapsterRawResponse) - heapsterResponse := serverResponse{http.StatusOK, &heapsterStrResponse} - testServer, handlers := makeTestServer(t, map[string]*serverResponse{ hpaListHandler: &hpaResponse, scaleHandler: &scaleResponse, - podListHandler: &podListResponse, - heapsterHandler: &heapsterResponse, updateHpaHandler: &updateHpaResponse, }) @@ -206,7 +179,13 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { kubeClient := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) expClient := client.NewExperimentalOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - hpaController := New(kubeClient, expClient) + fakeRC := fakeResourceConsumptionClient{metrics: map[api.ResourceName]expapi.ResourceConsumption{ + api.ResourceCPU: {Resource: api.ResourceCPU, Quantity: resource.MustParse("650m")}, + }} + fake := fakeMetricsClient{consumption: &fakeRC} + + hpaController := New(kubeClient, expClient, &fake) + err := hpaController.reconcileAutoscalers() if err != nil { t.Fatal("Failed to reconcile: %v", err) diff --git a/pkg/controller/autoscaler/metrics/metrics_client.go b/pkg/controller/autoscaler/metrics/metrics_client.go new file mode 100644 index 00000000000..b061a0c209c --- /dev/null +++ b/pkg/controller/autoscaler/metrics/metrics_client.go @@ -0,0 +1,168 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/expapi" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + + heapster "k8s.io/heapster/api/v1/types" +) + +const ( + heapsterNamespace = "kube-system" + heapsterService = "monitoring-heapster" +) + +var heapsterQueryStart, _ = time.ParseDuration("-5m") + +// An interface for getting metrics for pods. +type MetricsClient interface { + ResourceConsumption(namespace string) ResourceConsumptionClient +} + +type ResourceConsumptionClient interface { + // Gets average resource consumption for pods under the given selector. + Get(resourceName api.ResourceName, selector map[string]string) (*expapi.ResourceConsumption, error) +} + +// Aggregates results into ResourceConsumption. Also returns number of +// pods included in the aggregation. +type metricAggregator func(heapster.MetricResultList) (expapi.ResourceConsumption, int) + +type metricDefinition struct { + name string + aggregator metricAggregator +} + +// Heapster-based implementation of MetricsClient +type HeapsterMetricsClient struct { + client client.Interface +} + +type HeapsterResourceConsumptionClient struct { + namespace string + client client.Interface + resourceDefinitions map[api.ResourceName]metricDefinition +} + +func NewHeapsterMetricsClient(client client.Interface) *HeapsterMetricsClient { + return &HeapsterMetricsClient{client: client} +} + +var heapsterMetricDefinitions = map[api.ResourceName]metricDefinition{ + //TODO: add memory + api.ResourceCPU: {"cpu-usage", + func(metrics heapster.MetricResultList) (expapi.ResourceConsumption, int) { + sum, count := calculateSumFromLatestSample(metrics) + value := "0" + if count > 0 { + // assumes that cpu usage is in millis + value = fmt.Sprintf("%dm", sum/uint64(count)) + } + return expapi.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count + }}, +} + +func (h *HeapsterMetricsClient) ResourceConsumption(namespace string) ResourceConsumptionClient { + return &HeapsterResourceConsumptionClient{ + namespace: namespace, + client: h.client, + resourceDefinitions: heapsterMetricDefinitions, + } +} + +func (h *HeapsterResourceConsumptionClient) Get(resourceName api.ResourceName, selector map[string]string) (*expapi.ResourceConsumption, error) { + podList, err := h.client.Pods(h.namespace). + List(labels.SelectorFromSet(labels.Set(selector)), fields.Everything()) + + if err != nil { + return nil, fmt.Errorf("failed to get pod list: %v", err) + } + podNames := []string{} + for _, pod := range podList.Items { + podNames = append(podNames, pod.Name) + } + return h.getForPods(resourceName, podNames) +} + +func (h *HeapsterResourceConsumptionClient) getForPods(resourceName api.ResourceName, podNames []string) (*expapi.ResourceConsumption, error) { + + metricSpec, metricDefined := h.resourceDefinitions[resourceName] + if !metricDefined { + return nil, fmt.Errorf("heapster metric not defined for %v", resourceName) + } + now := time.Now() + + startTime := now.Add(heapsterQueryStart) + metricPath := fmt.Sprintf("/api/v1/model/namespaces/%s/pod-list/%s/metrics/%s", + h.namespace, + strings.Join(podNames, ","), + metricSpec.name) + + resultRaw, err := h.client.Services(heapsterNamespace). + ProxyGet(heapsterService, metricPath, map[string]string{"start": startTime.Format(time.RFC3339)}). + DoRaw() + + if err != nil { + return nil, fmt.Errorf("failed to get pods metrics: %v", err) + } + + var metrics heapster.MetricResultList + err = json.Unmarshal(resultRaw, &metrics) + if err != nil { + return nil, fmt.Errorf("failed to unmarshall heapster response: %v", err) + } + + glog.Infof("Metrics available: %s", string(resultRaw)) + + currentConsumption, count := metricSpec.aggregator(metrics) + if count != len(podNames) { + return nil, fmt.Errorf("metrics obtained for %d/%d of pods", count, len(podNames)) + } + + return ¤tConsumption, nil +} + +func calculateSumFromLatestSample(metrics heapster.MetricResultList) (uint64, int) { + sum := uint64(0) + count := 0 + for _, metrics := range metrics.Items { + var newest *heapster.MetricPoint + newest = nil + for _, metricPoint := range metrics.Metrics { + if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) { + newest = &metricPoint + } + } + if newest != nil { + sum += newest.Value + count++ + } + } + return sum, count +} diff --git a/pkg/controller/autoscaler/metrics/metrics_client_test.go b/pkg/controller/autoscaler/metrics/metrics_client_test.go new file mode 100644 index 00000000000..22277e8806a --- /dev/null +++ b/pkg/controller/autoscaler/metrics/metrics_client_test.go @@ -0,0 +1,131 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + _ "k8s.io/kubernetes/pkg/api/latest" + "k8s.io/kubernetes/pkg/api/testapi" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" + + heapster "k8s.io/heapster/api/v1/types" + + "github.com/golang/glog" + "github.com/stretchr/testify/assert" +) + +const ( + namespace = "test-namespace" + podName = "pod1" + podListHandler = "podlisthandler" + heapsterHandler = "heapsterhandler" +) + +type serverResponse struct { + statusCode int + obj interface{} +} + +func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httptest.Server, map[string]*util.FakeHandler) { + + handlers := map[string]*util.FakeHandler{} + mux := http.NewServeMux() + + mkHandler := func(url string, response serverResponse) *util.FakeHandler { + handler := util.FakeHandler{ + StatusCode: response.statusCode, + ResponseBody: runtime.EncodeOrDie(testapi.Codec(), response.obj.(runtime.Object)), + } + mux.Handle(url, &handler) + glog.Infof("Will handle %s", url) + return &handler + } + + mkRawHandler := func(url string, response serverResponse) *util.FakeHandler { + handler := util.FakeHandler{ + StatusCode: response.statusCode, + ResponseBody: *response.obj.(*string), + } + mux.Handle(url, &handler) + glog.Infof("Will handle %s", url) + return &handler + } + + if responses[podListHandler] != nil { + handlers[podListHandler] = mkHandler(fmt.Sprintf("/api/v1/namespaces/%s/pods", namespace), *responses[podListHandler]) + } + + if responses[heapsterHandler] != nil { + handlers[heapsterHandler] = mkRawHandler( + fmt.Sprintf("/api/v1/proxy/namespaces/kube-system/services/monitoring-heapster/api/v1/model/namespaces/%s/pod-list/%s/metrics/cpu-usage", + namespace, podName), *responses[heapsterHandler]) + } + + mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { + t.Errorf("unexpected request: %v", req.RequestURI) + res.WriteHeader(http.StatusNotFound) + }) + return httptest.NewServer(mux), handlers +} + +func TestHeapsterResourceConsumptionGet(t *testing.T) { + + podListResponse := serverResponse{http.StatusOK, &api.PodList{ + Items: []api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + Name: podName, + Namespace: namespace, + }, + }}}} + + timestamp := time.Now() + metrics := heapster.MetricResultList{ + Items: []heapster.MetricResult{{ + Metrics: []heapster.MetricPoint{{timestamp, 650}}, + LatestTimestamp: timestamp, + }}} + heapsterRawResponse, _ := json.Marshal(&metrics) + heapsterStrResponse := string(heapsterRawResponse) + heapsterResponse := serverResponse{http.StatusOK, &heapsterStrResponse} + + testServer, _ := makeTestServer(t, + map[string]*serverResponse{ + heapsterHandler: &heapsterResponse, + podListHandler: &podListResponse, + }) + + defer testServer.Close() + kubeClient := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + + metricsClient := NewHeapsterMetricsClient(kubeClient) + + val, err := metricsClient.ResourceConsumption(namespace).Get(api.ResourceCPU, map[string]string{"app": "test"}) + if err != nil { + t.Fatalf("Error while getting consumption: %v", err) + } + assert.Equal(t, int64(650), val.Quantity.MilliValue()) +}