diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 791f3b41c32..b4b88a4d9a4 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -483,6 +483,9 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root VolumePlugins: ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration), Cloud: cloud, ClusterName: s.ClusterName, + VolumeInformer: newSharedInformers.Core().V1().PersistentVolumes(), + ClaimInformer: newSharedInformers.Core().V1().PersistentVolumeClaims(), + ClassInformer: newSharedInformers.Storage().V1beta1().StorageClasses(), EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning, } volumeController := persistentvolumecontroller.NewController(params) diff --git a/pkg/controller/volume/persistentvolume/BUILD b/pkg/controller/volume/persistentvolume/BUILD index 799a94f0487..b33660b39b1 100644 --- a/pkg/controller/volume/persistentvolume/BUILD +++ b/pkg/controller/volume/persistentvolume/BUILD @@ -23,6 +23,10 @@ go_library( "//pkg/apis/storage/v1beta1:go_default_library", "//pkg/apis/storage/v1beta1/util:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/storage/v1beta1:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", + "//pkg/client/listers/storage/v1beta1:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/controller:go_default_library", "//pkg/util/goroutinemap:go_default_library", @@ -34,10 +38,9 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/api/meta", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/labels", - "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/types", + "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", - "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1", "//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/tools/cache", @@ -67,7 +70,9 @@ go_test( "//pkg/apis/storage/v1beta1/util:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library", - "//pkg/controller/volume/persistentvolume/testing:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", + "//pkg/client/listers/storage/v1beta1:go_default_library", + "//pkg/controller:go_default_library", "//pkg/volume:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/resource", @@ -76,6 +81,7 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/diff", "//vendor:k8s.io/apimachinery/pkg/util/wait", + "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/testing", "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/record", @@ -94,7 +100,6 @@ filegroup( srcs = [ ":package-srcs", "//pkg/controller/volume/persistentvolume/options:all-srcs", - "//pkg/controller/volume/persistentvolume/testing:all-srcs", ], tags = ["automanaged"], ) diff --git a/pkg/controller/volume/persistentvolume/framework_test.go b/pkg/controller/volume/persistentvolume/framework_test.go index f22bf556217..5d631072f5f 100644 --- a/pkg/controller/volume/persistentvolume/framework_test.go +++ b/pkg/controller/volume/persistentvolume/framework_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -45,7 +46,9 @@ import ( storageutil "k8s.io/kubernetes/pkg/apis/storage/v1beta1/util" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" - fcache "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" + storagelisters "k8s.io/kubernetes/pkg/client/listers/storage/v1beta1" + "k8s.io/kubernetes/pkg/controller" vol "k8s.io/kubernetes/pkg/volume" ) @@ -112,9 +115,10 @@ var noerrors = []reactorError{} // is updated first and claim.Phase second. This queue will then contain both // updates as separate entries. // - Number of changes since the last call to volumeReactor.syncAll(). -// - Optionally, volume and claim event sources. When set, all changed -// volumes/claims are sent as Modify event to these sources. These sources can -// be linked back to the controller watcher as "volume/claim updated" events. +// - Optionally, volume and claim fake watchers which should be the same ones +// used by the controller. Any time an event function like deleteVolumeEvent +// is called to simulate an event, the reactor's stores are updated and the +// controller is sent the event via the fake watcher. // - Optionally, list of error that should be returned by reactor, simulating // etcd / API server failures. These errors are evaluated in order and every // error is returned only once. I.e. when the reactor finds matching @@ -126,8 +130,8 @@ type volumeReactor struct { changedObjects []interface{} changedSinceLastSync int ctrl *PersistentVolumeController - volumeSource *fcache.FakePVControllerSource - claimSource *fcache.FakePVCControllerSource + fakeVolumeWatch *watch.FakeWatcher + fakeClaimWatch *watch.FakeWatcher lock sync.Mutex errors []reactorError } @@ -176,9 +180,6 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj } // Store the updated object to appropriate places. - if r.volumeSource != nil { - r.volumeSource.Add(volume) - } r.volumes[volume.Name] = volume r.changedObjects = append(r.changedObjects, volume) r.changedSinceLastSync++ @@ -203,9 +204,6 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj } // Store the updated object to appropriate places. - if r.volumeSource != nil { - r.volumeSource.Modify(volume) - } r.volumes[volume.Name] = volume r.changedObjects = append(r.changedObjects, volume) r.changedSinceLastSync++ @@ -231,9 +229,6 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj // Store the updated object to appropriate places. r.claims[claim.Name] = claim - if r.claimSource != nil { - r.claimSource.Modify(claim) - } r.changedObjects = append(r.changedObjects, claim) r.changedSinceLastSync++ glog.V(4).Infof("saved updated claim %s", claim.Name) @@ -513,9 +508,11 @@ func (r *volumeReactor) deleteVolumeEvent(volume *v1.PersistentVolume) { // Generate deletion event. Cloned volume is needed to prevent races (and we // would get a clone from etcd too). - clone, _ := api.Scheme.DeepCopy(volume) - volumeClone := clone.(*v1.PersistentVolume) - r.volumeSource.Delete(volumeClone) + if r.fakeVolumeWatch != nil { + clone, _ := api.Scheme.DeepCopy(volume) + volumeClone := clone.(*v1.PersistentVolume) + r.fakeVolumeWatch.Delete(volumeClone) + } } // deleteClaimEvent simulates that a claim has been deleted in etcd and the @@ -529,9 +526,11 @@ func (r *volumeReactor) deleteClaimEvent(claim *v1.PersistentVolumeClaim) { // Generate deletion event. Cloned volume is needed to prevent races (and we // would get a clone from etcd too). - clone, _ := api.Scheme.DeepCopy(claim) - claimClone := clone.(*v1.PersistentVolumeClaim) - r.claimSource.Delete(claimClone) + if r.fakeClaimWatch != nil { + clone, _ := api.Scheme.DeepCopy(claim) + claimClone := clone.(*v1.PersistentVolumeClaim) + r.fakeClaimWatch.Delete(claimClone) + } } // addVolumeEvent simulates that a volume has been added in etcd and the @@ -543,7 +542,9 @@ func (r *volumeReactor) addVolumeEvent(volume *v1.PersistentVolume) { r.volumes[volume.Name] = volume // Generate event. No cloning is needed, this claim is not stored in the // controller cache yet. - r.volumeSource.Add(volume) + if r.fakeVolumeWatch != nil { + r.fakeVolumeWatch.Add(volume) + } } // modifyVolumeEvent simulates that a volume has been modified in etcd and the @@ -555,9 +556,11 @@ func (r *volumeReactor) modifyVolumeEvent(volume *v1.PersistentVolume) { r.volumes[volume.Name] = volume // Generate deletion event. Cloned volume is needed to prevent races (and we // would get a clone from etcd too). - clone, _ := api.Scheme.DeepCopy(volume) - volumeClone := clone.(*v1.PersistentVolume) - r.volumeSource.Modify(volumeClone) + if r.fakeVolumeWatch != nil { + clone, _ := api.Scheme.DeepCopy(volume) + volumeClone := clone.(*v1.PersistentVolume) + r.fakeVolumeWatch.Modify(volumeClone) + } } // addClaimEvent simulates that a claim has been deleted in etcd and the @@ -569,45 +572,49 @@ func (r *volumeReactor) addClaimEvent(claim *v1.PersistentVolumeClaim) { r.claims[claim.Name] = claim // Generate event. No cloning is needed, this claim is not stored in the // controller cache yet. - r.claimSource.Add(claim) + if r.fakeClaimWatch != nil { + r.fakeClaimWatch.Add(claim) + } } -func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, volumeSource *fcache.FakePVControllerSource, claimSource *fcache.FakePVCControllerSource, errors []reactorError) *volumeReactor { +func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, fakeVolumeWatch, fakeClaimWatch *watch.FakeWatcher, errors []reactorError) *volumeReactor { reactor := &volumeReactor{ - volumes: make(map[string]*v1.PersistentVolume), - claims: make(map[string]*v1.PersistentVolumeClaim), - ctrl: ctrl, - volumeSource: volumeSource, - claimSource: claimSource, - errors: errors, + volumes: make(map[string]*v1.PersistentVolume), + claims: make(map[string]*v1.PersistentVolumeClaim), + ctrl: ctrl, + fakeVolumeWatch: fakeVolumeWatch, + fakeClaimWatch: fakeClaimWatch, + errors: errors, } - client.AddReactor("*", "*", reactor.React) + client.AddReactor("create", "persistentvolumes", reactor.React) + client.AddReactor("update", "persistentvolumes", reactor.React) + client.AddReactor("update", "persistentvolumeclaims", reactor.React) + client.AddReactor("get", "persistentvolumes", reactor.React) + client.AddReactor("delete", "persistentvolumes", reactor.React) + client.AddReactor("delete", "persistentvolumeclaims", reactor.React) + return reactor } +func alwaysReady() bool { return true } -func newTestController(kubeClient clientset.Interface, volumeSource, claimSource, classSource cache.ListerWatcher, enableDynamicProvisioning bool) *PersistentVolumeController { - if volumeSource == nil { - volumeSource = fcache.NewFakePVControllerSource() +func newTestController(kubeClient clientset.Interface, informerFactory informers.SharedInformerFactory, enableDynamicProvisioning bool) *PersistentVolumeController { + if informerFactory == nil { + informerFactory = informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) } - if claimSource == nil { - claimSource = fcache.NewFakePVCControllerSource() - } - if classSource == nil { - classSource = fcache.NewFakeControllerSource() - } - params := ControllerParameters{ KubeClient: kubeClient, SyncPeriod: 5 * time.Second, VolumePlugins: []vol.VolumePlugin{}, - VolumeSource: volumeSource, - ClaimSource: claimSource, - ClassSource: classSource, + VolumeInformer: informerFactory.Core().V1().PersistentVolumes(), + ClaimInformer: informerFactory.Core().V1().PersistentVolumeClaims(), + ClassInformer: informerFactory.Storage().V1beta1().StorageClasses(), EventRecorder: record.NewFakeRecorder(1000), EnableDynamicProvisioning: enableDynamicProvisioning, } ctrl := NewController(params) - + ctrl.volumeListerSynced = alwaysReady + ctrl.claimListerSynced = alwaysReady + ctrl.classListerSynced = alwaysReady // Speed up the test ctrl.createProvisionedPVInterval = 5 * time.Millisecond return ctrl @@ -924,7 +931,7 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag // Initialize the controller client := &fake.Clientset{} - ctrl := newTestController(client, nil, nil, nil, true) + ctrl := newTestController(client, nil, true) reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors) for _, claim := range test.initialClaims { ctrl.claims.Add(claim) @@ -935,14 +942,12 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag reactor.volumes[volume.Name] = volume } - // Convert classes to []interface{} and forcefully inject them into - // controller. - storageClassPtrs := make([]interface{}, len(storageClasses)) - for i, s := range storageClasses { - storageClassPtrs[i] = s + // Inject classes into controller via a custom lister. + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + for _, class := range storageClasses { + indexer.Add(class) } - // 1 is the resource version - ctrl.classes.Replace(storageClassPtrs, "1") + ctrl.classLister = storagelisters.NewStorageClassLister(indexer) // Run the tested functions err := test.test(ctrl, reactor, test) @@ -980,15 +985,14 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s // Initialize the controller client := &fake.Clientset{} - ctrl := newTestController(client, nil, nil, nil, true) + ctrl := newTestController(client, nil, true) - // Convert classes to []interface{} and forcefully inject them into - // controller. - storageClassPtrs := make([]interface{}, len(storageClasses)) - for i, s := range storageClasses { - storageClassPtrs[i] = s + // Inject classes into controller via a custom lister. + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + for _, class := range storageClasses { + indexer.Add(class) } - ctrl.classes.Replace(storageClassPtrs, "1") + ctrl.classLister = storagelisters.NewStorageClassLister(indexer) reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors) for _, claim := range test.initialClaims { diff --git a/pkg/controller/volume/persistentvolume/provision_test.go b/pkg/controller/volume/persistentvolume/provision_test.go index cc391bbac32..83e802d6304 100644 --- a/pkg/controller/volume/persistentvolume/provision_test.go +++ b/pkg/controller/volume/persistentvolume/provision_test.go @@ -422,7 +422,7 @@ func TestProvisionMultiSync(t *testing.T) { // When provisioning is disabled, provisioning a claim should instantly return nil func TestDisablingDynamicProvisioner(t *testing.T) { - ctrl := newTestController(nil, nil, nil, nil, false) + ctrl := newTestController(nil, nil, false) retVal := ctrl.provisionClaim(nil) if retVal != nil { t.Errorf("Expected nil return but got %v", retVal) diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index 81f5fa23248..630aeca1a69 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -31,6 +31,8 @@ import ( storage "k8s.io/kubernetes/pkg/apis/storage/v1beta1" storageutil "k8s.io/kubernetes/pkg/apis/storage/v1beta1/util" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" + storagelisters "k8s.io/kubernetes/pkg/client/listers/storage/v1beta1" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/util/goroutinemap" vol "k8s.io/kubernetes/pkg/volume" @@ -146,14 +148,13 @@ const createProvisionedPVInterval = 10 * time.Second // cache.Controllers that watch PersistentVolume and PersistentVolumeClaim // changes. type PersistentVolumeController struct { - volumeController cache.Controller - volumeInformer cache.Indexer - volumeSource cache.ListerWatcher - claimController cache.Controller - claimInformer cache.Store - claimSource cache.ListerWatcher - classReflector *cache.Reflector - classSource cache.ListerWatcher + volumeLister corelisters.PersistentVolumeLister + volumeListerSynced cache.InformerSynced + claimLister corelisters.PersistentVolumeClaimLister + claimListerSynced cache.InformerSynced + classLister storagelisters.StorageClassLister + classListerSynced cache.InformerSynced + kubeClient clientset.Interface eventRecorder record.EventRecorder cloud cloudprovider.Interface @@ -182,7 +183,6 @@ type PersistentVolumeController struct { // have been already written. volumes persistentVolumeOrderedIndex claims cache.Store - classes cache.Store // Work queues of claims and volumes to process. Every queue should have // exactly one worker thread, especially syncClaim() is not reentrant. @@ -1464,17 +1464,10 @@ func (ctrl *PersistentVolumeController) findProvisionablePlugin(claim *v1.Persis // provisionClaim() which leads here is never called with claimClass=="", we // can save some checks. claimClass := storageutil.GetClaimStorageClass(claim) - classObj, found, err := ctrl.classes.GetByKey(claimClass) + class, err := ctrl.classLister.Get(claimClass) if err != nil { return nil, nil, err } - if !found { - return nil, nil, fmt.Errorf("StorageClass %q not found", claimClass) - } - class, ok := classObj.(*storage.StorageClass) - if !ok { - return nil, nil, fmt.Errorf("Cannot convert object to StorageClass: %+v", classObj) - } // Find a plugin for the class plugin, err := ctrl.volumePluginMgr.FindProvisionablePluginByName(class.Provisioner) diff --git a/pkg/controller/volume/persistentvolume/pv_controller_base.go b/pkg/controller/volume/persistentvolume/pv_controller_base.go index 0c480865274..ee83a7e304a 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_base.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_base.go @@ -24,9 +24,9 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/labels" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" v1core "k8s.io/client-go/kubernetes/typed/core/v1" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" @@ -36,6 +36,9 @@ import ( "k8s.io/kubernetes/pkg/api/v1" storage "k8s.io/kubernetes/pkg/apis/storage/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" + storageinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/storage/v1beta1" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/goroutinemap" @@ -51,15 +54,17 @@ import ( // ControllerParameters contains arguments for creation of a new // PersistentVolume controller. type ControllerParameters struct { - KubeClient clientset.Interface - SyncPeriod time.Duration - AlphaProvisioner vol.ProvisionableVolumePlugin - VolumePlugins []vol.VolumePlugin - Cloud cloudprovider.Interface - ClusterName string - VolumeSource, ClaimSource, ClassSource cache.ListerWatcher - EventRecorder record.EventRecorder - EnableDynamicProvisioning bool + KubeClient clientset.Interface + SyncPeriod time.Duration + AlphaProvisioner vol.ProvisionableVolumePlugin + VolumePlugins []vol.VolumePlugin + Cloud cloudprovider.Interface + ClusterName string + VolumeInformer coreinformers.PersistentVolumeInformer + ClaimInformer coreinformers.PersistentVolumeClaimInformer + ClassInformer storageinformers.StorageClassInformer + EventRecorder record.EventRecorder + EnableDynamicProvisioning bool } // NewController creates a new PersistentVolume controller @@ -94,98 +99,47 @@ func NewController(p ControllerParameters) *PersistentVolumeController { } } - volumeSource := p.VolumeSource - if volumeSource == nil { - volumeSource = &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return p.KubeClient.Core().PersistentVolumes().List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return p.KubeClient.Core().PersistentVolumes().Watch(options) - }, - } - } - controller.volumeSource = volumeSource - - claimSource := p.ClaimSource - if claimSource == nil { - claimSource = &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return p.KubeClient.Core().PersistentVolumeClaims(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return p.KubeClient.Core().PersistentVolumeClaims(metav1.NamespaceAll).Watch(options) - }, - } - } - controller.claimSource = claimSource - - classSource := p.ClassSource - if classSource == nil { - classSource = &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return p.KubeClient.Storage().StorageClasses().List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return p.KubeClient.Storage().StorageClasses().Watch(options) - }, - } - } - controller.classSource = classSource - - controller.volumeInformer, controller.volumeController = cache.NewIndexerInformer( - volumeSource, - &v1.PersistentVolume{}, - p.SyncPeriod, + p.VolumeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) }, DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, }, - cache.Indexers{"accessmodes": accessModesIndexFunc}, - ) - controller.claimInformer, controller.claimController = cache.NewInformer( - claimSource, - &v1.PersistentVolumeClaim{}, p.SyncPeriod, + ) + controller.volumeLister = p.VolumeInformer.Lister() + controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced + + p.ClaimInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) }, DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) }, }, - ) - - // This is just a cache of StorageClass instances, no special actions are - // needed when a class is created/deleted/updated. - controller.classes = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) - controller.classReflector = cache.NewReflector( - classSource, - &storage.StorageClass{}, - controller.classes, p.SyncPeriod, ) + controller.claimLister = p.ClaimInformer.Lister() + controller.claimListerSynced = p.ClaimInformer.Informer().HasSynced + + controller.classLister = p.ClassInformer.Lister() + controller.classListerSynced = p.ClassInformer.Informer().HasSynced return controller } // initializeCaches fills all controller caches with initial data from etcd in // order to have the caches already filled when first addClaim/addVolume to // perform initial synchronization of the controller. -func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSource cache.ListerWatcher) { - volumeListObj, err := volumeSource.List(metav1.ListOptions{}) +func (ctrl *PersistentVolumeController) initializeCaches(volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister) { + volumeList, err := volumeLister.List(labels.Everything()) if err != nil { glog.Errorf("PersistentVolumeController can't initialize caches: %v", err) return } - volumeList, ok := volumeListObj.(*v1.PersistentVolumeList) - if !ok { - glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %#v", volumeListObj) - return - } - for _, volume := range volumeList.Items { + for _, volume := range volumeList { // Ignore template volumes from kubernetes 1.2 - deleted := ctrl.upgradeVolumeFrom1_2(&volume) + deleted := ctrl.upgradeVolumeFrom1_2(volume) if !deleted { - clone, err := api.Scheme.DeepCopy(&volume) + clone, err := api.Scheme.DeepCopy(volume) if err != nil { glog.Errorf("error cloning volume %q: %v", volume.Name, err) continue @@ -195,20 +149,15 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour } } - claimListObj, err := claimSource.List(metav1.ListOptions{}) + claimList, err := claimLister.List(labels.Everything()) if err != nil { glog.Errorf("PersistentVolumeController can't initialize caches: %v", err) return } - claimList, ok := claimListObj.(*v1.PersistentVolumeClaimList) - if !ok { - glog.Errorf("PersistentVolumeController can't initialize caches, expected list of claims, got: %#v", claimListObj) - return - } - for _, claim := range claimList.Items { - clone, err := api.Scheme.DeepCopy(&claim) + for _, claim := range claimList { + clone, err := api.Scheme.DeepCopy(claim) if err != nil { - glog.Errorf("error cloning claim %q: %v", claimToClaimKey(&claim), err) + glog.Errorf("error cloning claim %q: %v", claimToClaimKey(claim), err) continue } claimClone := clone.(*v1.PersistentVolumeClaim) @@ -326,10 +275,11 @@ func (ctrl *PersistentVolumeController) deleteClaim(claim *v1.PersistentVolumeCl // Run starts all of this controller's control loops func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) { glog.V(1).Infof("starting PersistentVolumeController") - ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource) - go ctrl.volumeController.Run(stopCh) - go ctrl.claimController.Run(stopCh) - go ctrl.classReflector.RunUntil(stopCh) + if !cache.WaitForCacheSync(stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for volume caches to sync")) + return + } + ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister) go wait.Until(ctrl.volumeWorker, time.Second, stopCh) go wait.Until(ctrl.claimWorker, time.Second, stopCh) @@ -351,27 +301,26 @@ func (ctrl *PersistentVolumeController) volumeWorker() { key := keyObj.(string) glog.V(5).Infof("volumeWorker[%s]", key) - volumeObj, found, err := ctrl.volumeInformer.GetByKey(key) + _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - glog.V(2).Infof("error getting volume %q from informer: %v", key, err) + glog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err) return false } - - if found { + volume, err := ctrl.volumeLister.Get(name) + if err == nil { // The volume still exists in informer cache, the event must have // been add/update/sync - volume, ok := volumeObj.(*v1.PersistentVolume) - if !ok { - glog.Errorf("expected volume, got %+v", volumeObj) - return false - } ctrl.updateVolume(volume) return false } + if !errors.IsNotFound(err) { + glog.V(2).Infof("error getting volume %q from informer: %v", key, err) + return false + } // The volume is not in informer cache, the event must have been // "delete" - volumeObj, found, err = ctrl.volumes.store.GetByKey(key) + volumeObj, found, err := ctrl.volumes.store.GetByKey(key) if err != nil { glog.V(2).Infof("error getting volume %q from cache: %v", key, err) return false @@ -410,26 +359,25 @@ func (ctrl *PersistentVolumeController) claimWorker() { key := keyObj.(string) glog.V(5).Infof("claimWorker[%s]", key) - claimObj, found, err := ctrl.claimInformer.GetByKey(key) + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { + glog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err) + return false + } + claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name) + if err == nil { + // The claim still exists in informer cache, the event must have + // been add/update/sync + ctrl.updateClaim(claim) + return false + } + if !errors.IsNotFound(err) { glog.V(2).Infof("error getting claim %q from informer: %v", key, err) return false } - if found { - // The claim still exists in informer cache, the event must have - // been add/update/sync - claim, ok := claimObj.(*v1.PersistentVolumeClaim) - if !ok { - glog.Errorf("expected claim, got %+v", claimObj) - return false - } - ctrl.updateClaim(claim) - return false - } - // The claim is not in informer cache, the event must have been "delete" - claimObj, found, err = ctrl.claims.GetByKey(key) + claimObj, found, err := ctrl.claims.GetByKey(key) if err != nil { glog.V(2).Infof("error getting claim %q from cache: %v", key, err) return false diff --git a/pkg/controller/volume/persistentvolume/pv_controller_test.go b/pkg/controller/volume/persistentvolume/pv_controller_test.go index 2346d036b8d..903b0b3610f 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_test.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_test.go @@ -21,10 +21,13 @@ import ( "time" "github.com/golang/glog" + "k8s.io/apimachinery/pkg/watch" + core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" - fcache "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" + "k8s.io/kubernetes/pkg/controller" ) // Test the real controller methods (add/update/delete claim/volume) with @@ -161,26 +164,38 @@ func TestControllerSync(t *testing.T) { // Initialize the controller client := &fake.Clientset{} - volumeSource := fcache.NewFakePVControllerSource() - claimSource := fcache.NewFakePVCControllerSource() - ctrl := newTestController(client, volumeSource, claimSource, nil, true) - reactor := newVolumeReactor(client, ctrl, volumeSource, claimSource, test.errors) + + fakeVolumeWatch := watch.NewFake() + client.PrependWatchReactor("persistentvolumes", core.DefaultWatchReactor(fakeVolumeWatch, nil)) + fakeClaimWatch := watch.NewFake() + client.PrependWatchReactor("persistentvolumeclaims", core.DefaultWatchReactor(fakeClaimWatch, nil)) + client.PrependWatchReactor("storageclasses", core.DefaultWatchReactor(watch.NewFake(), nil)) + + informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + ctrl := newTestController(client, informers, true) + + reactor := newVolumeReactor(client, ctrl, fakeVolumeWatch, fakeClaimWatch, test.errors) for _, claim := range test.initialClaims { - claimSource.Add(claim) reactor.claims[claim.Name] = claim + go func(claim *v1.PersistentVolumeClaim) { + fakeClaimWatch.Add(claim) + }(claim) } for _, volume := range test.initialVolumes { - volumeSource.Add(volume) reactor.volumes[volume.Name] = volume + go func(volume *v1.PersistentVolume) { + fakeVolumeWatch.Add(volume) + }(volume) } // Start the controller stopCh := make(chan struct{}) + informers.Start(stopCh) go ctrl.Run(stopCh) // Wait for the controller to pass initial sync and fill its caches. - for !ctrl.volumeController.HasSynced() || - !ctrl.claimController.HasSynced() || + for !ctrl.volumeListerSynced() || + !ctrl.claimListerSynced() || len(ctrl.claims.ListKeys()) < len(test.initialClaims) || len(ctrl.volumes.store.ListKeys()) < len(test.initialVolumes) { diff --git a/pkg/controller/volume/persistentvolume/testing/BUILD b/pkg/controller/volume/persistentvolume/testing/BUILD deleted file mode 100644 index 18900281556..00000000000 --- a/pkg/controller/volume/persistentvolume/testing/BUILD +++ /dev/null @@ -1,50 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -licenses(["notice"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) - -go_test( - name = "go_default_test", - srcs = ["fake_controller_source_test.go"], - library = ":go_default_library", - tags = ["automanaged"], - deps = [ - "//pkg/api:go_default_library", - "//pkg/api/v1:go_default_library", - "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", - "//vendor:k8s.io/apimachinery/pkg/watch", - ], -) - -go_library( - name = "go_default_library", - srcs = ["fake_controller_source.go"], - tags = ["automanaged"], - deps = [ - "//pkg/api:go_default_library", - "//pkg/api/v1:go_default_library", - "//vendor:k8s.io/apimachinery/pkg/api/meta", - "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", - "//vendor:k8s.io/apimachinery/pkg/runtime", - "//vendor:k8s.io/apimachinery/pkg/types", - "//vendor:k8s.io/apimachinery/pkg/watch", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], -) diff --git a/pkg/controller/volume/persistentvolume/testing/fake_controller_source.go b/pkg/controller/volume/persistentvolume/testing/fake_controller_source.go deleted file mode 100644 index e75071e3a3a..00000000000 --- a/pkg/controller/volume/persistentvolume/testing/fake_controller_source.go +++ /dev/null @@ -1,264 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package framework - -import ( - "errors" - "math/rand" - "strconv" - "sync" - - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/v1" -) - -func NewFakeControllerSource() *FakeControllerSource { - return &FakeControllerSource{ - Items: map[nnu]runtime.Object{}, - Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull), - } -} - -func NewFakePVControllerSource() *FakePVControllerSource { - return &FakePVControllerSource{ - FakeControllerSource{ - Items: map[nnu]runtime.Object{}, - Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull), - }} -} - -func NewFakePVCControllerSource() *FakePVCControllerSource { - return &FakePVCControllerSource{ - FakeControllerSource{ - Items: map[nnu]runtime.Object{}, - Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull), - }} -} - -// FakeControllerSource implements listing/watching for testing. -type FakeControllerSource struct { - lock sync.RWMutex - Items map[nnu]runtime.Object - changes []watch.Event // one change per resourceVersion - Broadcaster *watch.Broadcaster -} - -type FakePVControllerSource struct { - FakeControllerSource -} - -type FakePVCControllerSource struct { - FakeControllerSource -} - -// namespace, name, uid to be used as a key. -type nnu struct { - namespace, name string - uid types.UID -} - -// Add adds an object to the set and sends an add event to watchers. -// obj's ResourceVersion is set. -func (f *FakeControllerSource) Add(obj runtime.Object) { - f.Change(watch.Event{Type: watch.Added, Object: obj}, 1) -} - -// Modify updates an object in the set and sends a modified event to watchers. -// obj's ResourceVersion is set. -func (f *FakeControllerSource) Modify(obj runtime.Object) { - f.Change(watch.Event{Type: watch.Modified, Object: obj}, 1) -} - -// Delete deletes an object from the set and sends a delete event to watchers. -// obj's ResourceVersion is set. -func (f *FakeControllerSource) Delete(lastValue runtime.Object) { - f.Change(watch.Event{Type: watch.Deleted, Object: lastValue}, 1) -} - -// AddDropWatch adds an object to the set but forgets to send an add event to -// watchers. -// obj's ResourceVersion is set. -func (f *FakeControllerSource) AddDropWatch(obj runtime.Object) { - f.Change(watch.Event{Type: watch.Added, Object: obj}, 0) -} - -// ModifyDropWatch updates an object in the set but forgets to send a modify -// event to watchers. -// obj's ResourceVersion is set. -func (f *FakeControllerSource) ModifyDropWatch(obj runtime.Object) { - f.Change(watch.Event{Type: watch.Modified, Object: obj}, 0) -} - -// DeleteDropWatch deletes an object from the set but forgets to send a delete -// event to watchers. -// obj's ResourceVersion is set. -func (f *FakeControllerSource) DeleteDropWatch(lastValue runtime.Object) { - f.Change(watch.Event{Type: watch.Deleted, Object: lastValue}, 0) -} - -func (f *FakeControllerSource) key(accessor metav1.Object) nnu { - return nnu{accessor.GetNamespace(), accessor.GetName(), accessor.GetUID()} -} - -// Change records the given event (setting the object's resource version) and -// sends a watch event with the specified probability. -func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) { - f.lock.Lock() - defer f.lock.Unlock() - - accessor, err := meta.Accessor(e.Object) - if err != nil { - panic(err) // this is test code only - } - - resourceVersion := len(f.changes) + 1 - accessor.SetResourceVersion(strconv.Itoa(resourceVersion)) - f.changes = append(f.changes, e) - key := f.key(accessor) - switch e.Type { - case watch.Added, watch.Modified: - f.Items[key] = e.Object - case watch.Deleted: - delete(f.Items, key) - } - - if rand.Float64() < watchProbability { - f.Broadcaster.Action(e.Type, e.Object) - } -} - -func (f *FakeControllerSource) getListItemsLocked() ([]runtime.Object, error) { - list := make([]runtime.Object, 0, len(f.Items)) - for _, obj := range f.Items { - // Must make a copy to allow clients to modify the object. - // Otherwise, if they make a change and write it back, they - // will inadvertently change our canonical copy (in - // addition to racing with other clients). - objCopy, err := api.Scheme.DeepCopy(obj) - if err != nil { - return nil, err - } - list = append(list, objCopy.(runtime.Object)) - } - return list, nil -} - -// List returns a list object, with its resource version set. -func (f *FakeControllerSource) List(options metav1.ListOptions) (runtime.Object, error) { - f.lock.RLock() - defer f.lock.RUnlock() - list, err := f.getListItemsLocked() - if err != nil { - return nil, err - } - listObj := &api.List{} - if err := meta.SetList(listObj, list); err != nil { - return nil, err - } - objMeta, err := metav1.ListMetaFor(listObj) - if err != nil { - return nil, err - } - resourceVersion := len(f.changes) - objMeta.ResourceVersion = strconv.Itoa(resourceVersion) - return listObj, nil -} - -// List returns a list object, with its resource version set. -func (f *FakePVControllerSource) List(options metav1.ListOptions) (runtime.Object, error) { - f.lock.RLock() - defer f.lock.RUnlock() - list, err := f.FakeControllerSource.getListItemsLocked() - if err != nil { - return nil, err - } - listObj := &v1.PersistentVolumeList{} - if err := meta.SetList(listObj, list); err != nil { - return nil, err - } - objMeta, err := metav1.ListMetaFor(listObj) - if err != nil { - return nil, err - } - resourceVersion := len(f.changes) - objMeta.ResourceVersion = strconv.Itoa(resourceVersion) - return listObj, nil -} - -// List returns a list object, with its resource version set. -func (f *FakePVCControllerSource) List(options metav1.ListOptions) (runtime.Object, error) { - f.lock.RLock() - defer f.lock.RUnlock() - list, err := f.FakeControllerSource.getListItemsLocked() - if err != nil { - return nil, err - } - listObj := &v1.PersistentVolumeClaimList{} - if err := meta.SetList(listObj, list); err != nil { - return nil, err - } - objMeta, err := metav1.ListMetaFor(listObj) - if err != nil { - return nil, err - } - resourceVersion := len(f.changes) - objMeta.ResourceVersion = strconv.Itoa(resourceVersion) - return listObj, nil -} - -// Watch returns a watch, which will be pre-populated with all changes -// after resourceVersion. -func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interface, error) { - f.lock.RLock() - defer f.lock.RUnlock() - rc, err := strconv.Atoi(options.ResourceVersion) - if err != nil { - return nil, err - } - if rc < len(f.changes) { - changes := []watch.Event{} - for _, c := range f.changes[rc:] { - // Must make a copy to allow clients to modify the - // object. Otherwise, if they make a change and write - // it back, they will inadvertently change the our - // canonical copy (in addition to racing with other - // clients). - objCopy, err := api.Scheme.DeepCopy(c.Object) - if err != nil { - return nil, err - } - changes = append(changes, watch.Event{Type: c.Type, Object: objCopy.(runtime.Object)}) - } - return f.Broadcaster.WatchWithPrefix(changes), nil - } else if rc > len(f.changes) { - return nil, errors.New("resource version in the future not supported by this fake") - } - return f.Broadcaster.Watch(), nil -} - -// Shutdown closes the underlying broadcaster, waiting for events to be -// delivered. It's an error to call any method after calling shutdown. This is -// enforced by Shutdown() leaving f locked. -func (f *FakeControllerSource) Shutdown() { - f.lock.Lock() // Purposely no unlock. - f.Broadcaster.Shutdown() -} diff --git a/pkg/controller/volume/persistentvolume/testing/fake_controller_source_test.go b/pkg/controller/volume/persistentvolume/testing/fake_controller_source_test.go deleted file mode 100644 index cbd07b89eae..00000000000 --- a/pkg/controller/volume/persistentvolume/testing/fake_controller_source_test.go +++ /dev/null @@ -1,96 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package framework - -import ( - "sync" - "testing" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/v1" -) - -// ensure the watch delivers the requested and only the requested items. -func consume(t *testing.T, w watch.Interface, rvs []string, done *sync.WaitGroup) { - defer done.Done() - for _, rv := range rvs { - got, ok := <-w.ResultChan() - if !ok { - t.Errorf("%#v: unexpected channel close, wanted %v", rvs, rv) - return - } - gotRV := got.Object.(*v1.Pod).ObjectMeta.ResourceVersion - if e, a := rv, gotRV; e != a { - t.Errorf("wanted %v, got %v", e, a) - } else { - t.Logf("Got %v as expected", gotRV) - } - } - // We should not get anything else. - got, open := <-w.ResultChan() - if open { - t.Errorf("%#v: unwanted object %#v", rvs, got) - } -} - -func TestRCNumber(t *testing.T) { - pod := func(name string) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - } - } - - wg := &sync.WaitGroup{} - wg.Add(3) - - source := NewFakeControllerSource() - source.Add(pod("foo")) - source.Modify(pod("foo")) - source.Modify(pod("foo")) - - w, err := source.Watch(metav1.ListOptions{ResourceVersion: "1"}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - go consume(t, w, []string{"2", "3"}, wg) - - list, err := source.List(metav1.ListOptions{}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if e, a := "3", list.(*api.List).ResourceVersion; e != a { - t.Errorf("wanted %v, got %v", e, a) - } - - w2, err := source.Watch(metav1.ListOptions{ResourceVersion: "2"}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - go consume(t, w2, []string{"3"}, wg) - - w3, err := source.Watch(metav1.ListOptions{ResourceVersion: "3"}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - go consume(t, w3, []string{}, wg) - source.Shutdown() - wg.Wait() -} diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 41901ed763a..0605275e324 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -314,6 +314,7 @@ func ClusterRoles() []rbac.ClusterRole { rbac.NewRule("list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(), rbac.NewRule("list", "watch").Groups(autoscalingGroup).Resources("horizontalpodautoscalers").RuleOrDie(), rbac.NewRule("list", "watch").Groups(certificatesGroup).Resources("certificatesigningrequests").RuleOrDie(), + rbac.NewRule("list", "watch").Groups(storageGroup).Resources("storageclasses").RuleOrDie(), }, }, { diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml index 4e8403051b1..c001e8d502d 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml @@ -523,6 +523,13 @@ items: verbs: - list - watch + - apiGroups: + - storage.k8s.io + resources: + - storageclasses + verbs: + - list + - watch - apiVersion: rbac.authorization.k8s.io/v1beta1 kind: ClusterRole metadata: diff --git a/test/integration/volume/persistent_volumes_test.go b/test/integration/volume/persistent_volumes_test.go index 44f1196e9d0..218d3843a5e 100644 --- a/test/integration/volume/persistent_volumes_test.go +++ b/test/integration/volume/persistent_volumes_test.go @@ -36,6 +36,7 @@ import ( storage "k8s.io/kubernetes/pkg/apis/storage/v1beta1" storageutil "k8s.io/kubernetes/pkg/apis/storage/v1beta1/util" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" "k8s.io/kubernetes/pkg/volume" @@ -110,7 +111,7 @@ func TestPersistentVolumeRecycler(t *testing.T) { ns := framework.CreateTestingNamespace("pv-recycler", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - testClient, ctrl, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) + testClient, ctrl, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) defer watchPV.Stop() defer watchPVC.Stop() @@ -119,6 +120,7 @@ func TestPersistentVolumeRecycler(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) stopCh := make(chan struct{}) + informers.Start(stopCh) go ctrl.Run(stopCh) defer close(stopCh) @@ -164,7 +166,7 @@ func TestPersistentVolumeDeleter(t *testing.T) { ns := framework.CreateTestingNamespace("pv-deleter", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - testClient, ctrl, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) + testClient, ctrl, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) defer watchPV.Stop() defer watchPVC.Stop() @@ -173,6 +175,7 @@ func TestPersistentVolumeDeleter(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) stopCh := make(chan struct{}) + informers.Start(stopCh) go ctrl.Run(stopCh) defer close(stopCh) @@ -223,7 +226,7 @@ func TestPersistentVolumeBindRace(t *testing.T) { ns := framework.CreateTestingNamespace("pv-bind-race", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - testClient, ctrl, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) + testClient, ctrl, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) defer watchPV.Stop() defer watchPVC.Stop() @@ -232,6 +235,7 @@ func TestPersistentVolumeBindRace(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) stopCh := make(chan struct{}) + informers.Start(stopCh) go ctrl.Run(stopCh) defer close(stopCh) @@ -294,7 +298,7 @@ func TestPersistentVolumeClaimLabelSelector(t *testing.T) { ns := framework.CreateTestingNamespace("pvc-label-selector", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - testClient, controller, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) + testClient, controller, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) defer watchPV.Stop() defer watchPVC.Stop() @@ -303,6 +307,7 @@ func TestPersistentVolumeClaimLabelSelector(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) stopCh := make(chan struct{}) + informers.Start(stopCh) go controller.Run(stopCh) defer close(stopCh) @@ -374,7 +379,7 @@ func TestPersistentVolumeClaimLabelSelectorMatchExpressions(t *testing.T) { ns := framework.CreateTestingNamespace("pvc-match-expressions", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - testClient, controller, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) + testClient, controller, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) defer watchPV.Stop() defer watchPVC.Stop() @@ -383,6 +388,7 @@ func TestPersistentVolumeClaimLabelSelectorMatchExpressions(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) stopCh := make(chan struct{}) + informers.Start(stopCh) go controller.Run(stopCh) defer close(stopCh) @@ -473,7 +479,7 @@ func TestPersistentVolumeMultiPVs(t *testing.T) { ns := framework.CreateTestingNamespace("multi-pvs", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - testClient, controller, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) + testClient, controller, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) defer watchPV.Stop() defer watchPVC.Stop() @@ -482,6 +488,7 @@ func TestPersistentVolumeMultiPVs(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) stopCh := make(chan struct{}) + informers.Start(stopCh) go controller.Run(stopCh) defer close(stopCh) @@ -562,7 +569,7 @@ func TestPersistentVolumeMultiPVsPVCs(t *testing.T) { ns := framework.CreateTestingNamespace("multi-pvs-pvcs", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - testClient, binder, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) + testClient, binder, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) defer watchPV.Stop() defer watchPVC.Stop() @@ -571,6 +578,7 @@ func TestPersistentVolumeMultiPVsPVCs(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) controllerStopCh := make(chan struct{}) + informers.Start(controllerStopCh) go binder.Run(controllerStopCh) defer close(controllerStopCh) @@ -727,7 +735,7 @@ func TestPersistentVolumeControllerStartup(t *testing.T) { const shortSyncPeriod = 2 * time.Second syncPeriod := getSyncPeriod(shortSyncPeriod) - testClient, binder, watchPV, watchPVC := createClients(ns, t, s, shortSyncPeriod) + testClient, binder, informers, watchPV, watchPVC := createClients(ns, t, s, shortSyncPeriod) defer watchPV.Stop() defer watchPVC.Stop() @@ -784,6 +792,7 @@ func TestPersistentVolumeControllerStartup(t *testing.T) { // Start the controller when all PVs and PVCs are already saved in etcd stopCh := make(chan struct{}) + informers.Start(stopCh) go binder.Run(stopCh) defer close(stopCh) @@ -850,7 +859,7 @@ func TestPersistentVolumeProvisionMultiPVCs(t *testing.T) { ns := framework.CreateTestingNamespace("provision-multi-pvs", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - testClient, binder, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) + testClient, binder, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) defer watchPV.Stop() defer watchPVC.Stop() @@ -871,6 +880,7 @@ func TestPersistentVolumeProvisionMultiPVCs(t *testing.T) { testClient.Storage().StorageClasses().Create(&storageClass) stopCh := make(chan struct{}) + informers.Start(stopCh) go binder.Run(stopCh) defer close(stopCh) @@ -947,7 +957,7 @@ func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) { ns := framework.CreateTestingNamespace("multi-pvs-diff-access", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - testClient, controller, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) + testClient, controller, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) defer watchPV.Stop() defer watchPVC.Stop() @@ -956,6 +966,7 @@ func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) { defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) stopCh := make(chan struct{}) + informers.Start(stopCh) go controller.Run(stopCh) defer close(stopCh) @@ -1088,7 +1099,7 @@ func waitForAnyPersistentVolumeClaimPhase(w watch.Interface, phase v1.Persistent } } -func createClients(ns *v1.Namespace, t *testing.T, s *httptest.Server, syncPeriod time.Duration) (*clientset.Clientset, *persistentvolumecontroller.PersistentVolumeController, watch.Interface, watch.Interface) { +func createClients(ns *v1.Namespace, t *testing.T, s *httptest.Server, syncPeriod time.Duration) (*clientset.Clientset, *persistentvolumecontroller.PersistentVolumeController, informers.SharedInformerFactory, watch.Interface, watch.Interface) { // Use higher QPS and Burst, there is a test for race conditions which // creates many objects and default values were too low. binderClient := clientset.NewForConfigOrDie(&restclient.Config{ @@ -1119,12 +1130,16 @@ func createClients(ns *v1.Namespace, t *testing.T, s *httptest.Server, syncPerio } plugins := []volume.VolumePlugin{plugin} cloud := &fakecloud.FakeCloud{} + informers := informers.NewSharedInformerFactory(testClient, getSyncPeriod(syncPeriod)) ctrl := persistentvolumecontroller.NewController( persistentvolumecontroller.ControllerParameters{ - KubeClient: binderClient, - SyncPeriod: getSyncPeriod(syncPeriod), - VolumePlugins: plugins, - Cloud: cloud, + KubeClient: binderClient, + SyncPeriod: getSyncPeriod(syncPeriod), + VolumePlugins: plugins, + Cloud: cloud, + VolumeInformer: informers.Core().V1().PersistentVolumes(), + ClaimInformer: informers.Core().V1().PersistentVolumeClaims(), + ClassInformer: informers.Storage().V1beta1().StorageClasses(), EnableDynamicProvisioning: true, }) @@ -1137,7 +1152,7 @@ func createClients(ns *v1.Namespace, t *testing.T, s *httptest.Server, syncPerio t.Fatalf("Failed to watch PersistentVolumeClaims: %v", err) } - return testClient, ctrl, watchPV, watchPVC + return testClient, ctrl, informers, watchPV, watchPVC } func createPV(name, path, cap string, mode []v1.PersistentVolumeAccessMode, reclaim v1.PersistentVolumeReclaimPolicy) *v1.PersistentVolume {