PV controller: resync informers manually

We want relatively short resync period of PV/PVCs and at the same time we
don't want to force such short resync to all shared informer consumers.
Therefore we need to make our own periodic resync.
This commit is contained in:
Jan Safranek 2017-07-17 13:39:08 +02:00
parent 1170b7c2a1
commit 0eface85e4
3 changed files with 31 additions and 6 deletions

View File

@ -164,6 +164,7 @@ type PersistentVolumeController struct {
volumePluginMgr vol.VolumePluginMgr
enableDynamicProvisioning bool
clusterName string
resyncPeriod time.Duration
// Cache of the last known version of volumes and claims. This cache is
// thread safe as long as the volumes/claims there are not modified, they

View File

@ -88,30 +88,29 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
createProvisionedPVInterval: createProvisionedPVInterval,
claimQueue: workqueue.NewNamed("claims"),
volumeQueue: workqueue.NewNamed("volumes"),
resyncPeriod: p.SyncPeriod,
}
if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, controller); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolume Controller: %v", err)
}
p.VolumeInformer.Informer().AddEventHandlerWithResyncPeriod(
p.VolumeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
},
p.SyncPeriod,
)
controller.volumeLister = p.VolumeInformer.Lister()
controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced
p.ClaimInformer.Informer().AddEventHandlerWithResyncPeriod(
p.ClaimInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
},
p.SyncPeriod,
)
controller.claimLister = p.ClaimInformer.Lister()
controller.claimListerSynced = p.ClaimInformer.Informer().HasSynced
@ -277,6 +276,7 @@ func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)
go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)
go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
go wait.Until(ctrl.claimWorker, time.Second, stopCh)
@ -398,6 +398,31 @@ func (ctrl *PersistentVolumeController) claimWorker() {
}
}
// resync supplements short resync period of shared informers - we don't want
// all consumers of PV/PVC shared informer to have a short resync period,
// therefore we do our own.
func (ctrl *PersistentVolumeController) resync() {
glog.V(4).Infof("resyncing PV controller")
pvcs, err := ctrl.claimLister.List(labels.NewSelector())
if err != nil {
glog.Warningf("cannot list claims: %s", err)
return
}
for _, pvc := range pvcs {
ctrl.enqueueWork(ctrl.claimQueue, pvc)
}
pvs, err := ctrl.volumeLister.List(labels.NewSelector())
if err != nil {
glog.Warningf("cannot list persistent volumes: %s", err)
return
}
for _, pv := range pvs {
ctrl.enqueueWork(ctrl.volumeQueue, pv)
}
}
// setClaimProvisioner saves
// claim.Annotations[annStorageProvisioner] = class.Provisioner
func (ctrl *PersistentVolumeController) setClaimProvisioner(claim *v1.PersistentVolumeClaim, class *storage.StorageClass) (*v1.PersistentVolumeClaim, error) {

View File

@ -145,8 +145,7 @@ func TestControllerSync(t *testing.T) {
}
// Simulate a periodic resync, just in case some events arrived in a
// wrong order.
ctrl.claims.Resync()
ctrl.volumes.store.Resync()
ctrl.resync()
err = reactor.waitTest(test)
if err != nil {