From 5fafae11d8dc33ae3120f2b686d762d4101d4ecc Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Sat, 4 Aug 2018 12:05:55 +0800 Subject: [PATCH] attachdetach controller: attach volumes immediately when Pod's PVCs are bound - Use queue to process PVCs on add/update events - Index pods by PVC key then we don't need to iterate to find pods --- pkg/controller/volume/attachdetach/BUILD | 3 + .../attachdetach/attach_detach_controller.go | 127 ++++++++++++++++++ 2 files changed, 130 insertions(+) diff --git a/pkg/controller/volume/attachdetach/BUILD b/pkg/controller/volume/attachdetach/BUILD index 81ab0132587..6344b14e8cd 100644 --- a/pkg/controller/volume/attachdetach/BUILD +++ b/pkg/controller/volume/attachdetach/BUILD @@ -26,9 +26,11 @@ go_library( "//pkg/volume/util/volumepathhandler:go_default_library", "//staging/src/k8s.io/api/authentication/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", @@ -36,6 +38,7 @@ go_library( "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], ) diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 95a114807e2..b63458c81ff 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -26,9 +26,11 @@ import ( "github.com/golang/glog" authenticationv1 "k8s.io/api/authentication/v1" "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -36,6 +38,7 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" kcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" @@ -125,9 +128,11 @@ func NewAttachDetachController( pvsSynced: pvInformer.Informer().HasSynced, podLister: podInformer.Lister(), podsSynced: podInformer.Informer().HasSynced, + podIndexer: podInformer.Informer().GetIndexer(), nodeLister: nodeInformer.Lister(), nodesSynced: nodeInformer.Informer().HasSynced, cloud: cloud, + pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"), } if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil { @@ -179,15 +184,54 @@ func NewAttachDetachController( DeleteFunc: adc.podDelete, }) + // This custom indexer will index pods by its PVC keys. Then we don't need + // to iterate all pods every time to find pods which reference given PVC. + adc.podIndexer.AddIndexers(kcache.Indexers{ + pvcKeyIndex: indexByPVCKey, + }) + nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ AddFunc: adc.nodeAdd, UpdateFunc: adc.nodeUpdate, DeleteFunc: adc.nodeDelete, }) + pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + adc.enqueuePVC(obj) + }, + UpdateFunc: func(old, new interface{}) { + adc.enqueuePVC(new) + }, + }) + return adc, nil } +const ( + pvcKeyIndex string = "pvcKey" +) + +// indexByPVCKey returns PVC keys for given pod. Note that the index is only +// used for attaching, so we are only interested in active pods with nodeName +// set. +func indexByPVCKey(obj interface{}) ([]string, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return []string{}, nil + } + if len(pod.Spec.NodeName) == 0 || volumeutil.IsPodTerminated(pod, pod.Status) { + return []string{}, nil + } + keys := []string{} + for _, podVolume := range pod.Spec.Volumes { + if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil { + keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName)) + } + } + return keys, nil +} + type attachDetachController struct { // kubeClient is the kube API client used by volumehost to communicate with // the API server. @@ -207,6 +251,7 @@ type attachDetachController struct { podLister corelisters.PodLister podsSynced kcache.InformerSynced + podIndexer kcache.Indexer nodeLister corelisters.NodeLister nodesSynced kcache.InformerSynced @@ -251,10 +296,14 @@ type attachDetachController struct { // recorder is used to record events in the API server recorder record.EventRecorder + + // pvcQueue is used to queue pvc objects + pvcQueue workqueue.RateLimitingInterface } func (adc *attachDetachController) Run(stopCh <-chan struct{}) { defer runtime.HandleCrash() + defer adc.pvcQueue.ShutDown() glog.Infof("Starting attach detach controller") defer glog.Infof("Shutting down attach detach controller") @@ -273,6 +322,7 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) { } go adc.reconciler.Run(stopCh) go adc.desiredStateOfWorldPopulator.Run(stopCh) + go wait.Until(adc.pvcWorker, time.Second, stopCh) metrics.Register(adc.pvcLister, adc.pvLister, adc.podLister, &adc.volumePluginMgr) <-stopCh @@ -486,6 +536,83 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) { adc.processVolumesInUse(nodeName, node.Status.VolumesInUse) } +func (adc *attachDetachController) enqueuePVC(obj interface{}) { + key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + return + } + adc.pvcQueue.Add(key) +} + +// pvcWorker processes items from pvcQueue +func (adc *attachDetachController) pvcWorker() { + for adc.processNextItem() { + } +} + +func (adc *attachDetachController) processNextItem() bool { + keyObj, shutdown := adc.pvcQueue.Get() + if shutdown { + return false + } + defer adc.pvcQueue.Done(keyObj) + + if err := adc.syncPVCByKey(keyObj.(string)); err != nil { + // Rather than wait for a full resync, re-add the key to the + // queue to be processed. + adc.pvcQueue.AddRateLimited(keyObj) + runtime.HandleError(fmt.Errorf("Failed to sync pvc %q, will retry again: %v", keyObj.(string), err)) + return true + } + + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + adc.pvcQueue.Forget(keyObj) + return true +} + +func (adc *attachDetachController) syncPVCByKey(key string) error { + glog.V(5).Infof("syncPVCByKey[%s]", key) + namespace, name, err := kcache.SplitMetaNamespaceKey(key) + if err != nil { + glog.V(4).Infof("error getting namespace & name of pvc %q to get pvc from informer: %v", key, err) + return nil + } + pvc, err := adc.pvcLister.PersistentVolumeClaims(namespace).Get(name) + if apierrors.IsNotFound(err) { + glog.V(4).Infof("error getting pvc %q from informer: %v", key, err) + return nil + } + if err != nil { + return err + } + + if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" { + // Skip unbound PVCs. + return nil + } + + objs, err := adc.podIndexer.ByIndex(pvcKeyIndex, key) + if err != nil { + return err + } + for _, obj := range objs { + pod, ok := obj.(*v1.Pod) + if !ok { + continue + } + volumeActionFlag := util.DetermineVolumeAction( + pod, + adc.desiredStateOfWorld, + true /* default volume action */) + + util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */ + adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister) + } + return nil +} + // processVolumesInUse processes the list of volumes marked as "in-use" // according to the specified Node's Status.VolumesInUse and updates the // corresponding volume in the actual state of the world to indicate that it is