mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Implement external metrics in HPA
This commit is contained in:
parent
66f4f9080d
commit
e58411c600
@ -82,6 +82,7 @@ go_test(
|
|||||||
"//vendor/k8s.io/client-go/testing:go_default_library",
|
"//vendor/k8s.io/client-go/testing:go_default_library",
|
||||||
"//vendor/k8s.io/heapster/metrics/api/v1/types:go_default_library",
|
"//vendor/k8s.io/heapster/metrics/api/v1/types:go_default_library",
|
||||||
"//vendor/k8s.io/metrics/pkg/apis/custom_metrics/v1beta1:go_default_library",
|
"//vendor/k8s.io/metrics/pkg/apis/custom_metrics/v1beta1:go_default_library",
|
||||||
|
"//vendor/k8s.io/metrics/pkg/apis/external_metrics/v1beta1:go_default_library",
|
||||||
"//vendor/k8s.io/metrics/pkg/apis/metrics/v1alpha1:go_default_library",
|
"//vendor/k8s.io/metrics/pkg/apis/metrics/v1alpha1:go_default_library",
|
||||||
"//vendor/k8s.io/metrics/pkg/apis/metrics/v1beta1:go_default_library",
|
"//vendor/k8s.io/metrics/pkg/apis/metrics/v1beta1:go_default_library",
|
||||||
"//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset/fake:go_default_library",
|
"//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset/fake:go_default_library",
|
||||||
|
@ -299,6 +299,45 @@ func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.Hori
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case autoscalingv2.ExternalMetricSourceType:
|
||||||
|
if metricSpec.External.TargetAverageValue != nil {
|
||||||
|
replicaCountProposal, utilizationProposal, timestampProposal, err = a.replicaCalc.GetExternalPerPodMetricReplicas(currentReplicas, metricSpec.External.TargetAverageValue.MilliValue(), metricSpec.External.MetricName, hpa.Namespace, metricSpec.External.MetricSelector)
|
||||||
|
if err != nil {
|
||||||
|
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetExternalMetric", err.Error())
|
||||||
|
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "FailedGetExternalMetric", "the HPA was unable to compute the replica count: %v", err)
|
||||||
|
return 0, "", nil, time.Time{}, fmt.Errorf("failed to get %s external metric: %v", metricSpec.External.MetricName, err)
|
||||||
|
}
|
||||||
|
metricNameProposal = fmt.Sprintf("external metric %s(%+v)", metricSpec.External.MetricName, metricSpec.External.MetricSelector)
|
||||||
|
statuses[i] = autoscalingv2.MetricStatus{
|
||||||
|
Type: autoscalingv2.ExternalMetricSourceType,
|
||||||
|
External: &autoscalingv2.ExternalMetricStatus{
|
||||||
|
MetricSelector: metricSpec.External.MetricSelector,
|
||||||
|
MetricName: metricSpec.External.MetricName,
|
||||||
|
CurrentAverageValue: resource.NewMilliQuantity(utilizationProposal, resource.DecimalSI),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
} else if metricSpec.External.TargetValue != nil {
|
||||||
|
replicaCountProposal, utilizationProposal, timestampProposal, err = a.replicaCalc.GetExternalMetricReplicas(currentReplicas, metricSpec.External.TargetValue.MilliValue(), metricSpec.External.MetricName, hpa.Namespace, metricSpec.External.MetricSelector)
|
||||||
|
if err != nil {
|
||||||
|
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetExternalMetric", err.Error())
|
||||||
|
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "FailedGetExternalMetric", "the HPA was unable to compute the replica count: %v", err)
|
||||||
|
return 0, "", nil, time.Time{}, fmt.Errorf("failed to get external metric %s: %v", metricSpec.External.MetricName, err)
|
||||||
|
}
|
||||||
|
metricNameProposal = fmt.Sprintf("external metric %s(%+v)", metricSpec.External.MetricName, metricSpec.External.MetricSelector)
|
||||||
|
statuses[i] = autoscalingv2.MetricStatus{
|
||||||
|
Type: autoscalingv2.ExternalMetricSourceType,
|
||||||
|
External: &autoscalingv2.ExternalMetricStatus{
|
||||||
|
MetricSelector: metricSpec.External.MetricSelector,
|
||||||
|
MetricName: metricSpec.External.MetricName,
|
||||||
|
CurrentValue: *resource.NewMilliQuantity(utilizationProposal, resource.DecimalSI),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
errMsg := "invalid external metric source: neither a value target nor an average value target was set"
|
||||||
|
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetExternalMetric", errMsg)
|
||||||
|
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "FailedGetExternalMetric", "the HPA was unable to compute the replica count: %v", err)
|
||||||
|
return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
errMsg := fmt.Sprintf("unknown metric source type %q", string(metricSpec.Type))
|
errMsg := fmt.Sprintf("unknown metric source type %q", string(metricSpec.Type))
|
||||||
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidMetricSourceType", errMsg)
|
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidMetricSourceType", errMsg)
|
||||||
|
@ -42,6 +42,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
|
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
|
||||||
cmapi "k8s.io/metrics/pkg/apis/custom_metrics/v1beta1"
|
cmapi "k8s.io/metrics/pkg/apis/custom_metrics/v1beta1"
|
||||||
|
emapi "k8s.io/metrics/pkg/apis/external_metrics/v1beta1"
|
||||||
metricsapi "k8s.io/metrics/pkg/apis/metrics/v1beta1"
|
metricsapi "k8s.io/metrics/pkg/apis/metrics/v1beta1"
|
||||||
metricsfake "k8s.io/metrics/pkg/client/clientset_generated/clientset/fake"
|
metricsfake "k8s.io/metrics/pkg/client/clientset_generated/clientset/fake"
|
||||||
cmfake "k8s.io/metrics/pkg/client/custom_metrics/fake"
|
cmfake "k8s.io/metrics/pkg/client/custom_metrics/fake"
|
||||||
@ -526,6 +527,31 @@ func (tc *testCase) prepareTestClient(t *testing.T) (*fake.Clientset, *metricsfa
|
|||||||
|
|
||||||
fakeEMClient := &emfake.FakeExternalMetricsClient{}
|
fakeEMClient := &emfake.FakeExternalMetricsClient{}
|
||||||
|
|
||||||
|
fakeEMClient.AddReactor("list", "*", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
tc.Lock()
|
||||||
|
defer tc.Unlock()
|
||||||
|
|
||||||
|
listAction, wasList := action.(core.ListAction)
|
||||||
|
if !wasList {
|
||||||
|
return true, nil, fmt.Errorf("expected a list action, got %v instead", action)
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics := &emapi.ExternalMetricValueList{}
|
||||||
|
|
||||||
|
assert.Equal(t, "qps", listAction.GetResource().Resource, "the metric name requested should have been qps, as specified in the metric spec")
|
||||||
|
|
||||||
|
for _, level := range tc.reportedLevels {
|
||||||
|
metric := emapi.ExternalMetricValue{
|
||||||
|
Timestamp: metav1.Time{Time: time.Now()},
|
||||||
|
MetricName: "qps",
|
||||||
|
Value: *resource.NewMilliQuantity(int64(level), resource.DecimalSI),
|
||||||
|
}
|
||||||
|
metrics.Items = append(metrics.Items, metric)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, metrics, nil
|
||||||
|
})
|
||||||
|
|
||||||
return fakeClient, fakeMetricsClient, fakeCMClient, fakeEMClient, fakeScaleClient
|
return fakeClient, fakeMetricsClient, fakeCMClient, fakeEMClient, fakeScaleClient
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -826,6 +852,48 @@ func TestScaleUpCMObject(t *testing.T) {
|
|||||||
tc.runTest(t)
|
tc.runTest(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestScaleUpCMExternal(t *testing.T) {
|
||||||
|
tc := testCase{
|
||||||
|
minReplicas: 2,
|
||||||
|
maxReplicas: 6,
|
||||||
|
initialReplicas: 3,
|
||||||
|
desiredReplicas: 4,
|
||||||
|
metricsTarget: []autoscalingv2.MetricSpec{
|
||||||
|
{
|
||||||
|
Type: autoscalingv2.ExternalMetricSourceType,
|
||||||
|
External: &autoscalingv2.ExternalMetricSource{
|
||||||
|
MetricSelector: &metav1.LabelSelector{},
|
||||||
|
MetricName: "qps",
|
||||||
|
TargetValue: resource.NewMilliQuantity(6666, resource.DecimalSI),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
reportedLevels: []uint64{8600},
|
||||||
|
}
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestScaleUpPerPodCMExternal(t *testing.T) {
|
||||||
|
tc := testCase{
|
||||||
|
minReplicas: 2,
|
||||||
|
maxReplicas: 6,
|
||||||
|
initialReplicas: 3,
|
||||||
|
desiredReplicas: 4,
|
||||||
|
metricsTarget: []autoscalingv2.MetricSpec{
|
||||||
|
{
|
||||||
|
Type: autoscalingv2.ExternalMetricSourceType,
|
||||||
|
External: &autoscalingv2.ExternalMetricSource{
|
||||||
|
MetricSelector: &metav1.LabelSelector{},
|
||||||
|
MetricName: "qps",
|
||||||
|
TargetAverageValue: resource.NewMilliQuantity(2222, resource.DecimalSI),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
reportedLevels: []uint64{8600},
|
||||||
|
}
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
func TestScaleDown(t *testing.T) {
|
func TestScaleDown(t *testing.T) {
|
||||||
tc := testCase{
|
tc := testCase{
|
||||||
minReplicas: 2,
|
minReplicas: 2,
|
||||||
@ -890,6 +958,48 @@ func TestScaleDownCMObject(t *testing.T) {
|
|||||||
tc.runTest(t)
|
tc.runTest(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestScaleDownCMExternal(t *testing.T) {
|
||||||
|
tc := testCase{
|
||||||
|
minReplicas: 2,
|
||||||
|
maxReplicas: 6,
|
||||||
|
initialReplicas: 5,
|
||||||
|
desiredReplicas: 3,
|
||||||
|
metricsTarget: []autoscalingv2.MetricSpec{
|
||||||
|
{
|
||||||
|
Type: autoscalingv2.ExternalMetricSourceType,
|
||||||
|
External: &autoscalingv2.ExternalMetricSource{
|
||||||
|
MetricSelector: &metav1.LabelSelector{},
|
||||||
|
MetricName: "qps",
|
||||||
|
TargetValue: resource.NewMilliQuantity(14400, resource.DecimalSI),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
reportedLevels: []uint64{8600},
|
||||||
|
}
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestScaleDownPerPodCMExternal(t *testing.T) {
|
||||||
|
tc := testCase{
|
||||||
|
minReplicas: 2,
|
||||||
|
maxReplicas: 6,
|
||||||
|
initialReplicas: 5,
|
||||||
|
desiredReplicas: 3,
|
||||||
|
metricsTarget: []autoscalingv2.MetricSpec{
|
||||||
|
{
|
||||||
|
Type: autoscalingv2.ExternalMetricSourceType,
|
||||||
|
External: &autoscalingv2.ExternalMetricSource{
|
||||||
|
MetricSelector: &metav1.LabelSelector{},
|
||||||
|
MetricName: "qps",
|
||||||
|
TargetAverageValue: resource.NewMilliQuantity(3000, resource.DecimalSI),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
reportedLevels: []uint64{8600},
|
||||||
|
}
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
func TestScaleDownIgnoresUnreadyPods(t *testing.T) {
|
func TestScaleDownIgnoresUnreadyPods(t *testing.T) {
|
||||||
tc := testCase{
|
tc := testCase{
|
||||||
minReplicas: 2,
|
minReplicas: 2,
|
||||||
@ -983,6 +1093,58 @@ func TestToleranceCMObject(t *testing.T) {
|
|||||||
tc.runTest(t)
|
tc.runTest(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestToleranceCMExternal(t *testing.T) {
|
||||||
|
tc := testCase{
|
||||||
|
minReplicas: 2,
|
||||||
|
maxReplicas: 6,
|
||||||
|
initialReplicas: 4,
|
||||||
|
desiredReplicas: 4,
|
||||||
|
metricsTarget: []autoscalingv2.MetricSpec{
|
||||||
|
{
|
||||||
|
Type: autoscalingv2.ExternalMetricSourceType,
|
||||||
|
External: &autoscalingv2.ExternalMetricSource{
|
||||||
|
MetricSelector: &metav1.LabelSelector{},
|
||||||
|
MetricName: "qps",
|
||||||
|
TargetValue: resource.NewMilliQuantity(8666, resource.DecimalSI),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
reportedLevels: []uint64{8600},
|
||||||
|
expectedConditions: statusOkWithOverrides(autoscalingv2.HorizontalPodAutoscalerCondition{
|
||||||
|
Type: autoscalingv2.AbleToScale,
|
||||||
|
Status: v1.ConditionTrue,
|
||||||
|
Reason: "ReadyForNewScale",
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTolerancePerPodCMExternal(t *testing.T) {
|
||||||
|
tc := testCase{
|
||||||
|
minReplicas: 2,
|
||||||
|
maxReplicas: 6,
|
||||||
|
initialReplicas: 4,
|
||||||
|
desiredReplicas: 4,
|
||||||
|
metricsTarget: []autoscalingv2.MetricSpec{
|
||||||
|
{
|
||||||
|
Type: autoscalingv2.ExternalMetricSourceType,
|
||||||
|
External: &autoscalingv2.ExternalMetricSource{
|
||||||
|
MetricSelector: &metav1.LabelSelector{},
|
||||||
|
MetricName: "qps",
|
||||||
|
TargetAverageValue: resource.NewMilliQuantity(2200, resource.DecimalSI),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
reportedLevels: []uint64{8600},
|
||||||
|
expectedConditions: statusOkWithOverrides(autoscalingv2.HorizontalPodAutoscalerCondition{
|
||||||
|
Type: autoscalingv2.AbleToScale,
|
||||||
|
Status: v1.ConditionTrue,
|
||||||
|
Reason: "ReadyForNewScale",
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
func TestMinReplicas(t *testing.T) {
|
func TestMinReplicas(t *testing.T) {
|
||||||
tc := testCase{
|
tc := testCase{
|
||||||
minReplicas: 2,
|
minReplicas: 2,
|
||||||
@ -1272,7 +1434,7 @@ func TestConditionInvalidSelectorMissing(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
_, _, _, _,testScaleClient := tc.prepareTestClient(t)
|
_, _, _, _, testScaleClient := tc.prepareTestClient(t)
|
||||||
tc.testScaleClient = testScaleClient
|
tc.testScaleClient = testScaleClient
|
||||||
|
|
||||||
testScaleClient.PrependReactor("get", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
testScaleClient.PrependReactor("get", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
@ -152,7 +152,7 @@ type externalMetricsClient struct {
|
|||||||
// GetExternalMetric gets all the values of a given external metric
|
// GetExternalMetric gets all the values of a given external metric
|
||||||
// that match the specified selector.
|
// that match the specified selector.
|
||||||
func (c *externalMetricsClient) GetExternalMetric(metricName, namespace string, selector labels.Selector) ([]int64, time.Time, error) {
|
func (c *externalMetricsClient) GetExternalMetric(metricName, namespace string, selector labels.Selector) ([]int64, time.Time, error) {
|
||||||
metrics, err := c.client.NamespacedMetrics(namespace).Get(metricName, selector)
|
metrics, err := c.client.NamespacedMetrics(namespace).List(metricName, selector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []int64{}, time.Time{}, fmt.Errorf("unable to fetch metrics from external metrics API: %v", err)
|
return []int64{}, time.Time{}, fmt.Errorf("unable to fetch metrics from external metrics API: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -107,10 +107,10 @@ func (tc *restClientTestCase) prepareTestClient(t *testing.T) (*metricsfake.Clie
|
|||||||
return true, metrics, nil
|
return true, metrics, nil
|
||||||
})
|
})
|
||||||
} else if isExternal {
|
} else if isExternal {
|
||||||
fakeEMClient.AddReactor("get", "externalmetrics", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
fakeEMClient.AddReactor("list", "*", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
getForAction := action.(emfake.GetForAction)
|
listAction := action.(core.ListAction)
|
||||||
assert.Equal(t, tc.metricName, getForAction.GetMetricName(), "the metric requested should have matched the one specified.")
|
assert.Equal(t, tc.metricName, listAction.GetResource().Resource, "the metric requested should have matched the one specified.")
|
||||||
assert.Equal(t, tc.metricLabelSelector, getForAction.GetMetricSelector(), "the metric selector should have matched the one specified")
|
assert.Equal(t, tc.metricLabelSelector, listAction.GetListRestrictions().Labels, "the metric selector should have matched the one specified")
|
||||||
|
|
||||||
metrics := emapi.ExternalMetricValueList{}
|
metrics := emapi.ExternalMetricValueList{}
|
||||||
for _, metricPoint := range tc.reportedMetricPoints {
|
for _, metricPoint := range tc.reportedMetricPoints {
|
||||||
@ -325,7 +325,7 @@ func TestRESTClientQpsEmptyMetrics(t *testing.T) {
|
|||||||
func TestRESTClientExternalEmptyMetrics(t *testing.T) {
|
func TestRESTClientExternalEmptyMetrics(t *testing.T) {
|
||||||
tc := restClientTestCase{
|
tc := restClientTestCase{
|
||||||
metricName: "external",
|
metricName: "external",
|
||||||
metricSelector: &metav1.LabelSelector{},
|
metricSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"label": "value"}},
|
||||||
desiredError: fmt.Errorf("no metrics returned from external metrics API"),
|
desiredError: fmt.Errorf("no metrics returned from external metrics API"),
|
||||||
reportedMetricPoints: []metricPoint{},
|
reportedMetricPoints: []metricPoint{},
|
||||||
}
|
}
|
||||||
|
@ -278,6 +278,33 @@ func (c *ReplicaCalculator) GetObjectMetricReplicas(currentReplicas int32, targe
|
|||||||
return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v on %s %s/%s", metricName, objectRef.Kind, namespace, objectRef.Name, err)
|
return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v on %s %s/%s", metricName, objectRef.Kind, namespace, objectRef.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
usageRatio := float64(utilization) / float64(targetUtilization)
|
||||||
|
if math.Abs(1.0-usageRatio) <= c.tolerance {
|
||||||
|
// return the current replicas if the change would be too small
|
||||||
|
return currentReplicas, utilization, timestamp, nil
|
||||||
|
}
|
||||||
|
replicaCount = int32(math.Ceil(usageRatio * float64(currentReplicas)))
|
||||||
|
|
||||||
|
return replicaCount, utilization, timestamp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetExternalMetricReplicas calculates the desired replica count based on a
|
||||||
|
// target metric value (as a milli-value) for the external metric in the given
|
||||||
|
// namespace, and the current replica count.
|
||||||
|
func (c *ReplicaCalculator) GetExternalMetricReplicas(currentReplicas int32, targetUtilization int64, metricName, namespace string, selector *metav1.LabelSelector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
|
||||||
|
labelSelector, err := metav1.LabelSelectorAsSelector(selector)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, time.Time{}, err
|
||||||
|
}
|
||||||
|
metrics, timestamp, err := c.metricsClient.GetExternalMetric(metricName, namespace, labelSelector)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, time.Time{}, fmt.Errorf("unable to get external metric %s/%s/%+v: %s", namespace, metricName, selector, err)
|
||||||
|
}
|
||||||
|
utilization = 0
|
||||||
|
for _, val := range metrics {
|
||||||
|
utilization = utilization + val
|
||||||
|
}
|
||||||
|
|
||||||
usageRatio := float64(utilization) / float64(targetUtilization)
|
usageRatio := float64(utilization) / float64(targetUtilization)
|
||||||
if math.Abs(1.0-usageRatio) <= c.tolerance {
|
if math.Abs(1.0-usageRatio) <= c.tolerance {
|
||||||
// return the current replicas if the change would be too small
|
// return the current replicas if the change would be too small
|
||||||
@ -286,3 +313,30 @@ func (c *ReplicaCalculator) GetObjectMetricReplicas(currentReplicas int32, targe
|
|||||||
|
|
||||||
return int32(math.Ceil(usageRatio * float64(currentReplicas))), utilization, timestamp, nil
|
return int32(math.Ceil(usageRatio * float64(currentReplicas))), utilization, timestamp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetExternalPerPodMetricReplicas calculates the desired replica count based on a
|
||||||
|
// target metric value per pod (as a milli-value) for the external metric in the
|
||||||
|
// given namespace, and the current replica count.
|
||||||
|
func (c *ReplicaCalculator) GetExternalPerPodMetricReplicas(currentReplicas int32, targetUtilizationPerPod int64, metricName, namespace string, selector *metav1.LabelSelector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
|
||||||
|
labelSelector, err := metav1.LabelSelectorAsSelector(selector)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, time.Time{}, err
|
||||||
|
}
|
||||||
|
metrics, timestamp, err := c.metricsClient.GetExternalMetric(metricName, namespace, labelSelector)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, time.Time{}, fmt.Errorf("unable to get external metric %s/%s/%+v: %s", namespace, metricName, selector, err)
|
||||||
|
}
|
||||||
|
utilization = 0
|
||||||
|
for _, val := range metrics {
|
||||||
|
utilization = utilization + val
|
||||||
|
}
|
||||||
|
|
||||||
|
replicaCount = currentReplicas
|
||||||
|
usageRatio := float64(utilization) / (float64(targetUtilizationPerPod) * float64(replicaCount))
|
||||||
|
if math.Abs(1.0-usageRatio) > c.tolerance {
|
||||||
|
// update number of replicas if the change is large enough
|
||||||
|
replicaCount = int32(math.Ceil(float64(utilization) / float64(targetUtilizationPerPod)))
|
||||||
|
}
|
||||||
|
utilization = int64(math.Ceil(float64(utilization) / float64(currentReplicas)))
|
||||||
|
return replicaCount, utilization, timestamp, nil
|
||||||
|
}
|
||||||
|
@ -33,6 +33,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||||
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
|
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
|
||||||
cmapi "k8s.io/metrics/pkg/apis/custom_metrics/v1beta1"
|
cmapi "k8s.io/metrics/pkg/apis/custom_metrics/v1beta1"
|
||||||
|
emapi "k8s.io/metrics/pkg/apis/external_metrics/v1beta1"
|
||||||
metricsapi "k8s.io/metrics/pkg/apis/metrics/v1beta1"
|
metricsapi "k8s.io/metrics/pkg/apis/metrics/v1beta1"
|
||||||
metricsfake "k8s.io/metrics/pkg/client/clientset_generated/clientset/fake"
|
metricsfake "k8s.io/metrics/pkg/client/clientset_generated/clientset/fake"
|
||||||
cmfake "k8s.io/metrics/pkg/client/custom_metrics/fake"
|
cmfake "k8s.io/metrics/pkg/client/custom_metrics/fake"
|
||||||
@ -58,8 +59,10 @@ type metricInfo struct {
|
|||||||
name string
|
name string
|
||||||
levels []int64
|
levels []int64
|
||||||
singleObject *autoscalingv2.CrossVersionObjectReference
|
singleObject *autoscalingv2.CrossVersionObjectReference
|
||||||
|
selector *metav1.LabelSelector
|
||||||
|
|
||||||
targetUtilization int64
|
targetUtilization int64
|
||||||
|
perPodTargetUtilization int64
|
||||||
expectedUtilization int64
|
expectedUtilization int64
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -238,6 +241,37 @@ func (tc *replicaCalcTestCase) prepareTestClient(t *testing.T) (*fake.Clientset,
|
|||||||
})
|
})
|
||||||
|
|
||||||
fakeEMClient := &emfake.FakeExternalMetricsClient{}
|
fakeEMClient := &emfake.FakeExternalMetricsClient{}
|
||||||
|
fakeEMClient.AddReactor("list", "*", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||||
|
listAction, wasList := action.(core.ListAction)
|
||||||
|
if !wasList {
|
||||||
|
return true, nil, fmt.Errorf("expected a list-for action, got %v instead", action)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.metric == nil {
|
||||||
|
return true, nil, fmt.Errorf("no external metrics specified in test client")
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, tc.metric.name, listAction.GetResource().Resource, "the metric requested should have matched the one specified")
|
||||||
|
|
||||||
|
selector, err := metav1.LabelSelectorAsSelector(tc.metric.selector)
|
||||||
|
if err != nil {
|
||||||
|
return true, nil, fmt.Errorf("failed to convert label selector specified in test client")
|
||||||
|
}
|
||||||
|
assert.Equal(t, selector, listAction.GetListRestrictions().Labels, "the metric selector should have matched the one specified")
|
||||||
|
|
||||||
|
metrics := emapi.ExternalMetricValueList{}
|
||||||
|
|
||||||
|
for _, level := range tc.metric.levels {
|
||||||
|
metric := emapi.ExternalMetricValue{
|
||||||
|
Timestamp: metav1.Time{Time: tc.timestamp},
|
||||||
|
MetricName: tc.metric.name,
|
||||||
|
Value: *resource.NewMilliQuantity(level, resource.DecimalSI),
|
||||||
|
}
|
||||||
|
metrics.Items = append(metrics.Items, metric)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, &metrics, nil
|
||||||
|
})
|
||||||
|
|
||||||
return fakeClient, fakeMetricsClient, fakeCMClient, fakeEMClient
|
return fakeClient, fakeMetricsClient, fakeCMClient, fakeEMClient
|
||||||
}
|
}
|
||||||
@ -280,6 +314,12 @@ func (tc *replicaCalcTestCase) runTest(t *testing.T) {
|
|||||||
var err error
|
var err error
|
||||||
if tc.metric.singleObject != nil {
|
if tc.metric.singleObject != nil {
|
||||||
outReplicas, outUtilization, outTimestamp, err = replicaCalc.GetObjectMetricReplicas(tc.currentReplicas, tc.metric.targetUtilization, tc.metric.name, testNamespace, tc.metric.singleObject)
|
outReplicas, outUtilization, outTimestamp, err = replicaCalc.GetObjectMetricReplicas(tc.currentReplicas, tc.metric.targetUtilization, tc.metric.name, testNamespace, tc.metric.singleObject)
|
||||||
|
} else if tc.metric.selector != nil {
|
||||||
|
if tc.metric.targetUtilization > 0 {
|
||||||
|
outReplicas, outUtilization, outTimestamp, err = replicaCalc.GetExternalMetricReplicas(tc.currentReplicas, tc.metric.targetUtilization, tc.metric.name, testNamespace, tc.metric.selector)
|
||||||
|
} else if tc.metric.perPodTargetUtilization > 0 {
|
||||||
|
outReplicas, outUtilization, outTimestamp, err = replicaCalc.GetExternalPerPodMetricReplicas(tc.currentReplicas, tc.metric.perPodTargetUtilization, tc.metric.name, testNamespace, tc.metric.selector)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
outReplicas, outUtilization, outTimestamp, err = replicaCalc.GetMetricReplicas(tc.currentReplicas, tc.metric.targetUtilization, tc.metric.name, testNamespace, selector)
|
outReplicas, outUtilization, outTimestamp, err = replicaCalc.GetMetricReplicas(tc.currentReplicas, tc.metric.targetUtilization, tc.metric.name, testNamespace, selector)
|
||||||
}
|
}
|
||||||
@ -428,6 +468,50 @@ func TestReplicaCalcScaleUpCMObject(t *testing.T) {
|
|||||||
tc.runTest(t)
|
tc.runTest(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReplicaCalcScaleUpCMExternal(t *testing.T) {
|
||||||
|
tc := replicaCalcTestCase{
|
||||||
|
currentReplicas: 1,
|
||||||
|
expectedReplicas: 2,
|
||||||
|
metric: &metricInfo{
|
||||||
|
name: "qps",
|
||||||
|
levels: []int64{8600},
|
||||||
|
targetUtilization: 4400,
|
||||||
|
expectedUtilization: 8600,
|
||||||
|
selector: &metav1.LabelSelector{MatchLabels: map[string]string{"label": "value"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReplicaCalcScaleUpCMExternalNoLabels(t *testing.T) {
|
||||||
|
tc := replicaCalcTestCase{
|
||||||
|
currentReplicas: 1,
|
||||||
|
expectedReplicas: 2,
|
||||||
|
metric: &metricInfo{
|
||||||
|
name: "qps",
|
||||||
|
levels: []int64{8600},
|
||||||
|
targetUtilization: 4400,
|
||||||
|
expectedUtilization: 8600,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReplicaCalcScaleUpPerPodCMExternal(t *testing.T) {
|
||||||
|
tc := replicaCalcTestCase{
|
||||||
|
currentReplicas: 3,
|
||||||
|
expectedReplicas: 4,
|
||||||
|
metric: &metricInfo{
|
||||||
|
name: "qps",
|
||||||
|
levels: []int64{8600},
|
||||||
|
perPodTargetUtilization: 2150,
|
||||||
|
expectedUtilization: 2867,
|
||||||
|
selector: &metav1.LabelSelector{MatchLabels: map[string]string{"label": "value"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
func TestReplicaCalcScaleDown(t *testing.T) {
|
func TestReplicaCalcScaleDown(t *testing.T) {
|
||||||
tc := replicaCalcTestCase{
|
tc := replicaCalcTestCase{
|
||||||
currentReplicas: 5,
|
currentReplicas: 5,
|
||||||
@ -478,6 +562,36 @@ func TestReplicaCalcScaleDownCMObject(t *testing.T) {
|
|||||||
tc.runTest(t)
|
tc.runTest(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReplicaCalcScaleDownCMExternal(t *testing.T) {
|
||||||
|
tc := replicaCalcTestCase{
|
||||||
|
currentReplicas: 5,
|
||||||
|
expectedReplicas: 3,
|
||||||
|
metric: &metricInfo{
|
||||||
|
name: "qps",
|
||||||
|
levels: []int64{8600},
|
||||||
|
targetUtilization: 14334,
|
||||||
|
expectedUtilization: 8600,
|
||||||
|
selector: &metav1.LabelSelector{MatchLabels: map[string]string{"label": "value"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReplicaCalcScaleDownPerPodCMExternal(t *testing.T) {
|
||||||
|
tc := replicaCalcTestCase{
|
||||||
|
currentReplicas: 5,
|
||||||
|
expectedReplicas: 3,
|
||||||
|
metric: &metricInfo{
|
||||||
|
name: "qps",
|
||||||
|
levels: []int64{8600},
|
||||||
|
perPodTargetUtilization: 2867,
|
||||||
|
expectedUtilization: 1720,
|
||||||
|
selector: &metav1.LabelSelector{MatchLabels: map[string]string{"label": "value"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
func TestReplicaCalcScaleDownIgnoresUnreadyPods(t *testing.T) {
|
func TestReplicaCalcScaleDownIgnoresUnreadyPods(t *testing.T) {
|
||||||
tc := replicaCalcTestCase{
|
tc := replicaCalcTestCase{
|
||||||
currentReplicas: 5,
|
currentReplicas: 5,
|
||||||
@ -546,6 +660,36 @@ func TestReplicaCalcToleranceCMObject(t *testing.T) {
|
|||||||
tc.runTest(t)
|
tc.runTest(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReplicaCalcToleranceCMExternal(t *testing.T) {
|
||||||
|
tc := replicaCalcTestCase{
|
||||||
|
currentReplicas: 3,
|
||||||
|
expectedReplicas: 3,
|
||||||
|
metric: &metricInfo{
|
||||||
|
name: "qps",
|
||||||
|
levels: []int64{8600},
|
||||||
|
targetUtilization: 8888,
|
||||||
|
expectedUtilization: 8600,
|
||||||
|
selector: &metav1.LabelSelector{MatchLabels: map[string]string{"label": "value"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReplicaCalcTolerancePerPodCMExternal(t *testing.T) {
|
||||||
|
tc := replicaCalcTestCase{
|
||||||
|
currentReplicas: 3,
|
||||||
|
expectedReplicas: 3,
|
||||||
|
metric: &metricInfo{
|
||||||
|
name: "qps",
|
||||||
|
levels: []int64{8600},
|
||||||
|
perPodTargetUtilization: 2900,
|
||||||
|
expectedUtilization: 2867,
|
||||||
|
selector: &metav1.LabelSelector{MatchLabels: map[string]string{"label": "value"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
tc.runTest(t)
|
||||||
|
}
|
||||||
|
|
||||||
func TestReplicaCalcSuperfluousMetrics(t *testing.T) {
|
func TestReplicaCalcSuperfluousMetrics(t *testing.T) {
|
||||||
tc := replicaCalcTestCase{
|
tc := replicaCalcTestCase{
|
||||||
currentReplicas: 4,
|
currentReplicas: 4,
|
||||||
|
Loading…
Reference in New Issue
Block a user