diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index bbc6d2e7984..17491b6a52d 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -430,7 +430,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration), cloud, s.ClusterName, - sharedInformers.PersistentVolumes().Informer(), + nil, // volumeSource nil, // claimSource nil, // classSource nil, // eventRecorder diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index f2a38a9dfe3..9a79ef7919b 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -289,7 +289,7 @@ func (s *CMServer) Run(_ []string) error { if err != nil { glog.Fatalf("An backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err) } - volumeController := persistentvolumecontroller.NewPersistentVolumeControllerFromClient( + volumeController := persistentvolumecontroller.NewPersistentVolumeController( clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-binder")), s.PVClaimBinderSyncPeriod.Duration, alphaProvisioner, diff --git a/pkg/controller/volume/persistentvolume/controller.go b/pkg/controller/volume/persistentvolume/controller.go index ce85b498498..63c9d5a8e0f 100644 --- a/pkg/controller/volume/persistentvolume/controller.go +++ b/pkg/controller/volume/persistentvolume/controller.go @@ -154,8 +154,8 @@ const createProvisionedPVInterval = 10 * time.Second // framework.Controllers that watch PersistentVolume and PersistentVolumeClaim // changes. type PersistentVolumeController struct { - volumeController framework.ControllerInterface - pvInformer framework.SharedIndexInformer + volumeController *framework.Controller + volumeSource cache.ListerWatcher claimController *framework.Controller claimSource cache.ListerWatcher classReflector *cache.Reflector @@ -176,13 +176,6 @@ type PersistentVolumeController struct { claims cache.Store classes cache.Store - // isInformerInternal is true if the informer we hold is a personal informer, - // false if it is a shared informer. If we're using a normal shared informer, - // then the informer will be started for us. If we have a personal informer, - // we must start it ourselves. If you start the controller using - // NewPersistentVolumeController(passing SharedInformer), this will be false. - isInformerInternal bool - // Map of scheduled/running operations. runningOperations goroutinemap.GoRoutineMap diff --git a/pkg/controller/volume/persistentvolume/controller_base.go b/pkg/controller/volume/persistentvolume/controller_base.go index 2120424f03b..93fe8326f57 100644 --- a/pkg/controller/volume/persistentvolume/controller_base.go +++ b/pkg/controller/volume/persistentvolume/controller_base.go @@ -38,8 +38,6 @@ import ( "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" - "k8s.io/kubernetes/pkg/controller/framework/informers" - "k8s.io/kubernetes/pkg/util/wait" ) // This file contains the controller base functionality, i.e. framework to @@ -54,8 +52,7 @@ func NewPersistentVolumeController( volumePlugins []vol.VolumePlugin, cloud cloudprovider.Interface, clusterName string, - pvInformer framework.SharedIndexInformer, - claimSource, classSource cache.ListerWatcher, + volumeSource, claimSource, classSource cache.ListerWatcher, eventRecorder record.EventRecorder, enableDynamicProvisioning bool, ) *PersistentVolumeController { @@ -87,8 +84,17 @@ func NewPersistentVolumeController( } } - controller.pvInformer = pvInformer - controller.isInformerInternal = false + if volumeSource == nil { + volumeSource = &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return kubeClient.Core().PersistentVolumes().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return kubeClient.Core().PersistentVolumes().Watch(options) + }, + } + } + controller.volumeSource = volumeSource if claimSource == nil { claimSource = &cache.ListWatch{ @@ -114,8 +120,17 @@ func NewPersistentVolumeController( } controller.classSource = classSource - controller.volumeController = pvInformer.GetController() - + _, controller.volumeController = framework.NewIndexerInformer( + volumeSource, + &api.PersistentVolume{}, + syncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: controller.addVolume, + UpdateFunc: controller.updateVolume, + DeleteFunc: controller.deleteVolume, + }, + cache.Indexers{"accessmodes": accessModesIndexFunc}, + ) _, controller.claimController = framework.NewInformer( claimSource, &api.PersistentVolumeClaim{}, @@ -139,55 +154,25 @@ func NewPersistentVolumeController( return controller } -// NewPersistentVolumeControllerFromClient returns a new -// *PersistentVolumeController that runs its own informer. -func NewPersistentVolumeControllerFromClient( - kubeClient clientset.Interface, - syncPeriod time.Duration, - alphaProvisioner vol.ProvisionableVolumePlugin, - volumePlugins []vol.VolumePlugin, - cloud cloudprovider.Interface, - clusterName string, - volumeSource, claimSource, classSource cache.ListerWatcher, - eventRecorder record.EventRecorder, - enableDynamicProvisioning bool, -) *PersistentVolumeController { - pvInformer := informers.NewPVInformer(kubeClient, syncPeriod) - if volumeSource != nil { - pvInformer = framework.NewSharedIndexInformer(volumeSource, &api.PersistentVolume{}, syncPeriod, cache.Indexers{"accessmodes": accessModesIndexFunc}) - } - ctrl := NewPersistentVolumeController( - kubeClient, - syncPeriod, - alphaProvisioner, - volumePlugins, - cloud, - clusterName, - pvInformer, - claimSource, - classSource, - eventRecorder, - enableDynamicProvisioning, - ) - ctrl.isInformerInternal = true - - return ctrl -} - // initializeCaches fills all controller caches with initial data from etcd in // order to have the caches already filled when first addClaim/addVolume to // perform initial synchronization of the controller. -func (ctrl *PersistentVolumeController) initializeCaches(volumeStore cache.Store, claimSource cache.ListerWatcher) { - volumeList := volumeStore.List() - for _, obj := range volumeList { - volume, ok := obj.(*api.PersistentVolume) - if !ok { - glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %#v", obj) - } +func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSource cache.ListerWatcher) { + volumeListObj, err := volumeSource.List(api.ListOptions{}) + if err != nil { + glog.Errorf("PersistentVolumeController can't initialize caches: %v", err) + return + } + volumeList, ok := volumeListObj.(*api.PersistentVolumeList) + if !ok { + glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %#v", volumeListObj) + return + } + for _, volume := range volumeList.Items { // Ignore template volumes from kubernetes 1.2 - deleted := ctrl.upgradeVolumeFrom1_2(volume) + deleted := ctrl.upgradeVolumeFrom1_2(&volume) if !deleted { - clone, err := conversion.NewCloner().DeepCopy(volume) + clone, err := conversion.NewCloner().DeepCopy(&volume) if err != nil { glog.Errorf("error cloning volume %q: %v", volume.Name, err) continue @@ -459,21 +444,7 @@ func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) { // Run starts all of this controller's control loops func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) { glog.V(4).Infof("starting PersistentVolumeController") - if ctrl.isInformerInternal { - go ctrl.pvInformer.Run(stopCh) - // Wait to avoid data race between Run and AddEventHandler in tests - wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { - return ctrl.pvInformer.HasSynced(), nil - }) - } - ctrl.initializeCaches(ctrl.pvInformer.GetStore(), ctrl.claimSource) - // AddEventHandler will send synthetic add events which we don't want until - // we have initialized the caches - ctrl.pvInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ - AddFunc: ctrl.addVolume, - UpdateFunc: ctrl.updateVolume, - DeleteFunc: ctrl.deleteVolume, - }) + ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource) go ctrl.volumeController.Run(stopCh) go ctrl.claimController.Run(stopCh) go ctrl.classReflector.RunUntil(stopCh) diff --git a/pkg/controller/volume/persistentvolume/framework_test.go b/pkg/controller/volume/persistentvolume/framework_test.go index e2319c56243..54ba1c0175c 100644 --- a/pkg/controller/volume/persistentvolume/framework_test.go +++ b/pkg/controller/volume/persistentvolume/framework_test.go @@ -594,7 +594,7 @@ func newTestController(kubeClient clientset.Interface, volumeSource, claimSource if classSource == nil { classSource = framework.NewFakeControllerSource() } - ctrl := NewPersistentVolumeControllerFromClient( + ctrl := NewPersistentVolumeController( kubeClient, 5*time.Second, // sync period nil, // alpha provisioner diff --git a/test/integration/persistentvolumes/persistent_volumes_test.go b/test/integration/persistentvolumes/persistent_volumes_test.go index f15fb45a22b..4706d78df3c 100644 --- a/test/integration/persistentvolumes/persistent_volumes_test.go +++ b/test/integration/persistentvolumes/persistent_volumes_test.go @@ -1126,7 +1126,7 @@ func createClients(ns *api.Namespace, t *testing.T, s *httptest.Server, syncPeri cloud := &fake_cloud.FakeCloud{} syncPeriod = getSyncPeriod(syncPeriod) - ctrl := persistentvolumecontroller.NewPersistentVolumeControllerFromClient( + ctrl := persistentvolumecontroller.NewPersistentVolumeController( binderClient, syncPeriod, nil, // alpha provisioner