From 74186d3e0601965780407421a60b4f5801e93ea0 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Wed, 8 Feb 2017 13:20:31 -0500 Subject: [PATCH] Switch disruption controller to shared informers --- cmd/kube-controller-manager/app/policy.go | 7 +- pkg/controller/disruption/BUILD | 18 +- pkg/controller/disruption/disruption.go | 177 +++++++----------- pkg/controller/disruption/disruption_test.go | 127 ++++++++----- .../authorizer/rbac/bootstrappolicy/policy.go | 2 + .../testdata/cluster-roles.yaml | 14 ++ test/integration/evictions/evictions_test.go | 21 ++- 7 files changed, 185 insertions(+), 181 deletions(-) diff --git a/cmd/kube-controller-manager/app/policy.go b/cmd/kube-controller-manager/app/policy.go index a9a11437fad..1da85c4a714 100644 --- a/cmd/kube-controller-manager/app/policy.go +++ b/cmd/kube-controller-manager/app/policy.go @@ -30,7 +30,12 @@ func startDisruptionController(ctx ControllerContext) (bool, error) { return false, nil } go disruption.NewDisruptionController( - ctx.InformerFactory.Pods().Informer(), + ctx.NewInformerFactory.Core().V1().Pods(), + ctx.NewInformerFactory.Policy().V1beta1().PodDisruptionBudgets(), + ctx.NewInformerFactory.Core().V1().ReplicationControllers(), + ctx.NewInformerFactory.Extensions().V1beta1().ReplicaSets(), + ctx.NewInformerFactory.Extensions().V1beta1().Deployments(), + ctx.NewInformerFactory.Apps().V1beta1().StatefulSets(), ctx.ClientBuilder.ClientOrDie("disruption-controller"), ).Run(ctx.Stop) return true, nil diff --git a/pkg/controller/disruption/BUILD b/pkg/controller/disruption/BUILD index efe808fd79f..fc9df6e4555 100644 --- a/pkg/controller/disruption/BUILD +++ b/pkg/controller/disruption/BUILD @@ -15,21 +15,25 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", - "//pkg/apis/apps/v1beta1:go_default_library", - "//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/apis/policy/v1beta1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset/typed/policy/v1beta1:go_default_library", - "//pkg/client/legacylisters:go_default_library", + "//pkg/client/informers/informers_generated/apps/v1beta1:go_default_library", + "//pkg/client/informers/informers_generated/core/v1:go_default_library", + "//pkg/client/informers/informers_generated/extensions/v1beta1:go_default_library", + "//pkg/client/informers/informers_generated/policy/v1beta1:go_default_library", + "//pkg/client/listers/apps/v1beta1:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", + "//pkg/client/listers/extensions/v1beta1:go_default_library", + "//pkg/client/listers/policy/v1beta1:go_default_library", "//pkg/controller:go_default_library", "//vendor:github.com/golang/glog", + "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", - "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/intstr", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", - "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1", "//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/tools/cache", @@ -49,14 +53,12 @@ go_test( "//pkg/apis/apps/v1beta1:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/apis/policy/v1beta1:go_default_library", - "//pkg/client/legacylisters:go_default_library", + "//pkg/client/informers/informers_generated:go_default_library", "//pkg/controller:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/util/intstr", "//vendor:k8s.io/apimachinery/pkg/util/uuid", - "//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/tools/cache", - "//vendor:k8s.io/client-go/tools/record", "//vendor:k8s.io/client-go/util/workqueue", ], ) diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 89175dc7593..7d251c6b883 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -21,13 +21,12 @@ import ( "reflect" "time" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" v1core "k8s.io/client-go/kubernetes/typed/core/v1" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" @@ -35,12 +34,17 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" - apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" - extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" policyclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/policy/v1beta1" - "k8s.io/kubernetes/pkg/client/legacylisters" + appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/apps/v1beta1" + coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/core/v1" + extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/extensions/v1beta1" + policyinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/policy/v1beta1" + appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" + extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1" + policylisters "k8s.io/kubernetes/pkg/client/listers/policy/v1beta1" "k8s.io/kubernetes/pkg/controller" "github.com/golang/glog" @@ -64,28 +68,23 @@ type updater func(*policy.PodDisruptionBudget) error type DisruptionController struct { kubeClient clientset.Interface - pdbStore cache.Store - pdbController cache.Controller - pdbLister listers.StoreToPodDisruptionBudgetLister + pdbLister policylisters.PodDisruptionBudgetLister + pdbListerSynced cache.InformerSynced - podController cache.Controller - podLister listers.StoreToPodLister + podLister corelisters.PodLister + podListerSynced cache.InformerSynced - rcIndexer cache.Indexer - rcController cache.Controller - rcLister listers.StoreToReplicationControllerLister + rcLister corelisters.ReplicationControllerLister + rcListerSynced cache.InformerSynced - rsStore cache.Store - rsController cache.Controller - rsLister listers.StoreToReplicaSetLister + rsLister extensionslisters.ReplicaSetLister + rsListerSynced cache.InformerSynced - dIndexer cache.Indexer - dController cache.Controller - dLister listers.StoreToDeploymentLister + dLister extensionslisters.DeploymentLister + dListerSynced cache.InformerSynced - ssStore cache.Store - ssController cache.Controller - ssLister listers.StoreToStatefulSetLister + ssLister appslisters.StatefulSetLister + ssListerSynced cache.InformerSynced // PodDisruptionBudget keys that need to be synced. queue workqueue.RateLimitingInterface @@ -108,108 +107,55 @@ type controllerAndScale struct { // controllers and their scale. type podControllerFinder func(*v1.Pod) ([]controllerAndScale, error) -func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface) *DisruptionController { +func NewDisruptionController( + podInformer coreinformers.PodInformer, + pdbInformer policyinformers.PodDisruptionBudgetInformer, + rcInformer coreinformers.ReplicationControllerInformer, + rsInformer extensionsinformers.ReplicaSetInformer, + dInformer extensionsinformers.DeploymentInformer, + ssInformer appsinformers.StatefulSetInformer, + kubeClient clientset.Interface, +) *DisruptionController { dc := &DisruptionController{ - kubeClient: kubeClient, - podController: podInformer.GetController(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"), - recheckQueue: workqueue.NewNamedDelayingQueue("disruption-recheck"), - broadcaster: record.NewBroadcaster(), + kubeClient: kubeClient, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"), + recheckQueue: workqueue.NewNamedDelayingQueue("disruption-recheck"), + broadcaster: record.NewBroadcaster(), } dc.recorder = dc.broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "controllermanager"}) dc.getUpdater = func() updater { return dc.writePdbStatus } - dc.podLister.Indexer = podInformer.GetIndexer() - - podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addPod, UpdateFunc: dc.updatePod, DeleteFunc: dc.deletePod, }) + dc.podLister = podInformer.Lister() + dc.podListerSynced = podInformer.Informer().HasSynced - dc.pdbStore, dc.pdbController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return dc.kubeClient.Policy().PodDisruptionBudgets(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return dc.kubeClient.Policy().PodDisruptionBudgets(metav1.NamespaceAll).Watch(options) - }, - }, - &policy.PodDisruptionBudget{}, - 30*time.Second, + pdbInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: dc.addDb, UpdateFunc: dc.updateDb, DeleteFunc: dc.removeDb, }, - ) - dc.pdbLister.Store = dc.pdbStore - - dc.rcIndexer, dc.rcController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return dc.kubeClient.Core().ReplicationControllers(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return dc.kubeClient.Core().ReplicationControllers(metav1.NamespaceAll).Watch(options) - }, - }, - &v1.ReplicationController{}, 30*time.Second, - cache.ResourceEventHandlerFuncs{}, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) + dc.pdbLister = pdbInformer.Lister() + dc.pdbListerSynced = pdbInformer.Informer().HasSynced - dc.rcLister.Indexer = dc.rcIndexer + dc.rcLister = rcInformer.Lister() + dc.rcListerSynced = rcInformer.Informer().HasSynced - dc.rsLister.Indexer, dc.rsController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return dc.kubeClient.Extensions().ReplicaSets(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return dc.kubeClient.Extensions().ReplicaSets(metav1.NamespaceAll).Watch(options) - }, - }, - &extensions.ReplicaSet{}, - 30*time.Second, - cache.ResourceEventHandlerFuncs{}, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - dc.rsStore = dc.rsLister.Indexer + dc.rsLister = rsInformer.Lister() + dc.rsListerSynced = rsInformer.Informer().HasSynced - dc.dIndexer, dc.dController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return dc.kubeClient.Extensions().Deployments(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return dc.kubeClient.Extensions().Deployments(metav1.NamespaceAll).Watch(options) - }, - }, - &extensions.Deployment{}, - 30*time.Second, - cache.ResourceEventHandlerFuncs{}, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - dc.dLister.Indexer = dc.dIndexer + dc.dLister = dInformer.Lister() + dc.dListerSynced = dInformer.Informer().HasSynced - dc.ssStore, dc.ssController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return dc.kubeClient.Apps().StatefulSets(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return dc.kubeClient.Apps().StatefulSets(metav1.NamespaceAll).Watch(options) - }, - }, - &apps.StatefulSet{}, - 30*time.Second, - cache.ResourceEventHandlerFuncs{}, - ) - dc.ssLister.Store = dc.ssStore + dc.ssLister = ssInformer.Lister() + dc.ssListerSynced = ssInformer.Informer().HasSynced return dc } @@ -317,19 +263,22 @@ func (dc *DisruptionController) getPodReplicationControllers(pod *v1.Pod) ([]con } func (dc *DisruptionController) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer dc.queue.ShutDown() + glog.V(0).Infof("Starting disruption controller") + + if !cache.WaitForCacheSync(stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } + if dc.kubeClient != nil { glog.V(0).Infof("Sending events to api server.") dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(dc.kubeClient.Core().RESTClient()).Events("")}) } else { glog.V(0).Infof("No api server defined - no events will be sent to API server.") } - go dc.pdbController.Run(stopCh) - go dc.podController.Run(stopCh) - go dc.rcController.Run(stopCh) - go dc.rsController.Run(stopCh) - go dc.dController.Run(stopCh) - go dc.ssController.Run(stopCh) go wait.Until(dc.worker, time.Second, stopCh) go wait.Until(dc.recheckWorker, time.Second, stopCh) @@ -442,7 +391,7 @@ func (dc *DisruptionController) getPdbForPod(pod *v1.Pod) *policy.PodDisruptionB glog.Warning(msg) dc.recorder.Event(pod, v1.EventTypeWarning, "MultiplePodDisruptionBudgets", msg) } - return &pdbs[0] + return pdbs[0] } func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ([]*v1.Pod, error) { @@ -510,8 +459,12 @@ func (dc *DisruptionController) sync(key string) error { glog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Now().Sub(startTime)) }() - obj, exists, err := dc.pdbLister.Store.GetByKey(key) - if !exists { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + pdb, err := dc.pdbLister.PodDisruptionBudgets(namespace).Get(name) + if errors.IsNotFound(err) { glog.V(4).Infof("PodDisruptionBudget %q has been deleted", key) return nil } @@ -519,8 +472,6 @@ func (dc *DisruptionController) sync(key string) error { return err } - pdb := obj.(*policy.PodDisruptionBudget) - if err := dc.trySync(pdb); err != nil { glog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err) return dc.failSafe(pdb) diff --git a/pkg/controller/disruption/disruption_test.go b/pkg/controller/disruption/disruption_test.go index 2990a4b62df..01f8d2855ca 100644 --- a/pkg/controller/disruption/disruption_test.go +++ b/pkg/controller/disruption/disruption_test.go @@ -26,21 +26,21 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" - clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1" - "k8s.io/kubernetes/pkg/client/legacylisters" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated" "k8s.io/kubernetes/pkg/controller" ) type pdbStates map[string]policy.PodDisruptionBudget +var alwaysReady = func() bool { return true } + func (ps *pdbStates) Set(pdb *policy.PodDisruptionBudget) error { key, err := controller.KeyFunc(pdb) if err != nil { @@ -85,23 +85,48 @@ func (ps *pdbStates) VerifyDisruptionAllowed(t *testing.T, key string, disruptio } } -func newFakeDisruptionController() (*DisruptionController, *pdbStates) { +type disruptionController struct { + *DisruptionController + + podStore cache.Store + pdbStore cache.Store + rcStore cache.Store + rsStore cache.Store + dStore cache.Store + ssStore cache.Store +} + +func newFakeDisruptionController() (*disruptionController, *pdbStates) { ps := &pdbStates{} - dc := &DisruptionController{ - pdbLister: listers.StoreToPodDisruptionBudgetLister{Store: cache.NewStore(controller.KeyFunc)}, - podLister: listers.StoreToPodLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})}, - rcLister: listers.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, - rsLister: listers.StoreToReplicaSetLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, - dLister: listers.StoreToDeploymentLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})}, - ssLister: listers.StoreToStatefulSetLister{Store: cache.NewStore(controller.KeyFunc)}, - getUpdater: func() updater { return ps.Set }, - broadcaster: record.NewBroadcaster(), - } + informerFactory := informers.NewSharedInformerFactory(nil, nil, controller.NoResyncPeriodFunc()) - dc.recorder = dc.broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "disruption_test"}) + dc := NewDisruptionController( + informerFactory.Core().V1().Pods(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Extensions().V1beta1().Deployments(), + informerFactory.Apps().V1beta1().StatefulSets(), + nil, + ) + dc.getUpdater = func() updater { return ps.Set } + dc.podListerSynced = alwaysReady + dc.pdbListerSynced = alwaysReady + dc.rcListerSynced = alwaysReady + dc.rsListerSynced = alwaysReady + dc.dListerSynced = alwaysReady + dc.ssListerSynced = alwaysReady - return dc, ps + return &disruptionController{ + dc, + informerFactory.Core().V1().Pods().Informer().GetStore(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets().Informer().GetStore(), + informerFactory.Core().V1().ReplicationControllers().Informer().GetStore(), + informerFactory.Extensions().V1beta1().ReplicaSets().Informer().GetStore(), + informerFactory.Extensions().V1beta1().Deployments().Informer().GetStore(), + informerFactory.Apps().V1beta1().StatefulSets().Informer().GetStore(), + }, ps } func fooBar() map[string]string { @@ -283,11 +308,11 @@ func TestNoSelector(t *testing.T) { pdb.Spec.Selector = &metav1.LabelSelector{} pod, _ := newPod(t, "yo-yo-yo") - add(t, dc.pdbLister.Store, pdb) + add(t, dc.pdbStore, pdb) dc.sync(pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]metav1.Time{}) - add(t, dc.podLister.Indexer, pod) + add(t, dc.podStore, pod) dc.sync(pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]metav1.Time{}) } @@ -298,7 +323,7 @@ func TestUnavailable(t *testing.T) { dc, ps := newFakeDisruptionController() pdb, pdbName := newPodDisruptionBudget(t, intstr.FromInt(3)) - add(t, dc.pdbLister.Store, pdb) + add(t, dc.pdbStore, pdb) dc.sync(pdbName) // Add three pods, verifying that the counts go up at each step. @@ -307,14 +332,14 @@ func TestUnavailable(t *testing.T) { ps.VerifyPdbStatus(t, pdbName, 0, i, 3, i, map[string]metav1.Time{}) pod, _ := newPod(t, fmt.Sprintf("yo-yo-yo %d", i)) pods = append(pods, pod) - add(t, dc.podLister.Indexer, pod) + add(t, dc.podStore, pod) dc.sync(pdbName) } ps.VerifyPdbStatus(t, pdbName, 1, 4, 3, 4, map[string]metav1.Time{}) // Now set one pod as unavailable pods[0].Status.Conditions = []v1.PodCondition{} - update(t, dc.podLister.Indexer, pods[0]) + update(t, dc.podStore, pods[0]) dc.sync(pdbName) // Verify expected update @@ -327,13 +352,13 @@ func TestNakedPod(t *testing.T) { dc, ps := newFakeDisruptionController() pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("28%")) - add(t, dc.pdbLister.Store, pdb) + add(t, dc.pdbStore, pdb) dc.sync(pdbName) // This verifies that when a PDB has 0 pods, disruptions are not allowed. ps.VerifyDisruptionAllowed(t, pdbName, 0) pod, _ := newPod(t, "naked") - add(t, dc.podLister.Indexer, pod) + add(t, dc.podStore, pod) dc.sync(pdbName) ps.VerifyDisruptionAllowed(t, pdbName, 0) @@ -344,13 +369,13 @@ func TestReplicaSet(t *testing.T) { dc, ps := newFakeDisruptionController() pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("20%")) - add(t, dc.pdbLister.Store, pdb) + add(t, dc.pdbStore, pdb) rs, _ := newReplicaSet(t, 10) - add(t, dc.rsLister.Indexer, rs) + add(t, dc.rsStore, rs) pod, _ := newPod(t, "pod") - add(t, dc.podLister.Indexer, pod) + add(t, dc.podStore, pod) dc.sync(pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 10, map[string]metav1.Time{}) } @@ -363,11 +388,11 @@ func TestMultipleControllers(t *testing.T) { dc, ps := newFakeDisruptionController() pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("1%")) - add(t, dc.pdbLister.Store, pdb) + add(t, dc.pdbStore, pdb) for i := 0; i < podCount; i++ { pod, _ := newPod(t, fmt.Sprintf("pod %d", i)) - add(t, dc.podLister.Indexer, pod) + add(t, dc.podStore, pod) } dc.sync(pdbName) @@ -376,7 +401,7 @@ func TestMultipleControllers(t *testing.T) { rc, _ := newReplicationController(t, 1) rc.Name = "rc 1" - add(t, dc.rcLister.Indexer, rc) + add(t, dc.rcStore, rc) dc.sync(pdbName) // One RC and 200%>1% healthy => disruption allowed @@ -384,7 +409,7 @@ func TestMultipleControllers(t *testing.T) { rc, _ = newReplicationController(t, 1) rc.Name = "rc 2" - add(t, dc.rcLister.Indexer, rc) + add(t, dc.rcStore, rc) dc.sync(pdbName) // 100%>1% healthy BUT two RCs => no disruption allowed @@ -405,10 +430,10 @@ func TestReplicationController(t *testing.T) { // 34% should round up to 2 pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("34%")) - add(t, dc.pdbLister.Store, pdb) + add(t, dc.pdbStore, pdb) rc, _ := newReplicationController(t, 3) rc.Spec.Selector = labels - add(t, dc.rcLister.Indexer, rc) + add(t, dc.rcStore, rc) dc.sync(pdbName) // It starts out at 0 expected because, with no pods, the PDB doesn't know @@ -421,7 +446,7 @@ func TestReplicationController(t *testing.T) { pod, _ := newPod(t, fmt.Sprintf("foobar %d", i)) pods = append(pods, pod) pod.Labels = labels - add(t, dc.podLister.Indexer, pod) + add(t, dc.podStore, pod) dc.sync(pdbName) if i < 2 { ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{}) @@ -431,7 +456,7 @@ func TestReplicationController(t *testing.T) { } rogue, _ := newPod(t, "rogue") - add(t, dc.podLister.Indexer, rogue) + add(t, dc.podStore, rogue) dc.sync(pdbName) ps.VerifyDisruptionAllowed(t, pdbName, 0) } @@ -446,9 +471,9 @@ func TestStatefulSetController(t *testing.T) { // 34% should round up to 2 pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("34%")) - add(t, dc.pdbLister.Store, pdb) + add(t, dc.pdbStore, pdb) ss, _ := newStatefulSet(t, 3) - add(t, dc.ssLister.Store, ss) + add(t, dc.ssStore, ss) dc.sync(pdbName) // It starts out at 0 expected because, with no pods, the PDB doesn't know @@ -461,7 +486,7 @@ func TestStatefulSetController(t *testing.T) { pod, _ := newPod(t, fmt.Sprintf("foobar %d", i)) pods = append(pods, pod) pod.Labels = labels - add(t, dc.podLister.Indexer, pod) + add(t, dc.podStore, pod) dc.sync(pdbName) if i < 2 { ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{}) @@ -494,10 +519,10 @@ func TestTwoControllers(t *testing.T) { const minimumTwo int32 = 7 // integer minimum with two controllers pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("28%")) - add(t, dc.pdbLister.Store, pdb) + add(t, dc.pdbStore, pdb) rc, _ := newReplicationController(t, collectionSize) rc.Spec.Selector = rcLabels - add(t, dc.rcLister.Indexer, rc) + add(t, dc.rcStore, rc) dc.sync(pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{}) @@ -512,7 +537,7 @@ func TestTwoControllers(t *testing.T) { if i <= unavailablePods { pod.Status.Conditions = []v1.PodCondition{} } - add(t, dc.podLister.Indexer, pod) + add(t, dc.podStore, pod) dc.sync(pdbName) if i <= unavailablePods { ps.VerifyPdbStatus(t, pdbName, 0, 0, minimumOne, collectionSize, map[string]metav1.Time{}) @@ -525,14 +550,14 @@ func TestTwoControllers(t *testing.T) { d, _ := newDeployment(t, collectionSize) d.Spec.Selector = newSel(dLabels) - add(t, dc.dLister.Indexer, d) + add(t, dc.dStore, d) dc.sync(pdbName) ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]metav1.Time{}) rs, _ := newReplicaSet(t, collectionSize) rs.Spec.Selector = newSel(dLabels) rs.Labels = dLabels - add(t, dc.rsLister.Indexer, rs) + add(t, dc.rsStore, rs) dc.sync(pdbName) ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]metav1.Time{}) @@ -545,7 +570,7 @@ func TestTwoControllers(t *testing.T) { if i <= unavailablePods { pod.Status.Conditions = []v1.PodCondition{} } - add(t, dc.podLister.Indexer, pod) + add(t, dc.podStore, pod) dc.sync(pdbName) if i <= unavailablePods { ps.VerifyPdbStatus(t, pdbName, 0, minimumOne+1, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) @@ -562,17 +587,17 @@ func TestTwoControllers(t *testing.T) { // verify that a disruption is permitted again. ps.VerifyPdbStatus(t, pdbName, 2, 2+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) pods[collectionSize-1].Status.Conditions = []v1.PodCondition{} - update(t, dc.podLister.Indexer, pods[collectionSize-1]) + update(t, dc.podStore, pods[collectionSize-1]) dc.sync(pdbName) ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) pods[collectionSize-2].Status.Conditions = []v1.PodCondition{} - update(t, dc.podLister.Indexer, pods[collectionSize-2]) + update(t, dc.podStore, pods[collectionSize-2]) dc.sync(pdbName) ps.VerifyPdbStatus(t, pdbName, 0, minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) pods[collectionSize-1].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} - update(t, dc.podLister.Indexer, pods[collectionSize-1]) + update(t, dc.podStore, pods[collectionSize-1]) dc.sync(pdbName) ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) } @@ -581,7 +606,7 @@ func TestTwoControllers(t *testing.T) { func TestPDBNotExist(t *testing.T) { dc, _ := newFakeDisruptionController() pdb, _ := newPodDisruptionBudget(t, intstr.FromString("67%")) - add(t, dc.pdbLister.Store, pdb) + add(t, dc.pdbStore, pdb) if err := dc.sync("notExist"); err != nil { t.Errorf("Unexpected error: %v, expect nil", err) } @@ -598,16 +623,16 @@ func TestUpdateDisruptedPods(t *testing.T) { "p3": {Time: currentTime}, // Should remain, pod untouched. "notthere": {Time: currentTime}, // Should be removed, pod deleted. } - add(t, dc.pdbLister.Store, pdb) + add(t, dc.pdbStore, pdb) pod1, _ := newPod(t, "p1") pod1.DeletionTimestamp = &metav1.Time{Time: time.Now()} pod2, _ := newPod(t, "p2") pod3, _ := newPod(t, "p3") - add(t, dc.podLister.Indexer, pod1) - add(t, dc.podLister.Indexer, pod2) - add(t, dc.podLister.Indexer, pod3) + add(t, dc.podStore, pod1) + add(t, dc.podStore, pod2) + add(t, dc.podStore, pod3) dc.sync(pdbName) diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index dffc97d9710..17e834efc74 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -283,6 +283,8 @@ func ClusterRoles() []rbac.ClusterRole { "persistentvolumes", "pods", "secrets", "services", "serviceaccounts", "replicationcontrollers").RuleOrDie(), rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources("daemonsets", "deployments", "replicasets").RuleOrDie(), rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(), + rbac.NewRule("list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(), + rbac.NewRule("list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(), }, }, { diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml index f1695e20bb9..737a07b8a8a 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml @@ -475,6 +475,20 @@ items: verbs: - list - watch + - apiGroups: + - apps + resources: + - statefulsets + verbs: + - list + - watch + - apiGroups: + - policy + resources: + - poddisruptionbudgets + verbs: + - list + - watch - apiVersion: rbac.authorization.k8s.io/v1beta1 kind: ClusterRole metadata: diff --git a/test/integration/evictions/evictions_test.go b/test/integration/evictions/evictions_test.go index 2d5fabe4fbd..c37595f8d9c 100644 --- a/test/integration/evictions/evictions_test.go +++ b/test/integration/evictions/evictions_test.go @@ -36,8 +36,8 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apis/policy/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated" "k8s.io/kubernetes/pkg/controller/disruption" - "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/test/integration/framework" ) @@ -50,14 +50,14 @@ const ( func TestConcurrentEvictionRequests(t *testing.T) { podNameFormat := "test-pod-%d" - s, rm, podInformer, clientSet := rmSetup(t) + s, rm, informers, clientSet := rmSetup(t) defer s.Close() ns := framework.CreateTestingNamespace("concurrent-eviction-requests", s, t) defer framework.DeleteTestingNamespace(ns, s, t) stopCh := make(chan struct{}) - go podInformer.Run(stopCh) + informers.Start(stopCh) go rm.Run(stopCh) defer close(stopCh) @@ -87,7 +87,7 @@ func TestConcurrentEvictionRequests(t *testing.T) { } } - waitToObservePods(t, podInformer, numOfEvictions) + waitToObservePods(t, informers.Core().V1().Pods().Informer(), numOfEvictions) pdb := newPDB() if _, err := clientSet.Policy().PodDisruptionBudgets(ns.Name).Create(pdb); err != nil { @@ -227,7 +227,7 @@ func newEviction(ns, evictionName string, deleteOption *metav1.DeleteOptions) *v } } -func rmSetup(t *testing.T) (*httptest.Server, *disruption.DisruptionController, cache.SharedIndexInformer, clientset.Interface) { +func rmSetup(t *testing.T) (*httptest.Server, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface) { masterConfig := framework.NewIntegrationTestMasterConfig() _, s := framework.RunAMaster(masterConfig) @@ -237,13 +237,18 @@ func rmSetup(t *testing.T) (*httptest.Server, *disruption.DisruptionController, t.Fatalf("Error in create clientset: %v", err) } resyncPeriod := 12 * time.Hour - informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pdb-informers")), nil, resyncPeriod) + informers := informers.NewSharedInformerFactory(nil, clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pdb-informers")), resyncPeriod) rm := disruption.NewDisruptionController( - informers.Pods().Informer(), + informers.Core().V1().Pods(), + informers.Policy().V1beta1().PodDisruptionBudgets(), + informers.Core().V1().ReplicationControllers(), + informers.Extensions().V1beta1().ReplicaSets(), + informers.Extensions().V1beta1().Deployments(), + informers.Apps().V1beta1().StatefulSets(), clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "disruption-controller")), ) - return s, rm, informers.Pods().Informer(), clientSet + return s, rm, informers, clientSet } // wait for the podInformer to observe the pods. Call this function before