Merge pull request #41272 from DirectXMan12/feature/hpa-v2-controller

Automatic merge from submit-queue

Convert HPA controller to support HPA v2 mechanics

This PR converts the HPA controller to support the mechanics from HPA v2.
The HPA controller continues to make use of the HPA v1 client, but utilizes
the conversion logic to work with autoscaling/v2alpha1 objects internally.

It is the follow-up PR to #36033 and part of kubernetes/features#117.

**Release note**:
```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-02-20 01:52:19 -08:00 committed by GitHub
commit 2f0e5ba786
11 changed files with 466 additions and 417 deletions

View File

@ -547,11 +547,23 @@ func autoscalingFuncs(t apitesting.TestingCommon) []interface{} {
minReplicas := int32(c.Rand.Int31())
s.MinReplicas = &minReplicas
// NB: since this is used for round-tripping, we can only fuzz
// fields that round-trip successfully, so only the resource source
// type is usable here
randomQuantity := func() resource.Quantity {
var q resource.Quantity
c.Fuzz(&q)
// precalc the string for benchmarking purposes
_ = q.String()
return q
}
targetUtilization := int32(c.RandUint64())
s.Metrics = []autoscaling.MetricSpec{
{
Type: autoscaling.PodsMetricSourceType,
Pods: &autoscaling.PodsMetricSource{
MetricName: c.RandString(),
TargetAverageValue: randomQuantity(),
},
},
{
Type: autoscaling.ResourceMetricSourceType,
Resource: &autoscaling.ResourceMetricSource{
@ -563,11 +575,22 @@ func autoscalingFuncs(t apitesting.TestingCommon) []interface{} {
},
func(s *autoscaling.HorizontalPodAutoscalerStatus, c fuzz.Continue) {
c.FuzzNoCustom(s) // fuzz self without calling this function again
// NB: since this is used for round-tripping, we can only fuzz
// fields that round-trip successfully, so only the resource status
// type is usable here
randomQuantity := func() resource.Quantity {
var q resource.Quantity
c.Fuzz(&q)
// precalc the string for benchmarking purposes
_ = q.String()
return q
}
currentUtilization := int32(c.RandUint64())
s.CurrentMetrics = []autoscaling.MetricStatus{
{
Type: autoscaling.PodsMetricSourceType,
Pods: &autoscaling.PodsMetricStatus{
MetricName: c.RandString(),
CurrentAverageValue: randomQuantity(),
},
},
{
Type: autoscaling.ResourceMetricSourceType,
Resource: &autoscaling.ResourceMetricStatus{

View File

@ -108,12 +108,17 @@ func Convert_v1_HorizontalPodAutoscaler_To_autoscaling_HorizontalPodAutoscaler(i
return err
}
out.Spec.Metrics = make([]autoscaling.MetricSpec, len(otherMetrics))
// the normal Spec conversion could have populated out.Spec.Metrics with a single element, so deal with that
outMetrics := make([]autoscaling.MetricSpec, len(otherMetrics)+len(out.Spec.Metrics))
for i, metric := range otherMetrics {
if err := Convert_v1_MetricSpec_To_autoscaling_MetricSpec(&metric, &out.Spec.Metrics[i], s); err != nil {
if err := Convert_v1_MetricSpec_To_autoscaling_MetricSpec(&metric, &outMetrics[i], s); err != nil {
return err
}
}
if out.Spec.Metrics != nil {
outMetrics[len(otherMetrics)] = out.Spec.Metrics[0]
}
out.Spec.Metrics = outMetrics
delete(out.Annotations, autoscaling.MetricSpecsAnnotation)
}

View File

@ -20,6 +20,7 @@ go_library(
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/autoscaling/v1:go_default_library",
"//pkg/apis/autoscaling/v2alpha1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset/typed/autoscaling/v1:go_default_library",
"//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library",
@ -31,6 +32,8 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/api/resource",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
@ -53,6 +56,7 @@ go_test(
"//pkg/api/v1:go_default_library",
"//pkg/apis/autoscaling/install:go_default_library",
"//pkg/apis/autoscaling/v1:go_default_library",
"//pkg/apis/autoscaling/v2alpha1:go_default_library",
"//pkg/apis/extensions/install:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",

View File

@ -17,7 +17,6 @@ limitations under the License.
package podautoscaler
import (
"encoding/json"
"fmt"
"math"
"time"
@ -25,6 +24,9 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
@ -32,10 +34,11 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
autoscaling "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
extensionsv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
unversionedautoscaling "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/autoscaling/v1"
unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/extensions/v1beta1"
autoscalingv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
autoscalingv2 "k8s.io/kubernetes/pkg/apis/autoscaling/v2alpha1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
autoscalingclient "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/autoscaling/v1"
extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/extensions/v1beta1"
autoscalinginformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/autoscaling/v1"
autoscalinglisters "k8s.io/kubernetes/pkg/client/listers/autoscaling/v1"
)
@ -45,11 +48,6 @@ const (
// TODO: make it a flag or HPA spec element.
tolerance = 0.1
defaultTargetCPUUtilizationPercentage = 80
HpaCustomMetricsTargetAnnotationName = "alpha/target.custom-metrics.podautoscaler.kubernetes.io"
HpaCustomMetricsStatusAnnotationName = "alpha/status.custom-metrics.podautoscaler.kubernetes.io"
scaleUpLimitFactor = 2
scaleUpLimitMinimum = 4
)
@ -58,9 +56,27 @@ func calculateScaleUpLimit(currentReplicas int32) int32 {
return int32(math.Max(scaleUpLimitFactor*float64(currentReplicas), scaleUpLimitMinimum))
}
// ConvertToVersionVia is like api.Scheme.ConvertToVersion, but it does so via an internal version first.
// We use it since working with v2alpha1 is convinient here, but we want to use the v1 client (and
// can't just use the internal version). Note that it does *not* guarantee a copy is made -- this should
// be done separately if we need to mutate the object.
func UnsafeConvertToVersionVia(obj runtime.Object, externalVersion schema.GroupVersion) (runtime.Object, error) {
objInt, err := api.Scheme.UnsafeConvertToVersion(obj, schema.GroupVersion{Group: externalVersion.Group, Version: runtime.APIVersionInternal})
if err != nil {
return nil, fmt.Errorf("failed to convert the given object to the internal version: %v", err)
}
objExt, err := api.Scheme.UnsafeConvertToVersion(objInt, externalVersion)
if err != nil {
return nil, fmt.Errorf("failed to convert the given object back to the external version: %v", err)
}
return objExt, err
}
type HorizontalController struct {
scaleNamespacer unversionedextensions.ScalesGetter
hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter
scaleNamespacer extensionsclient.ScalesGetter
hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter
replicaCalc *ReplicaCalculator
eventRecorder record.EventRecorder
@ -76,8 +92,8 @@ var upscaleForbiddenWindow = 3 * time.Minute
func NewHorizontalController(
evtNamespacer v1core.EventsGetter,
scaleNamespacer unversionedextensions.ScalesGetter,
hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter,
scaleNamespacer extensionsclient.ScalesGetter,
hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,
replicaCalc *ReplicaCalculator,
hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,
resyncPeriod time.Duration,
@ -97,19 +113,14 @@ func NewHorizontalController(
hpaInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
hpa := obj.(*autoscaling.HorizontalPodAutoscaler)
hasCPUPolicy := hpa.Spec.TargetCPUUtilizationPercentage != nil
_, hasCustomMetricsPolicy := hpa.Annotations[HpaCustomMetricsTargetAnnotationName]
if !hasCPUPolicy && !hasCustomMetricsPolicy {
controller.eventRecorder.Event(hpa, v1.EventTypeNormal, "DefaultPolicy", "No scaling policy specified - will use default one. See documentation for details")
}
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
err := controller.reconcileAutoscaler(hpa)
if err != nil {
glog.Warningf("Failed to reconcile %s: %v", hpa.Name, err)
}
},
UpdateFunc: func(old, cur interface{}) {
hpa := cur.(*autoscaling.HorizontalPodAutoscaler)
hpa := cur.(*autoscalingv1.HorizontalPodAutoscaler)
err := controller.reconcileAutoscaler(hpa)
if err != nil {
glog.Warningf("Failed to reconcile %s: %v", hpa.Name, err)
@ -139,141 +150,140 @@ func (a *HorizontalController) Run(stopCh <-chan struct{}) {
glog.Infof("Shutting down HPA Controller")
}
// getLastScaleTime returns the hpa's last scale time or the hpa's creation time if the last scale time is nil.
func getLastScaleTime(hpa *autoscaling.HorizontalPodAutoscaler) time.Time {
lastScaleTime := hpa.Status.LastScaleTime
if lastScaleTime == nil {
lastScaleTime = &hpa.CreationTimestamp
}
return lastScaleTime.Time
}
func (a *HorizontalController) computeReplicasForCPUUtilization(hpa *autoscaling.HorizontalPodAutoscaler, scale *extensionsv1beta1.Scale) (int32, *int32, time.Time, error) {
targetUtilization := int32(defaultTargetCPUUtilizationPercentage)
if hpa.Spec.TargetCPUUtilizationPercentage != nil {
targetUtilization = *hpa.Spec.TargetCPUUtilizationPercentage
}
currentReplicas := scale.Status.Replicas
if scale.Status.Selector == nil {
errMsg := "selector is required"
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "SelectorRequired", errMsg)
return 0, nil, time.Time{}, fmt.Errorf(errMsg)
}
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: scale.Status.Selector})
if err != nil {
errMsg := fmt.Sprintf("couldn't convert selector string to a corresponding selector object: %v", err)
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidSelector", errMsg)
return 0, nil, time.Time{}, fmt.Errorf(errMsg)
}
desiredReplicas, utilization, timestamp, err := a.replicaCalc.GetResourceReplicas(currentReplicas, targetUtilization, v1.ResourceCPU, hpa.Namespace, selector)
if err != nil {
lastScaleTime := getLastScaleTime(hpa)
if time.Now().After(lastScaleTime.Add(upscaleForbiddenWindow)) {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetMetrics", err.Error())
} else {
a.eventRecorder.Event(hpa, v1.EventTypeNormal, "MetricsNotAvailableYet", err.Error())
}
return 0, nil, time.Time{}, fmt.Errorf("failed to get CPU utilization: %v", err)
}
if desiredReplicas != currentReplicas {
a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "DesiredReplicasComputed",
"Computed the desired num of replicas: %d (avgCPUutil: %d, current replicas: %d)",
desiredReplicas, utilization, scale.Status.Replicas)
}
return desiredReplicas, &utilization, timestamp, nil
}
// computeReplicasForCustomMetrics computes the desired number of replicas based on the CustomMetrics passed in cmAnnotation
// as json-serialized extensions.CustomMetricsTargetList.
// Returns number of replicas, metric which required highest number of replicas,
// status string (also json-serialized extensions.CustomMetricsCurrentStatusList),
// last timestamp of the metrics involved in computations or error, if occurred.
func (a *HorizontalController) computeReplicasForCustomMetrics(hpa *autoscaling.HorizontalPodAutoscaler, scale *extensionsv1beta1.Scale,
cmAnnotation string) (replicas int32, metric string, status string, timestamp time.Time, err error) {
if cmAnnotation == "" {
return
}
// Computes the desired number of replicas for the metric specifications listed in the HPA, returning the maximum
// of the computed replica counts, a description of the associated metric, and the statuses of all metrics
// computed.
func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *extensions.Scale,
metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {
currentReplicas := scale.Status.Replicas
var targetList extensionsv1beta1.CustomMetricTargetList
if err := json.Unmarshal([]byte(cmAnnotation), &targetList); err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedParseCustomMetricsAnnotation", err.Error())
return 0, "", "", time.Time{}, fmt.Errorf("failed to parse custom metrics annotation: %v", err)
}
if len(targetList.Items) == 0 {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "NoCustomMetricsInAnnotation", err.Error())
return 0, "", "", time.Time{}, fmt.Errorf("no custom metrics in annotation")
}
statuses = make([]autoscalingv2.MetricStatus, len(metricSpecs))
statusList := extensionsv1beta1.CustomMetricCurrentStatusList{
Items: make([]extensionsv1beta1.CustomMetricCurrentStatus, 0),
}
for _, customMetricTarget := range targetList.Items {
if scale.Status.Selector == nil {
for i, metricSpec := range metricSpecs {
if len(scale.Status.Selector) == 0 && len(scale.Status.TargetSelector) == 0 {
errMsg := "selector is required"
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "SelectorRequired", errMsg)
return 0, "", "", time.Time{}, fmt.Errorf("selector is required")
return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
}
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: scale.Status.Selector})
var selector labels.Selector
var err error
if len(scale.Status.Selector) > 0 {
selector = labels.SelectorFromSet(labels.Set(scale.Status.Selector))
err = nil
} else {
selector, err = labels.Parse(scale.Status.TargetSelector)
}
if err != nil {
errMsg := fmt.Sprintf("couldn't convert selector string to a corresponding selector object: %v", err)
errMsg := fmt.Sprintf("couldn't convert selector into a corresponding internal selector object: %v", err)
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidSelector", errMsg)
return 0, "", "", time.Time{}, fmt.Errorf("couldn't convert selector string to a corresponding selector object: %v", err)
return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
}
floatTarget := float64(customMetricTarget.TargetValue.MilliValue()) / 1000.0
replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetMetricReplicas(currentReplicas, floatTarget, fmt.Sprintf("custom/%s", customMetricTarget.Name), hpa.Namespace, selector)
if err != nil {
lastScaleTime := getLastScaleTime(hpa)
if time.Now().After(lastScaleTime.Add(upscaleForbiddenWindow)) {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetCustomMetrics", err.Error())
} else {
a.eventRecorder.Event(hpa, v1.EventTypeNormal, "CustomMetricsNotAvailableYet", err.Error())
var replicaCountProposal int32
var utilizationProposal int64
var timestampProposal time.Time
var metricNameProposal string
switch metricSpec.Type {
case autoscalingv2.ObjectMetricSourceType:
replicaCountProposal, utilizationProposal, timestampProposal, err = a.replicaCalc.GetObjectMetricReplicas(currentReplicas, metricSpec.Object.TargetValue.MilliValue(), metricSpec.Object.MetricName, hpa.Namespace, &metricSpec.Object.Target)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetObjectMetric", err.Error())
return 0, "", nil, time.Time{}, fmt.Errorf("failed to get object metric value: %v", err)
}
metricNameProposal = fmt.Sprintf("%s metric %s", metricSpec.Object.Target.Kind, metricSpec.Object.MetricName)
statuses[i] = autoscalingv2.MetricStatus{
Type: autoscalingv2.ObjectMetricSourceType,
Object: &autoscalingv2.ObjectMetricStatus{
Target: metricSpec.Object.Target,
MetricName: metricSpec.Object.MetricName,
CurrentValue: *resource.NewMilliQuantity(utilizationProposal, resource.DecimalSI),
},
}
case autoscalingv2.PodsMetricSourceType:
replicaCountProposal, utilizationProposal, timestampProposal, err = a.replicaCalc.GetMetricReplicas(currentReplicas, metricSpec.Pods.TargetAverageValue.MilliValue(), metricSpec.Pods.MetricName, hpa.Namespace, selector)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetPodsMetric", err.Error())
return 0, "", nil, time.Time{}, fmt.Errorf("failed to get pods metric value: %v", err)
}
metricNameProposal = fmt.Sprintf("pods metric %s", metricSpec.Pods.MetricName)
statuses[i] = autoscalingv2.MetricStatus{
Type: autoscalingv2.PodsMetricSourceType,
Pods: &autoscalingv2.PodsMetricStatus{
MetricName: metricSpec.Pods.MetricName,
CurrentAverageValue: *resource.NewMilliQuantity(utilizationProposal, resource.DecimalSI),
},
}
case autoscalingv2.ResourceMetricSourceType:
if metricSpec.Resource.TargetAverageValue != nil {
var rawProposal int64
replicaCountProposal, rawProposal, timestampProposal, err = a.replicaCalc.GetRawResourceReplicas(currentReplicas, metricSpec.Resource.TargetAverageValue.MilliValue(), metricSpec.Resource.Name, hpa.Namespace, selector)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetResourceMetric", err.Error())
return 0, "", nil, time.Time{}, fmt.Errorf("failed to get %s utilization: %v", metricSpec.Resource.Name, err)
}
metricNameProposal = fmt.Sprintf("%s resource", metricSpec.Resource.Name)
statuses[i] = autoscalingv2.MetricStatus{
Type: autoscalingv2.ResourceMetricSourceType,
Resource: &autoscalingv2.ResourceMetricStatus{
Name: metricSpec.Resource.Name,
CurrentAverageValue: *resource.NewMilliQuantity(rawProposal, resource.DecimalSI),
},
}
} else {
// set a default utilization percentage if none is set
if metricSpec.Resource.TargetAverageUtilization == nil {
errMsg := "invalid resource metric source: neither a utilization target nor a value target was set"
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetResourceMetric", errMsg)
return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
}
return 0, "", "", time.Time{}, fmt.Errorf("failed to get custom metric value: %v", err)
targetUtilization := *metricSpec.Resource.TargetAverageUtilization
var percentageProposal int32
var rawProposal int64
replicaCountProposal, percentageProposal, rawProposal, timestampProposal, err = a.replicaCalc.GetResourceReplicas(currentReplicas, targetUtilization, metricSpec.Resource.Name, hpa.Namespace, selector)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetResourceMetric", err.Error())
return 0, "", nil, time.Time{}, fmt.Errorf("failed to get %s utilization: %v", metricSpec.Resource.Name, err)
}
metricNameProposal = fmt.Sprintf("%s resource utilization (percentage of request)", metricSpec.Resource.Name)
statuses[i] = autoscalingv2.MetricStatus{
Type: autoscalingv2.ResourceMetricSourceType,
Resource: &autoscalingv2.ResourceMetricStatus{
Name: metricSpec.Resource.Name,
CurrentAverageUtilization: &percentageProposal,
CurrentAverageValue: *resource.NewMilliQuantity(rawProposal, resource.DecimalSI),
},
}
}
default:
errMsg := fmt.Sprintf("unknown metric source type %q", string(metricSpec.Type))
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidMetricSourceType", errMsg)
return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
}
if replicaCountProposal > replicas {
if replicas == 0 || replicaCountProposal > replicas {
timestamp = timestampProposal
replicas = replicaCountProposal
metric = fmt.Sprintf("Custom metric %s", customMetricTarget.Name)
metric = metricNameProposal
}
quantity, err := resource.ParseQuantity(fmt.Sprintf("%.3f", utilizationProposal))
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedSetCustomMetrics", err.Error())
return 0, "", "", time.Time{}, fmt.Errorf("failed to set custom metric value: %v", err)
}
statusList.Items = append(statusList.Items, extensionsv1beta1.CustomMetricCurrentStatus{
Name: customMetricTarget.Name,
CurrentValue: quantity,
})
}
byteStatusList, err := json.Marshal(statusList)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedSerializeCustomMetrics", err.Error())
return 0, "", "", time.Time{}, fmt.Errorf("failed to serialize custom metric status: %v", err)
}
if replicas != currentReplicas {
a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "DesiredReplicasComputedCustomMetric",
"Computed the desired num of replicas: %d, metric: %s, current replicas: %d",
func() *int32 { i := int32(replicas); return &i }(), metric, scale.Status.Replicas)
}
return replicas, metric, string(byteStatusList), timestamp, nil
return replicas, metric, statuses, timestamp, nil
}
func (a *HorizontalController) reconcileAutoscaler(hpa *autoscaling.HorizontalPodAutoscaler) error {
func (a *HorizontalController) reconcileAutoscaler(hpav1 *autoscalingv1.HorizontalPodAutoscaler) error {
// first, convert to autoscaling/v2, which makes our lives easier when calculating metrics
hpaRaw, err := UnsafeConvertToVersionVia(hpav1, autoscalingv2.SchemeGroupVersion)
if err != nil {
a.eventRecorder.Event(hpav1, v1.EventTypeWarning, "FailedConvertHPA", err.Error())
return fmt.Errorf("failed to convert the given HPA to %s: %v", autoscalingv2.SchemeGroupVersion.String(), err)
}
hpa := hpaRaw.(*autoscalingv2.HorizontalPodAutoscaler)
reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)
scale, err := a.scaleNamespacer.Scales(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Kind, hpa.Spec.ScaleTargetRef.Name)
@ -283,14 +293,10 @@ func (a *HorizontalController) reconcileAutoscaler(hpa *autoscaling.HorizontalPo
}
currentReplicas := scale.Status.Replicas
cpuDesiredReplicas := int32(0)
cpuCurrentUtilization := new(int32)
cpuTimestamp := time.Time{}
cmDesiredReplicas := int32(0)
cmMetric := ""
cmStatus := ""
cmTimestamp := time.Time{}
var metricStatuses []autoscalingv2.MetricStatus
metricDesiredReplicas := int32(0)
metricName := ""
metricTimestamp := time.Time{}
desiredReplicas := int32(0)
rescaleReason := ""
@ -312,35 +318,20 @@ func (a *HorizontalController) reconcileAutoscaler(hpa *autoscaling.HorizontalPo
rescaleReason = "Current number of replicas must be greater than 0"
desiredReplicas = 1
} else {
// All basic scenarios covered, the state should be sane, lets use metrics.
cmAnnotation, cmAnnotationFound := hpa.Annotations[HpaCustomMetricsTargetAnnotationName]
if hpa.Spec.TargetCPUUtilizationPercentage != nil || !cmAnnotationFound {
cpuDesiredReplicas, cpuCurrentUtilization, cpuTimestamp, err = a.computeReplicasForCPUUtilization(hpa, scale)
if err != nil {
a.updateCurrentReplicasInStatus(hpa, currentReplicas)
return fmt.Errorf("failed to compute desired number of replicas based on CPU utilization for %s: %v", reference, err)
}
metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics)
if err != nil {
a.updateCurrentReplicasInStatus(hpa, currentReplicas)
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
}
if cmAnnotationFound {
cmDesiredReplicas, cmMetric, cmStatus, cmTimestamp, err = a.computeReplicasForCustomMetrics(hpa, scale, cmAnnotation)
if err != nil {
a.updateCurrentReplicasInStatus(hpa, currentReplicas)
return fmt.Errorf("failed to compute desired number of replicas based on Custom Metrics for %s: %v", reference, err)
}
}
glog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, timestamp, reference)
rescaleMetric := ""
if cpuDesiredReplicas > desiredReplicas {
desiredReplicas = cpuDesiredReplicas
timestamp = cpuTimestamp
rescaleMetric = "CPU utilization"
}
if cmDesiredReplicas > desiredReplicas {
desiredReplicas = cmDesiredReplicas
timestamp = cmTimestamp
rescaleMetric = cmMetric
if metricDesiredReplicas > desiredReplicas {
desiredReplicas = metricDesiredReplicas
timestamp = metricTimestamp
rescaleMetric = metricName
}
if desiredReplicas > currentReplicas {
rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
@ -382,13 +373,14 @@ func (a *HorizontalController) reconcileAutoscaler(hpa *autoscaling.HorizontalPo
glog.Infof("Successfull rescale of %s, old size: %d, new size: %d, reason: %s",
hpa.Name, currentReplicas, desiredReplicas, rescaleReason)
} else {
glog.V(4).Infof("decided not to scale %s to %v (last scale time was %s)", reference, desiredReplicas, hpa.Status.LastScaleTime)
desiredReplicas = currentReplicas
}
return a.updateStatus(hpa, currentReplicas, desiredReplicas, cpuCurrentUtilization, cmStatus, rescale)
return a.updateStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
}
func shouldScale(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, timestamp time.Time) bool {
func shouldScale(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, timestamp time.Time) bool {
if desiredReplicas == currentReplicas {
return false
}
@ -408,32 +400,29 @@ func shouldScale(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas, desi
if desiredReplicas > currentReplicas && hpa.Status.LastScaleTime.Add(upscaleForbiddenWindow).Before(timestamp) {
return true
}
return false
}
func (a *HorizontalController) updateCurrentReplicasInStatus(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas int32) {
err := a.updateStatus(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentCPUUtilizationPercentage, hpa.Annotations[HpaCustomMetricsStatusAnnotationName], false)
func (a *HorizontalController) updateCurrentReplicasInStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas int32) {
err := a.updateStatus(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentMetrics, false)
if err != nil {
utilruntime.HandleError(err)
}
}
func (a *HorizontalController) updateStatus(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, cpuCurrentUtilization *int32, cmStatus string, rescale bool) error {
// Make a copy so we don't mutate the object in the shared cache
copy, err := api.Scheme.DeepCopy(hpa)
func (a *HorizontalController) updateStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, metricStatuses []autoscalingv2.MetricStatus, rescale bool) error {
// make a copy so that we don't mutate the shared informer cache
hpaCopy, err := api.Scheme.DeepCopy(hpa)
if err != nil {
return nil
}
hpa = copy.(*autoscaling.HorizontalPodAutoscaler)
hpa.Status = autoscaling.HorizontalPodAutoscalerStatus{
CurrentReplicas: currentReplicas,
DesiredReplicas: desiredReplicas,
CurrentCPUUtilizationPercentage: cpuCurrentUtilization,
LastScaleTime: hpa.Status.LastScaleTime,
}
if cmStatus != "" {
hpa.Annotations[HpaCustomMetricsStatusAnnotationName] = cmStatus
hpa = hpaCopy.(*autoscalingv2.HorizontalPodAutoscaler)
hpa.Status = autoscalingv2.HorizontalPodAutoscalerStatus{
CurrentReplicas: currentReplicas,
DesiredReplicas: desiredReplicas,
LastScaleTime: hpa.Status.LastScaleTime,
CurrentMetrics: metricStatuses,
}
if rescale {
@ -441,7 +430,15 @@ func (a *HorizontalController) updateStatus(hpa *autoscaling.HorizontalPodAutosc
hpa.Status.LastScaleTime = &now
}
_, err = a.hpaNamespacer.HorizontalPodAutoscalers(hpa.Namespace).UpdateStatus(hpa)
// convert back to autoscalingv1
hpaRaw, err := UnsafeConvertToVersionVia(hpa, autoscalingv1.SchemeGroupVersion)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedConvertHPA", err.Error())
return fmt.Errorf("failed to convert the given HPA to %s: %v", autoscalingv2.SchemeGroupVersion.String(), err)
}
hpav1 := hpaRaw.(*autoscalingv1.HorizontalPodAutoscaler)
_, err = a.hpaNamespacer.HorizontalPodAutoscalers(hpav1.Namespace).UpdateStatus(hpav1)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedUpdateStatus", err.Error())
return fmt.Errorf("failed to update status for %s: %v", hpa.Name, err)

View File

@ -37,7 +37,8 @@ import (
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
autoscaling "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
autoscalingv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
autoscalingv2 "k8s.io/kubernetes/pkg/apis/autoscaling/v2alpha1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
@ -91,12 +92,12 @@ type testCase struct {
reportedLevels []uint64
reportedCPURequests []resource.Quantity
reportedPodReadiness []v1.ConditionStatus
cmTarget *extensions.CustomMetricTargetList
scaleUpdated bool
statusUpdated bool
eventCreated bool
verifyEvents bool
useMetricsApi bool
metricsTarget []autoscalingv2.MetricSpec
// Channel with names of HPA objects which we have reconciled.
processed chan string
@ -127,9 +128,8 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
namespace := "test-namespace"
hpaName := "test-hpa"
podNamePrefix := "test-pod"
selector := &metav1.LabelSelector{
MatchLabels: map[string]string{"name": podNamePrefix},
}
// TODO: also test with TargetSelector
selector := map[string]string{"name": podNamePrefix}
tc.Lock()
@ -157,16 +157,16 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
tc.Lock()
defer tc.Unlock()
obj := &autoscaling.HorizontalPodAutoscalerList{
Items: []autoscaling.HorizontalPodAutoscaler{
obj := &autoscalingv2.HorizontalPodAutoscalerList{
Items: []autoscalingv2.HorizontalPodAutoscaler{
{
ObjectMeta: metav1.ObjectMeta{
Name: hpaName,
Namespace: namespace,
SelfLink: "experimental/v1/namespaces/" + namespace + "/horizontalpodautoscalers/" + hpaName,
},
Spec: autoscaling.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscaling.CrossVersionObjectReference{
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
Kind: tc.resource.kind,
Name: tc.resource.name,
APIVersion: tc.resource.apiVersion,
@ -174,7 +174,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
MinReplicas: &tc.minReplicas,
MaxReplicas: tc.maxReplicas,
},
Status: autoscaling.HorizontalPodAutoscalerStatus{
Status: autoscalingv2.HorizontalPodAutoscalerStatus{
CurrentReplicas: tc.initialReplicas,
DesiredReplicas: tc.initialReplicas,
},
@ -183,17 +183,39 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
}
if tc.CPUTarget > 0.0 {
obj.Items[0].Spec.TargetCPUUtilizationPercentage = &tc.CPUTarget
}
if tc.cmTarget != nil {
b, err := json.Marshal(tc.cmTarget)
if err != nil {
t.Fatalf("Failed to marshal cm: %v", err)
obj.Items[0].Spec.Metrics = []autoscalingv2.MetricSpec{
{
Type: autoscalingv2.ResourceMetricSourceType,
Resource: &autoscalingv2.ResourceMetricSource{
Name: v1.ResourceCPU,
TargetAverageUtilization: &tc.CPUTarget,
},
},
}
obj.Items[0].Annotations = make(map[string]string)
obj.Items[0].Annotations[HpaCustomMetricsTargetAnnotationName] = string(b)
}
return true, obj, nil
if len(tc.metricsTarget) > 0 {
obj.Items[0].Spec.Metrics = append(obj.Items[0].Spec.Metrics, tc.metricsTarget...)
}
if len(obj.Items[0].Spec.Metrics) == 0 {
// manually add in the defaulting logic
obj.Items[0].Spec.Metrics = []autoscalingv2.MetricSpec{
{
Type: autoscalingv2.ResourceMetricSourceType,
Resource: &autoscalingv2.ResourceMetricSource{
Name: v1.ResourceCPU,
},
},
}
}
// and... convert to autoscaling v1 to return the right type
objv1, err := UnsafeConvertToVersionVia(obj, autoscalingv1.SchemeGroupVersion)
if err != nil {
return true, nil, err
}
return true, objv1, nil
})
fakeClient.AddReactor("get", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
@ -210,7 +232,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
},
Status: extensions.ScaleStatus{
Replicas: tc.initialReplicas,
Selector: selector.MatchLabels,
Selector: selector,
},
}
return true, obj, nil
@ -230,7 +252,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
},
Status: extensions.ScaleStatus{
Replicas: tc.initialReplicas,
Selector: selector.MatchLabels,
Selector: selector,
},
}
return true, obj, nil
@ -250,7 +272,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
},
Status: extensions.ScaleStatus{
Replicas: tc.initialReplicas,
Selector: selector.MatchLabels,
Selector: selector,
},
}
return true, obj, nil
@ -411,7 +433,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
tc.Lock()
defer tc.Unlock()
obj := action.(core.UpdateAction).GetObject().(*autoscaling.HorizontalPodAutoscaler)
obj := action.(core.UpdateAction).GetObject().(*autoscalingv1.HorizontalPodAutoscaler)
assert.Equal(t, namespace, obj.Namespace, "the HPA namespace should be as expected")
assert.Equal(t, hpaName, obj.Name, "the HPA name should be as expected")
assert.Equal(t, tc.desiredReplicas, obj.Status.DesiredReplicas, "the desired replica count reported in the object status should be as expected")
@ -455,7 +477,7 @@ func (tc *testCase) runTest(t *testing.T) {
if tc.verifyEvents {
switch obj.Reason {
case "SuccessfulRescale":
assert.Equal(t, fmt.Sprintf("New size: %d; reason: CPU utilization above target", tc.desiredReplicas), obj.Message)
assert.Equal(t, fmt.Sprintf("New size: %d; reason: cpu resource utilization (percentage of request) above target", tc.desiredReplicas), obj.Message)
case "DesiredReplicasComputed":
assert.Equal(t, fmt.Sprintf(
"Computed the desired num of replicas: %d (avgCPUutil: %d, current replicas: %d)",
@ -504,58 +526,6 @@ func (tc *testCase) runTest(t *testing.T) {
tc.verifyResults(t)
}
func TestDefaultScaleUpRC(t *testing.T) {
tc := testCase{
minReplicas: 2,
maxReplicas: 6,
initialReplicas: 4,
desiredReplicas: 5,
verifyCPUCurrent: true,
reportedLevels: []uint64{900, 950, 950, 1000},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
}
tc.runTest(t)
}
func TestDefaultScaleUpDeployment(t *testing.T) {
tc := testCase{
minReplicas: 2,
maxReplicas: 6,
initialReplicas: 4,
desiredReplicas: 5,
verifyCPUCurrent: true,
reportedLevels: []uint64{900, 950, 950, 1000},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
resource: &fakeResource{
name: "test-dep",
apiVersion: "extensions/v1beta1",
kind: "deployments",
},
}
tc.runTest(t)
}
func TestDefaultScaleUpReplicaSet(t *testing.T) {
tc := testCase{
minReplicas: 2,
maxReplicas: 6,
initialReplicas: 4,
desiredReplicas: 5,
verifyCPUCurrent: true,
reportedLevels: []uint64{900, 950, 950, 1000},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
resource: &fakeResource{
name: "test-replicaset",
apiVersion: "extensions/v1beta1",
kind: "replicasets",
},
}
tc.runTest(t)
}
func TestScaleUp(t *testing.T) {
tc := testCase{
minReplicas: 2,
@ -652,11 +622,14 @@ func TestScaleUpCM(t *testing.T) {
initialReplicas: 3,
desiredReplicas: 4,
CPUTarget: 0,
cmTarget: &extensions.CustomMetricTargetList{
Items: []extensions.CustomMetricTarget{{
Name: "qps",
TargetValue: resource.MustParse("15.0"),
}},
metricsTarget: []autoscalingv2.MetricSpec{
{
Type: autoscalingv2.PodsMetricSourceType,
Pods: &autoscalingv2.PodsMetricSource{
MetricName: "qps",
TargetAverageValue: resource.MustParse("15.0"),
},
},
},
reportedLevels: []uint64{20, 10, 30},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
@ -671,11 +644,14 @@ func TestScaleUpCMUnreadyLessScale(t *testing.T) {
initialReplicas: 3,
desiredReplicas: 4,
CPUTarget: 0,
cmTarget: &extensions.CustomMetricTargetList{
Items: []extensions.CustomMetricTarget{{
Name: "qps",
TargetValue: resource.MustParse("15.0"),
}},
metricsTarget: []autoscalingv2.MetricSpec{
{
Type: autoscalingv2.PodsMetricSourceType,
Pods: &autoscalingv2.PodsMetricSource{
MetricName: "qps",
TargetAverageValue: resource.MustParse("15.0"),
},
},
},
reportedLevels: []uint64{50, 10, 30},
reportedPodReadiness: []v1.ConditionStatus{v1.ConditionTrue, v1.ConditionTrue, v1.ConditionFalse},
@ -691,11 +667,14 @@ func TestScaleUpCMUnreadyNoScaleWouldScaleDown(t *testing.T) {
initialReplicas: 3,
desiredReplicas: 3,
CPUTarget: 0,
cmTarget: &extensions.CustomMetricTargetList{
Items: []extensions.CustomMetricTarget{{
Name: "qps",
TargetValue: resource.MustParse("15.0"),
}},
metricsTarget: []autoscalingv2.MetricSpec{
{
Type: autoscalingv2.PodsMetricSourceType,
Pods: &autoscalingv2.PodsMetricSource{
MetricName: "qps",
TargetAverageValue: resource.MustParse("15.0"),
},
},
},
reportedLevels: []uint64{50, 15, 30},
reportedPodReadiness: []v1.ConditionStatus{v1.ConditionFalse, v1.ConditionTrue, v1.ConditionFalse},
@ -704,20 +683,6 @@ func TestScaleUpCMUnreadyNoScaleWouldScaleDown(t *testing.T) {
tc.runTest(t)
}
func TestDefaultScaleDown(t *testing.T) {
tc := testCase{
minReplicas: 2,
maxReplicas: 6,
initialReplicas: 5,
desiredReplicas: 4,
verifyCPUCurrent: true,
reportedLevels: []uint64{400, 500, 600, 700, 800},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,
}
tc.runTest(t)
}
func TestScaleDown(t *testing.T) {
tc := testCase{
minReplicas: 2,
@ -740,11 +705,15 @@ func TestScaleDownCM(t *testing.T) {
initialReplicas: 5,
desiredReplicas: 3,
CPUTarget: 0,
cmTarget: &extensions.CustomMetricTargetList{
Items: []extensions.CustomMetricTarget{{
Name: "qps",
TargetValue: resource.MustParse("20"),
}}},
metricsTarget: []autoscalingv2.MetricSpec{
{
Type: autoscalingv2.PodsMetricSourceType,
Pods: &autoscalingv2.PodsMetricSource{
MetricName: "qps",
TargetAverageValue: resource.MustParse("20.0"),
},
},
},
reportedLevels: []uint64{12, 12, 12, 12, 12},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
}
@ -788,11 +757,15 @@ func TestToleranceCM(t *testing.T) {
maxReplicas: 5,
initialReplicas: 3,
desiredReplicas: 3,
cmTarget: &extensions.CustomMetricTargetList{
Items: []extensions.CustomMetricTarget{{
Name: "qps",
TargetValue: resource.MustParse("20"),
}}},
metricsTarget: []autoscalingv2.MetricSpec{
{
Type: autoscalingv2.PodsMetricSourceType,
Pods: &autoscalingv2.PodsMetricSource{
MetricName: "qps",
TargetAverageValue: resource.MustParse("20.0"),
},
},
},
reportedLevels: []uint64{20, 21, 21},
reportedCPURequests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")},
}
@ -1058,7 +1031,7 @@ func TestScaleUpRCImmediately(t *testing.T) {
maxReplicas: 6,
initialReplicas: 1,
desiredReplicas: 2,
verifyCPUCurrent: true,
verifyCPUCurrent: false,
reportedLevels: []uint64{0, 0, 0, 0},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsApi: true,

View File

@ -17,6 +17,7 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/apis/autoscaling/v2alpha1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library",
"//vendor:github.com/golang/glog",

View File

@ -29,28 +29,29 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/api/v1"
autoscaling "k8s.io/kubernetes/pkg/apis/autoscaling/v2alpha1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
)
// PodResourceInfo contains pod resourcemetric values as a map from pod names to
// metric values
type PodResourceInfo map[string]int64
// PodMetricsInfo contains pod resourcemetric values as a map from pod names to
// metric values
type PodMetricsInfo map[string]float64
// PodMetricsInfo contains pod metric values as a map from pod names to
// metric values (the metric values are expected to be the metric as a milli-value)
type PodMetricsInfo map[string]int64
// MetricsClient knows how to query a remote interface to retrieve container-level
// resource metrics as well as pod-level arbitrary metrics
type MetricsClient interface {
// GetResourceMetric gets the given resource metric (and an associated oldest timestamp)
// for all pods matching the specified selector in the given namespace
GetResourceMetric(resource v1.ResourceName, namespace string, selector labels.Selector) (PodResourceInfo, time.Time, error)
GetResourceMetric(resource v1.ResourceName, namespace string, selector labels.Selector) (PodMetricsInfo, time.Time, error)
// GetRawMetric gets the given metric (and an associated oldest timestamp)
// for all pods matching the specified selector in the given namespace
GetRawMetric(metricName string, namespace string, selector labels.Selector) (PodMetricsInfo, time.Time, error)
// GetObjectMetric gets the given metric (and an associated timestamp) for the given
// object in the given namespace
GetObjectMetric(metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference) (int64, time.Time, error)
}
const (
@ -80,7 +81,7 @@ func NewHeapsterMetricsClient(client clientset.Interface, namespace, scheme, ser
}
}
func (h *HeapsterMetricsClient) GetResourceMetric(resource v1.ResourceName, namespace string, selector labels.Selector) (PodResourceInfo, time.Time, error) {
func (h *HeapsterMetricsClient) GetResourceMetric(resource v1.ResourceName, namespace string, selector labels.Selector) (PodMetricsInfo, time.Time, error) {
metricPath := fmt.Sprintf("/apis/metrics/v1alpha1/namespaces/%s/pods", namespace)
params := map[string]string{"labelSelector": selector.String()}
@ -88,7 +89,7 @@ func (h *HeapsterMetricsClient) GetResourceMetric(resource v1.ResourceName, name
ProxyGet(h.heapsterScheme, h.heapsterService, h.heapsterPort, metricPath, params).
DoRaw()
if err != nil {
return nil, time.Time{}, fmt.Errorf("failed to get heapster service: %v", err)
return nil, time.Time{}, fmt.Errorf("failed to get pod resource metrics: %v", err)
}
glog.V(4).Infof("Heapster metrics result: %s", string(resultRaw))
@ -103,7 +104,7 @@ func (h *HeapsterMetricsClient) GetResourceMetric(resource v1.ResourceName, name
return nil, time.Time{}, fmt.Errorf("no metrics returned from heapster")
}
res := make(PodResourceInfo, len(metrics.Items))
res := make(PodMetricsInfo, len(metrics.Items))
for _, m := range metrics.Items {
podSum := int64(0)
@ -155,7 +156,7 @@ func (h *HeapsterMetricsClient) GetRawMetric(metricName string, namespace string
ProxyGet(h.heapsterScheme, h.heapsterService, h.heapsterPort, metricPath, map[string]string{"start": startTime.Format(time.RFC3339)}).
DoRaw()
if err != nil {
return nil, time.Time{}, fmt.Errorf("failed to get heapster service: %v", err)
return nil, time.Time{}, fmt.Errorf("failed to get pod metrics: %v", err)
}
var metrics heapster.MetricResultList
@ -192,7 +193,11 @@ func (h *HeapsterMetricsClient) GetRawMetric(metricName string, namespace string
return res, *timestamp, nil
}
func collapseTimeSamples(metrics heapster.MetricResult, duration time.Duration) (float64, time.Time, bool) {
func (h *HeapsterMetricsClient) GetObjectMetric(metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference) (int64, time.Time, error) {
return 0, time.Time{}, fmt.Errorf("object metrics are not yet supported")
}
func collapseTimeSamples(metrics heapster.MetricResult, duration time.Duration) (int64, time.Time, bool) {
floatSum := float64(0)
intSum := int64(0)
intSumCount := 0
@ -217,9 +222,9 @@ func collapseTimeSamples(metrics heapster.MetricResult, duration time.Duration)
}
if newest.FloatValue != nil {
return floatSum / float64(floatSumCount), newest.Timestamp, true
return int64(floatSum / float64(floatSumCount) * 1000), newest.Timestamp, true
} else {
return float64(intSum / int64(intSumCount)), newest.Timestamp, true
return (intSum * 1000) / int64(intSumCount), newest.Timestamp, true
}
}

View File

@ -64,9 +64,8 @@ type metricPoint struct {
}
type testCase struct {
desiredResourceValues PodResourceInfo
desiredMetricValues PodMetricsInfo
desiredError error
desiredMetricValues PodMetricsInfo
desiredError error
replicas int
targetTimestamp int
@ -190,7 +189,7 @@ func buildPod(namespace, podName string, podLabels map[string]string, phase v1.P
}
}
func (tc *testCase) verifyResults(t *testing.T, metrics interface{}, timestamp time.Time, err error) {
func (tc *testCase) verifyResults(t *testing.T, metrics PodMetricsInfo, timestamp time.Time, err error) {
if tc.desiredError != nil {
assert.Error(t, err, "there should be an error retrieving the metrics")
assert.Contains(t, fmt.Sprintf("%v", err), fmt.Sprintf("%v", tc.desiredError), "the error message should be eas expected")
@ -199,13 +198,7 @@ func (tc *testCase) verifyResults(t *testing.T, metrics interface{}, timestamp t
assert.NoError(t, err, "there should be no error retrieving the metrics")
assert.NotNil(t, metrics, "there should be metrics returned")
if metricsInfo, wasRaw := metrics.(PodMetricsInfo); wasRaw {
assert.Equal(t, tc.desiredMetricValues, metricsInfo, "the raw metrics values should be as expected")
} else if resourceInfo, wasResource := metrics.(PodResourceInfo); wasResource {
assert.Equal(t, tc.desiredResourceValues, resourceInfo, "the resource metrics values be been as expected")
} else {
assert.False(t, true, "should return either resource metrics info or raw metrics info")
}
assert.Equal(t, tc.desiredMetricValues, metrics, "the metrics values should be as expected")
targetTimestamp := fixedTimestamp.Add(time.Duration(tc.targetTimestamp) * time.Minute)
assert.True(t, targetTimestamp.Equal(timestamp), fmt.Sprintf("the timestamp should be as expected (%s) but was %s", targetTimestamp, timestamp))
@ -227,7 +220,7 @@ func (tc *testCase) runTest(t *testing.T) {
func TestCPU(t *testing.T) {
tc := testCase{
replicas: 3,
desiredResourceValues: PodResourceInfo{
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 5000, "test-pod-1": 5000, "test-pod-2": 5000,
},
resourceName: v1.ResourceCPU,
@ -241,7 +234,7 @@ func TestQPS(t *testing.T) {
tc := testCase{
replicas: 3,
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 10, "test-pod-1": 20, "test-pod-2": 10,
"test-pod-0": 10000, "test-pod-1": 20000, "test-pod-2": 10000,
},
metricName: "qps",
targetTimestamp: 1,
@ -266,7 +259,7 @@ func TestQpsSumEqualZero(t *testing.T) {
func TestCPUMoreMetrics(t *testing.T) {
tc := testCase{
replicas: 5,
desiredResourceValues: PodResourceInfo{
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 5000, "test-pod-1": 5000, "test-pod-2": 5000,
"test-pod-3": 5000, "test-pod-4": 5000,
},
@ -280,7 +273,7 @@ func TestCPUMoreMetrics(t *testing.T) {
func TestCPUMissingMetrics(t *testing.T) {
tc := testCase{
replicas: 3,
desiredResourceValues: PodResourceInfo{
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 4000,
},
resourceName: v1.ResourceCPU,
@ -326,7 +319,7 @@ func TestQpsEmptyEntries(t *testing.T) {
replicas: 3,
metricName: "qps",
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 4000, "test-pod-2": 2000,
"test-pod-0": 4000000, "test-pod-2": 2000000,
},
targetTimestamp: 4,
reportedMetricsPoints: [][]metricPoint{{{4000, 4}}, {}, {{2000, 4}}},
@ -348,7 +341,7 @@ func TestCPUEmptyMetricsForOnePod(t *testing.T) {
tc := testCase{
replicas: 3,
resourceName: v1.ResourceCPU,
desiredResourceValues: PodResourceInfo{
desiredMetricValues: PodMetricsInfo{
"test-pod-0": 100, "test-pod-1": 700,
},
reportedPodMetrics: [][]int64{{100}, {300, 400}, {}},

View File

@ -22,10 +22,11 @@ import (
// GetResourceUtilizationRatio takes in a set of metrics, a set of matching requests,
// and a target utilization percentage, and calcuates the the ratio of
// desired to actual utilization (returning that and the actual utilization)
func GetResourceUtilizationRatio(metrics PodResourceInfo, requests map[string]int64, targetUtilization int32) (float64, int32, error) {
// desired to actual utilization (returning that, the actual utilization, and the raw average value)
func GetResourceUtilizationRatio(metrics PodMetricsInfo, requests map[string]int64, targetUtilization int32) (utilizationRatio float64, currentUtilization int32, rawAverageValue int64, err error) {
metricsTotal := int64(0)
requestsTotal := int64(0)
numEntries := 0
for podName, metricValue := range metrics {
request, hasRequest := requests[podName]
@ -36,29 +37,30 @@ func GetResourceUtilizationRatio(metrics PodResourceInfo, requests map[string]in
metricsTotal += metricValue
requestsTotal += request
numEntries++
}
// if the set of requests is completely disjoint from the set of metrics,
// then we could have an issue where the requests total is zero
if requestsTotal == 0 {
return 0, 0, fmt.Errorf("no metrics returned matched known pods")
return 0, 0, 0, fmt.Errorf("no metrics returned matched known pods")
}
currentUtilization := int32((metricsTotal * 100) / requestsTotal)
currentUtilization = int32((metricsTotal * 100) / requestsTotal)
return float64(currentUtilization) / float64(targetUtilization), currentUtilization, nil
return float64(currentUtilization) / float64(targetUtilization), currentUtilization, metricsTotal / int64(numEntries), nil
}
// GetMetricUtilizationRatio takes in a set of metrics and a target utilization value,
// and calcuates the ratio of desired to actual utilization
// (returning that and the actual utilization)
func GetMetricUtilizationRatio(metrics PodMetricsInfo, targetUtilization float64) (float64, float64) {
metricsTotal := float64(0)
func GetMetricUtilizationRatio(metrics PodMetricsInfo, targetUtilization int64) (utilizationRatio float64, currentUtilization int64) {
metricsTotal := int64(0)
for _, metricValue := range metrics {
metricsTotal += metricValue
}
currentUtilization := metricsTotal / float64(len(metrics))
currentUtilization = metricsTotal / int64(len(metrics))
return currentUtilization / targetUtilization, currentUtilization
return float64(currentUtilization) / float64(targetUtilization), currentUtilization
}

View File

@ -25,16 +25,17 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api/v1"
v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
autoscaling "k8s.io/kubernetes/pkg/apis/autoscaling/v2alpha1"
v1coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
)
type ReplicaCalculator struct {
metricsClient metricsclient.MetricsClient
podsGetter v1core.PodsGetter
podsGetter v1coreclient.PodsGetter
}
func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, podsGetter v1core.PodsGetter) *ReplicaCalculator {
func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, podsGetter v1coreclient.PodsGetter) *ReplicaCalculator {
return &ReplicaCalculator{
metricsClient: metricsClient,
podsGetter: podsGetter,
@ -43,19 +44,19 @@ func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, podsGetter
// GetResourceReplicas calculates the desired replica count based on a target resource utilization percentage
// of the given resource for pods matching the given selector in the given namespace, and the current replica count
func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUtilization int32, resource v1.ResourceName, namespace string, selector labels.Selector) (replicaCount int32, utilization int32, timestamp time.Time, err error) {
func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUtilization int32, resource v1.ResourceName, namespace string, selector labels.Selector) (replicaCount int32, utilization int32, rawUtilization int64, timestamp time.Time, err error) {
metrics, timestamp, err := c.metricsClient.GetResourceMetric(resource, namespace, selector)
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err)
return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err)
}
podList, err := c.podsGetter.Pods(namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
}
if len(podList.Items) == 0 {
return 0, 0, time.Time{}, fmt.Errorf("no pods returned by selector while calculating replica count")
return 0, 0, 0, time.Time{}, fmt.Errorf("no pods returned by selector while calculating replica count")
}
requests := make(map[string]int64, len(podList.Items))
@ -69,7 +70,7 @@ func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUti
if containerRequest, ok := container.Resources.Requests[resource]; ok {
podSum += containerRequest.MilliValue()
} else {
return 0, 0, time.Time{}, fmt.Errorf("missing request for %s on container %s in pod %s/%s", resource, container.Name, namespace, pod.Name)
return 0, 0, 0, time.Time{}, fmt.Errorf("missing request for %s on container %s in pod %s/%s", resource, container.Name, namespace, pod.Name)
}
}
@ -92,23 +93,23 @@ func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUti
}
if len(metrics) == 0 {
return 0, 0, time.Time{}, fmt.Errorf("did not receive metrics for any ready pods")
return 0, 0, 0, time.Time{}, fmt.Errorf("did not receive metrics for any ready pods")
}
usageRatio, utilization, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization)
usageRatio, utilization, rawUtilization, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization)
if err != nil {
return 0, 0, time.Time{}, err
return 0, 0, 0, time.Time{}, err
}
rebalanceUnready := len(unreadyPods) > 0 && usageRatio > 1.0
if !rebalanceUnready && len(missingPods) == 0 {
if math.Abs(1.0-usageRatio) <= tolerance {
// return the current replicas if the change would be too small
return currentReplicas, utilization, timestamp, nil
return currentReplicas, utilization, rawUtilization, timestamp, nil
}
// if we don't have any unready or missing pods, we can calculate the new replica count now
return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, timestamp, nil
return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, rawUtilization, timestamp, nil
}
if len(missingPods) > 0 {
@ -133,37 +134,56 @@ func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUti
}
// re-run the utilization calculation with our new numbers
newUsageRatio, _, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization)
newUsageRatio, _, _, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization)
if err != nil {
return 0, utilization, time.Time{}, err
return 0, utilization, rawUtilization, time.Time{}, err
}
if math.Abs(1.0-newUsageRatio) <= tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) {
// return the current replicas if the change would be too small,
// or if the new usage ratio would cause a change in scale direction
return currentReplicas, utilization, timestamp, nil
return currentReplicas, utilization, rawUtilization, timestamp, nil
}
// return the result, where the number of replicas considered is
// however many replicas factored into our calculation
return int32(math.Ceil(newUsageRatio * float64(len(metrics)))), utilization, timestamp, nil
return int32(math.Ceil(newUsageRatio * float64(len(metrics)))), utilization, rawUtilization, timestamp, nil
}
// GetMetricReplicas calculates the desired replica count based on a target resource utilization percentage
// of the given resource for pods matching the given selector in the given namespace, and the current replica count
func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtilization float64, metricName string, namespace string, selector labels.Selector) (replicaCount int32, utilization float64, timestamp time.Time, err error) {
metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector)
// GetRawResourceReplicas calculates the desired replica count based on a target resource utilization (as a raw milli-value)
// for pods matching the given selector in the given namespace, and the current replica count
func (c *ReplicaCalculator) GetRawResourceReplicas(currentReplicas int32, targetUtilization int64, resource v1.ResourceName, namespace string, selector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
metrics, timestamp, err := c.metricsClient.GetResourceMetric(resource, namespace, selector)
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err)
return 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err)
}
replicaCount, utilization, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUtilization, namespace, selector)
return replicaCount, utilization, timestamp, err
}
// GetMetricReplicas calculates the desired replica count based on a target metric utilization
// (as a milli-value) for pods matching the given selector in the given namespace, and the
// current replica count
func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtilization int64, metricName string, namespace string, selector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector)
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err)
}
replicaCount, utilization, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUtilization, namespace, selector)
return replicaCount, utilization, timestamp, err
}
// calcPlainMetricReplicas calculates the desired replicas for plain (i.e. non-utilization percentage) metrics.
func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector) (replicaCount int32, utilization int64, err error) {
podList, err := c.podsGetter.Pods(namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
return 0, 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
}
if len(podList.Items) == 0 {
return 0, 0, time.Time{}, fmt.Errorf("no pods returned by selector while calculating replica count")
return 0, 0, fmt.Errorf("no pods returned by selector while calculating replica count")
}
readyPodCount := 0
@ -188,24 +208,21 @@ func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtili
}
if len(metrics) == 0 {
return 0, 0, time.Time{}, fmt.Errorf("did not recieve metrics for any ready pods")
return 0, 0, fmt.Errorf("did not recieve metrics for any ready pods")
}
usageRatio, utilization := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization)
if err != nil {
return 0, 0, time.Time{}, err
}
rebalanceUnready := len(unreadyPods) > 0 && usageRatio > 1.0
if !rebalanceUnready && len(missingPods) == 0 {
if math.Abs(1.0-usageRatio) <= tolerance {
// return the current replicas if the change would be too small
return currentReplicas, utilization, timestamp, nil
return currentReplicas, utilization, nil
}
// if we don't have any unready or missing pods, we can calculate the new replica count now
return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, timestamp, nil
return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, nil
}
if len(missingPods) > 0 {
@ -232,16 +249,33 @@ func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtili
// re-run the utilization calculation with our new numbers
newUsageRatio, _ := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization)
if err != nil {
return 0, utilization, time.Time{}, err
return 0, utilization, err
}
if math.Abs(1.0-newUsageRatio) <= tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) {
// return the current replicas if the change would be too small,
// or if the new usage ratio would cause a change in scale direction
return currentReplicas, utilization, timestamp, nil
return currentReplicas, utilization, nil
}
// return the result, where the number of replicas considered is
// however many replicas factored into our calculation
return int32(math.Ceil(newUsageRatio * float64(len(metrics)))), utilization, timestamp, nil
return int32(math.Ceil(newUsageRatio * float64(len(metrics)))), utilization, nil
}
// GetObjectMetricReplicas calculates the desired replica count based on a target metric utilization (as a milli-value)
// for the given object in the given namespace, and the current replica count.
func (c *ReplicaCalculator) GetObjectMetricReplicas(currentReplicas int32, targetUtilization int64, metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
utilization, timestamp, err = c.metricsClient.GetObjectMetric(metricName, namespace, objectRef)
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v on %s %s/%s", metricName, objectRef.Kind, namespace, objectRef.Name, err)
}
usageRatio := float64(utilization) / float64(targetUtilization)
if math.Abs(1.0-usageRatio) <= tolerance {
// return the current replicas if the change would be too small
return currentReplicas, utilization, timestamp, nil
}
return int32(math.Ceil(usageRatio * float64(currentReplicas))), utilization, timestamp, nil
}

View File

@ -51,14 +51,15 @@ type resourceInfo struct {
targetUtilization int32
expectedUtilization int32
expectedValue int64
}
type metricInfo struct {
name string
levels []float64
targetUtilization float64
expectedUtilization float64
targetUtilization int64
expectedUtilization int64
}
type replicaCalcTestCase struct {
@ -75,8 +76,9 @@ type replicaCalcTestCase struct {
}
const (
testNamespace = "test-namespace"
podNamePrefix = "test-pod"
testNamespace = "test-namespace"
podNamePrefix = "test-pod"
numContainersPerPod = 2
)
func (tc *replicaCalcTestCase) prepareTestClient(t *testing.T) *fake.Clientset {
@ -144,25 +146,19 @@ func (tc *replicaCalcTestCase) prepareTestClient(t *testing.T) *fake.Clientset {
Name: podName,
Namespace: testNamespace,
},
Timestamp: unversioned.Time{Time: tc.timestamp},
Containers: []metricsapi.ContainerMetrics{
{
Name: "container1",
Usage: v1.ResourceList{
v1.ResourceName(tc.resource.name): *resource.NewMilliQuantity(
int64(resValue),
resource.DecimalSI),
},
Timestamp: unversioned.Time{Time: tc.timestamp},
Containers: make([]metricsapi.ContainerMetrics, numContainersPerPod),
}
for i := 0; i < numContainersPerPod; i++ {
podMetric.Containers[i] = metricsapi.ContainerMetrics{
Name: fmt.Sprintf("container%v", i),
Usage: v1.ResourceList{
v1.ResourceName(tc.resource.name): *resource.NewMilliQuantity(
int64(resValue),
resource.DecimalSI),
},
{
Name: "container2",
Usage: v1.ResourceList{
v1.ResourceName(tc.resource.name): *resource.NewMilliQuantity(
int64(resValue),
resource.DecimalSI),
},
},
},
}
}
metrics.Items = append(metrics.Items, podMetric)
}
@ -228,7 +224,7 @@ func (tc *replicaCalcTestCase) runTest(t *testing.T) {
}
if tc.resource != nil {
outReplicas, outUtilization, outTimestamp, err := replicaCalc.GetResourceReplicas(tc.currentReplicas, tc.resource.targetUtilization, tc.resource.name, testNamespace, selector)
outReplicas, outUtilization, outRawValue, outTimestamp, err := replicaCalc.GetResourceReplicas(tc.currentReplicas, tc.resource.targetUtilization, tc.resource.name, testNamespace, selector)
if tc.expectedError != nil {
require.Error(t, err, "there should be an error calculating the replica count")
@ -238,6 +234,7 @@ func (tc *replicaCalcTestCase) runTest(t *testing.T) {
require.NoError(t, err, "there should not have been an error calculating the replica count")
assert.Equal(t, tc.expectedReplicas, outReplicas, "replicas should be as expected")
assert.Equal(t, tc.resource.expectedUtilization, outUtilization, "utilization should be as expected")
assert.Equal(t, tc.resource.expectedValue, outRawValue, "raw value should be as expected")
assert.True(t, tc.timestamp.Equal(outTimestamp), "timestamp should be as expected")
} else {
@ -250,7 +247,7 @@ func (tc *replicaCalcTestCase) runTest(t *testing.T) {
}
require.NoError(t, err, "there should not have been an error calculating the replica count")
assert.Equal(t, tc.expectedReplicas, outReplicas, "replicas should be as expected")
assert.InDelta(t, tc.metric.expectedUtilization, 0.1, outUtilization, "utilization should be as expected")
assert.Equal(t, tc.metric.expectedUtilization, outUtilization, "utilization should be as expected")
assert.True(t, tc.timestamp.Equal(outTimestamp), "timestamp should be as expected")
}
}
@ -282,6 +279,7 @@ func TestReplicaCalcScaleUp(t *testing.T) {
targetUtilization: 30,
expectedUtilization: 50,
expectedValue: numContainersPerPod * 500,
},
}
tc.runTest(t)
@ -299,6 +297,7 @@ func TestReplicaCalcScaleUpUnreadyLessScale(t *testing.T) {
targetUtilization: 30,
expectedUtilization: 60,
expectedValue: numContainersPerPod * 600,
},
}
tc.runTest(t)
@ -316,6 +315,7 @@ func TestReplicaCalcScaleUpUnreadyNoScale(t *testing.T) {
targetUtilization: 30,
expectedUtilization: 40,
expectedValue: numContainersPerPod * 400,
},
}
tc.runTest(t)
@ -328,8 +328,8 @@ func TestReplicaCalcScaleUpCM(t *testing.T) {
metric: &metricInfo{
name: "qps",
levels: []float64{20.0, 10.0, 30.0},
targetUtilization: 15.0,
expectedUtilization: 20.0,
targetUtilization: 15000,
expectedUtilization: 20000,
},
}
tc.runTest(t)
@ -343,8 +343,8 @@ func TestReplicaCalcScaleUpCMUnreadyLessScale(t *testing.T) {
metric: &metricInfo{
name: "qps",
levels: []float64{50.0, 10.0, 30.0},
targetUtilization: 15.0,
expectedUtilization: 30.0,
targetUtilization: 15000,
expectedUtilization: 30000,
},
}
tc.runTest(t)
@ -358,8 +358,8 @@ func TestReplicaCalcScaleUpCMUnreadyNoScaleWouldScaleDown(t *testing.T) {
metric: &metricInfo{
name: "qps",
levels: []float64{50.0, 15.0, 30.0},
targetUtilization: 15.0,
expectedUtilization: 15.0,
targetUtilization: 15000,
expectedUtilization: 15000,
},
}
tc.runTest(t)
@ -376,6 +376,7 @@ func TestReplicaCalcScaleDown(t *testing.T) {
targetUtilization: 50,
expectedUtilization: 28,
expectedValue: numContainersPerPod * 280,
},
}
tc.runTest(t)
@ -388,8 +389,8 @@ func TestReplicaCalcScaleDownCM(t *testing.T) {
metric: &metricInfo{
name: "qps",
levels: []float64{12.0, 12.0, 12.0, 12.0, 12.0},
targetUtilization: 20.0,
expectedUtilization: 12.0,
targetUtilization: 20000,
expectedUtilization: 12000,
},
}
tc.runTest(t)
@ -407,6 +408,7 @@ func TestReplicaCalcScaleDownIgnoresUnreadyPods(t *testing.T) {
targetUtilization: 50,
expectedUtilization: 30,
expectedValue: numContainersPerPod * 300,
},
}
tc.runTest(t)
@ -423,6 +425,7 @@ func TestReplicaCalcTolerance(t *testing.T) {
targetUtilization: 100,
expectedUtilization: 102,
expectedValue: numContainersPerPod * 1020,
},
}
tc.runTest(t)
@ -435,8 +438,8 @@ func TestReplicaCalcToleranceCM(t *testing.T) {
metric: &metricInfo{
name: "qps",
levels: []float64{20.0, 21.0, 21.0},
targetUtilization: 20.0,
expectedUtilization: 20.66666,
targetUtilization: 20000,
expectedUtilization: 20666,
},
}
tc.runTest(t)
@ -452,6 +455,7 @@ func TestReplicaCalcSuperfluousMetrics(t *testing.T) {
levels: []int64{4000, 9500, 3000, 7000, 3200, 2000},
targetUtilization: 100,
expectedUtilization: 587,
expectedValue: numContainersPerPod * 5875,
},
}
tc.runTest(t)
@ -468,6 +472,7 @@ func TestReplicaCalcMissingMetrics(t *testing.T) {
targetUtilization: 100,
expectedUtilization: 24,
expectedValue: 495, // numContainersPerPod * 247, for sufficiently large values of 247
},
}
tc.runTest(t)
@ -514,6 +519,7 @@ func TestReplicaCalcMissingMetricsNoChangeEq(t *testing.T) {
targetUtilization: 100,
expectedUtilization: 100,
expectedValue: numContainersPerPod * 1000,
},
}
tc.runTest(t)
@ -530,6 +536,7 @@ func TestReplicaCalcMissingMetricsNoChangeGt(t *testing.T) {
targetUtilization: 100,
expectedUtilization: 190,
expectedValue: numContainersPerPod * 1900,
},
}
tc.runTest(t)
@ -546,6 +553,7 @@ func TestReplicaCalcMissingMetricsNoChangeLt(t *testing.T) {
targetUtilization: 100,
expectedUtilization: 60,
expectedValue: numContainersPerPod * 600,
},
}
tc.runTest(t)
@ -563,6 +571,7 @@ func TestReplicaCalcMissingMetricsUnreadyNoChange(t *testing.T) {
targetUtilization: 50,
expectedUtilization: 45,
expectedValue: numContainersPerPod * 450,
},
}
tc.runTest(t)
@ -580,6 +589,7 @@ func TestReplicaCalcMissingMetricsUnreadyScaleUp(t *testing.T) {
targetUtilization: 50,
expectedUtilization: 200,
expectedValue: numContainersPerPod * 2000,
},
}
tc.runTest(t)
@ -597,6 +607,7 @@ func TestReplicaCalcMissingMetricsUnreadyScaleDown(t *testing.T) {
targetUtilization: 50,
expectedUtilization: 10,
expectedValue: numContainersPerPod * 100,
},
}
tc.runTest(t)
@ -658,6 +669,7 @@ func TestReplicaCalcComputedToleranceAlgImplementation(t *testing.T) {
targetUtilization: finalCpuPercentTarget,
expectedUtilization: int32(totalUsedCPUOfAllPods*100) / totalRequestedCPUOfAllPods,
expectedValue: numContainersPerPod * totalUsedCPUOfAllPods / 10,
},
}