mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 18:00:08 +00:00
Merge pull request #78503 from gjtempleton/Multiple-Metrics-HPA
Multiple metrics hpa
This commit is contained in:
commit
9f85c5c4b5
@ -23,7 +23,7 @@ import (
|
|||||||
|
|
||||||
autoscalingv1 "k8s.io/api/autoscaling/v1"
|
autoscalingv1 "k8s.io/api/autoscaling/v1"
|
||||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||||
@ -230,7 +230,7 @@ func (a *HorizontalController) processNextWorkItem() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// computeReplicasForMetrics computes the desired number of replicas for the metric specifications listed in the HPA,
|
// computeReplicasForMetrics computes the desired number of replicas for the metric specifications listed in the HPA,
|
||||||
// returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of
|
// returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of
|
||||||
// all metrics computed.
|
// all metrics computed.
|
||||||
func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
|
func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
|
||||||
metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {
|
metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {
|
||||||
@ -239,76 +239,96 @@ func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.Hori
|
|||||||
|
|
||||||
statuses = make([]autoscalingv2.MetricStatus, len(metricSpecs))
|
statuses = make([]autoscalingv2.MetricStatus, len(metricSpecs))
|
||||||
|
|
||||||
|
if scale.Status.Selector == "" {
|
||||||
|
errMsg := "selector is required"
|
||||||
|
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "SelectorRequired", errMsg)
|
||||||
|
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", "the HPA target's scale is missing a selector")
|
||||||
|
return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
selector, err := labels.Parse(scale.Status.Selector)
|
||||||
|
if err != nil {
|
||||||
|
errMsg := fmt.Sprintf("couldn't convert selector into a corresponding internal selector object: %v", err)
|
||||||
|
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidSelector", errMsg)
|
||||||
|
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", errMsg)
|
||||||
|
return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
invalidMetricsCount := 0
|
||||||
|
var invalidMetricError error
|
||||||
|
|
||||||
for i, metricSpec := range metricSpecs {
|
for i, metricSpec := range metricSpecs {
|
||||||
if scale.Status.Selector == "" {
|
replicaCountProposal, metricNameProposal, timestampProposal, err := a.computeReplicasForMetric(hpa, metricSpec, currentReplicas, selector, &statuses[i])
|
||||||
errMsg := "selector is required"
|
|
||||||
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "SelectorRequired", errMsg)
|
|
||||||
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", "the HPA target's scale is missing a selector")
|
|
||||||
return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
|
|
||||||
}
|
|
||||||
|
|
||||||
selector, err := labels.Parse(scale.Status.Selector)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMsg := fmt.Sprintf("couldn't convert selector into a corresponding internal selector object: %v", err)
|
invalidMetricsCount++
|
||||||
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidSelector", errMsg)
|
invalidMetricError = err
|
||||||
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", errMsg)
|
|
||||||
return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
|
|
||||||
}
|
}
|
||||||
|
if err == nil && (replicas == 0 || replicaCountProposal > replicas) {
|
||||||
var replicaCountProposal int32
|
|
||||||
var timestampProposal time.Time
|
|
||||||
var metricNameProposal string
|
|
||||||
|
|
||||||
switch metricSpec.Type {
|
|
||||||
case autoscalingv2.ObjectMetricSourceType:
|
|
||||||
metricSelector, err := metav1.LabelSelectorAsSelector(metricSpec.Object.Metric.Selector)
|
|
||||||
if err != nil {
|
|
||||||
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetObjectMetric", err.Error())
|
|
||||||
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "FailedGetObjectMetric", "the HPA was unable to compute the replica count: %v", err)
|
|
||||||
return 0, "", nil, time.Time{}, fmt.Errorf("failed to get object metric value: %v", err)
|
|
||||||
}
|
|
||||||
replicaCountProposal, timestampProposal, metricNameProposal, err = a.computeStatusForObjectMetric(currentReplicas, metricSpec, hpa, selector, &statuses[i], metricSelector)
|
|
||||||
if err != nil {
|
|
||||||
return 0, "", nil, time.Time{}, fmt.Errorf("failed to get object metric value: %v", err)
|
|
||||||
}
|
|
||||||
case autoscalingv2.PodsMetricSourceType:
|
|
||||||
metricSelector, err := metav1.LabelSelectorAsSelector(metricSpec.Pods.Metric.Selector)
|
|
||||||
if err != nil {
|
|
||||||
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetPodsMetric", err.Error())
|
|
||||||
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "FailedGetPodsMetric", "the HPA was unable to compute the replica count: %v", err)
|
|
||||||
return 0, "", nil, time.Time{}, fmt.Errorf("failed to get pods metric value: %v", err)
|
|
||||||
}
|
|
||||||
replicaCountProposal, timestampProposal, metricNameProposal, err = a.computeStatusForPodsMetric(currentReplicas, metricSpec, hpa, selector, &statuses[i], metricSelector)
|
|
||||||
if err != nil {
|
|
||||||
return 0, "", nil, time.Time{}, fmt.Errorf("failed to get object metric value: %v", err)
|
|
||||||
}
|
|
||||||
case autoscalingv2.ResourceMetricSourceType:
|
|
||||||
replicaCountProposal, timestampProposal, metricNameProposal, err = a.computeStatusForResourceMetric(currentReplicas, metricSpec, hpa, selector, &statuses[i])
|
|
||||||
if err != nil {
|
|
||||||
return 0, "", nil, time.Time{}, err
|
|
||||||
}
|
|
||||||
case autoscalingv2.ExternalMetricSourceType:
|
|
||||||
replicaCountProposal, timestampProposal, metricNameProposal, err = a.computeStatusForExternalMetric(currentReplicas, metricSpec, hpa, selector, &statuses[i])
|
|
||||||
if err != nil {
|
|
||||||
return 0, "", nil, time.Time{}, err
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
errMsg := fmt.Sprintf("unknown metric source type %q", string(metricSpec.Type))
|
|
||||||
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidMetricSourceType", errMsg)
|
|
||||||
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidMetricSourceType", "the HPA was unable to compute the replica count: %s", errMsg)
|
|
||||||
return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
|
|
||||||
}
|
|
||||||
if replicas == 0 || replicaCountProposal > replicas {
|
|
||||||
timestamp = timestampProposal
|
timestamp = timestampProposal
|
||||||
replicas = replicaCountProposal
|
replicas = replicaCountProposal
|
||||||
metric = metricNameProposal
|
metric = metricNameProposal
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionTrue, "ValidMetricFound", "the HPA was able to successfully calculate a replica count from %s", metric)
|
// If all metrics are invalid or some are invalid and we would scale down,
|
||||||
|
// return an error and set the condition of the hpa based on the first invalid metric.
|
||||||
|
// Otherwise set the condition as scaling active as we're going to scale
|
||||||
|
if invalidMetricsCount >= len(metricSpecs) || (invalidMetricsCount > 0 && replicas < currentReplicas) {
|
||||||
|
return 0, "", statuses, time.Time{}, fmt.Errorf("Invalid metrics (%v invalid out of %v), last error was: %v", invalidMetricsCount, len(metricSpecs), invalidMetricError)
|
||||||
|
} else {
|
||||||
|
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionTrue, "ValidMetricFound", "the HPA was able to successfully calculate a replica count from %v", metric)
|
||||||
|
}
|
||||||
return replicas, metric, statuses, timestamp, nil
|
return replicas, metric, statuses, timestamp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// computeReplicasForMetric computes the desired number of replicas for for a specific hpa and single metric specification.
|
||||||
|
func (a *HorizontalController) computeReplicasForMetric(hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
|
||||||
|
currentReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,
|
||||||
|
timestampProposal time.Time, err error) {
|
||||||
|
|
||||||
|
switch spec.Type {
|
||||||
|
case autoscalingv2.ObjectMetricSourceType:
|
||||||
|
metricSelector, err := metav1.LabelSelectorAsSelector(spec.Object.Metric.Selector)
|
||||||
|
if err != nil {
|
||||||
|
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetObjectMetric", err.Error())
|
||||||
|
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "FailedGetObjectMetric", "the HPA was unable to compute the replica count: %v", err)
|
||||||
|
return 0, "", time.Time{}, fmt.Errorf("failed to get object metric value: %v", err)
|
||||||
|
}
|
||||||
|
replicaCountProposal, timestampProposal, metricNameProposal, err = a.computeStatusForObjectMetric(currentReplicas, spec, hpa, selector, status, metricSelector)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", time.Time{}, fmt.Errorf("failed to get object metric value: %v", err)
|
||||||
|
}
|
||||||
|
case autoscalingv2.PodsMetricSourceType:
|
||||||
|
metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)
|
||||||
|
if err != nil {
|
||||||
|
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetPodsMetric", err.Error())
|
||||||
|
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "FailedGetPodsMetric", "the HPA was unable to compute the replica count: %v", err)
|
||||||
|
return 0, "", time.Time{}, fmt.Errorf("failed to get pods metric value: %v", err)
|
||||||
|
}
|
||||||
|
replicaCountProposal, timestampProposal, metricNameProposal, err = a.computeStatusForPodsMetric(currentReplicas, spec, hpa, selector, status, metricSelector)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", time.Time{}, fmt.Errorf("failed to get object metric value: %v", err)
|
||||||
|
}
|
||||||
|
case autoscalingv2.ResourceMetricSourceType:
|
||||||
|
replicaCountProposal, timestampProposal, metricNameProposal, err = a.computeStatusForResourceMetric(currentReplicas, spec, hpa, selector, status)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", time.Time{}, err
|
||||||
|
}
|
||||||
|
case autoscalingv2.ExternalMetricSourceType:
|
||||||
|
replicaCountProposal, timestampProposal, metricNameProposal, err = a.computeStatusForExternalMetric(currentReplicas, spec, hpa, selector, status)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", time.Time{}, err
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))
|
||||||
|
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidMetricSourceType", errMsg)
|
||||||
|
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidMetricSourceType", "the HPA was unable to compute the replica count: %v", fmt.Errorf(errMsg))
|
||||||
|
return 0, "", time.Time{}, fmt.Errorf(errMsg)
|
||||||
|
}
|
||||||
|
return replicaCountProposal, metricNameProposal, timestampProposal, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a *HorizontalController) reconcileKey(key string) (deleted bool, err error) {
|
func (a *HorizontalController) reconcileKey(key string) (deleted bool, err error) {
|
||||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -26,7 +26,7 @@ import (
|
|||||||
|
|
||||||
autoscalingv1 "k8s.io/api/autoscaling/v1"
|
autoscalingv1 "k8s.io/api/autoscaling/v1"
|
||||||
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/meta/testrestmapper"
|
"k8s.io/apimachinery/pkg/api/meta/testrestmapper"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@ -2668,4 +2668,98 @@ func TestNormalizeDesiredReplicas(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestScaleUpOneMetricEmpty(t *testing.T) {
|
||||||
|
tc := testCase{
|
||||||
|
minReplicas: 2,
|
||||||
|
maxReplicas: 6,
|
||||||
|
initialReplicas: 3,
|
||||||
|
expectedDesiredReplicas: 4,
|
||||||
|
CPUTarget: 30,
|
||||||
|
verifyCPUCurrent: true,
|
||||||
|
metricsTarget: []autoscalingv2.MetricSpec{
|
||||||
|
{
|
||||||
|
Type: autoscalingv2.ExternalMetricSourceType,
|
||||||
|
External: &autoscalingv2.ExternalMetricSource{
|
||||||
|
Metric: autoscalingv2.MetricIdentifier{
|
||||||
|
Name: "qps",
|
||||||
|
Selector: &metav1.LabelSelector{},
|
||||||
|
},
|
||||||
|
Target: autoscalingv2.MetricTarget{
|
||||||
|
Value: resource.NewMilliQuantity(100, resource.DecimalSI),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
reportedLevels: []uint64{300, 400, 500},
|
||||||
|
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
|
||||||
|
}
|
||||||
|
_, _, _, testEMClient, _ := tc.prepareTestClient(t)
|
||||||
|
testEMClient.PrependReactor("list", "*", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
return true, &emapi.ExternalMetricValueList{}, fmt.Errorf("something went wrong")
|
||||||
|
})
|
||||||
|
tc.testEMClient = testEMClient
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNoScaleDownOneMetricInvalid(t *testing.T) {
|
||||||
|
tc := testCase{
|
||||||
|
minReplicas: 2,
|
||||||
|
maxReplicas: 6,
|
||||||
|
initialReplicas: 5,
|
||||||
|
expectedDesiredReplicas: 5,
|
||||||
|
CPUTarget: 50,
|
||||||
|
metricsTarget: []autoscalingv2.MetricSpec{
|
||||||
|
{
|
||||||
|
Type: "CheddarCheese",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
reportedLevels: []uint64{100, 300, 500, 250, 250},
|
||||||
|
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
|
||||||
|
useMetricsAPI: true,
|
||||||
|
expectedConditions: []autoscalingv1.HorizontalPodAutoscalerCondition{
|
||||||
|
{Type: autoscalingv1.AbleToScale, Status: v1.ConditionTrue, Reason: "SucceededGetScale"},
|
||||||
|
{Type: autoscalingv1.ScalingActive, Status: v1.ConditionFalse, Reason: "InvalidMetricSourceType"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNoScaleDownOneMetricEmpty(t *testing.T) {
|
||||||
|
tc := testCase{
|
||||||
|
minReplicas: 2,
|
||||||
|
maxReplicas: 6,
|
||||||
|
initialReplicas: 5,
|
||||||
|
expectedDesiredReplicas: 5,
|
||||||
|
CPUTarget: 50,
|
||||||
|
metricsTarget: []autoscalingv2.MetricSpec{
|
||||||
|
{
|
||||||
|
Type: autoscalingv2.ExternalMetricSourceType,
|
||||||
|
External: &autoscalingv2.ExternalMetricSource{
|
||||||
|
Metric: autoscalingv2.MetricIdentifier{
|
||||||
|
Name: "qps",
|
||||||
|
Selector: &metav1.LabelSelector{},
|
||||||
|
},
|
||||||
|
Target: autoscalingv2.MetricTarget{
|
||||||
|
Value: resource.NewMilliQuantity(1000, resource.DecimalSI),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
reportedLevels: []uint64{100, 300, 500, 250, 250},
|
||||||
|
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
|
||||||
|
useMetricsAPI: true,
|
||||||
|
expectedConditions: []autoscalingv1.HorizontalPodAutoscalerCondition{
|
||||||
|
{Type: autoscalingv1.AbleToScale, Status: v1.ConditionTrue, Reason: "SucceededGetScale"},
|
||||||
|
{Type: autoscalingv1.ScalingActive, Status: v1.ConditionFalse, Reason: "FailedGetExternalMetric"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, _, _, testEMClient, _ := tc.prepareTestClient(t)
|
||||||
|
testEMClient.PrependReactor("list", "*", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
return true, &emapi.ExternalMetricValueList{}, fmt.Errorf("something went wrong")
|
||||||
|
})
|
||||||
|
tc.testEMClient = testEMClient
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: add more tests
|
// TODO: add more tests
|
||||||
|
Loading…
Reference in New Issue
Block a user