mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 05:57:25 +00:00
Merge pull request #49016 from jsafrane/pv-controller-no-resync
PV controller: resync informers manually merging to unblock the submit queue
This commit is contained in:
commit
5941f7b69f
@ -164,6 +164,7 @@ type PersistentVolumeController struct {
|
|||||||
volumePluginMgr vol.VolumePluginMgr
|
volumePluginMgr vol.VolumePluginMgr
|
||||||
enableDynamicProvisioning bool
|
enableDynamicProvisioning bool
|
||||||
clusterName string
|
clusterName string
|
||||||
|
resyncPeriod time.Duration
|
||||||
|
|
||||||
// Cache of the last known version of volumes and claims. This cache is
|
// Cache of the last known version of volumes and claims. This cache is
|
||||||
// thread safe as long as the volumes/claims there are not modified, they
|
// thread safe as long as the volumes/claims there are not modified, they
|
||||||
|
@ -87,30 +87,29 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
|
|||||||
createProvisionedPVInterval: createProvisionedPVInterval,
|
createProvisionedPVInterval: createProvisionedPVInterval,
|
||||||
claimQueue: workqueue.NewNamed("claims"),
|
claimQueue: workqueue.NewNamed("claims"),
|
||||||
volumeQueue: workqueue.NewNamed("volumes"),
|
volumeQueue: workqueue.NewNamed("volumes"),
|
||||||
|
resyncPeriod: p.SyncPeriod,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, controller); err != nil {
|
if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, controller); err != nil {
|
||||||
return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolume Controller: %v", err)
|
return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolume Controller: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.VolumeInformer.Informer().AddEventHandlerWithResyncPeriod(
|
p.VolumeInformer.Informer().AddEventHandler(
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
|
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
|
||||||
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
|
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
|
||||||
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
|
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
|
||||||
},
|
},
|
||||||
p.SyncPeriod,
|
|
||||||
)
|
)
|
||||||
controller.volumeLister = p.VolumeInformer.Lister()
|
controller.volumeLister = p.VolumeInformer.Lister()
|
||||||
controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced
|
controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced
|
||||||
|
|
||||||
p.ClaimInformer.Informer().AddEventHandlerWithResyncPeriod(
|
p.ClaimInformer.Informer().AddEventHandler(
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
|
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
|
||||||
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
|
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
|
||||||
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
|
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
|
||||||
},
|
},
|
||||||
p.SyncPeriod,
|
|
||||||
)
|
)
|
||||||
controller.claimLister = p.ClaimInformer.Lister()
|
controller.claimLister = p.ClaimInformer.Lister()
|
||||||
controller.claimListerSynced = p.ClaimInformer.Informer().HasSynced
|
controller.claimListerSynced = p.ClaimInformer.Informer().HasSynced
|
||||||
@ -276,6 +275,7 @@ func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
|
|||||||
|
|
||||||
ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)
|
ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)
|
||||||
|
|
||||||
|
go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)
|
||||||
go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
|
go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
|
||||||
go wait.Until(ctrl.claimWorker, time.Second, stopCh)
|
go wait.Until(ctrl.claimWorker, time.Second, stopCh)
|
||||||
|
|
||||||
@ -397,6 +397,31 @@ func (ctrl *PersistentVolumeController) claimWorker() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// resync supplements short resync period of shared informers - we don't want
|
||||||
|
// all consumers of PV/PVC shared informer to have a short resync period,
|
||||||
|
// therefore we do our own.
|
||||||
|
func (ctrl *PersistentVolumeController) resync() {
|
||||||
|
glog.V(4).Infof("resyncing PV controller")
|
||||||
|
|
||||||
|
pvcs, err := ctrl.claimLister.List(labels.NewSelector())
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("cannot list claims: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, pvc := range pvcs {
|
||||||
|
ctrl.enqueueWork(ctrl.claimQueue, pvc)
|
||||||
|
}
|
||||||
|
|
||||||
|
pvs, err := ctrl.volumeLister.List(labels.NewSelector())
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("cannot list persistent volumes: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, pv := range pvs {
|
||||||
|
ctrl.enqueueWork(ctrl.volumeQueue, pv)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// setClaimProvisioner saves
|
// setClaimProvisioner saves
|
||||||
// claim.Annotations[annStorageProvisioner] = class.Provisioner
|
// claim.Annotations[annStorageProvisioner] = class.Provisioner
|
||||||
func (ctrl *PersistentVolumeController) setClaimProvisioner(claim *v1.PersistentVolumeClaim, class *storage.StorageClass) (*v1.PersistentVolumeClaim, error) {
|
func (ctrl *PersistentVolumeController) setClaimProvisioner(claim *v1.PersistentVolumeClaim, class *storage.StorageClass) (*v1.PersistentVolumeClaim, error) {
|
||||||
|
@ -145,8 +145,7 @@ func TestControllerSync(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// Simulate a periodic resync, just in case some events arrived in a
|
// Simulate a periodic resync, just in case some events arrived in a
|
||||||
// wrong order.
|
// wrong order.
|
||||||
ctrl.claims.Resync()
|
ctrl.resync()
|
||||||
ctrl.volumes.store.Resync()
|
|
||||||
|
|
||||||
err = reactor.waitTest(test)
|
err = reactor.waitTest(test)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user