Use Metrics API in HPA

This commit is contained in:
Piotr Szczesniak 2016-05-17 21:31:31 +02:00
parent 6addbf2b17
commit 26ad827893
3 changed files with 225 additions and 110 deletions

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
_ "k8s.io/kubernetes/pkg/apimachinery/registered" _ "k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/autoscaling" "k8s.io/kubernetes/pkg/apis/autoscaling"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
@ -41,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
heapster "k8s.io/heapster/metrics/api/v1/types" heapster "k8s.io/heapster/metrics/api/v1/types"
metrics_api "k8s.io/heapster/metrics/apis/metrics/v1alpha1"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -85,6 +87,7 @@ type testCase struct {
statusUpdated bool statusUpdated bool
eventCreated bool eventCreated bool
verifyEvents bool verifyEvents bool
useMetricsApi bool
// Channel with names of HPA objects which we have reconciled. // Channel with names of HPA objects which we have reconciled.
processed chan string processed chan string
@ -278,16 +281,47 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
tc.Lock() tc.Lock()
defer tc.Unlock() defer tc.Unlock()
timestamp := time.Now() var heapsterRawMemResponse []byte
metrics := heapster.MetricResultList{}
for _, level := range tc.reportedLevels { if tc.useMetricsApi {
metric := heapster.MetricResult{ metrics := []*metrics_api.PodMetrics{}
Metrics: []heapster.MetricPoint{{timestamp, level, nil}}, for i, cpu := range tc.reportedLevels {
LatestTimestamp: timestamp, podMetric := &metrics_api.PodMetrics{
ObjectMeta: v1.ObjectMeta{
Name: fmt.Sprintf("%s-%d", podNamePrefix, i),
Namespace: namespace,
},
Timestamp: unversioned.Time{Time: time.Now()},
Containers: []metrics_api.ContainerMetrics{
{
Name: "container",
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(
int64(cpu),
resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(
int64(1024*1024),
resource.BinarySI),
},
},
},
}
metrics = append(metrics, podMetric)
} }
metrics.Items = append(metrics.Items, metric) heapsterRawMemResponse, _ = json.Marshal(&metrics)
} else {
timestamp := time.Now()
metrics := heapster.MetricResultList{}
for _, level := range tc.reportedLevels {
metric := heapster.MetricResult{
Metrics: []heapster.MetricPoint{{timestamp, level, nil}},
LatestTimestamp: timestamp,
}
metrics.Items = append(metrics.Items, metric)
}
heapsterRawMemResponse, _ = json.Marshal(&metrics)
} }
heapsterRawMemResponse, _ := json.Marshal(&metrics)
return true, newFakeResponseWrapper(heapsterRawMemResponse), nil return true, newFakeResponseWrapper(heapsterRawMemResponse), nil
}) })
@ -417,6 +451,7 @@ func TestDefaultScaleUpRC(t *testing.T) {
verifyCPUCurrent: true, verifyCPUCurrent: true,
reportedLevels: []uint64{900, 950, 950, 1000}, reportedLevels: []uint64{900, 950, 950, 1000},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -430,6 +465,7 @@ func TestDefaultScaleUpDeployment(t *testing.T) {
verifyCPUCurrent: true, verifyCPUCurrent: true,
reportedLevels: []uint64{900, 950, 950, 1000}, reportedLevels: []uint64{900, 950, 950, 1000},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
resource: &fakeResource{ resource: &fakeResource{
name: "test-dep", name: "test-dep",
apiVersion: "extensions/v1beta1", apiVersion: "extensions/v1beta1",
@ -448,6 +484,7 @@ func TestDefaultScaleUpReplicaSet(t *testing.T) {
verifyCPUCurrent: true, verifyCPUCurrent: true,
reportedLevels: []uint64{900, 950, 950, 1000}, reportedLevels: []uint64{900, 950, 950, 1000},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
resource: &fakeResource{ resource: &fakeResource{
name: "test-replicaset", name: "test-replicaset",
apiVersion: "extensions/v1beta1", apiVersion: "extensions/v1beta1",
@ -467,6 +504,7 @@ func TestScaleUp(t *testing.T) {
verifyCPUCurrent: true, verifyCPUCurrent: true,
reportedLevels: []uint64{300, 500, 700}, reportedLevels: []uint64{300, 500, 700},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -481,6 +519,7 @@ func TestScaleUpDeployment(t *testing.T) {
verifyCPUCurrent: true, verifyCPUCurrent: true,
reportedLevels: []uint64{300, 500, 700}, reportedLevels: []uint64{300, 500, 700},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
resource: &fakeResource{ resource: &fakeResource{
name: "test-dep", name: "test-dep",
apiVersion: "extensions/v1beta1", apiVersion: "extensions/v1beta1",
@ -500,6 +539,7 @@ func TestScaleUpReplicaSet(t *testing.T) {
verifyCPUCurrent: true, verifyCPUCurrent: true,
reportedLevels: []uint64{300, 500, 700}, reportedLevels: []uint64{300, 500, 700},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
resource: &fakeResource{ resource: &fakeResource{
name: "test-replicaset", name: "test-replicaset",
apiVersion: "extensions/v1beta1", apiVersion: "extensions/v1beta1",
@ -537,6 +577,7 @@ func TestDefaultScaleDown(t *testing.T) {
verifyCPUCurrent: true, verifyCPUCurrent: true,
reportedLevels: []uint64{400, 500, 600, 700, 800}, reportedLevels: []uint64{400, 500, 600, 700, 800},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -551,6 +592,7 @@ func TestScaleDown(t *testing.T) {
verifyCPUCurrent: true, verifyCPUCurrent: true,
reportedLevels: []uint64{100, 300, 500, 250, 250}, reportedLevels: []uint64{100, 300, 500, 250, 250},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -582,6 +624,7 @@ func TestTolerance(t *testing.T) {
CPUTarget: 100, CPUTarget: 100,
reportedLevels: []uint64{1010, 1030, 1020}, reportedLevels: []uint64{1010, 1030, 1020},
reportedCPURequests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")}, reportedCPURequests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -612,6 +655,7 @@ func TestMinReplicas(t *testing.T) {
CPUTarget: 90, CPUTarget: 90,
reportedLevels: []uint64{10, 95, 10}, reportedLevels: []uint64{10, 95, 10},
reportedCPURequests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")}, reportedCPURequests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -625,6 +669,7 @@ func TestZeroReplicas(t *testing.T) {
CPUTarget: 90, CPUTarget: 90,
reportedLevels: []uint64{}, reportedLevels: []uint64{},
reportedCPURequests: []resource.Quantity{}, reportedCPURequests: []resource.Quantity{},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -638,6 +683,7 @@ func TestTooFewReplicas(t *testing.T) {
CPUTarget: 90, CPUTarget: 90,
reportedLevels: []uint64{}, reportedLevels: []uint64{},
reportedCPURequests: []resource.Quantity{}, reportedCPURequests: []resource.Quantity{},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -651,6 +697,7 @@ func TestTooManyReplicas(t *testing.T) {
CPUTarget: 90, CPUTarget: 90,
reportedLevels: []uint64{}, reportedLevels: []uint64{},
reportedCPURequests: []resource.Quantity{}, reportedCPURequests: []resource.Quantity{},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -664,6 +711,7 @@ func TestMaxReplicas(t *testing.T) {
CPUTarget: 90, CPUTarget: 90,
reportedLevels: []uint64{8000, 9500, 1000}, reportedLevels: []uint64{8000, 9500, 1000},
reportedCPURequests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")}, reportedCPURequests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -677,6 +725,7 @@ func TestSuperfluousMetrics(t *testing.T) {
CPUTarget: 100, CPUTarget: 100,
reportedLevels: []uint64{4000, 9500, 3000, 7000, 3200, 2000}, reportedLevels: []uint64{4000, 9500, 3000, 7000, 3200, 2000},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -690,6 +739,7 @@ func TestMissingMetrics(t *testing.T) {
CPUTarget: 100, CPUTarget: 100,
reportedLevels: []uint64{400, 95}, reportedLevels: []uint64{400, 95},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -703,6 +753,7 @@ func TestEmptyMetrics(t *testing.T) {
CPUTarget: 100, CPUTarget: 100,
reportedLevels: []uint64{}, reportedLevels: []uint64{},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -715,6 +766,7 @@ func TestEmptyCPURequest(t *testing.T) {
desiredReplicas: 1, desiredReplicas: 1,
CPUTarget: 100, CPUTarget: 100,
reportedLevels: []uint64{200}, reportedLevels: []uint64{200},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -729,6 +781,7 @@ func TestEventCreated(t *testing.T) {
reportedLevels: []uint64{200}, reportedLevels: []uint64{200},
reportedCPURequests: []resource.Quantity{resource.MustParse("0.2")}, reportedCPURequests: []resource.Quantity{resource.MustParse("0.2")},
verifyEvents: true, verifyEvents: true,
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -743,6 +796,7 @@ func TestEventNotCreated(t *testing.T) {
reportedLevels: []uint64{200, 200}, reportedLevels: []uint64{200, 200},
reportedCPURequests: []resource.Quantity{resource.MustParse("0.4"), resource.MustParse("0.4")}, reportedCPURequests: []resource.Quantity{resource.MustParse("0.4"), resource.MustParse("0.4")},
verifyEvents: true, verifyEvents: true,
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -801,6 +855,7 @@ func TestComputedToleranceAlgImplementation(t *testing.T) {
resource.MustParse(fmt.Sprint(perPodRequested) + "m"), resource.MustParse(fmt.Sprint(perPodRequested) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested) + "m"), resource.MustParse(fmt.Sprint(perPodRequested) + "m"),
}, },
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)

View File

@ -24,10 +24,12 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
heapster "k8s.io/heapster/metrics/api/v1/types" heapster "k8s.io/heapster/metrics/api/v1/types"
metrics_api "k8s.io/heapster/metrics/apis/metrics/v1alpha1"
) )
const ( const (
@ -83,8 +85,6 @@ var averageFunction = func(metrics heapster.MetricResultList) (intAndFloat, int,
return result, count, timestamp return result, count, timestamp
} }
var heapsterCpuUsageMetricDefinition = metricDefinition{"cpu-usage", averageFunction}
func getHeapsterCustomMetricDefinition(metricName string) metricDefinition { func getHeapsterCustomMetricDefinition(metricName string) metricDefinition {
return metricDefinition{"custom/" + metricName, averageFunction} return metricDefinition{"custom/" + metricName, averageFunction}
} }
@ -118,7 +118,7 @@ func (h *HeapsterMetricsClient) GetCpuConsumptionAndRequestInMillis(namespace st
if err != nil { if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("failed to get pod list: %v", err) return 0, 0, time.Time{}, fmt.Errorf("failed to get pod list: %v", err)
} }
podNames := []string{} podNames := map[string]struct{}{}
requestSum := int64(0) requestSum := int64(0)
missing := false missing := false
for _, pod := range podList.Items { for _, pod := range podList.Items {
@ -127,7 +127,7 @@ func (h *HeapsterMetricsClient) GetCpuConsumptionAndRequestInMillis(namespace st
continue continue
} }
podNames = append(podNames, pod.Name) podNames[pod.Name] = struct{}{}
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {
if containerRequest, ok := container.Resources.Requests[api.ResourceCPU]; ok { if containerRequest, ok := container.Resources.Requests[api.ResourceCPU]; ok {
requestSum += containerRequest.MilliValue() requestSum += containerRequest.MilliValue()
@ -145,11 +145,52 @@ func (h *HeapsterMetricsClient) GetCpuConsumptionAndRequestInMillis(namespace st
glog.V(4).Infof("%s %s - sum of CPU requested: %d", namespace, selector, requestSum) glog.V(4).Infof("%s %s - sum of CPU requested: %d", namespace, selector, requestSum)
requestAvg := requestSum / int64(len(podList.Items)) requestAvg := requestSum / int64(len(podList.Items))
// Consumption is already averaged and in millis. // Consumption is already averaged and in millis.
consumption, timestamp, err := h.getForPods(heapsterCpuUsageMetricDefinition, namespace, podNames) consumption, timestamp, err := h.getCpuUtilizationForPods(namespace, selector, podNames)
if err != nil { if err != nil {
return 0, 0, time.Time{}, err return 0, 0, time.Time{}, err
} }
return consumption.intValue, requestAvg, timestamp, nil return consumption, requestAvg, timestamp, nil
}
func (h *HeapsterMetricsClient) getCpuUtilizationForPods(namespace string, selector labels.Selector, podNames map[string]struct{}) (int64, time.Time, error) {
metricPath := fmt.Sprintf("/apis/metrics/v1alpha1/namespaces/%s/pods", namespace)
params := map[string]string{"labelSelector": selector.String()}
resultRaw, err := h.client.Core().Services(h.heapsterNamespace).
ProxyGet(h.heapsterScheme, h.heapsterService, h.heapsterPort, metricPath, params).
DoRaw()
if err != nil {
return 0, time.Time{}, fmt.Errorf("failed to get pods metrics: %v", err)
}
glog.V(4).Infof("Heapster metrics result: %s", string(resultRaw))
metrics := make([]metrics_api.PodMetrics, 0)
err = json.Unmarshal(resultRaw, &metrics)
if err != nil {
return 0, time.Time{}, fmt.Errorf("failed to unmarshall heapster response: %v", err)
}
if len(metrics) != len(podNames) {
return 0, time.Time{}, fmt.Errorf("metrics obtained for %d/%d of pods", len(metrics), len(podNames))
}
sum := int64(0)
for _, m := range metrics {
if _, found := podNames[m.Name]; found {
for _, c := range m.Containers {
cpu, found := c.Usage[v1.ResourceCPU]
if !found {
return 0, time.Time{}, fmt.Errorf("no cpu for container %v in pod %v/%v", c.Name, namespace, m.Name)
}
sum += cpu.MilliValue()
}
} else {
return 0, time.Time{}, fmt.Errorf("not expected metrics for pod %v/%v", namespace, m.Name)
}
}
return sum / int64(len(metrics)), metrics[0].Timestamp.Time, nil
} }
// GetCustomMetric returns the average value of the given custom metric from the // GetCustomMetric returns the average value of the given custom metric from the
@ -174,14 +215,14 @@ func (h *HeapsterMetricsClient) GetCustomMetric(customMetricName string, namespa
return nil, time.Time{}, fmt.Errorf("no running pods") return nil, time.Time{}, fmt.Errorf("no running pods")
} }
value, timestamp, err := h.getForPods(metricSpec, namespace, podNames) value, timestamp, err := h.getCustomMetricForPods(metricSpec, namespace, podNames)
if err != nil { if err != nil {
return nil, time.Time{}, err return nil, time.Time{}, err
} }
return &value.floatValue, timestamp, nil return &value.floatValue, timestamp, nil
} }
func (h *HeapsterMetricsClient) getForPods(metricSpec metricDefinition, namespace string, podNames []string) (*intAndFloat, time.Time, error) { func (h *HeapsterMetricsClient) getCustomMetricForPods(metricSpec metricDefinition, namespace string, podNames []string) (*intAndFloat, time.Time, error) {
now := time.Now() now := time.Now()

View File

@ -25,6 +25,8 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
_ "k8s.io/kubernetes/pkg/apimachinery/registered" _ "k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
@ -33,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
heapster "k8s.io/heapster/metrics/api/v1/types" heapster "k8s.io/heapster/metrics/api/v1/types"
metrics_api "k8s.io/heapster/metrics/apis/metrics/v1alpha1"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -68,9 +71,11 @@ type testCase struct {
targetResource string targetResource string
targetTimestamp int targetTimestamp int
reportedMetricsPoints [][]metricPoint reportedMetricsPoints [][]metricPoint
reportedPodMetrics [][]int64
namespace string namespace string
podListOverride *api.PodList podListOverride *api.PodList
selector labels.Selector selector labels.Selector
useMetricsApi bool
} }
func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset { func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
@ -95,28 +100,61 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
return true, obj, nil return true, obj, nil
}) })
fakeClient.AddProxyReactor("services", func(action core.Action) (handled bool, ret restclient.ResponseWrapper, err error) { if tc.useMetricsApi {
metrics := heapster.MetricResultList{} fakeClient.AddProxyReactor("services", func(action core.Action) (handled bool, ret restclient.ResponseWrapper, err error) {
var latestTimestamp time.Time metrics := []*metrics_api.PodMetrics{}
for _, reportedMetricPoints := range tc.reportedMetricsPoints { for i, containers := range tc.reportedPodMetrics {
var heapsterMetricPoints []heapster.MetricPoint metric := &metrics_api.PodMetrics{
for _, reportedMetricPoint := range reportedMetricPoints { ObjectMeta: v1.ObjectMeta{
timestamp := fixedTimestamp.Add(time.Duration(reportedMetricPoint.timestamp) * time.Minute) Name: fmt.Sprintf("%s-%d", podNamePrefix, i),
if latestTimestamp.Before(timestamp) { Namespace: namespace,
latestTimestamp = timestamp },
Timestamp: unversioned.Time{Time: fixedTimestamp.Add(time.Duration(tc.targetTimestamp) * time.Minute)},
Containers: []metrics_api.ContainerMetrics{},
} }
heapsterMetricPoint := heapster.MetricPoint{Timestamp: timestamp, Value: reportedMetricPoint.level, FloatValue: nil} for j, cpu := range containers {
heapsterMetricPoints = append(heapsterMetricPoints, heapsterMetricPoint) cm := metrics_api.ContainerMetrics{
Name: fmt.Sprintf("%s-%d-container-%d", podNamePrefix, i, j),
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(
cpu,
resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(
int64(1024*1024),
resource.BinarySI),
},
}
metric.Containers = append(metric.Containers, cm)
}
metrics = append(metrics, metric)
} }
metric := heapster.MetricResult{ heapsterRawMemResponse, _ := json.Marshal(&metrics)
Metrics: heapsterMetricPoints, return true, newFakeResponseWrapper(heapsterRawMemResponse), nil
LatestTimestamp: latestTimestamp, })
} else {
fakeClient.AddProxyReactor("services", func(action core.Action) (handled bool, ret restclient.ResponseWrapper, err error) {
metrics := heapster.MetricResultList{}
var latestTimestamp time.Time
for _, reportedMetricPoints := range tc.reportedMetricsPoints {
var heapsterMetricPoints []heapster.MetricPoint
for _, reportedMetricPoint := range reportedMetricPoints {
timestamp := fixedTimestamp.Add(time.Duration(reportedMetricPoint.timestamp) * time.Minute)
if latestTimestamp.Before(timestamp) {
latestTimestamp = timestamp
}
heapsterMetricPoint := heapster.MetricPoint{Timestamp: timestamp, Value: reportedMetricPoint.level, FloatValue: nil}
heapsterMetricPoints = append(heapsterMetricPoints, heapsterMetricPoint)
}
metric := heapster.MetricResult{
Metrics: heapsterMetricPoints,
LatestTimestamp: latestTimestamp,
}
metrics.Items = append(metrics.Items, metric)
} }
metrics.Items = append(metrics.Items, metric) heapsterRawMemResponse, _ := json.Marshal(&metrics)
} return true, newFakeResponseWrapper(heapsterRawMemResponse), nil
heapsterRawMemResponse, _ := json.Marshal(&metrics) })
return true, newFakeResponseWrapper(heapsterRawMemResponse), nil }
})
return fakeClient return fakeClient
} }
@ -155,7 +193,7 @@ func (tc *testCase) verifyResults(t *testing.T, val *float64, timestamp time.Tim
assert.True(t, tc.desiredValue+0.001 > *val) assert.True(t, tc.desiredValue+0.001 > *val)
targetTimestamp := fixedTimestamp.Add(time.Duration(tc.targetTimestamp) * time.Minute) targetTimestamp := fixedTimestamp.Add(time.Duration(tc.targetTimestamp) * time.Minute)
assert.Equal(t, targetTimestamp, timestamp) assert.True(t, targetTimestamp.Equal(timestamp))
} }
func (tc *testCase) runTest(t *testing.T) { func (tc *testCase) runTest(t *testing.T) {
@ -173,23 +211,25 @@ func (tc *testCase) runTest(t *testing.T) {
func TestCPU(t *testing.T) { func TestCPU(t *testing.T) {
tc := testCase{ tc := testCase{
replicas: 3, replicas: 3,
desiredValue: 5000, desiredValue: 5000,
targetResource: "cpu-usage", targetResource: "cpu-usage",
targetTimestamp: 1, targetTimestamp: 1,
reportedMetricsPoints: [][]metricPoint{{{5000, 1}}, {{5000, 1}}, {{5000, 1}}}, reportedPodMetrics: [][]int64{{5000}, {5000}, {5000}},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
func TestCPUPending(t *testing.T) { func TestCPUPending(t *testing.T) {
tc := testCase{ tc := testCase{
replicas: 4, replicas: 4,
desiredValue: 5000, desiredValue: 5000,
targetResource: "cpu-usage", targetResource: "cpu-usage",
targetTimestamp: 1, targetTimestamp: 1,
reportedMetricsPoints: [][]metricPoint{{{5000, 1}}, {{5000, 1}}, {{5000, 1}}}, reportedPodMetrics: [][]int64{{5000}, {5000}, {5000}},
podListOverride: &api.PodList{}, useMetricsApi: true,
podListOverride: &api.PodList{},
} }
namespace := "test-namespace" namespace := "test-namespace"
@ -200,19 +240,20 @@ func TestCPUPending(t *testing.T) {
pod := buildPod(namespace, podName, podLabels, api.PodRunning) pod := buildPod(namespace, podName, podLabels, api.PodRunning)
tc.podListOverride.Items = append(tc.podListOverride.Items, pod) tc.podListOverride.Items = append(tc.podListOverride.Items, pod)
} }
tc.podListOverride.Items[0].Status.Phase = api.PodPending tc.podListOverride.Items[3].Status.Phase = api.PodPending
tc.runTest(t) tc.runTest(t)
} }
func TestCPUAllPending(t *testing.T) { func TestCPUAllPending(t *testing.T) {
tc := testCase{ tc := testCase{
replicas: 4, replicas: 4,
targetResource: "cpu-usage", targetResource: "cpu-usage",
targetTimestamp: 1, targetTimestamp: 1,
reportedMetricsPoints: [][]metricPoint{}, reportedPodMetrics: [][]int64{},
podListOverride: &api.PodList{}, useMetricsApi: true,
desiredError: fmt.Errorf("no running pods"), podListOverride: &api.PodList{},
desiredError: fmt.Errorf("no running pods"),
} }
namespace := "test-namespace" namespace := "test-namespace"
@ -283,11 +324,12 @@ func TestQPSAllPending(t *testing.T) {
func TestCPUSumEqualZero(t *testing.T) { func TestCPUSumEqualZero(t *testing.T) {
tc := testCase{ tc := testCase{
replicas: 3, replicas: 3,
desiredValue: 0, desiredValue: 0,
targetResource: "cpu-usage", targetResource: "cpu-usage",
targetTimestamp: 0, targetTimestamp: 0,
reportedMetricsPoints: [][]metricPoint{{{0, 0}}, {{0, 0}}, {{0, 0}}}, reportedPodMetrics: [][]int64{{0}, {0}, {0}},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -305,51 +347,23 @@ func TestQpsSumEqualZero(t *testing.T) {
func TestCPUMoreMetrics(t *testing.T) { func TestCPUMoreMetrics(t *testing.T) {
tc := testCase{ tc := testCase{
replicas: 5, replicas: 5,
desiredValue: 5000, desiredValue: 5000,
targetResource: "cpu-usage", targetResource: "cpu-usage",
targetTimestamp: 10, targetTimestamp: 10,
reportedMetricsPoints: [][]metricPoint{ reportedPodMetrics: [][]int64{{1000, 2000, 2000}, {5000}, {1000, 1000, 1000, 2000}, {4000, 1000}, {5000}},
{{0, 3}, {0, 6}, {5, 4}, {9000, 10}}, useMetricsApi: true,
{{5000, 2}, {10, 5}, {66, 1}, {0, 10}},
{{5000, 3}, {80, 5}, {6000, 10}},
{{5000, 3}, {40, 3}, {0, 9}, {200, 2}, {8000, 10}},
{{5000, 2}, {20, 2}, {2000, 10}}},
}
tc.runTest(t)
}
func TestCPUResultIsFloat(t *testing.T) {
tc := testCase{
replicas: 6,
desiredValue: 4783,
targetResource: "cpu-usage",
targetTimestamp: 4,
reportedMetricsPoints: [][]metricPoint{{{4000, 4}}, {{9500, 4}}, {{3000, 4}}, {{7000, 4}}, {{3200, 4}}, {{2000, 4}}},
}
tc.runTest(t)
}
func TestCPUSamplesWithRandomTimestamps(t *testing.T) {
tc := testCase{
replicas: 3,
desiredValue: 3000,
targetResource: "cpu-usage",
targetTimestamp: 3,
reportedMetricsPoints: [][]metricPoint{
{{1, 1}, {3000, 5}, {2, 2}},
{{2, 2}, {1, 1}, {3000, 3}},
{{3000, 4}, {1, 1}, {2, 2}}},
} }
tc.runTest(t) tc.runTest(t)
} }
func TestCPUMissingMetrics(t *testing.T) { func TestCPUMissingMetrics(t *testing.T) {
tc := testCase{ tc := testCase{
replicas: 3, replicas: 3,
targetResource: "cpu-usage", targetResource: "cpu-usage",
desiredError: fmt.Errorf("metrics obtained for 1/3 of pods"), desiredError: fmt.Errorf("metrics obtained for 1/3 of pods"),
reportedMetricsPoints: [][]metricPoint{{{4000, 4}}}, reportedPodMetrics: [][]int64{{4000}},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -366,10 +380,11 @@ func TestQpsMissingMetrics(t *testing.T) {
func TestCPUSuperfluousMetrics(t *testing.T) { func TestCPUSuperfluousMetrics(t *testing.T) {
tc := testCase{ tc := testCase{
replicas: 3, replicas: 3,
targetResource: "cpu-usage", targetResource: "cpu-usage",
desiredError: fmt.Errorf("metrics obtained for 6/3 of pods"), desiredError: fmt.Errorf("metrics obtained for 6/3 of pods"),
reportedMetricsPoints: [][]metricPoint{{{1000, 1}}, {{2000, 4}}, {{2000, 1}}, {{4000, 5}}, {{2000, 1}}, {{4000, 4}}}, reportedPodMetrics: [][]int64{{1000}, {2000}, {4000}, {4000}, {2000}, {4000}},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
@ -390,26 +405,30 @@ func TestCPUEmptyMetrics(t *testing.T) {
targetResource: "cpu-usage", targetResource: "cpu-usage",
desiredError: fmt.Errorf("metrics obtained for 0/3 of pods"), desiredError: fmt.Errorf("metrics obtained for 0/3 of pods"),
reportedMetricsPoints: [][]metricPoint{}, reportedMetricsPoints: [][]metricPoint{},
reportedPodMetrics: [][]int64{},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
func TestCPUZeroReplicas(t *testing.T) { func TestCPUZeroReplicas(t *testing.T) {
tc := testCase{ tc := testCase{
replicas: 0, replicas: 0,
targetResource: "cpu-usage", targetResource: "cpu-usage",
desiredError: fmt.Errorf("some pods do not have request for cpu"), desiredError: fmt.Errorf("some pods do not have request for cpu"),
reportedMetricsPoints: [][]metricPoint{}, reportedPodMetrics: [][]int64{},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }
func TestCPUEmptyMetricsForOnePod(t *testing.T) { func TestCPUEmptyMetricsForOnePod(t *testing.T) {
tc := testCase{ tc := testCase{
replicas: 3, replicas: 3,
targetResource: "cpu-usage", targetResource: "cpu-usage",
desiredError: fmt.Errorf("metrics obtained for 2/3 of pods"), desiredError: fmt.Errorf("metrics obtained for 2/3 of pods"),
reportedMetricsPoints: [][]metricPoint{{}, {{100, 1}}, {{400, 2}, {300, 3}}}, reportedPodMetrics: [][]int64{{100}, {300, 400}},
useMetricsApi: true,
} }
tc.runTest(t) tc.runTest(t)
} }