diff --git a/pkg/controller/volume/persistentvolume/BUILD b/pkg/controller/volume/persistentvolume/BUILD index d0c648f5cfb..8e3f145dab3 100644 --- a/pkg/controller/volume/persistentvolume/BUILD +++ b/pkg/controller/volume/persistentvolume/BUILD @@ -44,6 +44,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", @@ -92,6 +93,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index cd9143e2800..0c9534bca68 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -285,15 +285,16 @@ func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVo return nil } -func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) { +func (ctrl *PersistentVolumeController) isDelayBindingProvisioning(claim *v1.PersistentVolumeClaim) bool { // When feature VolumeScheduling enabled, // Scheduler signal to the PV controller to start dynamic // provisioning by setting the "annSelectedNode" annotation // in the PVC - if _, ok := claim.Annotations[annSelectedNode]; ok { - return false, nil - } + _, ok := claim.Annotations[annSelectedNode] + return ok +} +func (ctrl *PersistentVolumeController) isDelayBindingMode(claim *v1.PersistentVolumeClaim) (bool, error) { className := v1helper.GetPersistentVolumeClaimClass(claim) if className == "" { return false, nil @@ -311,6 +312,18 @@ func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentV return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil } +// shouldDelayBinding returns true if binding of claim should be delayed, false otherwise. +// If binding of claim should be delayed, only claims pbound by scheduler +func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) { + // If claim has already been assigned a node by scheduler for dynamic provisioning. + if ctrl.isDelayBindingProvisioning(claim) { + return false, nil + } + + // If claim is in delay binding mode. + return ctrl.isDelayBindingMode(claim) +} + // syncUnboundClaim is the main controller method to decide what to do with an // unbound claim. func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error { diff --git a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go index 2cc50a91a5f..cd4bc88d7ca 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go +++ b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go @@ -42,6 +42,9 @@ type AssumeCache interface { // Get the object by name Get(objName string) (interface{}, error) + // Get the API object by name + GetAPIObj(objName string) (interface{}, error) + // List all the objects in the cache List(indexObj interface{}) []interface{} } @@ -250,6 +253,17 @@ func (c *assumeCache) Get(objName string) (interface{}, error) { return objInfo.latestObj, nil } +func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) { + c.rwMutex.RLock() + defer c.rwMutex.RUnlock() + + objInfo, err := c.getObjInfo(objName) + if err != nil { + return nil, err + } + return objInfo.apiObj, nil +} + func (c *assumeCache) List(indexObj interface{}) []interface{} { c.rwMutex.RLock() defer c.rwMutex.RUnlock() @@ -297,7 +311,7 @@ func (c *assumeCache) Assume(obj interface{}) error { } if newVersion < storedVersion { - return fmt.Errorf("%v %q is out of sync", c.description, name) + return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) } // Only update the cached object @@ -325,6 +339,7 @@ type PVAssumeCache interface { AssumeCache GetPV(pvName string) (*v1.PersistentVolume, error) + GetAPIPV(pvName string) (*v1.PersistentVolume, error) ListPVs(storageClassName string) []*v1.PersistentVolume } @@ -356,6 +371,18 @@ func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { return pv, nil } +func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) { + obj, err := c.GetAPIObj(pvName) + if err != nil { + return nil, err + } + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + return nil, &errWrongType{"v1.PersistentVolume", obj} + } + return pv, nil +} + func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume { objs := c.List(&v1.PersistentVolume{ Spec: v1.PersistentVolumeSpec{ @@ -380,6 +407,7 @@ type PVCAssumeCache interface { // GetPVC returns the PVC from the cache with given pvcKey. // pvcKey is the result of MetaNamespaceKeyFunc on PVC obj GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) + GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) } type pvcAssumeCache struct { @@ -402,3 +430,15 @@ func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error } return pvc, nil } + +func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { + obj, err := c.GetAPIObj(pvcKey) + if err != nil { + return nil, err + } + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + return nil, &errWrongType{"v1.PersistentVolumeClaim", obj} + } + return pvc, nil +} diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder.go b/pkg/controller/volume/persistentvolume/scheduler_binder.go index 1d4918bbcd3..737c195611d 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/storage/etcd" coreinformers "k8s.io/client-go/informers/core/v1" storageinformers "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" @@ -145,6 +146,27 @@ func (b *volumeBinder) GetBindingsCache() PodBindingCache { // This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer. // That's necessary because some operations will need to pass in to the predicate fake node objects. func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) { + var ( + matchedClaims []*bindingInfo + provisionedClaims []*v1.PersistentVolumeClaim + ) + defer func() { + // We recreate bindings for each new schedule loop. + // Although we do not distinguish nil from empty in this function, for + // easier testing, we normalize empty to nil. + if len(matchedClaims) == 0 { + matchedClaims = nil + } + if len(provisionedClaims) == 0 { + provisionedClaims = nil + } + // TODO merge into one atomic function + // Mark cache with all the matches for each PVC for this node + b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims) + // Mark cache with all the PVCs that need provisioning for this node + b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims) + }() + podName := getPodName(pod) // Warning: Below log needs high verbosity as it can be printed several times (#60933). @@ -181,16 +203,39 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume } } + // Find matching volumes and node for unbound claims if len(claimsToBind) > 0 { - var claimsToProvision []*v1.PersistentVolumeClaim - unboundVolumesSatisfied, claimsToProvision, err = b.findMatchingVolumes(pod, claimsToBind, node) - if err != nil { - return false, false, err + var ( + claimsToFindMatching []*v1.PersistentVolumeClaim + claimsToProvision []*v1.PersistentVolumeClaim + ) + + // Filter out claims to provision + for _, claim := range claimsToBind { + if selectedNode, ok := claim.Annotations[annSelectedNode]; ok { + if selectedNode != node.Name { + // Fast path, skip unmatched node + return false, boundVolumesSatisfied, nil + } + claimsToProvision = append(claimsToProvision, claim) + } else { + claimsToFindMatching = append(claimsToFindMatching, claim) + } } - // Try to provision for unbound volumes - if !unboundVolumesSatisfied { - unboundVolumesSatisfied, err = b.checkVolumeProvisions(pod, claimsToProvision, node) + // Find matching volumes + if len(claimsToFindMatching) > 0 { + var unboundClaims []*v1.PersistentVolumeClaim + unboundVolumesSatisfied, matchedClaims, unboundClaims, err = b.findMatchingVolumes(pod, claimsToFindMatching, node) + if err != nil { + return false, false, err + } + claimsToProvision = append(claimsToProvision, unboundClaims...) + } + + // Check for claims to provision + if len(claimsToProvision) > 0 { + unboundVolumesSatisfied, provisionedClaims, err = b.checkVolumeProvisions(pod, claimsToProvision, node) if err != nil { return false, false, err } @@ -304,10 +349,8 @@ func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) (err error) { } return wait.Poll(time.Second, b.bindTimeout, func() (bool, error) { - // Get cached values every time in case the pod gets deleted - bindings = b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName) - claimsToProvision = b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName) - return b.checkBindings(assumedPod, bindings, claimsToProvision) + b, err := b.checkBindings(assumedPod, bindings, claimsToProvision) + return b, err }) } @@ -344,6 +387,7 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl var ( binding *bindingInfo + i int claim *v1.PersistentVolumeClaim ) @@ -352,18 +396,24 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl for _, binding = range bindings { klog.V(5).Infof("bindAPIUpdate: Pod %q, binding PV %q to PVC %q", podName, binding.pv.Name, binding.pvc.Name) // TODO: does it hurt if we make an api call and nothing needs to be updated? - if _, err := b.ctrl.updateBindVolumeToClaim(binding.pv, binding.pvc, false); err != nil { + if newPV, err := b.ctrl.updateBindVolumeToClaim(binding.pv, binding.pvc, false); err != nil { return err + } else { + // Save updated object from apiserver for later checking. + binding.pv = newPV } lastProcessedBinding++ } // Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest // PV controller is expect to signal back by removing related annotations if actual provisioning fails - for _, claim = range claimsToProvision { + for i, claim = range claimsToProvision { klog.V(5).Infof("bindAPIUpdate: Pod %q, PVC %q", podName, getPVCName(claim)) - if _, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil { + if newClaim, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil { return err + } else { + // Save updated object from apiserver for later checking. + claimsToProvision[i] = newClaim } lastProcessedProvisioning++ } @@ -371,12 +421,20 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl return nil } +var ( + versioner = etcd.APIObjectVersioner{} +) + // checkBindings runs through all the PVCs in the Pod and checks: // * if the PVC is fully bound // * if there are any conditions that require binding to fail and be retried // // It returns true when all of the Pod's PVCs are fully bound, and error if // binding (and scheduling) needs to be retried +// Note that it checks on API objects not PV/PVC cache, this is because +// PV/PVC cache can be assumed again in main scheduler loop, we must check +// latest state in API server which are shared with PV controller and +// provisioners func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) (bool, error) { podName := getPodName(pod) if bindings == nil { @@ -391,13 +449,32 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim return false, fmt.Errorf("failed to get node %q: %v", pod.Spec.NodeName, err) } - for _, binding := range bindings { - // Check for any conditions that might require scheduling retry + // Check for any conditions that might require scheduling retry - // Check if pv still exists - pv, err := b.pvCache.GetPV(binding.pv.Name) - if err != nil || pv == nil { - return false, fmt.Errorf("failed to check pv binding: %v", err) + // When pod is removed from scheduling queue because of deletion or any + // other reasons, binding operation should be cancelled. There is no need + // to check PV/PVC bindings any more. + // We check pod binding cache here which will be cleared when pod is + // removed from scheduling queue. + if b.podBindingCache.GetDecisions(pod) == nil { + return false, fmt.Errorf("pod %q does not exist any more", podName) + } + + for _, binding := range bindings { + pv, err := b.pvCache.GetAPIPV(binding.pv.Name) + if err != nil { + return false, fmt.Errorf("failed to check binding: %v", err) + } + + pvc, err := b.pvcCache.GetAPIPVC(getPVCName(binding.pvc)) + if err != nil { + return false, fmt.Errorf("failed to check binding: %v", err) + } + + // Because we updated PV in apiserver, skip if API object is older + // and wait for new API object propagated from apiserver. + if versioner.CompareResourceVersion(binding.pv, pv) > 0 { + return false, nil } // Check PV's node affinity (the node might not have the proper label) @@ -411,18 +488,21 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim } // Check if pvc is fully bound - if isBound, _, err := b.isPVCBound(binding.pvc.Namespace, binding.pvc.Name); !isBound || err != nil { - return false, err + if !b.isPVCFullyBound(pvc) { + return false, nil } - - // TODO; what if pvc is bound to the wrong pv? It means our assume cache should be reverted. - // Or will pv controller cleanup the pv.ClaimRef? } for _, claim := range claimsToProvision { - bound, pvc, err := b.isPVCBound(claim.Namespace, claim.Name) - if err != nil || pvc == nil { - return false, fmt.Errorf("failed to check pvc binding: %v", err) + pvc, err := b.pvcCache.GetAPIPVC(getPVCName(claim)) + if err != nil { + return false, fmt.Errorf("failed to check provisioning pvc: %v", err) + } + + // Because we updated PVC in apiserver, skip if API object is older + // and wait for new API object propagated from apiserver. + if versioner.CompareResourceVersion(claim, pvc) > 0 { + return false, nil } // Check if selectedNode annotation is still set @@ -436,7 +516,7 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim // If the PVC is bound to a PV, check its node affinity if pvc.Spec.VolumeName != "" { - pv, err := b.pvCache.GetPV(pvc.Spec.VolumeName) + pv, err := b.pvCache.GetAPIPV(pvc.Spec.VolumeName) if err != nil { return false, fmt.Errorf("failed to get pv %q from cache: %v", pvc.Spec.VolumeName, err) } @@ -445,7 +525,8 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim } } - if !bound { + // Check if pvc is fully bound + if !b.isPVCFullyBound(pvc) { return false, nil } } @@ -477,19 +558,21 @@ func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.Persiste return false, nil, fmt.Errorf("error getting PVC %q: %v", pvcKey, err) } - pvName := pvc.Spec.VolumeName - if pvName != "" { - if metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted) { - klog.V(5).Infof("PVC %q is fully bound to PV %q", pvcKey, pvName) - return true, pvc, nil + fullyBound := b.isPVCFullyBound(pvc) + if fullyBound { + klog.V(5).Infof("PVC %q is fully bound to PV %q", pvcKey, pvc.Spec.VolumeName) + } else { + if pvc.Spec.VolumeName != "" { + klog.V(5).Infof("PVC %q is not fully bound to PV %q", pvcKey, pvc.Spec.VolumeName) } else { - klog.V(5).Infof("PVC %q is not fully bound to PV %q", pvcKey, pvName) - return false, pvc, nil + klog.V(5).Infof("PVC %q is not bound", pvcKey) } } + return fullyBound, pvc, nil +} - klog.V(5).Infof("PVC %q is not bound", pvcKey) - return false, pvc, nil +func (b *volumeBinder) isPVCFullyBound(pvc *v1.PersistentVolumeClaim) bool { + return pvc.Spec.VolumeName != "" && metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted) } // arePodVolumesBound returns true if all volumes are fully bound @@ -503,12 +586,12 @@ func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool { return true } -// getPodVolumes returns a pod's PVCs separated into bound (including prebound), unbound with delayed binding, -// and unbound with immediate binding -func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaims []*bindingInfo, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) { +// getPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning) +// and unbound with immediate binding (including prebound) +func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaims []*v1.PersistentVolumeClaim, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) { boundClaims = []*v1.PersistentVolumeClaim{} unboundClaimsImmediate = []*v1.PersistentVolumeClaim{} - unboundClaims = []*bindingInfo{} + unboundClaims = []*v1.PersistentVolumeClaim{} for _, vol := range pod.Spec.Volumes { volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol) @@ -521,15 +604,16 @@ func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV if volumeBound { boundClaims = append(boundClaims, pvc) } else { - delayBinding, err := b.ctrl.shouldDelayBinding(pvc) + delayBindingMode, err := b.ctrl.isDelayBindingMode(pvc) if err != nil { return nil, nil, nil, err } // Prebound PVCs are treated as unbound immediate binding - if delayBinding && pvc.Spec.VolumeName == "" { + if delayBindingMode && pvc.Spec.VolumeName == "" { // Scheduler path - unboundClaims = append(unboundClaims, &bindingInfo{pvc: pvc}) + unboundClaims = append(unboundClaims, pvc) } else { + // !delayBindingMode || pvc.Spec.VolumeName != "" // Immediate binding should have already been bound unboundClaimsImmediate = append(unboundClaimsImmediate, pvc) } @@ -560,7 +644,7 @@ func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node // findMatchingVolumes tries to find matching volumes for given claims, // and return unbound claims for further provision. -func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, unboundClaims []*v1.PersistentVolumeClaim, err error) { +func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (foundMatches bool, matchedClaims []*bindingInfo, unboundClaims []*v1.PersistentVolumeClaim, err error) { podName := getPodName(pod) // Sort all the claims by increasing size request to get the smallest fits sort.Sort(byPVCSize(claimsToBind)) @@ -568,39 +652,34 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI chosenPVs := map[string]*v1.PersistentVolume{} foundMatches = true - matchedClaims := []*bindingInfo{} + matchedClaims = []*bindingInfo{} - for _, bindingInfo := range claimsToBind { + for _, pvc := range claimsToBind { // Get storage class name from each PVC storageClassName := "" - storageClass := bindingInfo.pvc.Spec.StorageClassName + storageClass := pvc.Spec.StorageClassName if storageClass != nil { storageClassName = *storageClass } allPVs := b.pvCache.ListPVs(storageClassName) - pvcName := getPVCName(bindingInfo.pvc) + pvcName := getPVCName(pvc) // Find a matching PV - bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true) + pv, err := findMatchingVolume(pvc, allPVs, node, chosenPVs, true) if err != nil { - return false, nil, err + return false, nil, nil, err } - if bindingInfo.pv == nil { + if pv == nil { klog.V(4).Infof("No matching volumes for Pod %q, PVC %q on node %q", podName, pvcName, node.Name) - unboundClaims = append(unboundClaims, bindingInfo.pvc) + unboundClaims = append(unboundClaims, pvc) foundMatches = false continue } // matching PV needs to be excluded so we don't select it again - chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv - matchedClaims = append(matchedClaims, bindingInfo) - klog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", bindingInfo.pv.Name, pvcName, node.Name, podName) - } - - // Mark cache with all the matches for each PVC for this node - if len(matchedClaims) > 0 { - b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims) + chosenPVs[pv.Name] = pv + matchedClaims = append(matchedClaims, &bindingInfo{pv: pv, pvc: pvc}) + klog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", pv.Name, pvcName, node.Name, podName) } if foundMatches { @@ -613,31 +692,31 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI // checkVolumeProvisions checks given unbound claims (the claims have gone through func // findMatchingVolumes, and do not have matching volumes for binding), and return true // if all of the claims are eligible for dynamic provision. -func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, err error) { +func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, provisionedClaims []*v1.PersistentVolumeClaim, err error) { podName := getPodName(pod) - provisionedClaims := []*v1.PersistentVolumeClaim{} + provisionedClaims = []*v1.PersistentVolumeClaim{} for _, claim := range claimsToProvision { pvcName := getPVCName(claim) className := v1helper.GetPersistentVolumeClaimClass(claim) if className == "" { - return false, fmt.Errorf("no class for claim %q", pvcName) + return false, nil, fmt.Errorf("no class for claim %q", pvcName) } class, err := b.ctrl.classLister.Get(className) if err != nil { - return false, fmt.Errorf("failed to find storage class %q", className) + return false, nil, fmt.Errorf("failed to find storage class %q", className) } provisioner := class.Provisioner if provisioner == "" || provisioner == notSupportedProvisioner { klog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, pvcName) - return false, nil + return false, nil, nil } // Check if the node can satisfy the topology requirement in the class if !v1helper.MatchTopologySelectorTerms(class.AllowedTopologies, labels.Set(node.Labels)) { klog.V(4).Infof("Node %q cannot satisfy provisioning topology requirements of claim %q", node.Name, pvcName) - return false, nil + return false, nil, nil } // TODO: Check if capacity of the node domain in the storage class @@ -648,10 +727,7 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v } klog.V(4).Infof("Provisioning for claims of pod %q that has no matching volumes on node %q ...", podName, node.Name) - // Mark cache with all the PVCs that need provisioning for this node - b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims) - - return true, nil + return true, provisionedClaims, nil } func (b *volumeBinder) revertAssumedPVs(bindings []*bindingInfo) { @@ -674,7 +750,7 @@ type bindingInfo struct { pv *v1.PersistentVolume } -type byPVCSize []*bindingInfo +type byPVCSize []*v1.PersistentVolumeClaim func (a byPVCSize) Len() int { return len(a) @@ -685,8 +761,8 @@ func (a byPVCSize) Swap(i, j int) { } func (a byPVCSize) Less(i, j int) bool { - iSize := a[i].pvc.Spec.Resources.Requests[v1.ResourceStorage] - jSize := a[j].pvc.Spec.Resources.Requests[v1.ResourceStorage] + iSize := a[i].Spec.Resources.Requests[v1.ResourceStorage] + jSize := a[j].Spec.Resources.Requests[v1.ResourceStorage] // return true if iSize is less than jSize return iSize.Cmp(jSize) == -1 } diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go b/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go index b95538304ad..f32c1269b54 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go @@ -44,6 +44,9 @@ type PodBindingCache interface { // means that no provisioning operations are needed. GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim + // GetDecisions will return all cached decisions for the given pod. + GetDecisions(pod *v1.Pod) nodeDecisions + // DeleteBindings will remove all cached bindings and provisionings for the given pod. // TODO: separate the func if it is needed to delete bindings/provisionings individually DeleteBindings(pod *v1.Pod) @@ -72,6 +75,17 @@ func NewPodBindingCache() PodBindingCache { return &podBindingCache{bindingDecisions: map[string]nodeDecisions{}} } +func (c *podBindingCache) GetDecisions(pod *v1.Pod) nodeDecisions { + c.rwMutex.Lock() + defer c.rwMutex.Unlock() + podName := getPodName(pod) + decisions, ok := c.bindingDecisions[podName] + if !ok { + return nil + } + return decisions +} + func (c *podBindingCache) DeleteBindings(pod *v1.Pod) { c.rwMutex.Lock() defer c.rwMutex.Unlock()