mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
attachdetach controller: attach volumes immediately when Pod's PVCs are bound
- Use queue to process PVCs on add/update events - Index pods by PVC key then we don't need to iterate to find pods
This commit is contained in:
parent
23111ad414
commit
5fafae11d8
@ -26,9 +26,11 @@ go_library(
|
||||
"//pkg/volume/util/volumepathhandler:go_default_library",
|
||||
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||
@ -36,6 +38,7 @@ go_library(
|
||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -26,9 +26,11 @@ import (
|
||||
"github.com/golang/glog"
|
||||
authenticationv1 "k8s.io/api/authentication/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
@ -36,6 +38,7 @@ import (
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
kcache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
|
||||
@ -125,9 +128,11 @@ func NewAttachDetachController(
|
||||
pvsSynced: pvInformer.Informer().HasSynced,
|
||||
podLister: podInformer.Lister(),
|
||||
podsSynced: podInformer.Informer().HasSynced,
|
||||
podIndexer: podInformer.Informer().GetIndexer(),
|
||||
nodeLister: nodeInformer.Lister(),
|
||||
nodesSynced: nodeInformer.Informer().HasSynced,
|
||||
cloud: cloud,
|
||||
pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"),
|
||||
}
|
||||
|
||||
if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
|
||||
@ -179,15 +184,54 @@ func NewAttachDetachController(
|
||||
DeleteFunc: adc.podDelete,
|
||||
})
|
||||
|
||||
// This custom indexer will index pods by its PVC keys. Then we don't need
|
||||
// to iterate all pods every time to find pods which reference given PVC.
|
||||
adc.podIndexer.AddIndexers(kcache.Indexers{
|
||||
pvcKeyIndex: indexByPVCKey,
|
||||
})
|
||||
|
||||
nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
|
||||
AddFunc: adc.nodeAdd,
|
||||
UpdateFunc: adc.nodeUpdate,
|
||||
DeleteFunc: adc.nodeDelete,
|
||||
})
|
||||
|
||||
pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
adc.enqueuePVC(obj)
|
||||
},
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
adc.enqueuePVC(new)
|
||||
},
|
||||
})
|
||||
|
||||
return adc, nil
|
||||
}
|
||||
|
||||
const (
|
||||
pvcKeyIndex string = "pvcKey"
|
||||
)
|
||||
|
||||
// indexByPVCKey returns PVC keys for given pod. Note that the index is only
|
||||
// used for attaching, so we are only interested in active pods with nodeName
|
||||
// set.
|
||||
func indexByPVCKey(obj interface{}) ([]string, error) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
return []string{}, nil
|
||||
}
|
||||
if len(pod.Spec.NodeName) == 0 || volumeutil.IsPodTerminated(pod, pod.Status) {
|
||||
return []string{}, nil
|
||||
}
|
||||
keys := []string{}
|
||||
for _, podVolume := range pod.Spec.Volumes {
|
||||
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
|
||||
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName))
|
||||
}
|
||||
}
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
type attachDetachController struct {
|
||||
// kubeClient is the kube API client used by volumehost to communicate with
|
||||
// the API server.
|
||||
@ -207,6 +251,7 @@ type attachDetachController struct {
|
||||
|
||||
podLister corelisters.PodLister
|
||||
podsSynced kcache.InformerSynced
|
||||
podIndexer kcache.Indexer
|
||||
|
||||
nodeLister corelisters.NodeLister
|
||||
nodesSynced kcache.InformerSynced
|
||||
@ -251,10 +296,14 @@ type attachDetachController struct {
|
||||
|
||||
// recorder is used to record events in the API server
|
||||
recorder record.EventRecorder
|
||||
|
||||
// pvcQueue is used to queue pvc objects
|
||||
pvcQueue workqueue.RateLimitingInterface
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
|
||||
defer runtime.HandleCrash()
|
||||
defer adc.pvcQueue.ShutDown()
|
||||
|
||||
glog.Infof("Starting attach detach controller")
|
||||
defer glog.Infof("Shutting down attach detach controller")
|
||||
@ -273,6 +322,7 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
|
||||
}
|
||||
go adc.reconciler.Run(stopCh)
|
||||
go adc.desiredStateOfWorldPopulator.Run(stopCh)
|
||||
go wait.Until(adc.pvcWorker, time.Second, stopCh)
|
||||
metrics.Register(adc.pvcLister, adc.pvLister, adc.podLister, &adc.volumePluginMgr)
|
||||
|
||||
<-stopCh
|
||||
@ -486,6 +536,83 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) {
|
||||
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) enqueuePVC(obj interface{}) {
|
||||
key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
|
||||
return
|
||||
}
|
||||
adc.pvcQueue.Add(key)
|
||||
}
|
||||
|
||||
// pvcWorker processes items from pvcQueue
|
||||
func (adc *attachDetachController) pvcWorker() {
|
||||
for adc.processNextItem() {
|
||||
}
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) processNextItem() bool {
|
||||
keyObj, shutdown := adc.pvcQueue.Get()
|
||||
if shutdown {
|
||||
return false
|
||||
}
|
||||
defer adc.pvcQueue.Done(keyObj)
|
||||
|
||||
if err := adc.syncPVCByKey(keyObj.(string)); err != nil {
|
||||
// Rather than wait for a full resync, re-add the key to the
|
||||
// queue to be processed.
|
||||
adc.pvcQueue.AddRateLimited(keyObj)
|
||||
runtime.HandleError(fmt.Errorf("Failed to sync pvc %q, will retry again: %v", keyObj.(string), err))
|
||||
return true
|
||||
}
|
||||
|
||||
// Finally, if no error occurs we Forget this item so it does not
|
||||
// get queued again until another change happens.
|
||||
adc.pvcQueue.Forget(keyObj)
|
||||
return true
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) syncPVCByKey(key string) error {
|
||||
glog.V(5).Infof("syncPVCByKey[%s]", key)
|
||||
namespace, name, err := kcache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("error getting namespace & name of pvc %q to get pvc from informer: %v", key, err)
|
||||
return nil
|
||||
}
|
||||
pvc, err := adc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
|
||||
if apierrors.IsNotFound(err) {
|
||||
glog.V(4).Infof("error getting pvc %q from informer: %v", key, err)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
|
||||
// Skip unbound PVCs.
|
||||
return nil
|
||||
}
|
||||
|
||||
objs, err := adc.podIndexer.ByIndex(pvcKeyIndex, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, obj := range objs {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
volumeActionFlag := util.DetermineVolumeAction(
|
||||
pod,
|
||||
adc.desiredStateOfWorld,
|
||||
true /* default volume action */)
|
||||
|
||||
util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
|
||||
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// processVolumesInUse processes the list of volumes marked as "in-use"
|
||||
// according to the specified Node's Status.VolumesInUse and updates the
|
||||
// corresponding volume in the actual state of the world to indicate that it is
|
||||
|
Loading…
Reference in New Issue
Block a user