Make HPA controller use polymorphic scale client

This updates the HPA controller to use the polymorphic scale client from
client-go.  This should enable HPAs to work with arbitrary scalable
resources, instead of just those in the extensions API group (meaning we
can deprecate the copy of ReplicationController in extensions/v1beta1).
It also means that the HPA controller now pays attention to the
APIVersion field in `scaleTargetRef` (more specifically, the group part
of it).

Note that currently, discovery information on which resources are
available where is only fetched once (the first time that it's
requested).  In the future, we may want a refreshing discovery REST
mapper.
This commit is contained in:
Solly Ross
2017-10-11 14:31:04 -04:00
parent 3f51babdd1
commit d2b41120ea
7 changed files with 344 additions and 264 deletions

View File

@@ -27,15 +27,16 @@ import (
autoscalingv1 "k8s.io/api/autoscaling/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2beta1"
"k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clientfake "k8s.io/client-go/kubernetes/fake"
scalefake "k8s.io/client-go/scale/fake"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/autoscaling"
@@ -121,6 +122,7 @@ type testCase struct {
testClient *fake.Clientset
testMetricsClient *metricsfake.Clientset
testCMClient *cmfake.FakeCustomMetricsClient
testScaleClient *scalefake.FakeScaleClient
}
// Needs to be called under a lock.
@@ -144,12 +146,12 @@ func init() {
scaleUpLimitFactor = 8
}
func (tc *testCase) prepareTestClient(t *testing.T) (*fake.Clientset, *metricsfake.Clientset, *cmfake.FakeCustomMetricsClient) {
func (tc *testCase) prepareTestClient(t *testing.T) (*fake.Clientset, *metricsfake.Clientset, *cmfake.FakeCustomMetricsClient, *scalefake.FakeScaleClient) {
namespace := "test-namespace"
hpaName := "test-hpa"
podNamePrefix := "test-pod"
// TODO: also test with TargetSelector
selector := map[string]string{"name": podNamePrefix}
labelSet := map[string]string{"name": podNamePrefix}
selector := labels.SelectorFromSet(labelSet).String()
tc.Lock()
@@ -161,13 +163,11 @@ func (tc *testCase) prepareTestClient(t *testing.T) (*fake.Clientset, *metricsfa
tc.computeCPUCurrent()
}
// TODO(madhusudancs): HPA only supports resources in extensions/v1beta1 right now. Add
// tests for "v1" replicationcontrollers when HPA adds support for cross-group scale.
if tc.resource == nil {
tc.resource = &fakeResource{
name: "test-rc",
apiVersion: "extensions/v1beta1",
kind: "replicationcontrollers",
apiVersion: "v1",
kind: "ReplicationController",
}
}
tc.Unlock()
@@ -239,66 +239,6 @@ func (tc *testCase) prepareTestClient(t *testing.T) (*fake.Clientset, *metricsfa
return true, objv1, nil
})
fakeClient.AddReactor("get", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
obj := &extensions.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: tc.resource.name,
Namespace: namespace,
},
Spec: extensions.ScaleSpec{
Replicas: tc.initialReplicas,
},
Status: extensions.ScaleStatus{
Replicas: tc.initialReplicas,
Selector: selector,
},
}
return true, obj, nil
})
fakeClient.AddReactor("get", "deployments", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
obj := &extensions.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: tc.resource.name,
Namespace: namespace,
},
Spec: extensions.ScaleSpec{
Replicas: tc.initialReplicas,
},
Status: extensions.ScaleStatus{
Replicas: tc.initialReplicas,
Selector: selector,
},
}
return true, obj, nil
})
fakeClient.AddReactor("get", "replicasets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
obj := &extensions.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: tc.resource.name,
Namespace: namespace,
},
Spec: extensions.ScaleSpec{
Replicas: tc.initialReplicas,
},
Status: extensions.ScaleStatus{
Replicas: tc.initialReplicas,
Selector: selector,
},
}
return true, obj, nil
})
fakeClient.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
@@ -344,39 +284,6 @@ func (tc *testCase) prepareTestClient(t *testing.T) (*fake.Clientset, *metricsfa
return true, obj, nil
})
fakeClient.AddReactor("update", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
obj := action.(core.UpdateAction).GetObject().(*extensions.Scale)
replicas := action.(core.UpdateAction).GetObject().(*extensions.Scale).Spec.Replicas
assert.Equal(t, tc.desiredReplicas, replicas, "the replica count of the RC should be as expected")
tc.scaleUpdated = true
return true, obj, nil
})
fakeClient.AddReactor("update", "deployments", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
obj := action.(core.UpdateAction).GetObject().(*extensions.Scale)
replicas := action.(core.UpdateAction).GetObject().(*extensions.Scale).Spec.Replicas
assert.Equal(t, tc.desiredReplicas, replicas, "the replica count of the deployment should be as expected")
tc.scaleUpdated = true
return true, obj, nil
})
fakeClient.AddReactor("update", "replicasets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
obj := action.(core.UpdateAction).GetObject().(*extensions.Scale)
replicas := action.(core.UpdateAction).GetObject().(*extensions.Scale).Spec.Replicas
assert.Equal(t, tc.desiredReplicas, replicas, "the replica count of the replicaset should be as expected")
tc.scaleUpdated = true
return true, obj, nil
})
fakeClient.AddReactor("update", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
@@ -386,8 +293,9 @@ func (tc *testCase) prepareTestClient(t *testing.T) (*fake.Clientset, *metricsfa
assert.Equal(t, hpaName, obj.Name, "the HPA name should be as expected")
assert.Equal(t, tc.desiredReplicas, obj.Status.DesiredReplicas, "the desired replica count reported in the object status should be as expected")
if tc.verifyCPUCurrent {
assert.NotNil(t, obj.Status.CurrentCPUUtilizationPercentage, "the reported CPU utilization percentage should be non-nil")
assert.Equal(t, tc.CPUCurrent, *obj.Status.CurrentCPUUtilizationPercentage, "the report CPU utilization percentage should be as expected")
if assert.NotNil(t, obj.Status.CurrentCPUUtilizationPercentage, "the reported CPU utilization percentage should be non-nil") {
assert.Equal(t, tc.CPUCurrent, *obj.Status.CurrentCPUUtilizationPercentage, "the report CPU utilization percentage should be as expected")
}
}
var actualConditions []autoscalingv1.HorizontalPodAutoscalerCondition
if err := json.Unmarshal([]byte(obj.ObjectMeta.Annotations[autoscaling.HorizontalPodAutoscalerConditionsAnnotation]), &actualConditions); err != nil {
@@ -411,6 +319,100 @@ func (tc *testCase) prepareTestClient(t *testing.T) (*fake.Clientset, *metricsfa
return true, obj, nil
})
fakeScaleClient := &scalefake.FakeScaleClient{}
fakeScaleClient.AddReactor("get", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
obj := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: tc.resource.name,
Namespace: namespace,
},
Spec: autoscalingv1.ScaleSpec{
Replicas: tc.initialReplicas,
},
Status: autoscalingv1.ScaleStatus{
Replicas: tc.initialReplicas,
Selector: selector,
},
}
return true, obj, nil
})
fakeScaleClient.AddReactor("get", "deployments", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
obj := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: tc.resource.name,
Namespace: namespace,
},
Spec: autoscalingv1.ScaleSpec{
Replicas: tc.initialReplicas,
},
Status: autoscalingv1.ScaleStatus{
Replicas: tc.initialReplicas,
Selector: selector,
},
}
return true, obj, nil
})
fakeScaleClient.AddReactor("get", "replicasets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
obj := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: tc.resource.name,
Namespace: namespace,
},
Spec: autoscalingv1.ScaleSpec{
Replicas: tc.initialReplicas,
},
Status: autoscalingv1.ScaleStatus{
Replicas: tc.initialReplicas,
Selector: selector,
},
}
return true, obj, nil
})
fakeScaleClient.AddReactor("update", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
obj := action.(core.UpdateAction).GetObject().(*autoscalingv1.Scale)
replicas := action.(core.UpdateAction).GetObject().(*autoscalingv1.Scale).Spec.Replicas
assert.Equal(t, tc.desiredReplicas, replicas, "the replica count of the RC should be as expected")
tc.scaleUpdated = true
return true, obj, nil
})
fakeScaleClient.AddReactor("update", "deployments", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
obj := action.(core.UpdateAction).GetObject().(*autoscalingv1.Scale)
replicas := action.(core.UpdateAction).GetObject().(*autoscalingv1.Scale).Spec.Replicas
assert.Equal(t, tc.desiredReplicas, replicas, "the replica count of the deployment should be as expected")
tc.scaleUpdated = true
return true, obj, nil
})
fakeScaleClient.AddReactor("update", "replicasets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
obj := action.(core.UpdateAction).GetObject().(*autoscalingv1.Scale)
replicas := action.(core.UpdateAction).GetObject().(*autoscalingv1.Scale).Spec.Replicas
assert.Equal(t, tc.desiredReplicas, replicas, "the replica count of the replicaset should be as expected")
tc.scaleUpdated = true
return true, obj, nil
})
fakeWatch := watch.NewFake()
fakeClient.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
@@ -427,7 +429,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) (*fake.Clientset, *metricsfa
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%d", podNamePrefix, i),
Namespace: namespace,
Labels: selector,
Labels: labelSet,
},
Timestamp: metav1.Time{Time: time.Now()},
Containers: []metricsapi.ContainerMetrics{
@@ -522,7 +524,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) (*fake.Clientset, *metricsfa
return true, metrics, nil
})
return fakeClient, fakeMetricsClient, fakeCMClient
return fakeClient, fakeMetricsClient, fakeCMClient, fakeScaleClient
}
func (tc *testCase) verifyResults(t *testing.T) {
@@ -537,7 +539,7 @@ func (tc *testCase) verifyResults(t *testing.T) {
}
func (tc *testCase) setupController(t *testing.T) (*HorizontalController, informers.SharedInformerFactory) {
testClient, testMetricsClient, testCMClient := tc.prepareTestClient(t)
testClient, testMetricsClient, testCMClient, testScaleClient := tc.prepareTestClient(t)
if tc.testClient != nil {
testClient = tc.testClient
}
@@ -547,6 +549,9 @@ func (tc *testCase) setupController(t *testing.T) (*HorizontalController, inform
if tc.testCMClient != nil {
testCMClient = tc.testCMClient
}
if tc.testScaleClient != nil {
testScaleClient = tc.testScaleClient
}
metricsClient := metrics.NewRESTMetricsClient(
testMetricsClient.MetricsV1beta1(),
testCMClient,
@@ -587,8 +592,9 @@ func (tc *testCase) setupController(t *testing.T) (*HorizontalController, inform
hpaController := NewHorizontalController(
eventClient.Core(),
testClient.Extensions(),
testScaleClient,
testClient.Autoscaling(),
legacyscheme.Registry.RESTMapper(),
replicaCalc,
informerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
controller.NoResyncPeriodFunc(),
@@ -692,7 +698,7 @@ func TestScaleUpDeployment(t *testing.T) {
resource: &fakeResource{
name: "test-dep",
apiVersion: "extensions/v1beta1",
kind: "deployments",
kind: "Deployment",
},
}
tc.runTest(t)
@@ -712,7 +718,7 @@ func TestScaleUpReplicaSet(t *testing.T) {
resource: &fakeResource{
name: "test-replicaset",
apiVersion: "extensions/v1beta1",
kind: "replicasets",
kind: "ReplicaSet",
},
}
tc.runTest(t)
@@ -1267,18 +1273,18 @@ func TestConditionInvalidSelectorMissing(t *testing.T) {
},
}
testClient, _, _ := tc.prepareTestClient(t)
tc.testClient = testClient
_, _, _, testScaleClient := tc.prepareTestClient(t)
tc.testScaleClient = testScaleClient
testClient.PrependReactor("get", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
obj := &extensions.Scale{
testScaleClient.PrependReactor("get", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
obj := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: tc.resource.name,
},
Spec: extensions.ScaleSpec{
Spec: autoscalingv1.ScaleSpec{
Replicas: tc.initialReplicas,
},
Status: extensions.ScaleStatus{
Status: autoscalingv1.ScaleStatus{
Replicas: tc.initialReplicas,
},
}
@@ -1312,20 +1318,20 @@ func TestConditionInvalidSelectorUnparsable(t *testing.T) {
},
}
testClient, _, _ := tc.prepareTestClient(t)
tc.testClient = testClient
_, _, _, testScaleClient := tc.prepareTestClient(t)
tc.testScaleClient = testScaleClient
testClient.PrependReactor("get", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
obj := &extensions.Scale{
testScaleClient.PrependReactor("get", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
obj := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: tc.resource.name,
},
Spec: extensions.ScaleSpec{
Spec: autoscalingv1.ScaleSpec{
Replicas: tc.initialReplicas,
},
Status: extensions.ScaleStatus{
Replicas: tc.initialReplicas,
TargetSelector: "cheddar cheese",
Status: autoscalingv1.ScaleStatus{
Replicas: tc.initialReplicas,
Selector: "cheddar cheese",
},
}
return true, obj, nil
@@ -1373,7 +1379,7 @@ func TestConditionFailedGetMetrics(t *testing.T) {
reportedCPURequests: []resource.Quantity{resource.MustParse("0.1"), resource.MustParse("0.1"), resource.MustParse("0.1")},
useMetricsAPI: true,
}
_, testMetricsClient, testCMClient := tc.prepareTestClient(t)
_, testMetricsClient, testCMClient, _ := tc.prepareTestClient(t)
tc.testMetricsClient = testMetricsClient
tc.testCMClient = testCMClient
@@ -1446,11 +1452,11 @@ func TestConditionFailedGetScale(t *testing.T) {
},
}
testClient, _, _ := tc.prepareTestClient(t)
tc.testClient = testClient
_, _, _, testScaleClient := tc.prepareTestClient(t)
tc.testScaleClient = testScaleClient
testClient.PrependReactor("get", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, &extensions.Scale{}, fmt.Errorf("something went wrong")
testScaleClient.PrependReactor("get", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, &autoscalingv1.Scale{}, fmt.Errorf("something went wrong")
})
tc.runTest(t)
@@ -1473,11 +1479,11 @@ func TestConditionFailedUpdateScale(t *testing.T) {
}),
}
testClient, _, _ := tc.prepareTestClient(t)
tc.testClient = testClient
_, _, _, testScaleClient := tc.prepareTestClient(t)
tc.testScaleClient = testScaleClient
testClient.PrependReactor("update", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, &extensions.Scale{}, fmt.Errorf("something went wrong")
testScaleClient.PrependReactor("update", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, &autoscalingv1.Scale{}, fmt.Errorf("something went wrong")
})
tc.runTest(t)
@@ -1659,7 +1665,7 @@ func TestAvoidUncessaryUpdates(t *testing.T) {
reportedPodReadiness: []v1.ConditionStatus{v1.ConditionTrue, v1.ConditionFalse, v1.ConditionFalse},
useMetricsAPI: true,
}
testClient, _, _ := tc.prepareTestClient(t)
testClient, _, _, _ := tc.prepareTestClient(t)
tc.testClient = testClient
var savedHPA *autoscalingv1.HorizontalPodAutoscaler
testClient.PrependReactor("list", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) {