diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index da0f5b2fddb..e516c9f9df9 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -164,6 +164,7 @@ type PersistentVolumeController struct { volumePluginMgr vol.VolumePluginMgr enableDynamicProvisioning bool clusterName string + resyncPeriod time.Duration // Cache of the last known version of volumes and claims. This cache is // thread safe as long as the volumes/claims there are not modified, they diff --git a/pkg/controller/volume/persistentvolume/pv_controller_base.go b/pkg/controller/volume/persistentvolume/pv_controller_base.go index 74259f8dfbc..cfbe26401ae 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_base.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_base.go @@ -87,30 +87,29 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error) createProvisionedPVInterval: createProvisionedPVInterval, claimQueue: workqueue.NewNamed("claims"), volumeQueue: workqueue.NewNamed("volumes"), + resyncPeriod: p.SyncPeriod, } if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, controller); err != nil { return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolume Controller: %v", err) } - p.VolumeInformer.Informer().AddEventHandlerWithResyncPeriod( + p.VolumeInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) }, DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, }, - p.SyncPeriod, ) controller.volumeLister = p.VolumeInformer.Lister() controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced - p.ClaimInformer.Informer().AddEventHandlerWithResyncPeriod( + p.ClaimInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) }, DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) }, }, - p.SyncPeriod, ) controller.claimLister = p.ClaimInformer.Lister() controller.claimListerSynced = p.ClaimInformer.Informer().HasSynced @@ -276,6 +275,7 @@ func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) { ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister) + go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh) go wait.Until(ctrl.volumeWorker, time.Second, stopCh) go wait.Until(ctrl.claimWorker, time.Second, stopCh) @@ -397,6 +397,31 @@ func (ctrl *PersistentVolumeController) claimWorker() { } } +// resync supplements short resync period of shared informers - we don't want +// all consumers of PV/PVC shared informer to have a short resync period, +// therefore we do our own. +func (ctrl *PersistentVolumeController) resync() { + glog.V(4).Infof("resyncing PV controller") + + pvcs, err := ctrl.claimLister.List(labels.NewSelector()) + if err != nil { + glog.Warningf("cannot list claims: %s", err) + return + } + for _, pvc := range pvcs { + ctrl.enqueueWork(ctrl.claimQueue, pvc) + } + + pvs, err := ctrl.volumeLister.List(labels.NewSelector()) + if err != nil { + glog.Warningf("cannot list persistent volumes: %s", err) + return + } + for _, pv := range pvs { + ctrl.enqueueWork(ctrl.volumeQueue, pv) + } +} + // setClaimProvisioner saves // claim.Annotations[annStorageProvisioner] = class.Provisioner func (ctrl *PersistentVolumeController) setClaimProvisioner(claim *v1.PersistentVolumeClaim, class *storage.StorageClass) (*v1.PersistentVolumeClaim, error) { diff --git a/pkg/controller/volume/persistentvolume/pv_controller_test.go b/pkg/controller/volume/persistentvolume/pv_controller_test.go index 2169c30a4b9..41d5ce4b831 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_test.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_test.go @@ -145,8 +145,7 @@ func TestControllerSync(t *testing.T) { } // Simulate a periodic resync, just in case some events arrived in a // wrong order. - ctrl.claims.Resync() - ctrl.volumes.store.Resync() + ctrl.resync() err = reactor.waitTest(test) if err != nil {