HPA: Consider unready pods and missing metrics

Currently, the HPA considers unready pods the same as ready pods when
looking at their CPU and custom metric usage.  However, pods frequently
use extra CPU during initialization, so we want to consider them
separately.

This commit causes the HPA to consider unready pods as having 0 CPU
usage when scaling up, and ignores them when scaling down.  If, when
scaling up, factoring the unready pods as having 0 CPU would cause a
downscale instead, we simply choose not to scale.  Otherwise, we simply
scale up at the reduced amount caculated by factoring the pods in at
zero CPU usage.

The effect is that unready pods cause the autoscaler to be a bit more
conservative -- large increases in CPU usage can still cause scales,
even with unready pods in the mix, but will not cause the scale factors
to be as large, in anticipation of the new pods later becoming ready and
handling load.

Similarly, if there are pods for which no metrics have been retrieved,
these pods are treated as having 100% of the requested metric when
scaling down, and 0% when scaling up.  As above, this cannot change the
direction of the scale.

This commit also changes the HPA to ignore superfluous metrics -- as
long as metrics for all ready pods are present, the HPA we make scaling
decisions.  Currently, this only works for CPU.  For custom metrics, we
cannot identify which metrics go to which pods if we get superfluous
metrics, so we abort the scale.
This commit is contained in:
Solly Ross 2016-09-27 14:47:52 -04:00
parent 8558768650
commit 2c66d47786
10 changed files with 1274 additions and 510 deletions

View File

@ -427,7 +427,8 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl
metrics.DefaultHeapsterService,
metrics.DefaultHeapsterPort,
)
go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), metricsClient, s.HorizontalPodAutoscalerSyncPeriod.Duration).
replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core())
go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), replicaCalc, s.HorizontalPodAutoscalerSyncPeriod.Duration).
Run(wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}

View File

@ -15,6 +15,7 @@ go_library(
srcs = [
"doc.go",
"horizontal.go",
"replica_calculator.go",
],
tags = ["automanaged"],
deps = [
@ -29,8 +30,10 @@ go_library(
"//pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/controller/podautoscaler/metrics:go_default_library",
"//pkg/labels:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/util/runtime:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/golang/glog",
],
@ -38,7 +41,10 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["horizontal_test.go"],
srcs = [
"horizontal_test.go",
"replica_calculator_test.go",
],
library = "go_default_library",
tags = ["automanaged"],
deps = [
@ -58,6 +64,7 @@ go_test(
"//pkg/runtime:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/stretchr/testify/assert",
"//vendor:github.com/stretchr/testify/require",
"//vendor:k8s.io/heapster/metrics/api/v1/types",
"//vendor:k8s.io/heapster/metrics/apis/metrics/v1alpha1",
],

View File

@ -33,7 +33,6 @@ import (
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/watch"
@ -61,7 +60,7 @@ type HorizontalController struct {
scaleNamespacer unversionedextensions.ScalesGetter
hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter
metricsClient metrics.MetricsClient
replicaCalc *ReplicaCalculator
eventRecorder record.EventRecorder
// A store of HPA objects, populated by the controller.
@ -110,13 +109,13 @@ func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (
)
}
func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter, metricsClient metrics.MetricsClient, resyncPeriod time.Duration) *HorizontalController {
func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter, replicaCalc *ReplicaCalculator, resyncPeriod time.Duration) *HorizontalController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: evtNamespacer.Events("")})
recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"})
controller := &HorizontalController{
metricsClient: metricsClient,
replicaCalc: replicaCalc,
eventRecorder: recorder,
scaleNamespacer: scaleNamespacer,
hpaNamespacer: hpaNamespacer,
@ -164,9 +163,8 @@ func (a *HorizontalController) computeReplicasForCPUUtilization(hpa *autoscaling
a.eventRecorder.Event(hpa, api.EventTypeWarning, "InvalidSelector", errMsg)
return 0, nil, time.Time{}, fmt.Errorf(errMsg)
}
currentUtilization, numRunningPods, timestamp, err := a.metricsClient.GetCPUUtilization(hpa.Namespace, selector)
// TODO: what to do on partial errors (like metrics obtained for 75% of pods).
desiredReplicas, utilization, timestamp, err := a.replicaCalc.GetResourceReplicas(currentReplicas, targetUtilization, api.ResourceCPU, hpa.Namespace, selector)
if err != nil {
lastScaleTime := getLastScaleTime(hpa)
if time.Now().After(lastScaleTime.Add(upscaleForbiddenWindow)) {
@ -178,20 +176,13 @@ func (a *HorizontalController) computeReplicasForCPUUtilization(hpa *autoscaling
return 0, nil, time.Time{}, fmt.Errorf("failed to get CPU utilization: %v", err)
}
utilization := int32(*currentUtilization)
usageRatio := float64(utilization) / float64(targetUtilization)
if math.Abs(1.0-usageRatio) <= tolerance {
return currentReplicas, &utilization, timestamp, nil
if desiredReplicas != currentReplicas {
a.eventRecorder.Eventf(hpa, api.EventTypeNormal, "DesiredReplicasComputed",
"Computed the desired num of replicas: %d (avgCPUutil: %d, current replicas: %d)",
desiredReplicas, utilization, scale.Status.Replicas)
}
desiredReplicas := math.Ceil(usageRatio * float64(numRunningPods))
a.eventRecorder.Eventf(hpa, api.EventTypeNormal, "DesiredReplicasComputed",
"Computed the desired num of replicas: %d, on a base of %d report(s) (avgCPUutil: %d, current replicas: %d)",
int32(desiredReplicas), numRunningPods, utilization, scale.Status.Replicas)
return int32(desiredReplicas), &utilization, timestamp, nil
return desiredReplicas, &utilization, timestamp, nil
}
// computeReplicasForCustomMetrics computes the desired number of replicas based on the CustomMetrics passed in cmAnnotation
@ -233,8 +224,8 @@ func (a *HorizontalController) computeReplicasForCustomMetrics(hpa *autoscaling.
a.eventRecorder.Event(hpa, api.EventTypeWarning, "InvalidSelector", errMsg)
return 0, "", "", time.Time{}, fmt.Errorf("couldn't convert selector string to a corresponding selector object: %v", err)
}
value, currentTimestamp, err := a.metricsClient.GetCustomMetric(customMetricTarget.Name, hpa.Namespace, selector)
// TODO: what to do on partial errors (like metrics obtained for 75% of pods).
floatTarget := float64(customMetricTarget.TargetValue.MilliValue()) / 1000.0
replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetMetricReplicas(currentReplicas, floatTarget, fmt.Sprintf("custom/%s", customMetricTarget.Name), hpa.Namespace, selector)
if err != nil {
lastScaleTime := getLastScaleTime(hpa)
if time.Now().After(lastScaleTime.Add(upscaleForbiddenWindow)) {
@ -245,21 +236,13 @@ func (a *HorizontalController) computeReplicasForCustomMetrics(hpa *autoscaling.
return 0, "", "", time.Time{}, fmt.Errorf("failed to get custom metric value: %v", err)
}
floatTarget := float64(customMetricTarget.TargetValue.MilliValue()) / 1000.0
usageRatio := *value / floatTarget
replicaCountProposal := int32(0)
if math.Abs(1.0-usageRatio) > tolerance {
replicaCountProposal = int32(math.Ceil(usageRatio * float64(currentReplicas)))
} else {
replicaCountProposal = currentReplicas
}
if replicaCountProposal > replicas {
timestamp = currentTimestamp
timestamp = timestampProposal
replicas = replicaCountProposal
metric = fmt.Sprintf("Custom metric %s", customMetricTarget.Name)
}
quantity, err := resource.ParseQuantity(fmt.Sprintf("%.3f", *value))
quantity, err := resource.ParseQuantity(fmt.Sprintf("%.3f", utilizationProposal))
if err != nil {
return 0, "", "", time.Time{}, fmt.Errorf("failed to set custom metric value: %v", err)
}

View File

@ -21,6 +21,8 @@ import (
"fmt"
"io"
"math"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -77,17 +79,18 @@ type testCase struct {
desiredReplicas int32
// CPU target utilization as a percentage of the requested resources.
CPUTarget int32
CPUCurrent int32
verifyCPUCurrent bool
reportedLevels []uint64
reportedCPURequests []resource.Quantity
cmTarget *extensions.CustomMetricTargetList
scaleUpdated bool
statusUpdated bool
eventCreated bool
verifyEvents bool
useMetricsApi bool
CPUTarget int32
CPUCurrent int32
verifyCPUCurrent bool
reportedLevels []uint64
reportedCPURequests []resource.Quantity
reportedPodReadiness []api.ConditionStatus
cmTarget *extensions.CustomMetricTargetList
scaleUpdated bool
statusUpdated bool
eventCreated bool
verifyEvents bool
useMetricsApi bool
// Channel with names of HPA objects which we have reconciled.
processed chan string
@ -125,7 +128,9 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
tc.statusUpdated = false
tc.eventCreated = false
tc.processed = make(chan string, 100)
tc.computeCPUCurrent()
if tc.CPUCurrent == 0 {
tc.computeCPUCurrent()
}
// TODO(madhusudancs): HPA only supports resources in extensions/v1beta1 right now. Add
// tests for "v1" replicationcontrollers when HPA adds support for cross-group scale.
@ -248,10 +253,20 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
obj := &api.PodList{}
for i := 0; i < len(tc.reportedCPURequests); i++ {
podReadiness := api.ConditionTrue
if tc.reportedPodReadiness != nil {
podReadiness = tc.reportedPodReadiness[i]
}
podName := fmt.Sprintf("%s-%d", podNamePrefix, i)
pod := api.Pod{
Status: api.PodStatus{
Phase: api.PodRunning,
Conditions: []api.PodCondition{
{
Type: api.PodReady,
Status: podReadiness,
},
},
},
ObjectMeta: api.ObjectMeta{
Name: podName,
@ -310,9 +325,34 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
}
heapsterRawMemResponse, _ = json.Marshal(&metrics)
} else {
// only return the pods that we actually asked for
proxyAction := action.(core.ProxyGetAction)
pathParts := strings.Split(proxyAction.GetPath(), "/")
// pathParts should look like [ api, v1, model, namespaces, $NS, pod-list, $PODS, metrics, $METRIC... ]
if len(pathParts) < 9 {
return true, nil, fmt.Errorf("invalid heapster path %q", proxyAction.GetPath())
}
podNames := strings.Split(pathParts[7], ",")
podPresent := make([]bool, len(tc.reportedLevels))
for _, name := range podNames {
if len(name) <= len(podNamePrefix)+1 {
return true, nil, fmt.Errorf("unknown pod %q", name)
}
num, err := strconv.Atoi(name[len(podNamePrefix)+1:])
if err != nil {
return true, nil, fmt.Errorf("unknown pod %q", name)
}
podPresent[num] = true
}
timestamp := time.Now()
metrics := heapster.MetricResultList{}
for _, level := range tc.reportedLevels {
for i, level := range tc.reportedLevels {
if !podPresent[i] {
continue
}
metric := heapster.MetricResult{
Metrics: []heapster.MetricPoint{{Timestamp: timestamp, Value: level, FloatValue: nil}},
LatestTimestamp: timestamp,
@ -331,7 +371,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
obj := action.(core.UpdateAction).GetObject().(*extensions.Scale)
replicas := action.(core.UpdateAction).GetObject().(*extensions.Scale).Spec.Replicas
assert.Equal(t, tc.desiredReplicas, replicas)
assert.Equal(t, tc.desiredReplicas, replicas, "the replica count of the RC should be as expected")
tc.scaleUpdated = true
return true, obj, nil
})
@ -342,7 +382,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
obj := action.(core.UpdateAction).GetObject().(*extensions.Scale)
replicas := action.(core.UpdateAction).GetObject().(*extensions.Scale).Spec.Replicas
assert.Equal(t, tc.desiredReplicas, replicas)
assert.Equal(t, tc.desiredReplicas, replicas, "the replica count of the deployment should be as expected")
tc.scaleUpdated = true
return true, obj, nil
})
@ -353,7 +393,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
obj := action.(core.UpdateAction).GetObject().(*extensions.Scale)
replicas := action.(core.UpdateAction).GetObject().(*extensions.Scale).Spec.Replicas
assert.Equal(t, tc.desiredReplicas, replicas)
assert.Equal(t, tc.desiredReplicas, replicas, "the replica count of the replicaset should be as expected")
tc.scaleUpdated = true
return true, obj, nil
})
@ -363,12 +403,12 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
defer tc.Unlock()
obj := action.(core.UpdateAction).GetObject().(*autoscaling.HorizontalPodAutoscaler)
assert.Equal(t, namespace, obj.Namespace)
assert.Equal(t, hpaName, obj.Name)
assert.Equal(t, tc.desiredReplicas, obj.Status.DesiredReplicas)
assert.Equal(t, namespace, obj.Namespace, "the HPA namespace should be as expected")
assert.Equal(t, hpaName, obj.Name, "the HPA name should be as expected")
assert.Equal(t, tc.desiredReplicas, obj.Status.DesiredReplicas, "the desired replica count reported in the object status should be as expected")
if tc.verifyCPUCurrent {
assert.NotNil(t, obj.Status.CurrentCPUUtilizationPercentage)
assert.Equal(t, tc.CPUCurrent, *obj.Status.CurrentCPUUtilizationPercentage)
assert.NotNil(t, obj.Status.CurrentCPUUtilizationPercentage, "the reported CPU utilization percentage should be non-nil")
assert.Equal(t, tc.CPUCurrent, *obj.Status.CurrentCPUUtilizationPercentage, "the report CPU utilization percentage should be as expected")
}
tc.statusUpdated = true
// Every time we reconcile HPA object we are updating status.
@ -387,8 +427,8 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
assert.Equal(t, fmt.Sprintf("New size: %d; reason: CPU utilization above target", tc.desiredReplicas), obj.Message)
case "DesiredReplicasComputed":
assert.Equal(t, fmt.Sprintf(
"Computed the desired num of replicas: %d, on a base of %d report(s) (avgCPUutil: %d, current replicas: %d)",
tc.desiredReplicas, len(tc.reportedLevels),
"Computed the desired num of replicas: %d (avgCPUutil: %d, current replicas: %d)",
tc.desiredReplicas,
(int64(tc.reportedLevels[0])*100)/tc.reportedCPURequests[0].MilliValue(), tc.initialReplicas), obj.Message)
default:
assert.False(t, true, fmt.Sprintf("Unexpected event: %s / %s", obj.Reason, obj.Message))
@ -408,10 +448,10 @@ func (tc *testCase) verifyResults(t *testing.T) {
tc.Lock()
defer tc.Unlock()
assert.Equal(t, tc.initialReplicas != tc.desiredReplicas, tc.scaleUpdated)
assert.True(t, tc.statusUpdated)
assert.Equal(t, tc.initialReplicas != tc.desiredReplicas, tc.scaleUpdated, "the scale should only be updated if we expected a change in replicas")
assert.True(t, tc.statusUpdated, "the status should have been updated")
if tc.verifyEvents {
assert.Equal(t, tc.initialReplicas != tc.desiredReplicas, tc.eventCreated)
assert.Equal(t, tc.initialReplicas != tc.desiredReplicas, tc.eventCreated, "an event should have been created only if we expected a change in replicas")
}
}
@ -423,8 +463,13 @@ func (tc *testCase) runTest(t *testing.T) {
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: testClient.Core().Events("")})
recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"})
replicaCalc := &ReplicaCalculator{
metricsClient: metricsClient,
podsGetter: testClient.Core(),
}
hpaController := &HorizontalController{
metricsClient: metricsClient,
replicaCalc: replicaCalc,
eventRecorder: recorder,
scaleNamespacer: testClient.Extensions(),
hpaNamespacer: testClient.Autoscaling(),
@ -518,6 +563,40 @@ func TestScaleUp(t *testing.T) {
tc.runTest(t)
}
func TestScaleUpUnreadyLessScale(t *testing.T) {
tc := testCase{
minReplicas: 2,
maxReplicas: 6,
initialReplicas: 3,
desiredReplicas: 4,
CPUTarget: 30,
CPUCurrent: 60,
verifyCPUCurrent: true,
reportedLevels: []uint64{300, 500, 700},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
reportedPodReadiness: []api.ConditionStatus{api.ConditionFalse, api.ConditionTrue, api.ConditionTrue},
useMetricsApi: true,
}
tc.runTest(t)
}
func TestScaleUpUnreadyNoScale(t *testing.T) {
tc := testCase{
minReplicas: 2,
maxReplicas: 6,
initialReplicas: 3,
desiredReplicas: 3,
CPUTarget: 30,
CPUCurrent: 40,
verifyCPUCurrent: true,
reportedLevels: []uint64{400, 500, 700},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
reportedPodReadiness: []api.ConditionStatus{api.ConditionTrue, api.ConditionFalse, api.ConditionFalse},
useMetricsApi: true,
}
tc.runTest(t)
}
func TestScaleUpDeployment(t *testing.T) {
tc := testCase{
minReplicas: 2,
@ -577,6 +656,46 @@ func TestScaleUpCM(t *testing.T) {
tc.runTest(t)
}
func TestScaleUpCMUnreadyLessScale(t *testing.T) {
tc := testCase{
minReplicas: 2,
maxReplicas: 6,
initialReplicas: 3,
desiredReplicas: 4,
CPUTarget: 0,
cmTarget: &extensions.CustomMetricTargetList{
Items: []extensions.CustomMetricTarget{{
Name: "qps",
TargetValue: resource.MustParse("15.0"),
}},
},
reportedLevels: []uint64{50, 10, 30},
reportedPodReadiness: []api.ConditionStatus{api.ConditionTrue, api.ConditionTrue, api.ConditionFalse},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
}
tc.runTest(t)
}
func TestScaleUpCMUnreadyNoScaleWouldScaleDown(t *testing.T) {
tc := testCase{
minReplicas: 2,
maxReplicas: 6,
initialReplicas: 3,
desiredReplicas: 3,
CPUTarget: 0,
cmTarget: &extensions.CustomMetricTargetList{
Items: []extensions.CustomMetricTarget{{
Name: "qps",
TargetValue: resource.MustParse("15.0"),
}},
},
reportedLevels: []uint64{50, 15, 30},
reportedPodReadiness: []api.ConditionStatus{api.ConditionFalse, api.ConditionTrue, api.ConditionFalse},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
}
tc.runTest(t)
}
func TestDefaultScaleDown(t *testing.T) {
tc := testCase{
minReplicas: 2,
@ -624,6 +743,23 @@ func TestScaleDownCM(t *testing.T) {
tc.runTest(t)
}
func TestScaleDownIgnoresUnreadyPods(t *testing.T) {
tc := testCase{
minReplicas: 2,
maxReplicas: 6,
initialReplicas: 5,
desiredReplicas: 2,
CPUTarget: 50,
CPUCurrent: 30,
verifyCPUCurrent: true,
reportedLevels: []uint64{100, 300, 500, 250, 250},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
reportedPodReadiness: []api.ConditionStatus{api.ConditionTrue, api.ConditionTrue, api.ConditionTrue, api.ConditionFalse, api.ConditionFalse},
}
tc.runTest(t)
}
func TestTolerance(t *testing.T) {
tc := testCase{
minReplicas: 1,
@ -730,7 +866,7 @@ func TestSuperfluousMetrics(t *testing.T) {
minReplicas: 2,
maxReplicas: 6,
initialReplicas: 4,
desiredReplicas: 4,
desiredReplicas: 6,
CPUTarget: 100,
reportedLevels: []uint64{4000, 9500, 3000, 7000, 3200, 2000},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
@ -744,7 +880,7 @@ func TestMissingMetrics(t *testing.T) {
minReplicas: 2,
maxReplicas: 6,
initialReplicas: 4,
desiredReplicas: 4,
desiredReplicas: 3,
CPUTarget: 100,
reportedLevels: []uint64{400, 95},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},

View File

@ -12,14 +12,17 @@ load(
go_library(
name = "go_default_library",
srcs = ["metrics_client.go"],
srcs = [
"metrics_client.go",
"utilization.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/labels:go_default_library",
"//pkg/util/sets:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/heapster/metrics/api/v1/types",
"//vendor:k8s.io/heapster/metrics/apis/metrics/v1alpha1",

View File

@ -26,13 +26,33 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets"
heapster "k8s.io/heapster/metrics/api/v1/types"
metrics_api "k8s.io/heapster/metrics/apis/metrics/v1alpha1"
)
// PodResourceInfo contains pod resourcemetric values as a map from pod names to
// metric values
type PodResourceInfo map[string]int64
// PodMetricsInfo contains pod resourcemetric values as a map from pod names to
// metric values
type PodMetricsInfo map[string]float64
// MetricsClient knows how to query a remote interface to retrieve container-level
// resource metrics as well as pod-level arbitrary metrics
type MetricsClient interface {
// GetResourceMetric gets the given resource metric (and an associated oldest timestamp)
// for all pods matching the specified selector in the given namespace
GetResourceMetric(resource api.ResourceName, namespace string, selector labels.Selector) (PodResourceInfo, time.Time, error)
// GetRawMetric gets the given metric (and an associated oldest timestamp)
// for all pods matching the specified selector in the given namespace
GetRawMetric(metricName string, namespace string, selector labels.Selector) (PodMetricsInfo, time.Time, error)
}
const (
DefaultHeapsterNamespace = "kube-system"
DefaultHeapsterScheme = "http"
@ -42,126 +62,33 @@ const (
var heapsterQueryStart = -5 * time.Minute
// MetricsClient is an interface for getting metrics for pods.
type MetricsClient interface {
// GetCPUUtilization returns the average utilization over all pods represented as a percent of requested CPU
// (e.g. 70 means that an average pod uses 70% of the requested CPU),
// the number of running pods from which CPU usage was collected and the time of generation of the oldest of utilization reports for pods.
GetCPUUtilization(namespace string, selector labels.Selector) (*int, int, time.Time, error)
// GetCustomMetric returns the average value of the given custom metrics from the
// pods picked using the namespace and selector passed as arguments.
GetCustomMetric(customMetricName string, namespace string, selector labels.Selector) (*float64, time.Time, error)
}
type intAndFloat struct {
intValue int64
floatValue float64
}
// Aggregates results into ResourceConsumption. Also returns number of pods included in the aggregation.
type metricAggregator func(heapster.MetricResultList) (intAndFloat, int, time.Time)
type metricDefinition struct {
name string
aggregator metricAggregator
}
// HeapsterMetricsClient is Heapster-based implementation of MetricsClient
type HeapsterMetricsClient struct {
client clientset.Interface
heapsterNamespace string
heapsterScheme string
heapsterService string
heapsterPort string
services unversionedcore.ServiceInterface
podsGetter unversionedcore.PodsGetter
heapsterScheme string
heapsterService string
heapsterPort string
}
var averageFunction = func(metrics heapster.MetricResultList) (intAndFloat, int, time.Time) {
sum, count, timestamp := calculateSumFromTimeSample(metrics, time.Minute)
result := intAndFloat{0, 0}
if count > 0 {
result.intValue = sum.intValue / int64(count)
result.floatValue = sum.floatValue / float64(count)
}
return result, count, timestamp
}
func getHeapsterCustomMetricDefinition(metricName string) metricDefinition {
return metricDefinition{"custom/" + metricName, averageFunction}
}
// NewHeapsterMetricsClient returns a new instance of Heapster-based implementation of MetricsClient interface.
func NewHeapsterMetricsClient(client clientset.Interface, namespace, scheme, service, port string) *HeapsterMetricsClient {
func NewHeapsterMetricsClient(client clientset.Interface, namespace, scheme, service, port string) MetricsClient {
return &HeapsterMetricsClient{
client: client,
heapsterNamespace: namespace,
heapsterScheme: scheme,
heapsterService: service,
heapsterPort: port,
services: client.Core().Services(namespace),
podsGetter: client.Core(),
heapsterScheme: scheme,
heapsterService: service,
heapsterPort: port,
}
}
func (h *HeapsterMetricsClient) GetCPUUtilization(namespace string, selector labels.Selector) (utilization *int, numRunningPods int, timestamp time.Time, err error) {
avgConsumption, avgRequest, numRunningPods, timestamp, err := h.GetCpuConsumptionAndRequestInMillis(namespace, selector)
if err != nil {
return nil, 0, time.Time{}, fmt.Errorf("failed to get CPU consumption and request: %v", err)
}
tmp := int((avgConsumption * 100) / avgRequest)
return &tmp, numRunningPods, timestamp, nil
}
func (h *HeapsterMetricsClient) GetCpuConsumptionAndRequestInMillis(namespace string, selector labels.Selector) (avgConsumption int64,
avgRequest int64, numRunningPods int, timestamp time.Time, err error) {
podList, err := h.client.Core().Pods(namespace).
List(api.ListOptions{LabelSelector: selector})
if err != nil {
return 0, 0, 0, time.Time{}, fmt.Errorf("failed to get pod list: %v", err)
}
podNames := map[string]struct{}{}
requestSum := int64(0)
missing := false
for _, pod := range podList.Items {
if pod.Status.Phase != api.PodRunning {
// Count only running pods.
continue
}
podNames[pod.Name] = struct{}{}
for _, container := range pod.Spec.Containers {
if containerRequest, ok := container.Resources.Requests[api.ResourceCPU]; ok {
requestSum += containerRequest.MilliValue()
} else {
missing = true
}
}
}
if len(podNames) == 0 && len(podList.Items) > 0 {
return 0, 0, 0, time.Time{}, fmt.Errorf("no running pods")
}
if missing || requestSum == 0 {
return 0, 0, 0, time.Time{}, fmt.Errorf("some pods do not have request for cpu")
}
glog.V(4).Infof("%s %s - sum of CPU requested: %d", namespace, selector, requestSum)
requestAvg := requestSum / int64(len(podNames))
// Consumption is already averaged and in millis.
consumption, timestamp, err := h.getCpuUtilizationForPods(namespace, selector, podNames)
if err != nil {
return 0, 0, 0, time.Time{}, err
}
return consumption, requestAvg, len(podNames), timestamp, nil
}
func (h *HeapsterMetricsClient) getCpuUtilizationForPods(namespace string, selector labels.Selector, podNames map[string]struct{}) (int64, time.Time, error) {
func (h *HeapsterMetricsClient) GetResourceMetric(resource api.ResourceName, namespace string, selector labels.Selector) (PodResourceInfo, time.Time, error) {
metricPath := fmt.Sprintf("/apis/metrics/v1alpha1/namespaces/%s/pods", namespace)
params := map[string]string{"labelSelector": selector.String()}
resultRaw, err := h.client.Core().Services(h.heapsterNamespace).
resultRaw, err := h.services.
ProxyGet(h.heapsterScheme, h.heapsterService, h.heapsterPort, metricPath, params).
DoRaw()
if err != nil {
return 0, time.Time{}, fmt.Errorf("failed to get pods metrics: %v", err)
return nil, time.Time{}, fmt.Errorf("failed to get pod resource metrics: %v", err)
}
glog.V(4).Infof("Heapster metrics result: %s", string(resultRaw))
@ -169,75 +96,55 @@ func (h *HeapsterMetricsClient) getCpuUtilizationForPods(namespace string, selec
metrics := metrics_api.PodMetricsList{}
err = json.Unmarshal(resultRaw, &metrics)
if err != nil {
return 0, time.Time{}, fmt.Errorf("failed to unmarshall heapster response: %v", err)
return nil, time.Time{}, fmt.Errorf("failed to unmarshal heapster response: %v", err)
}
if len(metrics.Items) != len(podNames) {
present := sets.NewString()
for _, m := range metrics.Items {
present.Insert(m.Name)
}
missing := make([]string, 0)
for expected := range podNames {
if !present.Has(expected) {
missing = append(missing, expected)
}
}
hint := ""
if len(missing) > 0 {
hint = fmt.Sprintf(" (sample missing pod: %s/%s)", namespace, missing[0])
}
return 0, time.Time{}, fmt.Errorf("metrics obtained for %d/%d of pods%s", len(metrics.Items), len(podNames), hint)
if len(metrics.Items) == 0 {
return nil, time.Time{}, fmt.Errorf("no metrics returned from heapster")
}
sum := int64(0)
res := make(PodResourceInfo, len(metrics.Items))
for _, m := range metrics.Items {
if _, found := podNames[m.Name]; found {
for _, c := range m.Containers {
cpu, found := c.Usage[v1.ResourceCPU]
if !found {
return 0, time.Time{}, fmt.Errorf("no cpu for container %v in pod %v/%v", c.Name, namespace, m.Name)
}
sum += cpu.MilliValue()
podSum := int64(0)
missing := len(m.Containers) == 0
for _, c := range m.Containers {
resValue, found := c.Usage[v1.ResourceName(resource)]
if !found {
missing = true
glog.V(2).Infof("missing resource metric %v for container %s in pod %s/%s", resource, c.Name, namespace, m.Name)
continue
}
} else {
return 0, time.Time{}, fmt.Errorf("not expected metrics for pod %v/%v", namespace, m.Name)
podSum += resValue.MilliValue()
}
if !missing {
res[m.Name] = int64(podSum)
}
}
return sum / int64(len(metrics.Items)), metrics.Items[0].Timestamp.Time, nil
timestamp := time.Time{}
if len(metrics.Items) > 0 {
timestamp = metrics.Items[0].Timestamp.Time
}
return res, timestamp, nil
}
// GetCustomMetric returns the average value of the given custom metric from the
// pods picked using the namespace and selector passed as arguments.
func (h *HeapsterMetricsClient) GetCustomMetric(customMetricName string, namespace string, selector labels.Selector) (*float64, time.Time, error) {
metricSpec := getHeapsterCustomMetricDefinition(customMetricName)
podList, err := h.client.Core().Pods(namespace).List(api.ListOptions{LabelSelector: selector})
func (h *HeapsterMetricsClient) GetRawMetric(metricName string, namespace string, selector labels.Selector) (PodMetricsInfo, time.Time, error) {
podList, err := h.podsGetter.Pods(namespace).List(api.ListOptions{LabelSelector: selector})
if err != nil {
return nil, time.Time{}, fmt.Errorf("failed to get pod list: %v", err)
}
podNames := []string{}
for _, pod := range podList.Items {
if pod.Status.Phase == api.PodPending {
// Skip pending pods.
continue
}
podNames = append(podNames, pod.Name)
}
if len(podNames) == 0 && len(podList.Items) > 0 {
return nil, time.Time{}, fmt.Errorf("no running pods")
return nil, time.Time{}, fmt.Errorf("failed to get pod list while fetching metrics: %v", err)
}
value, timestamp, err := h.getCustomMetricForPods(metricSpec, namespace, podNames)
if err != nil {
return nil, time.Time{}, err
if len(podList.Items) == 0 {
return nil, time.Time{}, fmt.Errorf("no pods matched the provided selector")
}
return &value.floatValue, timestamp, nil
}
func (h *HeapsterMetricsClient) getCustomMetricForPods(metricSpec metricDefinition, namespace string, podNames []string) (*intAndFloat, time.Time, error) {
podNames := make([]string, len(podList.Items))
for i, pod := range podList.Items {
podNames[i] = pod.Name
}
now := time.Now()
@ -245,89 +152,79 @@ func (h *HeapsterMetricsClient) getCustomMetricForPods(metricSpec metricDefiniti
metricPath := fmt.Sprintf("/api/v1/model/namespaces/%s/pod-list/%s/metrics/%s",
namespace,
strings.Join(podNames, ","),
metricSpec.name)
metricName)
resultRaw, err := h.client.Core().Services(h.heapsterNamespace).
resultRaw, err := h.services.
ProxyGet(h.heapsterScheme, h.heapsterService, h.heapsterPort, metricPath, map[string]string{"start": startTime.Format(time.RFC3339)}).
DoRaw()
if err != nil {
return nil, time.Time{}, fmt.Errorf("failed to get pods metrics: %v", err)
return nil, time.Time{}, fmt.Errorf("failed to get pod metrics: %v", err)
}
var metrics heapster.MetricResultList
err = json.Unmarshal(resultRaw, &metrics)
if err != nil {
return nil, time.Time{}, fmt.Errorf("failed to unmarshall heapster response: %v", err)
return nil, time.Time{}, fmt.Errorf("failed to unmarshal heapster response: %v", err)
}
glog.V(4).Infof("Heapster metrics result: %s", string(resultRaw))
sum, count, timestamp := metricSpec.aggregator(metrics)
if count != len(podNames) {
missing := make([]string, 0)
for i, expected := range podNames {
if len(metrics.Items) > i && len(metrics.Items[i].Metrics) == 0 {
missing = append(missing, expected)
}
}
hint := ""
if len(missing) > 0 {
hint = fmt.Sprintf(" (sample missing pod: %s/%s)", namespace, missing[0])
}
return nil, time.Time{}, fmt.Errorf("metrics obtained for %d/%d of pods%s", count, len(podNames), hint)
if len(metrics.Items) != len(podNames) {
// if we get too many metrics or two few metrics, we have no way of knowing which metric goes to which pod
// (note that Heapster returns *empty* metric items when a pod does not exist or have that metric, so this
// does not cover the "missing metric entry" case)
return nil, time.Time{}, fmt.Errorf("requested metrics for %v pods, got metrics for %v", len(podNames), len(metrics.Items))
}
return &sum, timestamp, nil
var timestamp *time.Time
res := make(PodMetricsInfo, len(metrics.Items))
for i, podMetrics := range metrics.Items {
val, podTimestamp, hadMetrics := collapseTimeSamples(podMetrics, time.Minute)
if hadMetrics {
res[podNames[i]] = val
if timestamp == nil || podTimestamp.Before(*timestamp) {
timestamp = &podTimestamp
}
}
}
if timestamp == nil {
timestamp = &time.Time{}
}
return res, *timestamp, nil
}
func calculateSumFromTimeSample(metrics heapster.MetricResultList, duration time.Duration) (sum intAndFloat, count int, timestamp time.Time) {
sum = intAndFloat{0, 0}
count = 0
timestamp = time.Time{}
var oldest *time.Time // creation time of the oldest of used samples across pods
oldest = nil
for _, metrics := range metrics.Items {
var newest *heapster.MetricPoint // creation time of the newest sample for pod
newest = nil
for i, metricPoint := range metrics.Metrics {
if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) {
newest = &metrics.Metrics[i]
}
}
if newest != nil {
if oldest == nil || newest.Timestamp.Before(*oldest) {
oldest = &newest.Timestamp
}
intervalSum := intAndFloat{0, 0}
intSumCount := 0
floatSumCount := 0
for _, metricPoint := range metrics.Metrics {
if metricPoint.Timestamp.Add(duration).After(newest.Timestamp) {
intervalSum.intValue += int64(metricPoint.Value)
intSumCount++
if metricPoint.FloatValue != nil {
intervalSum.floatValue += *metricPoint.FloatValue
floatSumCount++
}
}
}
if newest.FloatValue == nil {
if intSumCount > 0 {
sum.intValue += int64(intervalSum.intValue / int64(intSumCount))
sum.floatValue += float64(intervalSum.intValue / int64(intSumCount))
}
} else {
if floatSumCount > 0 {
sum.intValue += int64(intervalSum.floatValue / float64(floatSumCount))
sum.floatValue += intervalSum.floatValue / float64(floatSumCount)
}
}
count++
func collapseTimeSamples(metrics heapster.MetricResult, duration time.Duration) (float64, time.Time, bool) {
floatSum := float64(0)
intSum := int64(0)
intSumCount := 0
floatSumCount := 0
var newest *heapster.MetricPoint // creation time of the newest sample for this pod
for i, metricPoint := range metrics.Metrics {
if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) {
newest = &metrics.Metrics[i]
}
}
if oldest != nil {
timestamp = *oldest
if newest != nil {
for _, metricPoint := range metrics.Metrics {
if metricPoint.Timestamp.Add(duration).After(newest.Timestamp) {
intSum += int64(metricPoint.Value)
intSumCount++
if metricPoint.FloatValue != nil {
floatSum += *metricPoint.FloatValue
floatSumCount++
}
}
}
if newest.FloatValue != nil {
return floatSum / float64(floatSumCount), newest.Timestamp, true
} else {
return float64(intSum / int64(intSumCount)), newest.Timestamp, true
}
}
return sum, count, timestamp
return 0, time.Time{}, false
}

View File

@ -65,19 +65,19 @@ type metricPoint struct {
}
type testCase struct {
replicas int
desiredValue float64
desiredRequest *float64
desiredResourceValues PodResourceInfo
desiredMetricValues PodMetricsInfo
desiredError error
desiredRunningPods int
targetResource string
replicas int
targetTimestamp int
reportedMetricsPoints [][]metricPoint
reportedPodMetrics [][]int64
namespace string
podListOverride *api.PodList
selector labels.Selector
useMetricsApi bool
namespace string
selector labels.Selector
resourceName api.ResourceName
metricName string
}
func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
@ -87,12 +87,12 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
podLabels := map[string]string{"name": podNamePrefix}
tc.selector = labels.SelectorFromSet(podLabels)
// it's a resource test if we have a resource name
isResource := len(tc.resourceName) > 0
fakeClient := &fake.Clientset{}
fakeClient.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
if tc.podListOverride != nil {
return true, tc.podListOverride, nil
}
obj := &api.PodList{}
for i := 0; i < tc.replicas; i++ {
podName := fmt.Sprintf("%s-%d", podNamePrefix, i)
@ -102,7 +102,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
return true, obj, nil
})
if tc.useMetricsApi {
if isResource {
fakeClient.AddProxyReactor("services", func(action core.Action) (handled bool, ret restclient.ResponseWrapper, err error) {
metrics := metrics_api.PodMetricsList{}
for i, containers := range tc.reportedPodMetrics {
@ -181,182 +181,83 @@ func buildPod(namespace, podName string, podLabels map[string]string, phase api.
},
Status: api.PodStatus{
Phase: phase,
Conditions: []api.PodCondition{
{
Type: api.PodReady,
Status: api.ConditionTrue,
},
},
},
}
}
func (tc *testCase) verifyResults(t *testing.T, val *float64, req *float64, pods int, timestamp time.Time, err error) {
func (tc *testCase) verifyResults(t *testing.T, metrics interface{}, timestamp time.Time, err error) {
if tc.desiredError != nil {
assert.Error(t, err)
assert.Contains(t, fmt.Sprintf("%v", err), fmt.Sprintf("%v", tc.desiredError))
assert.Error(t, err, "there should be an error retrieving the metrics")
assert.Contains(t, fmt.Sprintf("%v", err), fmt.Sprintf("%v", tc.desiredError), "the error message should be eas expected")
return
}
assert.NoError(t, err)
assert.NotNil(t, val)
assert.True(t, tc.desiredValue-0.001 < *val)
assert.True(t, tc.desiredValue+0.001 > *val)
assert.Equal(t, tc.desiredRunningPods, pods)
assert.NoError(t, err, "there should be no error retrieving the metrics")
assert.NotNil(t, metrics, "there should be metrics returned")
if tc.desiredRequest != nil {
assert.True(t, *tc.desiredRequest-0.001 < *req)
assert.True(t, *tc.desiredRequest+0.001 > *req)
if metricsInfo, wasRaw := metrics.(PodMetricsInfo); wasRaw {
assert.Equal(t, tc.desiredMetricValues, metricsInfo, "the raw metrics values should be as expected")
} else if resourceInfo, wasResource := metrics.(PodResourceInfo); wasResource {
assert.Equal(t, tc.desiredResourceValues, resourceInfo, "the resource metrics values be been as expected")
} else {
assert.False(t, true, "should return either resource metrics info or raw metrics info")
}
targetTimestamp := fixedTimestamp.Add(time.Duration(tc.targetTimestamp) * time.Minute)
assert.True(t, targetTimestamp.Equal(timestamp))
assert.True(t, targetTimestamp.Equal(timestamp), fmt.Sprintf("the timestamp should be as expected (%s) but was %s", targetTimestamp, timestamp))
}
func (tc *testCase) runTest(t *testing.T) {
testClient := tc.prepareTestClient(t)
metricsClient := NewHeapsterMetricsClient(testClient, DefaultHeapsterNamespace, DefaultHeapsterScheme, DefaultHeapsterService, DefaultHeapsterPort)
if tc.targetResource == "cpu-usage" {
val, req, pods, timestamp, err := metricsClient.GetCpuConsumptionAndRequestInMillis(tc.namespace, tc.selector)
fval := float64(val)
freq := float64(req)
tc.verifyResults(t, &fval, &freq, pods, timestamp, err)
isResource := len(tc.resourceName) > 0
if isResource {
info, timestamp, err := metricsClient.GetResourceMetric(tc.resourceName, tc.namespace, tc.selector)
tc.verifyResults(t, info, timestamp, err)
} else {
val, timestamp, err := metricsClient.GetCustomMetric(tc.targetResource, tc.namespace, tc.selector)
tc.verifyResults(t, val, nil, 0, timestamp, err)
info, timestamp, err := metricsClient.GetRawMetric(tc.metricName, tc.namespace, tc.selector)
tc.verifyResults(t, info, timestamp, err)
}
}
func TestCPU(t *testing.T) {
tc := testCase{
replicas: 3,
desiredValue: 5000,
desiredRunningPods: 3,
targetResource: "cpu-usage",
replicas: 3,
desiredResourceValues: PodResourceInfo{
"test-pod-0": 5000, "test-pod-1": 5000, "test-pod-2": 5000,
},
resourceName: api.ResourceCPU,
targetTimestamp: 1,
reportedPodMetrics: [][]int64{{5000}, {5000}, {5000}},
useMetricsApi: true,
}
tc.runTest(t)
}
func TestCPUPending(t *testing.T) {
desiredRequest := float64(2048 * 1000)
tc := testCase{
replicas: 5,
desiredValue: 5000,
desiredRequest: &desiredRequest,
desiredRunningPods: 3,
targetResource: "cpu-usage",
targetTimestamp: 1,
reportedPodMetrics: [][]int64{{5000}, {5000}, {5000}},
useMetricsApi: true,
podListOverride: &api.PodList{},
}
namespace := "test-namespace"
podNamePrefix := "test-pod"
podLabels := map[string]string{"name": podNamePrefix}
podRequest := []string{"1024", "2048", "3072", "200", "100"}
for i := 0; i < tc.replicas; i++ {
podName := fmt.Sprintf("%s-%d", podNamePrefix, i)
pod := buildPod(namespace, podName, podLabels, api.PodRunning, podRequest[i])
tc.podListOverride.Items = append(tc.podListOverride.Items, pod)
}
tc.podListOverride.Items[3].Status.Phase = api.PodPending
tc.podListOverride.Items[4].Status.Phase = api.PodFailed
tc.runTest(t)
}
func TestCPUAllPending(t *testing.T) {
tc := testCase{
replicas: 4,
targetResource: "cpu-usage",
targetTimestamp: 1,
reportedPodMetrics: [][]int64{},
useMetricsApi: true,
podListOverride: &api.PodList{},
desiredError: fmt.Errorf("no running pods"),
}
namespace := "test-namespace"
podNamePrefix := "test-pod"
podLabels := map[string]string{"name": podNamePrefix}
for i := 0; i < tc.replicas; i++ {
podName := fmt.Sprintf("%s-%d", podNamePrefix, i)
pod := buildPod(namespace, podName, podLabels, api.PodPending, "2048")
tc.podListOverride.Items = append(tc.podListOverride.Items, pod)
}
tc.runTest(t)
}
func TestQPS(t *testing.T) {
tc := testCase{
replicas: 3,
desiredValue: 13.33333,
targetResource: "qps",
replicas: 3,
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 10, "test-pod-1": 20, "test-pod-2": 10,
},
metricName: "qps",
targetTimestamp: 1,
reportedMetricsPoints: [][]metricPoint{{{10, 1}}, {{20, 1}}, {{10, 1}}},
}
tc.runTest(t)
}
func TestQPSPending(t *testing.T) {
tc := testCase{
replicas: 4,
desiredValue: 13.33333,
targetResource: "qps",
targetTimestamp: 1,
reportedMetricsPoints: [][]metricPoint{{{10, 1}}, {{20, 1}}, {{10, 1}}},
podListOverride: &api.PodList{},
}
namespace := "test-namespace"
podNamePrefix := "test-pod"
podLabels := map[string]string{"name": podNamePrefix}
for i := 0; i < tc.replicas; i++ {
podName := fmt.Sprintf("%s-%d", podNamePrefix, i)
pod := buildPod(namespace, podName, podLabels, api.PodRunning, "256")
tc.podListOverride.Items = append(tc.podListOverride.Items, pod)
}
tc.podListOverride.Items[0].Status.Phase = api.PodPending
tc.runTest(t)
}
func TestQPSAllPending(t *testing.T) {
tc := testCase{
replicas: 4,
desiredError: fmt.Errorf("no running pods"),
targetResource: "qps",
targetTimestamp: 1,
reportedMetricsPoints: [][]metricPoint{},
podListOverride: &api.PodList{},
}
namespace := "test-namespace"
podNamePrefix := "test-pod"
podLabels := map[string]string{"name": podNamePrefix}
for i := 0; i < tc.replicas; i++ {
podName := fmt.Sprintf("%s-%d", podNamePrefix, i)
pod := buildPod(namespace, podName, podLabels, api.PodPending, "512")
tc.podListOverride.Items = append(tc.podListOverride.Items, pod)
}
tc.podListOverride.Items[0].Status.Phase = api.PodPending
tc.runTest(t)
}
func TestCPUSumEqualZero(t *testing.T) {
tc := testCase{
replicas: 3,
desiredValue: 0,
desiredRunningPods: 3,
targetResource: "cpu-usage",
targetTimestamp: 0,
reportedPodMetrics: [][]int64{{0}, {0}, {0}},
useMetricsApi: true,
}
tc.runTest(t)
}
func TestQpsSumEqualZero(t *testing.T) {
tc := testCase{
replicas: 3,
desiredValue: 0,
targetResource: "qps",
replicas: 3,
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 0, "test-pod-1": 0, "test-pod-2": 0,
},
metricName: "qps",
targetTimestamp: 0,
reportedMetricsPoints: [][]metricPoint{{{0, 0}}, {{0, 0}}, {{0, 0}}},
}
@ -365,24 +266,26 @@ func TestQpsSumEqualZero(t *testing.T) {
func TestCPUMoreMetrics(t *testing.T) {
tc := testCase{
replicas: 5,
desiredValue: 5000,
desiredRunningPods: 5,
targetResource: "cpu-usage",
replicas: 5,
desiredResourceValues: PodResourceInfo{
"test-pod-0": 5000, "test-pod-1": 5000, "test-pod-2": 5000,
"test-pod-3": 5000, "test-pod-4": 5000,
},
resourceName: api.ResourceCPU,
targetTimestamp: 10,
reportedPodMetrics: [][]int64{{1000, 2000, 2000}, {5000}, {1000, 1000, 1000, 2000}, {4000, 1000}, {5000}},
useMetricsApi: true,
}
tc.runTest(t)
}
func TestCPUMissingMetrics(t *testing.T) {
tc := testCase{
replicas: 3,
targetResource: "cpu-usage",
desiredError: fmt.Errorf("metrics obtained for 1/3 of pods"),
replicas: 3,
desiredResourceValues: PodResourceInfo{
"test-pod-0": 4000,
},
resourceName: api.ResourceCPU,
reportedPodMetrics: [][]int64{{4000}},
useMetricsApi: true,
}
tc.runTest(t)
}
@ -390,29 +293,19 @@ func TestCPUMissingMetrics(t *testing.T) {
func TestQpsMissingMetrics(t *testing.T) {
tc := testCase{
replicas: 3,
targetResource: "qps",
desiredError: fmt.Errorf("metrics obtained for 1/3 of pods"),
desiredError: fmt.Errorf("requested metrics for 3 pods, got metrics for 1"),
metricName: "qps",
targetTimestamp: 1,
reportedMetricsPoints: [][]metricPoint{{{4000, 4}}},
}
tc.runTest(t)
}
func TestCPUSuperfluousMetrics(t *testing.T) {
tc := testCase{
replicas: 3,
targetResource: "cpu-usage",
desiredError: fmt.Errorf("metrics obtained for 6/3 of pods"),
reportedPodMetrics: [][]int64{{1000}, {2000}, {4000}, {4000}, {2000}, {4000}},
useMetricsApi: true,
}
tc.runTest(t)
}
func TestQpsSuperfluousMetrics(t *testing.T) {
tc := testCase{
replicas: 3,
targetResource: "qps",
desiredError: fmt.Errorf("metrics obtained for 6/3 of pods"),
desiredError: fmt.Errorf("requested metrics for 3 pods, got metrics for 6"),
metricName: "qps",
reportedMetricsPoints: [][]metricPoint{{{1000, 1}}, {{2000, 4}}, {{2000, 1}}, {{4000, 5}}, {{2000, 1}}, {{4000, 4}}},
}
tc.runTest(t)
@ -421,11 +314,23 @@ func TestQpsSuperfluousMetrics(t *testing.T) {
func TestCPUEmptyMetrics(t *testing.T) {
tc := testCase{
replicas: 3,
targetResource: "cpu-usage",
desiredError: fmt.Errorf("metrics obtained for 0/3 of pods"),
resourceName: api.ResourceCPU,
desiredError: fmt.Errorf("no metrics returned from heapster"),
reportedMetricsPoints: [][]metricPoint{},
reportedPodMetrics: [][]int64{},
useMetricsApi: true,
}
tc.runTest(t)
}
func TestQpsEmptyEntries(t *testing.T) {
tc := testCase{
replicas: 3,
metricName: "qps",
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 4000, "test-pod-2": 2000,
},
targetTimestamp: 4,
reportedMetricsPoints: [][]metricPoint{{{4000, 4}}, {}, {{2000, 4}}},
}
tc.runTest(t)
}
@ -433,61 +338,37 @@ func TestCPUEmptyMetrics(t *testing.T) {
func TestCPUZeroReplicas(t *testing.T) {
tc := testCase{
replicas: 0,
targetResource: "cpu-usage",
desiredError: fmt.Errorf("some pods do not have request for cpu"),
resourceName: api.ResourceCPU,
desiredError: fmt.Errorf("no metrics returned from heapster"),
reportedPodMetrics: [][]int64{},
useMetricsApi: true,
}
tc.runTest(t)
}
func TestCPUEmptyMetricsForOnePod(t *testing.T) {
tc := testCase{
replicas: 3,
targetResource: "cpu-usage",
desiredError: fmt.Errorf("metrics obtained for 2/3 of pods (sample missing pod: test-namespace/test-pod-2)"),
reportedPodMetrics: [][]int64{{100}, {300, 400}},
useMetricsApi: true,
replicas: 3,
resourceName: api.ResourceCPU,
desiredResourceValues: PodResourceInfo{
"test-pod-0": 100, "test-pod-1": 700,
},
reportedPodMetrics: [][]int64{{100}, {300, 400}, {}},
}
tc.runTest(t)
}
func TestAggregateSum(t *testing.T) {
//calculateSumFromTimeSample(metrics heapster.MetricResultList, duration time.Duration) (sum intAndFloat, count int, timestamp time.Time) {
func testCollapseTimeSamples(t *testing.T) {
now := time.Now()
result := heapster.MetricResultList{
Items: []heapster.MetricResult{
{
Metrics: []heapster.MetricPoint{
{Timestamp: now, Value: 50, FloatValue: nil},
{Timestamp: now.Add(-15 * time.Second), Value: 100, FloatValue: nil},
{Timestamp: now.Add(-60 * time.Second), Value: 100000, FloatValue: nil}},
LatestTimestamp: now,
},
},
metrics := heapster.MetricResult{
Metrics: []heapster.MetricPoint{
{Timestamp: now, Value: 50, FloatValue: nil},
{Timestamp: now.Add(-15 * time.Second), Value: 100, FloatValue: nil},
{Timestamp: now.Add(-60 * time.Second), Value: 100000, FloatValue: nil}},
LatestTimestamp: now,
}
sum, cnt, _ := calculateSumFromTimeSample(result, time.Minute)
assert.Equal(t, int64(75), sum.intValue)
assert.InEpsilon(t, 75.0, sum.floatValue, 0.1)
assert.Equal(t, 1, cnt)
}
func TestAggregateSumSingle(t *testing.T) {
now := time.Now()
result := heapster.MetricResultList{
Items: []heapster.MetricResult{
{
Metrics: []heapster.MetricPoint{
{Timestamp: now, Value: 50, FloatValue: nil},
{Timestamp: now.Add(-65 * time.Second), Value: 100000, FloatValue: nil}},
LatestTimestamp: now,
},
},
}
sum, cnt, _ := calculateSumFromTimeSample(result, time.Minute)
assert.Equal(t, int64(50), sum.intValue)
assert.InEpsilon(t, 50.0, sum.floatValue, 0.1)
assert.Equal(t, 1, cnt)
val, timestamp, hadMetrics := collapseTimeSamples(metrics, time.Minute)
assert.True(t, hadMetrics, "should report that it received a populated list of metrics")
assert.InEpsilon(t, float64(75), val, 0.1, "collapsed sample value should be as expected")
assert.True(t, timestamp.Equal(now), "timestamp should be the current time (the newest)")
}
// TODO: add proper tests for request

View File

@ -0,0 +1,54 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
// GetResourceUtilizationRatio takes in a set of metrics, a set of matching requests,
// and a target utilization percentage, and calcuates the the ratio of
// desired to actual utilization (returning that and the actual utilization)
func GetResourceUtilizationRatio(metrics PodResourceInfo, requests map[string]int64, targetUtilization int32) (float64, int32, error) {
metricsTotal := int64(0)
requestsTotal := int64(0)
for podName, metricValue := range metrics {
request, hasRequest := requests[podName]
if !hasRequest {
// we check for missing requests elsewhere, so assuming missing requests == extraneous metrics
continue
}
metricsTotal += metricValue
requestsTotal += request
}
currentUtilization := int32((metricsTotal * 100) / requestsTotal)
return float64(currentUtilization) / float64(targetUtilization), currentUtilization, nil
}
// GetMetricUtilizationRatio takes in a set of metrics and a target utilization value,
// and calcuates the ratio of desired to actual utilization
// (returning that and the actual utilization)
func GetMetricUtilizationRatio(metrics PodMetricsInfo, targetUtilization float64) (float64, float64) {
metricsTotal := float64(0)
for _, metricValue := range metrics {
metricsTotal += metricValue
}
currentUtilization := metricsTotal / float64(len(metrics))
return currentUtilization / targetUtilization, currentUtilization
}

View File

@ -0,0 +1,246 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podautoscaler
import (
"fmt"
"math"
"time"
"k8s.io/kubernetes/pkg/api"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets"
)
type ReplicaCalculator struct {
metricsClient metricsclient.MetricsClient
podsGetter unversionedcore.PodsGetter
}
func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, podsGetter unversionedcore.PodsGetter) *ReplicaCalculator {
return &ReplicaCalculator{
metricsClient: metricsClient,
podsGetter: podsGetter,
}
}
// GetResourceReplicas calculates the desired replica count based on a target resource utilization percentage
// of the given resource for pods matching the given selector in the given namespace, and the current replica count
func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUtilization int32, resource api.ResourceName, namespace string, selector labels.Selector) (replicaCount int32, utilization int32, timestamp time.Time, err error) {
metrics, timestamp, err := c.metricsClient.GetResourceMetric(resource, namespace, selector)
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err)
}
podList, err := c.podsGetter.Pods(namespace).List(api.ListOptions{LabelSelector: selector})
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
}
if len(podList.Items) == 0 {
return 0, 0, time.Time{}, fmt.Errorf("no pods returned by selector while calculating replica count")
}
requests := make(map[string]int64, len(podList.Items))
readyPodCount := 0
unreadyPods := sets.NewString()
missingPods := sets.NewString()
for _, pod := range podList.Items {
podSum := int64(0)
for _, container := range pod.Spec.Containers {
if containerRequest, ok := container.Resources.Requests[resource]; ok {
podSum += containerRequest.MilliValue()
} else {
return 0, 0, time.Time{}, fmt.Errorf("missing request for %s on container %s in pod %s/%s", resource, container.Name, namespace, pod.Name)
}
}
requests[pod.Name] = podSum
if pod.Status.Phase != api.PodRunning || !api.IsPodReady(&pod) {
// save this pod name for later, but pretend it doesn't exist for now
unreadyPods.Insert(pod.Name)
delete(metrics, pod.Name)
continue
}
if _, found := metrics[pod.Name]; !found {
// save this pod name for later, but pretend it doesn't exist for now
missingPods.Insert(pod.Name)
continue
}
readyPodCount++
}
if len(metrics) == 0 {
return 0, 0, time.Time{}, fmt.Errorf("did not receive metrics for any ready pods")
}
usageRatio, utilization, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization)
if err != nil {
return 0, 0, time.Time{}, err
}
rebalanceUnready := len(unreadyPods) > 0 && usageRatio > 1.0
if !rebalanceUnready && len(missingPods) == 0 {
if math.Abs(1.0-usageRatio) <= tolerance {
// return the current replicas if the change would be too small
return currentReplicas, utilization, timestamp, nil
}
// if we don't have any unready or missing pods, we can calculate the new replica count now
return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, timestamp, nil
}
if len(missingPods) > 0 {
if usageRatio < 1.0 {
// on a scale-down, treat missing pods as using 100% of the resource request
for podName := range missingPods {
metrics[podName] = requests[podName]
}
} else {
// on a scale-up, treat missing pods as using 0% of the resource request
for podName := range missingPods {
metrics[podName] = 0
}
}
}
if rebalanceUnready {
// on a scale-up, treat unready pods as using 0% of the resource request
for podName := range unreadyPods {
metrics[podName] = 0
}
}
// re-run the utilization calculation with our new numbers
newUsageRatio, _, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization)
if err != nil {
return 0, utilization, time.Time{}, err
}
if math.Abs(1.0-newUsageRatio) <= tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) {
// return the current replicas if the change would be too small,
// or if the new usage ratio would cause a change in scale direction
return currentReplicas, utilization, timestamp, nil
}
// return the result, where the number of replicas considered is
// however many replicas factored into our calculation
return int32(math.Ceil(newUsageRatio * float64(len(metrics)))), utilization, timestamp, nil
}
// GetMetricReplicas calculates the desired replica count based on a target resource utilization percentage
// of the given resource for pods matching the given selector in the given namespace, and the current replica count
func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtilization float64, metricName string, namespace string, selector labels.Selector) (replicaCount int32, utilization float64, timestamp time.Time, err error) {
metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector)
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err)
}
podList, err := c.podsGetter.Pods(namespace).List(api.ListOptions{LabelSelector: selector})
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
}
if len(podList.Items) == 0 {
return 0, 0, time.Time{}, fmt.Errorf("no pods returned by selector while calculating replica count")
}
readyPodCount := 0
unreadyPods := sets.NewString()
missingPods := sets.NewString()
for _, pod := range podList.Items {
if pod.Status.Phase != api.PodRunning || !api.IsPodReady(&pod) {
// save this pod name for later, but pretend it doesn't exist for now
unreadyPods.Insert(pod.Name)
delete(metrics, pod.Name)
continue
}
if _, found := metrics[pod.Name]; !found {
// save this pod name for later, but pretend it doesn't exist for now
missingPods.Insert(pod.Name)
continue
}
readyPodCount++
}
if len(metrics) == 0 {
return 0, 0, time.Time{}, fmt.Errorf("did not recieve metrics for any ready pods")
}
usageRatio, utilization := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization)
if err != nil {
return 0, 0, time.Time{}, err
}
rebalanceUnready := len(unreadyPods) > 0 && usageRatio > 1.0
if !rebalanceUnready && len(missingPods) == 0 {
if math.Abs(1.0-usageRatio) <= tolerance {
// return the current replicas if the change would be too small
return currentReplicas, utilization, timestamp, nil
}
// if we don't have any unready or missing pods, we can calculate the new replica count now
return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, timestamp, nil
}
if len(missingPods) > 0 {
if usageRatio < 1.0 {
// on a scale-down, treat missing pods as using 100% of the resource request
for podName := range missingPods {
metrics[podName] = targetUtilization
}
} else {
// on a scale-up, treat missing pods as using 0% of the resource request
for podName := range missingPods {
metrics[podName] = 0
}
}
}
if rebalanceUnready {
// on a scale-up, treat unready pods as using 0% of the resource request
for podName := range unreadyPods {
metrics[podName] = 0
}
}
// re-run the utilization calculation with our new numbers
newUsageRatio, _ := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization)
if err != nil {
return 0, utilization, time.Time{}, err
}
if math.Abs(1.0-newUsageRatio) <= tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) {
// return the current replicas if the change would be too small,
// or if the new usage ratio would cause a change in scale direction
return currentReplicas, utilization, timestamp, nil
}
// return the result, where the number of replicas considered is
// however many replicas factored into our calculation
return int32(math.Ceil(newUsageRatio * float64(len(metrics)))), utilization, timestamp, nil
}

View File

@ -0,0 +1,556 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podautoscaler
import (
"encoding/json"
"fmt"
"math"
"strconv"
"strings"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
_ "k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
"k8s.io/kubernetes/pkg/runtime"
heapster "k8s.io/heapster/metrics/api/v1/types"
metrics_api "k8s.io/heapster/metrics/apis/metrics/v1alpha1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type resourceInfo struct {
name api.ResourceName
requests []resource.Quantity
levels []int64
targetUtilization int32
expectedUtilization int32
}
type metricInfo struct {
name string
levels []float64
targetUtilization float64
expectedUtilization float64
}
type replicaCalcTestCase struct {
currentReplicas int32
expectedReplicas int32
expectedError error
timestamp time.Time
resource *resourceInfo
metric *metricInfo
podReadiness []api.ConditionStatus
}
const (
testNamespace = "test-namespace"
podNamePrefix = "test-pod"
)
func (tc *replicaCalcTestCase) prepareTestClient(t *testing.T) *fake.Clientset {
fakeClient := &fake.Clientset{}
fakeClient.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
obj := &api.PodList{}
for i := 0; i < int(tc.currentReplicas); i++ {
podReadiness := api.ConditionTrue
if tc.podReadiness != nil {
podReadiness = tc.podReadiness[i]
}
podName := fmt.Sprintf("%s-%d", podNamePrefix, i)
pod := api.Pod{
Status: api.PodStatus{
Phase: api.PodRunning,
Conditions: []api.PodCondition{
{
Type: api.PodReady,
Status: podReadiness,
},
},
},
ObjectMeta: api.ObjectMeta{
Name: podName,
Namespace: testNamespace,
Labels: map[string]string{
"name": podNamePrefix,
},
},
Spec: api.PodSpec{
Containers: []api.Container{{}, {}},
},
}
if tc.resource != nil && i < len(tc.resource.requests) {
pod.Spec.Containers[0].Resources = api.ResourceRequirements{
Requests: api.ResourceList{
tc.resource.name: tc.resource.requests[i],
},
}
pod.Spec.Containers[1].Resources = api.ResourceRequirements{
Requests: api.ResourceList{
tc.resource.name: tc.resource.requests[i],
},
}
}
obj.Items = append(obj.Items, pod)
}
return true, obj, nil
})
fakeClient.AddProxyReactor("services", func(action core.Action) (handled bool, ret restclient.ResponseWrapper, err error) {
var heapsterRawMemResponse []byte
if tc.resource != nil {
metrics := metrics_api.PodMetricsList{}
for i, resValue := range tc.resource.levels {
podMetric := metrics_api.PodMetrics{
ObjectMeta: v1.ObjectMeta{
Name: fmt.Sprintf("%s-%d", podNamePrefix, i),
Namespace: testNamespace,
},
Timestamp: unversioned.Time{Time: tc.timestamp},
Containers: []metrics_api.ContainerMetrics{
{
Name: "container1",
Usage: v1.ResourceList{
v1.ResourceName(tc.resource.name): *resource.NewMilliQuantity(
int64(resValue),
resource.DecimalSI),
},
},
{
Name: "container2",
Usage: v1.ResourceList{
v1.ResourceName(tc.resource.name): *resource.NewMilliQuantity(
int64(resValue),
resource.DecimalSI),
},
},
},
}
metrics.Items = append(metrics.Items, podMetric)
}
heapsterRawMemResponse, _ = json.Marshal(&metrics)
} else {
// only return the pods that we actually asked for
proxyAction := action.(core.ProxyGetAction)
pathParts := strings.Split(proxyAction.GetPath(), "/")
// pathParts should look like [ api, v1, model, namespaces, $NS, pod-list, $PODS, metrics, $METRIC... ]
if len(pathParts) < 9 {
return true, nil, fmt.Errorf("invalid heapster path %q", proxyAction.GetPath())
}
podNames := strings.Split(pathParts[7], ",")
podPresent := make([]bool, len(tc.metric.levels))
for _, name := range podNames {
if len(name) <= len(podNamePrefix)+1 {
return true, nil, fmt.Errorf("unknown pod %q", name)
}
num, err := strconv.Atoi(name[len(podNamePrefix)+1:])
if err != nil {
return true, nil, fmt.Errorf("unknown pod %q", name)
}
podPresent[num] = true
}
timestamp := tc.timestamp
metrics := heapster.MetricResultList{}
for i, level := range tc.metric.levels {
if !podPresent[i] {
continue
}
metric := heapster.MetricResult{
Metrics: []heapster.MetricPoint{{Timestamp: timestamp, Value: uint64(level), FloatValue: &tc.metric.levels[i]}},
LatestTimestamp: timestamp,
}
metrics.Items = append(metrics.Items, metric)
}
heapsterRawMemResponse, _ = json.Marshal(&metrics)
}
return true, newFakeResponseWrapper(heapsterRawMemResponse), nil
})
return fakeClient
}
func (tc *replicaCalcTestCase) runTest(t *testing.T) {
testClient := tc.prepareTestClient(t)
metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort)
replicaCalc := &ReplicaCalculator{
metricsClient: metricsClient,
podsGetter: testClient.Core(),
}
selector, err := unversioned.LabelSelectorAsSelector(&unversioned.LabelSelector{
MatchLabels: map[string]string{"name": podNamePrefix},
})
if err != nil {
require.Nil(t, err, "something went horribly wrong...")
}
if tc.resource != nil {
outReplicas, outUtilization, outTimestamp, err := replicaCalc.GetResourceReplicas(tc.currentReplicas, tc.resource.targetUtilization, tc.resource.name, testNamespace, selector)
if tc.expectedError != nil {
require.Error(t, err, "there should be an error calculating the replica count")
assert.Contains(t, err.Error(), tc.expectedError.Error(), "the error message should have contained the expected error message")
return
}
require.NoError(t, err, "there should not have been an error calculating the replica count")
assert.Equal(t, tc.expectedReplicas, outReplicas, "replicas should be as expected")
assert.Equal(t, tc.resource.expectedUtilization, outUtilization, "utilization should be as expected")
assert.True(t, tc.timestamp.Equal(outTimestamp), "timestamp should be as expected")
} else {
outReplicas, outUtilization, outTimestamp, err := replicaCalc.GetMetricReplicas(tc.currentReplicas, tc.metric.targetUtilization, tc.metric.name, testNamespace, selector)
if tc.expectedError != nil {
require.Error(t, err, "there should be an error calculating the replica count")
assert.Contains(t, err.Error(), tc.expectedError.Error(), "the error message should have contained the expected error message")
return
}
require.NoError(t, err, "there should not have been an error calculating the replica count")
assert.Equal(t, tc.expectedReplicas, outReplicas, "replicas should be as expected")
assert.InDelta(t, tc.metric.expectedUtilization, 0.1, outUtilization, "utilization should be as expected")
assert.True(t, tc.timestamp.Equal(outTimestamp), "timestamp should be as expected")
}
}
func TestReplicaCalcScaleUp(t *testing.T) {
tc := replicaCalcTestCase{
currentReplicas: 3,
expectedReplicas: 5,
resource: &resourceInfo{
name: api.ResourceCPU,
requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
levels: []int64{300, 500, 700},
targetUtilization: 30,
expectedUtilization: 50,
},
}
tc.runTest(t)
}
func TestReplicaCalcScaleUpUnreadyLessScale(t *testing.T) {
tc := replicaCalcTestCase{
currentReplicas: 3,
expectedReplicas: 4,
podReadiness: []api.ConditionStatus{api.ConditionFalse, api.ConditionTrue, api.ConditionTrue},
resource: &resourceInfo{
name: api.ResourceCPU,
requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
levels: []int64{300, 500, 700},
targetUtilization: 30,
expectedUtilization: 60,
},
}
tc.runTest(t)
}
func TestReplicaCalcScaleUpUnreadyNoScale(t *testing.T) {
tc := replicaCalcTestCase{
currentReplicas: 3,
expectedReplicas: 3,
podReadiness: []api.ConditionStatus{api.ConditionTrue, api.ConditionFalse, api.ConditionFalse},
resource: &resourceInfo{
name: api.ResourceCPU,
requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
levels: []int64{400, 500, 700},
targetUtilization: 30,
expectedUtilization: 40,
},
}
tc.runTest(t)
}
func TestReplicaCalcScaleUpCM(t *testing.T) {
tc := replicaCalcTestCase{
currentReplicas: 3,
expectedReplicas: 4,
metric: &metricInfo{
name: "qps",
levels: []float64{20.0, 10.0, 30.0},
targetUtilization: 15.0,
expectedUtilization: 20.0,
},
}
tc.runTest(t)
}
func TestReplicaCalcScaleUpCMUnreadyLessScale(t *testing.T) {
tc := replicaCalcTestCase{
currentReplicas: 3,
expectedReplicas: 4,
podReadiness: []api.ConditionStatus{api.ConditionTrue, api.ConditionTrue, api.ConditionFalse},
metric: &metricInfo{
name: "qps",
levels: []float64{50.0, 10.0, 30.0},
targetUtilization: 15.0,
expectedUtilization: 30.0,
},
}
tc.runTest(t)
}
func TestReplicaCalcScaleUpCMUnreadyNoScaleWouldScaleDown(t *testing.T) {
tc := replicaCalcTestCase{
currentReplicas: 3,
expectedReplicas: 3,
podReadiness: []api.ConditionStatus{api.ConditionFalse, api.ConditionTrue, api.ConditionFalse},
metric: &metricInfo{
name: "qps",
levels: []float64{50.0, 15.0, 30.0},
targetUtilization: 15.0,
expectedUtilization: 15.0,
},
}
tc.runTest(t)
}
func TestReplicaCalcScaleDown(t *testing.T) {
tc := replicaCalcTestCase{
currentReplicas: 5,
expectedReplicas: 3,
resource: &resourceInfo{
name: api.ResourceCPU,
requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
levels: []int64{100, 300, 500, 250, 250},
targetUtilization: 50,
expectedUtilization: 28,
},
}
tc.runTest(t)
}
func TestReplicaCalcScaleDownCM(t *testing.T) {
tc := replicaCalcTestCase{
currentReplicas: 5,
expectedReplicas: 3,
metric: &metricInfo{
name: "qps",
levels: []float64{12.0, 12.0, 12.0, 12.0, 12.0},
targetUtilization: 20.0,
expectedUtilization: 12.0,
},
}
tc.runTest(t)
}
func TestReplicaCalcScaleDownIgnoresUnreadyPods(t *testing.T) {
tc := replicaCalcTestCase{
currentReplicas: 5,
expectedReplicas: 2,
podReadiness: []api.ConditionStatus{api.ConditionTrue, api.ConditionTrue, api.ConditionTrue, api.ConditionFalse, api.ConditionFalse},
resource: &resourceInfo{
name: api.ResourceCPU,
requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
levels: []int64{100, 300, 500, 250, 250},
targetUtilization: 50,
expectedUtilization: 30,
},
}
tc.runTest(t)
}
func TestReplicaCalcTolerance(t *testing.T) {
tc := replicaCalcTestCase{
currentReplicas: 3,
expectedReplicas: 3,
resource: &resourceInfo{
name: api.ResourceCPU,
requests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")},
levels: []int64{1010, 1030, 1020},
targetUtilization: 100,
expectedUtilization: 102,
},
}
tc.runTest(t)
}
func TestReplicaCalcToleranceCM(t *testing.T) {
tc := replicaCalcTestCase{
currentReplicas: 3,
expectedReplicas: 3,
metric: &metricInfo{
name: "qps",
levels: []float64{20.0, 21.0, 21.0},
targetUtilization: 20.0,
expectedUtilization: 20.66666,
},
}
tc.runTest(t)
}
func TestReplicaCalcSuperfluousMetrics(t *testing.T) {
tc := replicaCalcTestCase{
currentReplicas: 4,
expectedReplicas: 24,
resource: &resourceInfo{
name: api.ResourceCPU,
requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
levels: []int64{4000, 9500, 3000, 7000, 3200, 2000},
targetUtilization: 100,
expectedUtilization: 587,
},
}
tc.runTest(t)
}
func TestReplicaCalcMissingMetrics(t *testing.T) {
tc := replicaCalcTestCase{
currentReplicas: 4,
expectedReplicas: 3,
resource: &resourceInfo{
name: api.ResourceCPU,
requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
levels: []int64{400, 95},
targetUtilization: 100,
expectedUtilization: 24,
},
}
tc.runTest(t)
}
func TestReplicaCalcEmptyMetrics(t *testing.T) {
tc := replicaCalcTestCase{
currentReplicas: 4,
expectedError: fmt.Errorf("unable to get metrics for resource cpu: no metrics returned from heapster"),
resource: &resourceInfo{
name: api.ResourceCPU,
requests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
levels: []int64{},
targetUtilization: 100,
},
}
tc.runTest(t)
}
func TestReplicaCalcEmptyCPURequest(t *testing.T) {
tc := replicaCalcTestCase{
currentReplicas: 1,
expectedError: fmt.Errorf("missing request for"),
resource: &resourceInfo{
name: api.ResourceCPU,
requests: []resource.Quantity{},
levels: []int64{200},
targetUtilization: 100,
},
}
tc.runTest(t)
}
// TestComputedToleranceAlgImplementation is a regression test which
// back-calculates a minimal percentage for downscaling based on a small percentage
// increase in pod utilization which is calibrated against the tolerance value.
func TestReplicaCalcComputedToleranceAlgImplementation(t *testing.T) {
startPods := int32(10)
// 150 mCPU per pod.
totalUsedCPUOfAllPods := int64(startPods * 150)
// Each pod starts out asking for 2X what is really needed.
// This means we will have a 50% ratio of used/requested
totalRequestedCPUOfAllPods := int32(2 * totalUsedCPUOfAllPods)
requestedToUsed := float64(totalRequestedCPUOfAllPods / int32(totalUsedCPUOfAllPods))
// Spread the amount we ask over 10 pods. We can add some jitter later in reportedLevels.
perPodRequested := totalRequestedCPUOfAllPods / startPods
// Force a minimal scaling event by satisfying (tolerance < 1 - resourcesUsedRatio).
target := math.Abs(1/(requestedToUsed*(1-tolerance))) + .01
finalCpuPercentTarget := int32(target * 100)
resourcesUsedRatio := float64(totalUsedCPUOfAllPods) / float64(float64(totalRequestedCPUOfAllPods)*target)
// i.e. .60 * 20 -> scaled down expectation.
finalPods := int32(math.Ceil(resourcesUsedRatio * float64(startPods)))
// To breach tolerance we will create a utilization ratio difference of tolerance to usageRatioToleranceValue)
tc := replicaCalcTestCase{
currentReplicas: startPods,
expectedReplicas: finalPods,
resource: &resourceInfo{
name: api.ResourceCPU,
levels: []int64{
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
},
requests: []resource.Quantity{
resource.MustParse(fmt.Sprint(perPodRequested+100) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested-100) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested+10) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested-10) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested+2) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested-2) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested+1) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested-1) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested) + "m"),
},
targetUtilization: finalCpuPercentTarget,
expectedUtilization: int32(totalUsedCPUOfAllPods*100) / totalRequestedCPUOfAllPods,
},
}
tc.runTest(t)
// Reuse the data structure above, now testing "unscaling".
// Now, we test that no scaling happens if we are in a very close margin to the tolerance
target = math.Abs(1/(requestedToUsed*(1-tolerance))) + .004
finalCpuPercentTarget = int32(target * 100)
tc.resource.targetUtilization = finalCpuPercentTarget
tc.currentReplicas = startPods
tc.expectedReplicas = startPods
tc.runTest(t)
}
// TODO: add more tests