diff --git a/cmd/kube-controller-manager/app/policy.go b/cmd/kube-controller-manager/app/policy.go index 03c84b6644a..40aacd38bed 100644 --- a/cmd/kube-controller-manager/app/policy.go +++ b/cmd/kube-controller-manager/app/policy.go @@ -22,6 +22,8 @@ package app import ( "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/scale" "k8s.io/kubernetes/pkg/controller/disruption" "net/http" @@ -40,6 +42,15 @@ func startDisruptionController(ctx ControllerContext) (http.Handler, bool, error resource, group+"/"+version) return nil, false, nil } + + client := ctx.ClientBuilder.ClientOrDie("disruption-controller") + config := ctx.ClientBuilder.ConfigOrDie("disruption-controller") + scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery()) + scaleClient, err := scale.NewForConfig(config, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) + if err != nil { + return nil, false, err + } + go disruption.NewDisruptionController( ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(), @@ -47,7 +58,9 @@ func startDisruptionController(ctx ControllerContext) (http.Handler, bool, error ctx.InformerFactory.Apps().V1().ReplicaSets(), ctx.InformerFactory.Apps().V1().Deployments(), ctx.InformerFactory.Apps().V1().StatefulSets(), - ctx.ClientBuilder.ClientOrDie("disruption-controller"), + client, + ctx.RESTMapper, + scaleClient, ).Run(ctx.Stop) return nil, true, nil } diff --git a/pkg/controller/disruption/BUILD b/pkg/controller/disruption/BUILD index cca52c58c62..2bede6377a6 100644 --- a/pkg/controller/disruption/BUILD +++ b/pkg/controller/disruption/BUILD @@ -19,7 +19,9 @@ go_library( "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", @@ -34,6 +36,7 @@ go_library( "//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library", + "//staging/src/k8s.io/client-go/scale:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", @@ -49,13 +52,20 @@ go_test( "//pkg/apis/core/install:go_default_library", "//pkg/controller:go_default_library", "//staging/src/k8s.io/api/apps/v1:go_default_library", + "//staging/src/k8s.io/api/autoscaling/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/meta/testrestmapper:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/scale/fake:go_default_library", + "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/github.com/Azure/go-autorest/autorest/to:go_default_library", diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 9f34d927fa6..c2468e913fd 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -26,7 +26,9 @@ import ( policy "k8s.io/api/policy/v1beta1" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -41,6 +43,7 @@ import ( appsv1listers "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1beta1" + scaleclient "k8s.io/client-go/scale" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -67,6 +70,9 @@ type updater func(*policy.PodDisruptionBudget) error type DisruptionController struct { kubeClient clientset.Interface + mapper apimeta.RESTMapper + + scaleNamespacer scaleclient.ScalesGetter pdbLister policylisters.PodDisruptionBudgetLister pdbListerSynced cache.InformerSynced @@ -105,7 +111,7 @@ type controllerAndScale struct { // podControllerFinder is a function type that maps a pod to a list of // controllers and their scale. -type podControllerFinder func(*v1.Pod) (*controllerAndScale, error) +type podControllerFinder func(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) func NewDisruptionController( podInformer coreinformers.PodInformer, @@ -115,6 +121,8 @@ func NewDisruptionController( dInformer appsv1informers.DeploymentInformer, ssInformer appsv1informers.StatefulSetInformer, kubeClient clientset.Interface, + restMapper apimeta.RESTMapper, + scaleNamespacer scaleclient.ScalesGetter, ) *DisruptionController { dc := &DisruptionController{ kubeClient: kubeClient, @@ -157,19 +165,19 @@ func NewDisruptionController( dc.ssLister = ssInformer.Lister() dc.ssListerSynced = ssInformer.Informer().HasSynced + dc.mapper = restMapper + dc.scaleNamespacer = scaleNamespacer + return dc } -// TODO(mml): When controllerRef is implemented (#2210), we *could* simply -// return controllers without their scales, and access scale type-generically -// via the scale subresource. That may not be as much of a win as it sounds, -// however. We are accessing everything through the pkg/client/cache API that -// we have to set up and tune to the types we know we'll be accessing anyway, -// and we may well need further tweaks just to be able to access scale -// subresources. +// The workload resources do implement the scale subresource, so it would +// be possible to only check the scale subresource here. But since there is no +// way to take advantage of listers with scale subresources, we use the workload +// resources directly and only fall back to the scale subresource when needed. func (dc *DisruptionController) finders() []podControllerFinder { return []podControllerFinder{dc.getPodReplicationController, dc.getPodDeployment, dc.getPodReplicaSet, - dc.getPodStatefulSet} + dc.getPodStatefulSet, dc.getScaleController} } var ( @@ -180,15 +188,12 @@ var ( ) // getPodReplicaSet finds a replicaset which has no matching deployments. -func (dc *DisruptionController) getPodReplicaSet(pod *v1.Pod) (*controllerAndScale, error) { - controllerRef := metav1.GetControllerOf(pod) - if controllerRef == nil { - return nil, nil +func (dc *DisruptionController) getPodReplicaSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { + ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"}) + if !ok || err != nil { + return nil, err } - if controllerRef.Kind != controllerKindRS.Kind { - return nil, nil - } - rs, err := dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name) + rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name) if err != nil { // The only possible error is NotFound, which is ok here. return nil, nil @@ -204,16 +209,13 @@ func (dc *DisruptionController) getPodReplicaSet(pod *v1.Pod) (*controllerAndSca return &controllerAndScale{rs.UID, *(rs.Spec.Replicas)}, nil } -// getPodStatefulSet returns the statefulset managing the given pod. -func (dc *DisruptionController) getPodStatefulSet(pod *v1.Pod) (*controllerAndScale, error) { - controllerRef := metav1.GetControllerOf(pod) - if controllerRef == nil { - return nil, nil +// getPodStatefulSet returns the statefulset referenced by the provided controllerRef. +func (dc *DisruptionController) getPodStatefulSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { + ok, err := verifyGroupKind(controllerRef, controllerKindSS.Kind, []string{"apps"}) + if !ok || err != nil { + return nil, err } - if controllerRef.Kind != controllerKindSS.Kind { - return nil, nil - } - ss, err := dc.ssLister.StatefulSets(pod.Namespace).Get(controllerRef.Name) + ss, err := dc.ssLister.StatefulSets(namespace).Get(controllerRef.Name) if err != nil { // The only possible error is NotFound, which is ok here. return nil, nil @@ -226,15 +228,12 @@ func (dc *DisruptionController) getPodStatefulSet(pod *v1.Pod) (*controllerAndSc } // getPodDeployments finds deployments for any replicasets which are being managed by deployments. -func (dc *DisruptionController) getPodDeployment(pod *v1.Pod) (*controllerAndScale, error) { - controllerRef := metav1.GetControllerOf(pod) - if controllerRef == nil { - return nil, nil +func (dc *DisruptionController) getPodDeployment(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { + ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"}) + if !ok || err != nil { + return nil, err } - if controllerRef.Kind != controllerKindRS.Kind { - return nil, nil - } - rs, err := dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name) + rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name) if err != nil { // The only possible error is NotFound, which is ok here. return nil, nil @@ -246,8 +245,10 @@ func (dc *DisruptionController) getPodDeployment(pod *v1.Pod) (*controllerAndSca if controllerRef == nil { return nil, nil } - if controllerRef.Kind != controllerKindDep.Kind { - return nil, nil + + ok, err = verifyGroupKind(controllerRef, controllerKindDep.Kind, []string{"apps", "extensions"}) + if !ok || err != nil { + return nil, err } deployment, err := dc.dLister.Deployments(rs.Namespace).Get(controllerRef.Name) if err != nil { @@ -260,15 +261,12 @@ func (dc *DisruptionController) getPodDeployment(pod *v1.Pod) (*controllerAndSca return &controllerAndScale{deployment.UID, *(deployment.Spec.Replicas)}, nil } -func (dc *DisruptionController) getPodReplicationController(pod *v1.Pod) (*controllerAndScale, error) { - controllerRef := metav1.GetControllerOf(pod) - if controllerRef == nil { - return nil, nil +func (dc *DisruptionController) getPodReplicationController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { + ok, err := verifyGroupKind(controllerRef, controllerKindRC.Kind, []string{""}) + if !ok || err != nil { + return nil, err } - if controllerRef.Kind != controllerKindRC.Kind { - return nil, nil - } - rc, err := dc.rcLister.ReplicationControllers(pod.Namespace).Get(controllerRef.Name) + rc, err := dc.rcLister.ReplicationControllers(namespace).Get(controllerRef.Name) if err != nil { // The only possible error is NotFound, which is ok here. return nil, nil @@ -279,6 +277,55 @@ func (dc *DisruptionController) getPodReplicationController(pod *v1.Pod) (*contr return &controllerAndScale{rc.UID, *(rc.Spec.Replicas)}, nil } +func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { + gv, err := schema.ParseGroupVersion(controllerRef.APIVersion) + if err != nil { + return nil, err + } + + gk := schema.GroupKind{ + Group: gv.Group, + Kind: controllerRef.Kind, + } + + mapping, err := dc.mapper.RESTMapping(gk, gv.Version) + if err != nil { + return nil, err + } + gr := mapping.Resource.GroupResource() + + scale, err := dc.scaleNamespacer.Scales(namespace).Get(gr, controllerRef.Name) + if err != nil { + if errors.IsNotFound(err) { + return nil, nil + } + return nil, err + } + if scale.UID != controllerRef.UID { + return nil, nil + } + return &controllerAndScale{scale.UID, scale.Spec.Replicas}, nil +} + +func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string, expectedGroups []string) (bool, error) { + gv, err := schema.ParseGroupVersion(controllerRef.APIVersion) + if err != nil { + return false, err + } + + if controllerRef.Kind != expectedKind { + return false, nil + } + + for _, group := range expectedGroups { + if group == gv.Group { + return true, nil + } + } + + return false, nil +} + func (dc *DisruptionController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer dc.queue.ShutDown() @@ -583,10 +630,23 @@ func (dc *DisruptionController) getExpectedScale(pdb *policy.PodDisruptionBudget // 1. Find the controller for each pod. If any pod has 0 controllers, // that's an error. With ControllerRef, a pod can only have 1 controller. for _, pod := range pods { + controllerRef := metav1.GetControllerOf(pod) + if controllerRef == nil { + err = fmt.Errorf("found no controller ref for pod %q", pod.Name) + dc.recorder.Event(pdb, v1.EventTypeWarning, "NoControllerRef", err.Error()) + return + } + + // If we already know the scale of the controller there is no need to do anything. + if _, found := controllerScale[controllerRef.UID]; found { + continue + } + + // Check all the supported controllers to find the desired scale. foundController := false for _, finder := range dc.finders() { var controllerNScale *controllerAndScale - controllerNScale, err = finder(pod) + controllerNScale, err = finder(controllerRef, pod.Namespace) if err != nil { return } diff --git a/pkg/controller/disruption/disruption_test.go b/pkg/controller/disruption/disruption_test.go index 1f4e6c74efe..26341083317 100644 --- a/pkg/controller/disruption/disruption_test.go +++ b/pkg/controller/disruption/disruption_test.go @@ -23,13 +23,20 @@ import ( "time" apps "k8s.io/api/apps/v1" + autoscalingapi "k8s.io/api/autoscaling/v1" "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/meta/testrestmapper" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/informers" + scalefake "k8s.io/client-go/scale/fake" + core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" _ "k8s.io/kubernetes/pkg/apis/core/install" @@ -90,6 +97,14 @@ type disruptionController struct { rsStore cache.Store dStore cache.Store ssStore cache.Store + + scaleClient *scalefake.FakeScaleClient +} + +var customGVK = schema.GroupVersionKind{ + Group: "custom.k8s.io", + Version: "v1", + Kind: "customresource", } func newFakeDisruptionController() (*disruptionController, *pdbStates) { @@ -97,6 +112,10 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + scheme := runtime.NewScheme() + scheme.AddKnownTypeWithName(customGVK, &v1.Service{}) + fakeScaleClient := &scalefake.FakeScaleClient{} + dc := NewDisruptionController( informerFactory.Core().V1().Pods(), informerFactory.Policy().V1beta1().PodDisruptionBudgets(), @@ -105,6 +124,8 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { informerFactory.Apps().V1().Deployments(), informerFactory.Apps().V1().StatefulSets(), nil, + testrestmapper.TestOnlyStaticRESTMapper(scheme), + fakeScaleClient, ) dc.getUpdater = func() updater { return ps.Set } dc.podListerSynced = alwaysReady @@ -122,6 +143,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { informerFactory.Apps().V1().ReplicaSets().Informer().GetStore(), informerFactory.Apps().V1().Deployments().Informer().GetStore(), informerFactory.Apps().V1().StatefulSets().Informer().GetStore(), + fakeScaleClient, }, ps } @@ -490,6 +512,52 @@ func TestReplicaSet(t *testing.T) { ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 10, map[string]metav1.Time{}) } +func TestScaleResource(t *testing.T) { + customResourceUID := uuid.NewUUID() + replicas := int32(10) + pods := int32(4) + maxUnavailable := int32(5) + + dc, ps := newFakeDisruptionController() + + dc.scaleClient.AddReactor("get", "customresources", func(action core.Action) (handled bool, ret runtime.Object, err error) { + obj := &autoscalingapi.Scale{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + UID: customResourceUID, + }, + Spec: autoscalingapi.ScaleSpec{ + Replicas: replicas, + }, + } + return true, obj, nil + }) + + pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromInt(int(maxUnavailable))) + add(t, dc.pdbStore, pdb) + + trueVal := true + for i := 0; i < int(pods); i++ { + pod, _ := newPod(t, fmt.Sprintf("pod-%d", i)) + pod.SetOwnerReferences([]metav1.OwnerReference{ + { + Kind: customGVK.Kind, + APIVersion: customGVK.GroupVersion().String(), + Controller: &trueVal, + UID: customResourceUID, + }, + }) + add(t, dc.podStore, pod) + } + + dc.sync(pdbName) + disruptionsAllowed := int32(0) + if replicas-pods < maxUnavailable { + disruptionsAllowed = maxUnavailable - (replicas - pods) + } + ps.VerifyPdbStatus(t, pdbName, disruptionsAllowed, pods, replicas-maxUnavailable, replicas, map[string]metav1.Time{}) +} + // Verify that multiple controllers doesn't allow the PDB to be set true. func TestMultipleControllers(t *testing.T) { const podCount = 2 @@ -759,3 +827,202 @@ func TestUpdateDisruptedPods(t *testing.T) { ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime}}) } + +func TestBasicFinderFunctions(t *testing.T) { + dc, _ := newFakeDisruptionController() + + rs, _ := newReplicaSet(t, 10) + add(t, dc.rsStore, rs) + rc, _ := newReplicationController(t, 12) + add(t, dc.rcStore, rc) + ss, _ := newStatefulSet(t, 14) + add(t, dc.ssStore, ss) + + testCases := map[string]struct { + finderFunc podControllerFinder + apiVersion string + kind string + name string + uid types.UID + findsScale bool + expectedScale int32 + }{ + "replicaset controller with apps group": { + finderFunc: dc.getPodReplicaSet, + apiVersion: "apps/v1", + kind: controllerKindRS.Kind, + name: rs.Name, + uid: rs.UID, + findsScale: true, + expectedScale: 10, + }, + "replicaset controller with invalid group": { + finderFunc: dc.getPodReplicaSet, + apiVersion: "invalid/v1", + kind: controllerKindRS.Kind, + name: rs.Name, + uid: rs.UID, + findsScale: false, + }, + "replicationcontroller with empty group": { + finderFunc: dc.getPodReplicationController, + apiVersion: "/v1", + kind: controllerKindRC.Kind, + name: rc.Name, + uid: rc.UID, + findsScale: true, + expectedScale: 12, + }, + "replicationcontroller with invalid group": { + finderFunc: dc.getPodReplicationController, + apiVersion: "apps/v1", + kind: controllerKindRC.Kind, + name: rc.Name, + uid: rc.UID, + findsScale: false, + }, + "statefulset controller with extensions group": { + finderFunc: dc.getPodStatefulSet, + apiVersion: "apps/v1", + kind: controllerKindSS.Kind, + name: ss.Name, + uid: ss.UID, + findsScale: true, + expectedScale: 14, + }, + "statefulset controller with invalid kind": { + finderFunc: dc.getPodStatefulSet, + apiVersion: "apps/v1", + kind: controllerKindRS.Kind, + name: ss.Name, + uid: ss.UID, + findsScale: false, + }, + } + + for tn, tc := range testCases { + t.Run(tn, func(t *testing.T) { + controllerRef := &metav1.OwnerReference{ + APIVersion: tc.apiVersion, + Kind: tc.kind, + Name: tc.name, + UID: tc.uid, + } + + controllerAndScale, _ := tc.finderFunc(controllerRef, metav1.NamespaceDefault) + + if controllerAndScale == nil { + if tc.findsScale { + t.Error("Expected scale, but got nil") + } + return + } + + if got, want := controllerAndScale.scale, tc.expectedScale; got != want { + t.Errorf("Expected scale %d, but got %d", want, got) + } + + if got, want := controllerAndScale.UID, tc.uid; got != want { + t.Errorf("Expected uid %s, but got %s", want, got) + } + }) + } +} + +func TestDeploymentFinderFunction(t *testing.T) { + labels := map[string]string{ + "foo": "bar", + } + + testCases := map[string]struct { + rsApiVersion string + rsKind string + depApiVersion string + depKind string + findsScale bool + expectedScale int32 + }{ + "happy path": { + rsApiVersion: "apps/v1", + rsKind: controllerKindRS.Kind, + depApiVersion: "extensions/v1", + depKind: controllerKindDep.Kind, + findsScale: true, + expectedScale: 10, + }, + "invalid rs apiVersion": { + rsApiVersion: "invalid/v1", + rsKind: controllerKindRS.Kind, + depApiVersion: "apps/v1", + depKind: controllerKindDep.Kind, + findsScale: false, + }, + "invalid rs kind": { + rsApiVersion: "apps/v1", + rsKind: "InvalidKind", + depApiVersion: "apps/v1", + depKind: controllerKindDep.Kind, + findsScale: false, + }, + "invalid deployment apiVersion": { + rsApiVersion: "extensions/v1", + rsKind: controllerKindRS.Kind, + depApiVersion: "deployment/v1", + depKind: controllerKindDep.Kind, + findsScale: false, + }, + "invalid deployment kind": { + rsApiVersion: "apps/v1", + rsKind: controllerKindRS.Kind, + depApiVersion: "extensions/v1", + depKind: "InvalidKind", + findsScale: false, + }, + } + + for tn, tc := range testCases { + t.Run(tn, func(t *testing.T) { + dc, _ := newFakeDisruptionController() + + dep, _ := newDeployment(t, 10) + dep.Spec.Selector = newSel(labels) + add(t, dc.dStore, dep) + + rs, _ := newReplicaSet(t, 5) + rs.Labels = labels + trueVal := true + rs.OwnerReferences = append(rs.OwnerReferences, metav1.OwnerReference{ + APIVersion: tc.depApiVersion, + Kind: tc.depKind, + Name: dep.Name, + UID: dep.UID, + Controller: &trueVal, + }) + add(t, dc.rsStore, rs) + + controllerRef := &metav1.OwnerReference{ + APIVersion: tc.rsApiVersion, + Kind: tc.rsKind, + Name: rs.Name, + UID: rs.UID, + } + + controllerAndScale, _ := dc.getPodDeployment(controllerRef, metav1.NamespaceDefault) + + if controllerAndScale == nil { + if tc.findsScale { + t.Error("Expected scale, but got nil") + } + return + } + + if got, want := controllerAndScale.scale, tc.expectedScale; got != want { + t.Errorf("Expected scale %d, but got %d", want, got) + } + + if got, want := controllerAndScale.UID, dep.UID; got != want { + t.Errorf("Expected uid %s, but got %s", want, got) + } + }) + } +} diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index 1a95a6263cc..f82cd3d5903 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -138,6 +138,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) rbacv1helpers.NewRule("get", "list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(), rbacv1helpers.NewRule("get", "list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(), rbacv1helpers.NewRule("update").Groups(policyGroup).Resources("poddisruptionbudgets/status").RuleOrDie(), + rbacv1helpers.NewRule("get").Groups("*").Resources("*/scale").RuleOrDie(), eventsRule(), }, }) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy_test.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy_test.go index f1064f27084..365b5c3d20d 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy_test.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy_test.go @@ -33,6 +33,7 @@ var rolesWithAllowStar = sets.NewString( saRolePrefix+"resourcequota-controller", saRolePrefix+"horizontal-pod-autoscaler", saRolePrefix+"clusterrole-aggregation-controller", + saRolePrefix+"disruption-controller", ) // TestNoStarsForControllers confirms that no controller role has star verbs, groups, diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml index 34463b35b30..411a2749d11 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml @@ -382,6 +382,12 @@ items: - poddisruptionbudgets/status verbs: - update + - apiGroups: + - '*' + resources: + - '*/scale' + verbs: + - get - apiGroups: - "" resources: diff --git a/test/integration/BUILD b/test/integration/BUILD index acce949a91e..4d657752613 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -45,6 +45,7 @@ filegroup( "//test/integration/daemonset:all-srcs", "//test/integration/defaulttolerationseconds:all-srcs", "//test/integration/deployment:all-srcs", + "//test/integration/disruption:all-srcs", "//test/integration/dryrun:all-srcs", "//test/integration/etcd:all-srcs", "//test/integration/evictions:all-srcs", diff --git a/test/integration/disruption/BUILD b/test/integration/disruption/BUILD new file mode 100644 index 00000000000..8e0ad029dd2 --- /dev/null +++ b/test/integration/disruption/BUILD @@ -0,0 +1,47 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "go_default_test", + srcs = [ + "disruption_test.go", + "main_test.go", + ], + tags = ["integration"], + deps = [ + "//cmd/kube-apiserver/app/testing:go_default_library", + "//pkg/controller/disruption:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", + "//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1:go_default_library", + "//staging/src/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library", + "//staging/src/k8s.io/client-go/dynamic:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/client-go/restmapper:go_default_library", + "//staging/src/k8s.io/client-go/scale:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//test/integration/etcd:go_default_library", + "//test/integration/framework:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/test/integration/disruption/disruption_test.go b/test/integration/disruption/disruption_test.go new file mode 100644 index 00000000000..1edeecceb4a --- /dev/null +++ b/test/integration/disruption/disruption_test.go @@ -0,0 +1,284 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package disruption + +import ( + "fmt" + "testing" + "time" + + "k8s.io/api/core/v1" + "k8s.io/api/policy/v1beta1" + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" + cacheddiscovery "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/scale" + "k8s.io/client-go/tools/cache" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/pkg/controller/disruption" + "k8s.io/kubernetes/test/integration/etcd" + "k8s.io/kubernetes/test/integration/framework" +) + +func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface, *apiextensionsclientset.Clientset, dynamic.Interface) { + server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins", "ServiceAccount"}, framework.SharedEtcd()) + + clientSet, err := clientset.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("Error creating clientset: %v", err) + } + resyncPeriod := 12 * time.Hour + informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(server.ClientConfig, "pdb-informers")), resyncPeriod) + + client := clientset.NewForConfigOrDie(restclient.AddUserAgent(server.ClientConfig, "disruption-controller")) + + discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery()) + mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) + + scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery()) + scaleClient, err := scale.NewForConfig(server.ClientConfig, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) + if err != nil { + t.Fatalf("Error creating scaleClient: %v", err) + } + + apiExtensionClient, err := apiextensionsclientset.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("Error creating extension clientset: %v", err) + } + + dynamicClient, err := dynamic.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("Error creating dynamicClient: %v", err) + } + + pdbc := disruption.NewDisruptionController( + informers.Core().V1().Pods(), + informers.Policy().V1beta1().PodDisruptionBudgets(), + informers.Core().V1().ReplicationControllers(), + informers.Apps().V1().ReplicaSets(), + informers.Apps().V1().Deployments(), + informers.Apps().V1().StatefulSets(), + client, + mapper, + scaleClient, + ) + return server, pdbc, informers, clientSet, apiExtensionClient, dynamicClient +} + +func TestPDBWithScaleSubresource(t *testing.T) { + s, pdbc, informers, clientSet, apiExtensionClient, dynamicClient := setup(t) + defer s.TearDownFn() + + nsName := "pdb-scale-subresource" + createNs(t, nsName, clientSet) + + stopCh := make(chan struct{}) + informers.Start(stopCh) + go pdbc.Run(stopCh) + defer close(stopCh) + + crdDefinition := newCustomResourceDefinition() + etcd.CreateTestCRDs(t, apiExtensionClient, true, crdDefinition) + gvr := schema.GroupVersionResource{Group: crdDefinition.Spec.Group, Version: crdDefinition.Spec.Version, Resource: crdDefinition.Spec.Names.Plural} + resourceClient := dynamicClient.Resource(gvr).Namespace(nsName) + + replicas := 4 + maxUnavailable := int32(2) + podLabelValue := "test-crd" + + resource := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": crdDefinition.Spec.Names.Kind, + "apiVersion": crdDefinition.Spec.Group + "/" + crdDefinition.Spec.Version, + "metadata": map[string]interface{}{ + "name": "resource", + "namespace": nsName, + }, + "spec": map[string]interface{}{ + "replicas": replicas, + }, + }, + } + createdResource, err := resourceClient.Create(resource, metav1.CreateOptions{}) + if err != nil { + t.Error(err) + } + + trueValue := true + ownerRef := metav1.OwnerReference{ + Name: resource.GetName(), + Kind: crdDefinition.Spec.Names.Kind, + APIVersion: crdDefinition.Spec.Group + "/" + crdDefinition.Spec.Version, + UID: createdResource.GetUID(), + Controller: &trueValue, + } + for i := 0; i < replicas; i++ { + createPod(t, fmt.Sprintf("pod-%d", i), nsName, podLabelValue, clientSet, ownerRef) + } + + waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4, v1.PodRunning) + + pdb := &v1beta1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pdb", + }, + Spec: v1beta1.PodDisruptionBudgetSpec{ + MaxUnavailable: &intstr.IntOrString{ + Type: intstr.Int, + IntVal: maxUnavailable, + }, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": podLabelValue}, + }, + }, + } + if _, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(nsName).Create(pdb); err != nil { + t.Errorf("Error creating PodDisruptionBudget: %v", err) + } + + waitPDBStable(t, clientSet, 4, nsName, pdb.Name) + + newPdb, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(nsName).Get(pdb.Name, metav1.GetOptions{}) + + if expected, found := int32(replicas), newPdb.Status.ExpectedPods; expected != found { + t.Errorf("Expected %d, but found %d", expected, found) + } + if expected, found := int32(replicas)-maxUnavailable, newPdb.Status.DesiredHealthy; expected != found { + t.Errorf("Expected %d, but found %d", expected, found) + } + if expected, found := maxUnavailable, newPdb.Status.PodDisruptionsAllowed; expected != found { + t.Errorf("Expected %d, but found %d", expected, found) + } +} + +func createPod(t *testing.T, name, namespace, labelValue string, clientSet clientset.Interface, ownerRef metav1.OwnerReference) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: map[string]string{"app": labelValue}, + OwnerReferences: []metav1.OwnerReference{ + ownerRef, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fake-name", + Image: "fakeimage", + }, + }, + }, + } + _, err := clientSet.CoreV1().Pods(namespace).Create(pod) + if err != nil { + t.Error(err) + } + addPodConditionReady(pod) + if _, err := clientSet.CoreV1().Pods(namespace).UpdateStatus(pod); err != nil { + t.Error(err) + } +} + +func createNs(t *testing.T, name string, clientSet clientset.Interface) { + _, err := clientSet.CoreV1().Namespaces().Create(&v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + }) + if err != nil { + t.Errorf("Error creating namespace: %v", err) + } +} + +func addPodConditionReady(pod *v1.Pod) { + pod.Status = v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + }, + } +} + +func newCustomResourceDefinition() *apiextensionsv1beta1.CustomResourceDefinition { + return &apiextensionsv1beta1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{Name: "crds.mygroup.example.com"}, + Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{ + Group: "mygroup.example.com", + Version: "v1beta1", + Names: apiextensionsv1beta1.CustomResourceDefinitionNames{ + Plural: "crds", + Singular: "crd", + Kind: "Crd", + ListKind: "CrdList", + }, + Scope: apiextensionsv1beta1.NamespaceScoped, + Subresources: &apiextensionsv1beta1.CustomResourceSubresources{ + Scale: &apiextensionsv1beta1.CustomResourceSubresourceScale{ + SpecReplicasPath: ".spec.replicas", + StatusReplicasPath: ".status.replicas", + }, + }, + }, + } +} + +func waitPDBStable(t *testing.T, clientSet clientset.Interface, podNum int32, ns, pdbName string) { + if err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) { + pdb, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns).Get(pdbName, metav1.GetOptions{}) + if err != nil { + return false, err + } + if pdb.Status.CurrentHealthy != podNum { + return false, nil + } + return true, nil + }); err != nil { + t.Fatal(err) + } +} + +func waitToObservePods(t *testing.T, podInformer cache.SharedIndexInformer, podNum int, phase v1.PodPhase) { + if err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) { + objects := podInformer.GetIndexer().List() + if len(objects) != podNum { + return false, nil + } + for _, obj := range objects { + pod := obj.(*v1.Pod) + if pod.Status.Phase != phase { + return false, nil + } + } + return true, nil + }); err != nil { + t.Fatal(err) + } +} diff --git a/test/integration/disruption/main_test.go b/test/integration/disruption/main_test.go new file mode 100644 index 00000000000..5e481cf889c --- /dev/null +++ b/test/integration/disruption/main_test.go @@ -0,0 +1,26 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package disruption + +import ( + "k8s.io/kubernetes/test/integration/framework" + "testing" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +} diff --git a/test/integration/evictions/BUILD b/test/integration/evictions/BUILD index 153b79c25ba..edd4961be36 100644 --- a/test/integration/evictions/BUILD +++ b/test/integration/evictions/BUILD @@ -22,9 +22,13 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library", + "//staging/src/k8s.io/client-go/dynamic:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/client-go/restmapper:go_default_library", + "//staging/src/k8s.io/client-go/scale:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//test/integration/framework:go_default_library", ], diff --git a/test/integration/evictions/evictions_test.go b/test/integration/evictions/evictions_test.go index 479a8075d13..a0be6e50287 100644 --- a/test/integration/evictions/evictions_test.go +++ b/test/integration/evictions/evictions_test.go @@ -18,6 +18,7 @@ package evictions import ( "fmt" + "net/http/httptest" "reflect" "sync" @@ -32,9 +33,13 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" + cacheddiscovery "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/scale" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/controller/disruption" "k8s.io/kubernetes/test/integration/framework" @@ -329,6 +334,17 @@ func rmSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *disruption.D resyncPeriod := 12 * time.Hour informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pdb-informers")), resyncPeriod) + client := clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "disruption-controller")) + + discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery()) + mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) + + scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery()) + scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) + if err != nil { + t.Fatalf("Error in create scaleClient: %v", err) + } + rm := disruption.NewDisruptionController( informers.Core().V1().Pods(), informers.Policy().V1beta1().PodDisruptionBudgets(), @@ -336,7 +352,9 @@ func rmSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *disruption.D informers.Apps().V1().ReplicaSets(), informers.Apps().V1().Deployments(), informers.Apps().V1().StatefulSets(), - clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "disruption-controller")), + client, + mapper, + scaleClient, ) return s, closeFn, rm, informers, clientSet } diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index 439f227fddb..bdcef5c5779 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -109,12 +109,16 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission:go_default_library", + "//staging/src/k8s.io/client-go/discovery/cached/memory:go_default_library", + "//staging/src/k8s.io/client-go/dynamic:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/client-go/restmapper:go_default_library", + "//staging/src/k8s.io/client-go/scale:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//test/integration/framework:go_default_library", "//test/utils/image:go_default_library", diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index 6baf474c340..6e019ee3811 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -736,7 +736,7 @@ func TestPDBInPreemption(t *testing.T) { defer cleanupTest(t, context) cs := context.clientSet - initDisruptionController(context) + initDisruptionController(t, context) defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 9ba8d0b8f1e..478757509f2 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -33,12 +33,16 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" + cacheddiscovery "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" clientv1core "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" restclient "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/scale" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/legacyscheme" podutil "k8s.io/kubernetes/pkg/api/v1/pod" @@ -238,9 +242,19 @@ func initTestSchedulerWithOptions( // initDisruptionController initializes and runs a Disruption Controller to properly // update PodDisuptionBudget objects. -func initDisruptionController(context *testContext) *disruption.DisruptionController { +func initDisruptionController(t *testing.T, context *testContext) *disruption.DisruptionController { informers := informers.NewSharedInformerFactory(context.clientSet, 12*time.Hour) + discoveryClient := cacheddiscovery.NewMemCacheClient(context.clientSet.Discovery()) + mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) + + config := restclient.Config{Host: context.httpServer.URL} + scaleKindResolver := scale.NewDiscoveryScaleKindResolver(context.clientSet.Discovery()) + scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) + if err != nil { + t.Fatalf("Error in create scaleClient: %v", err) + } + dc := disruption.NewDisruptionController( informers.Core().V1().Pods(), informers.Policy().V1beta1().PodDisruptionBudgets(), @@ -248,7 +262,9 @@ func initDisruptionController(context *testContext) *disruption.DisruptionContro informers.Apps().V1().ReplicaSets(), informers.Apps().V1().Deployments(), informers.Apps().V1().StatefulSets(), - context.clientSet) + context.clientSet, + mapper, + scaleClient) informers.Start(context.schedulerConfig.StopEverything) informers.WaitForCacheSync(context.schedulerConfig.StopEverything)