From 2cdc63aeaab603c61b48e95c044f1e4bb2f32548 Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Wed, 1 Jul 2020 15:01:10 +0800 Subject: [PATCH] cleanup in volume scheduling pkg - remove duplicated function claimToClaimKey - cache Lister - don't add indexer for PVCAssumeCache --- .../scheduling/scheduler_assume_cache.go | 10 ++- .../volume/scheduling/scheduler_binder.go | 75 +++++++++---------- 2 files changed, 43 insertions(+), 42 deletions(-) diff --git a/pkg/controller/volume/scheduling/scheduler_assume_cache.go b/pkg/controller/volume/scheduling/scheduler_assume_cache.go index 88fbf45b7bc..355da12aab6 100644 --- a/pkg/controller/volume/scheduling/scheduler_assume_cache.go +++ b/pkg/controller/volume/scheduling/scheduler_assume_cache.go @@ -23,7 +23,7 @@ import ( "k8s.io/klog/v2" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/client-go/tools/cache" ) @@ -134,7 +134,11 @@ func NewAssumeCache(informer cache.SharedIndexInformer, description, indexName s indexFunc: indexFunc, indexName: indexName, } - c.store = cache.NewIndexer(objInfoKeyFunc, cache.Indexers{indexName: c.objInfoIndexFunc}) + indexers := cache.Indexers{} + if indexName != "" && indexFunc != nil { + indexers[indexName] = c.objInfoIndexFunc + } + c.store = cache.NewIndexer(objInfoKeyFunc, indexers) // Unit tests don't use informers if informer != nil { @@ -422,7 +426,7 @@ type pvcAssumeCache struct { // NewPVCAssumeCache creates a PVC assume cache. func NewPVCAssumeCache(informer cache.SharedIndexInformer) PVCAssumeCache { - return &pvcAssumeCache{NewAssumeCache(informer, "v1.PersistentVolumeClaim", "namespace", cache.MetaNamespaceIndexFunc)} + return &pvcAssumeCache{NewAssumeCache(informer, "v1.PersistentVolumeClaim", "", nil)} } func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { diff --git a/pkg/controller/volume/scheduling/scheduler_binder.go b/pkg/controller/volume/scheduling/scheduler_binder.go index 6dbab627f98..949db90e48c 100644 --- a/pkg/controller/volume/scheduling/scheduler_binder.go +++ b/pkg/controller/volume/scheduling/scheduler_binder.go @@ -115,7 +115,7 @@ type InTreeToCSITranslator interface { // PV controller to fully bind and provision the PVCs. If binding fails, the Pod is sent // back through the scheduler. // ii. After BindPodVolumes() is complete, then the scheduler does the final Pod->Node binding. -// 2. Once all the assume operations are done in d), the scheduler processes the next Pod in the scheduler queue +// 2. Once all the assume operations are done in e), the scheduler processes the next Pod in the scheduler queue // while the actual binding operation occurs in the background. type SchedulerVolumeBinder interface { // GetPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning) @@ -160,14 +160,15 @@ type SchedulerVolumeBinder interface { } type volumeBinder struct { - kubeClient clientset.Interface - classLister storagelisters.StorageClassLister + kubeClient clientset.Interface - podLister corelisters.PodLister - nodeInformer coreinformers.NodeInformer - csiNodeInformer storageinformers.CSINodeInformer - pvcCache PVCAssumeCache - pvCache PVAssumeCache + classLister storagelisters.StorageClassLister + podLister corelisters.PodLister + nodeLister corelisters.NodeLister + csiNodeLister storagelisters.CSINodeLister + + pvcCache PVCAssumeCache + pvCache PVAssumeCache // Amount of time to wait for the bind operation to succeed bindTimeout time.Duration @@ -186,15 +187,15 @@ func NewVolumeBinder( storageClassInformer storageinformers.StorageClassInformer, bindTimeout time.Duration) SchedulerVolumeBinder { return &volumeBinder{ - kubeClient: kubeClient, - podLister: podInformer.Lister(), - classLister: storageClassInformer.Lister(), - nodeInformer: nodeInformer, - csiNodeInformer: csiNodeInformer, - pvcCache: NewPVCAssumeCache(pvcInformer.Informer()), - pvCache: NewPVAssumeCache(pvInformer.Informer()), - bindTimeout: bindTimeout, - translator: csitrans.New(), + kubeClient: kubeClient, + podLister: podInformer.Lister(), + classLister: storageClassInformer.Lister(), + nodeLister: nodeInformer.Lister(), + csiNodeLister: csiNodeInformer.Lister(), + pvcCache: NewPVCAssumeCache(pvcInformer.Informer()), + pvCache: NewPVAssumeCache(pvInformer.Informer()), + bindTimeout: bindTimeout, + translator: csitrans.New(), } } @@ -234,20 +235,20 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []* }() var ( - matchedBindings []*BindingInfo - provisionedClaims []*v1.PersistentVolumeClaim + staticBindings []*BindingInfo + dynamicProvisions []*v1.PersistentVolumeClaim ) defer func() { // Although we do not distinguish nil from empty in this function, for // easier testing, we normalize empty to nil. - if len(matchedBindings) == 0 { - matchedBindings = nil + if len(staticBindings) == 0 { + staticBindings = nil } - if len(provisionedClaims) == 0 { - provisionedClaims = nil + if len(dynamicProvisions) == 0 { + dynamicProvisions = nil } - podVolumes.StaticBindings = matchedBindings - podVolumes.DynamicProvisions = provisionedClaims + podVolumes.StaticBindings = staticBindings + podVolumes.DynamicProvisions = dynamicProvisions }() // Check PV node affinity on bound volumes @@ -282,7 +283,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []* // Find matching volumes if len(claimsToFindMatching) > 0 { var unboundClaims []*v1.PersistentVolumeClaim - unboundVolumesSatisfied, matchedBindings, unboundClaims, err = b.findMatchingVolumes(pod, claimsToFindMatching, node) + unboundVolumesSatisfied, staticBindings, unboundClaims, err = b.findMatchingVolumes(pod, claimsToFindMatching, node) if err != nil { return } @@ -291,7 +292,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []* // Check for claims to provision if len(claimsToProvision) > 0 { - unboundVolumesSatisfied, provisionedClaims, err = b.checkVolumeProvisions(pod, claimsToProvision, node) + unboundVolumesSatisfied, dynamicProvisions, err = b.checkVolumeProvisions(pod, claimsToProvision, node) if err != nil { return } @@ -452,7 +453,7 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*BindingInfo, cl for _, binding = range bindings { klog.V(5).Infof("bindAPIUpdate: Pod %q, binding PV %q to PVC %q", podName, binding.pv.Name, binding.pvc.Name) // TODO: does it hurt if we make an api call and nothing needs to be updated? - claimKey := claimToClaimKey(binding.pvc) + claimKey := getPVCName(binding.pvc) klog.V(2).Infof("claim %q bound to volume %q", claimKey, binding.pv.Name) newPV, err := b.kubeClient.CoreV1().PersistentVolumes().Update(context.TODO(), binding.pv, metav1.UpdateOptions{}) if err != nil { @@ -504,12 +505,12 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claim return false, fmt.Errorf("failed to get cached claims to provision for pod %q", podName) } - node, err := b.nodeInformer.Lister().Get(pod.Spec.NodeName) + node, err := b.nodeLister.Get(pod.Spec.NodeName) if err != nil { return false, fmt.Errorf("failed to get node %q: %v", pod.Spec.NodeName, err) } - csiNode, err := b.csiNodeInformer.Lister().Get(node.Name) + csiNode, err := b.csiNodeLister.Get(node.Name) if err != nil { // TODO: return the error once CSINode is created by default klog.V(4).Infof("Could not get a CSINode object for the node %q: %v", node.Name, err) @@ -711,7 +712,7 @@ func (b *volumeBinder) GetPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV } func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node *v1.Node, podName string) (bool, error) { - csiNode, err := b.csiNodeInformer.Lister().Get(node.Name) + csiNode, err := b.csiNodeLister.Get(node.Name) if err != nil { // TODO: return the error once CSINode is created by default klog.V(4).Infof("Could not get a CSINode object for the node %q: %v", node.Name, err) @@ -786,9 +787,9 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi // checkVolumeProvisions checks given unbound claims (the claims have gone through func // findMatchingVolumes, and do not have matching volumes for binding), and return true // if all of the claims are eligible for dynamic provision. -func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, provisionedClaims []*v1.PersistentVolumeClaim, err error) { +func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, dynamicProvisions []*v1.PersistentVolumeClaim, err error) { podName := getPodName(pod) - provisionedClaims = []*v1.PersistentVolumeClaim{} + dynamicProvisions = []*v1.PersistentVolumeClaim{} for _, claim := range claimsToProvision { pvcName := getPVCName(claim) @@ -816,12 +817,12 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v // TODO: Check if capacity of the node domain in the storage class // can satisfy resource requirement of given claim - provisionedClaims = append(provisionedClaims, claim) + dynamicProvisions = append(dynamicProvisions, claim) } klog.V(4).Infof("Provisioning for %d claims of pod %q that has no matching volumes on node %q ...", len(claimsToProvision), podName, node.Name) - return true, provisionedClaims, nil + return true, dynamicProvisions, nil } func (b *volumeBinder) revertAssumedPVs(bindings []*BindingInfo) { @@ -853,10 +854,6 @@ func (a byPVCSize) Less(i, j int) bool { return iSize.Cmp(jSize) == -1 } -func claimToClaimKey(claim *v1.PersistentVolumeClaim) string { - return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name) -} - // isCSIMigrationOnForPlugin checks if CSI migrartion is enabled for a given plugin. func isCSIMigrationOnForPlugin(pluginName string) bool { switch pluginName {