Switch pv controller to shared informers

This commit is contained in:
Matthew Wong
2017-02-16 10:08:23 -05:00
parent 7d4383c6e1
commit 33f98d4db3
13 changed files with 216 additions and 635 deletions

View File

@@ -24,9 +24,9 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
@@ -36,6 +36,9 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
storage "k8s.io/kubernetes/pkg/apis/storage/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
storageinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/storage/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/goroutinemap"
@@ -51,15 +54,17 @@ import (
// ControllerParameters contains arguments for creation of a new
// PersistentVolume controller.
type ControllerParameters struct {
KubeClient clientset.Interface
SyncPeriod time.Duration
AlphaProvisioner vol.ProvisionableVolumePlugin
VolumePlugins []vol.VolumePlugin
Cloud cloudprovider.Interface
ClusterName string
VolumeSource, ClaimSource, ClassSource cache.ListerWatcher
EventRecorder record.EventRecorder
EnableDynamicProvisioning bool
KubeClient clientset.Interface
SyncPeriod time.Duration
AlphaProvisioner vol.ProvisionableVolumePlugin
VolumePlugins []vol.VolumePlugin
Cloud cloudprovider.Interface
ClusterName string
VolumeInformer coreinformers.PersistentVolumeInformer
ClaimInformer coreinformers.PersistentVolumeClaimInformer
ClassInformer storageinformers.StorageClassInformer
EventRecorder record.EventRecorder
EnableDynamicProvisioning bool
}
// NewController creates a new PersistentVolume controller
@@ -94,98 +99,47 @@ func NewController(p ControllerParameters) *PersistentVolumeController {
}
}
volumeSource := p.VolumeSource
if volumeSource == nil {
volumeSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return p.KubeClient.Core().PersistentVolumes().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return p.KubeClient.Core().PersistentVolumes().Watch(options)
},
}
}
controller.volumeSource = volumeSource
claimSource := p.ClaimSource
if claimSource == nil {
claimSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return p.KubeClient.Core().PersistentVolumeClaims(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return p.KubeClient.Core().PersistentVolumeClaims(metav1.NamespaceAll).Watch(options)
},
}
}
controller.claimSource = claimSource
classSource := p.ClassSource
if classSource == nil {
classSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return p.KubeClient.Storage().StorageClasses().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return p.KubeClient.Storage().StorageClasses().Watch(options)
},
}
}
controller.classSource = classSource
controller.volumeInformer, controller.volumeController = cache.NewIndexerInformer(
volumeSource,
&v1.PersistentVolume{},
p.SyncPeriod,
p.VolumeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
},
cache.Indexers{"accessmodes": accessModesIndexFunc},
)
controller.claimInformer, controller.claimController = cache.NewInformer(
claimSource,
&v1.PersistentVolumeClaim{},
p.SyncPeriod,
)
controller.volumeLister = p.VolumeInformer.Lister()
controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced
p.ClaimInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
},
)
// This is just a cache of StorageClass instances, no special actions are
// needed when a class is created/deleted/updated.
controller.classes = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
controller.classReflector = cache.NewReflector(
classSource,
&storage.StorageClass{},
controller.classes,
p.SyncPeriod,
)
controller.claimLister = p.ClaimInformer.Lister()
controller.claimListerSynced = p.ClaimInformer.Informer().HasSynced
controller.classLister = p.ClassInformer.Lister()
controller.classListerSynced = p.ClassInformer.Informer().HasSynced
return controller
}
// initializeCaches fills all controller caches with initial data from etcd in
// order to have the caches already filled when first addClaim/addVolume to
// perform initial synchronization of the controller.
func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSource cache.ListerWatcher) {
volumeListObj, err := volumeSource.List(metav1.ListOptions{})
func (ctrl *PersistentVolumeController) initializeCaches(volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister) {
volumeList, err := volumeLister.List(labels.Everything())
if err != nil {
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
return
}
volumeList, ok := volumeListObj.(*v1.PersistentVolumeList)
if !ok {
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %#v", volumeListObj)
return
}
for _, volume := range volumeList.Items {
for _, volume := range volumeList {
// Ignore template volumes from kubernetes 1.2
deleted := ctrl.upgradeVolumeFrom1_2(&volume)
deleted := ctrl.upgradeVolumeFrom1_2(volume)
if !deleted {
clone, err := api.Scheme.DeepCopy(&volume)
clone, err := api.Scheme.DeepCopy(volume)
if err != nil {
glog.Errorf("error cloning volume %q: %v", volume.Name, err)
continue
@@ -195,20 +149,15 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour
}
}
claimListObj, err := claimSource.List(metav1.ListOptions{})
claimList, err := claimLister.List(labels.Everything())
if err != nil {
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
return
}
claimList, ok := claimListObj.(*v1.PersistentVolumeClaimList)
if !ok {
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of claims, got: %#v", claimListObj)
return
}
for _, claim := range claimList.Items {
clone, err := api.Scheme.DeepCopy(&claim)
for _, claim := range claimList {
clone, err := api.Scheme.DeepCopy(claim)
if err != nil {
glog.Errorf("error cloning claim %q: %v", claimToClaimKey(&claim), err)
glog.Errorf("error cloning claim %q: %v", claimToClaimKey(claim), err)
continue
}
claimClone := clone.(*v1.PersistentVolumeClaim)
@@ -326,10 +275,11 @@ func (ctrl *PersistentVolumeController) deleteClaim(claim *v1.PersistentVolumeCl
// Run starts all of this controller's control loops
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
glog.V(1).Infof("starting PersistentVolumeController")
ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource)
go ctrl.volumeController.Run(stopCh)
go ctrl.claimController.Run(stopCh)
go ctrl.classReflector.RunUntil(stopCh)
if !cache.WaitForCacheSync(stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for volume caches to sync"))
return
}
ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)
go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
go wait.Until(ctrl.claimWorker, time.Second, stopCh)
@@ -351,27 +301,26 @@ func (ctrl *PersistentVolumeController) volumeWorker() {
key := keyObj.(string)
glog.V(5).Infof("volumeWorker[%s]", key)
volumeObj, found, err := ctrl.volumeInformer.GetByKey(key)
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.V(2).Infof("error getting volume %q from informer: %v", key, err)
glog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)
return false
}
if found {
volume, err := ctrl.volumeLister.Get(name)
if err == nil {
// The volume still exists in informer cache, the event must have
// been add/update/sync
volume, ok := volumeObj.(*v1.PersistentVolume)
if !ok {
glog.Errorf("expected volume, got %+v", volumeObj)
return false
}
ctrl.updateVolume(volume)
return false
}
if !errors.IsNotFound(err) {
glog.V(2).Infof("error getting volume %q from informer: %v", key, err)
return false
}
// The volume is not in informer cache, the event must have been
// "delete"
volumeObj, found, err = ctrl.volumes.store.GetByKey(key)
volumeObj, found, err := ctrl.volumes.store.GetByKey(key)
if err != nil {
glog.V(2).Infof("error getting volume %q from cache: %v", key, err)
return false
@@ -410,26 +359,25 @@ func (ctrl *PersistentVolumeController) claimWorker() {
key := keyObj.(string)
glog.V(5).Infof("claimWorker[%s]", key)
claimObj, found, err := ctrl.claimInformer.GetByKey(key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)
return false
}
claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)
if err == nil {
// The claim still exists in informer cache, the event must have
// been add/update/sync
ctrl.updateClaim(claim)
return false
}
if !errors.IsNotFound(err) {
glog.V(2).Infof("error getting claim %q from informer: %v", key, err)
return false
}
if found {
// The claim still exists in informer cache, the event must have
// been add/update/sync
claim, ok := claimObj.(*v1.PersistentVolumeClaim)
if !ok {
glog.Errorf("expected claim, got %+v", claimObj)
return false
}
ctrl.updateClaim(claim)
return false
}
// The claim is not in informer cache, the event must have been "delete"
claimObj, found, err = ctrl.claims.GetByKey(key)
claimObj, found, err := ctrl.claims.GetByKey(key)
if err != nil {
glog.V(2).Infof("error getting claim %q from cache: %v", key, err)
return false