From 13d87fbff8ce4030bce5209caedbb2e2d55e13bf Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Tue, 8 Jan 2019 01:17:31 +0800 Subject: [PATCH 1/4] Make volume binder resilient to races - FindPodVolumes do not error if PVC is assumed with selected node - BindPodVolumes check against API objects --- pkg/controller/volume/persistentvolume/BUILD | 2 + .../volume/persistentvolume/pv_controller.go | 21 +- .../scheduler_assume_cache.go | 42 +++- .../persistentvolume/scheduler_binder.go | 228 ++++++++++++------ .../scheduler_binder_cache.go | 14 ++ 5 files changed, 226 insertions(+), 81 deletions(-) diff --git a/pkg/controller/volume/persistentvolume/BUILD b/pkg/controller/volume/persistentvolume/BUILD index d0c648f5cfb..8e3f145dab3 100644 --- a/pkg/controller/volume/persistentvolume/BUILD +++ b/pkg/controller/volume/persistentvolume/BUILD @@ -44,6 +44,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", @@ -92,6 +93,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index cd9143e2800..0c9534bca68 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -285,15 +285,16 @@ func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVo return nil } -func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) { +func (ctrl *PersistentVolumeController) isDelayBindingProvisioning(claim *v1.PersistentVolumeClaim) bool { // When feature VolumeScheduling enabled, // Scheduler signal to the PV controller to start dynamic // provisioning by setting the "annSelectedNode" annotation // in the PVC - if _, ok := claim.Annotations[annSelectedNode]; ok { - return false, nil - } + _, ok := claim.Annotations[annSelectedNode] + return ok +} +func (ctrl *PersistentVolumeController) isDelayBindingMode(claim *v1.PersistentVolumeClaim) (bool, error) { className := v1helper.GetPersistentVolumeClaimClass(claim) if className == "" { return false, nil @@ -311,6 +312,18 @@ func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentV return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil } +// shouldDelayBinding returns true if binding of claim should be delayed, false otherwise. +// If binding of claim should be delayed, only claims pbound by scheduler +func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) { + // If claim has already been assigned a node by scheduler for dynamic provisioning. + if ctrl.isDelayBindingProvisioning(claim) { + return false, nil + } + + // If claim is in delay binding mode. + return ctrl.isDelayBindingMode(claim) +} + // syncUnboundClaim is the main controller method to decide what to do with an // unbound claim. func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error { diff --git a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go index 2cc50a91a5f..cd4bc88d7ca 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go +++ b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go @@ -42,6 +42,9 @@ type AssumeCache interface { // Get the object by name Get(objName string) (interface{}, error) + // Get the API object by name + GetAPIObj(objName string) (interface{}, error) + // List all the objects in the cache List(indexObj interface{}) []interface{} } @@ -250,6 +253,17 @@ func (c *assumeCache) Get(objName string) (interface{}, error) { return objInfo.latestObj, nil } +func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) { + c.rwMutex.RLock() + defer c.rwMutex.RUnlock() + + objInfo, err := c.getObjInfo(objName) + if err != nil { + return nil, err + } + return objInfo.apiObj, nil +} + func (c *assumeCache) List(indexObj interface{}) []interface{} { c.rwMutex.RLock() defer c.rwMutex.RUnlock() @@ -297,7 +311,7 @@ func (c *assumeCache) Assume(obj interface{}) error { } if newVersion < storedVersion { - return fmt.Errorf("%v %q is out of sync", c.description, name) + return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) } // Only update the cached object @@ -325,6 +339,7 @@ type PVAssumeCache interface { AssumeCache GetPV(pvName string) (*v1.PersistentVolume, error) + GetAPIPV(pvName string) (*v1.PersistentVolume, error) ListPVs(storageClassName string) []*v1.PersistentVolume } @@ -356,6 +371,18 @@ func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { return pv, nil } +func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) { + obj, err := c.GetAPIObj(pvName) + if err != nil { + return nil, err + } + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + return nil, &errWrongType{"v1.PersistentVolume", obj} + } + return pv, nil +} + func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume { objs := c.List(&v1.PersistentVolume{ Spec: v1.PersistentVolumeSpec{ @@ -380,6 +407,7 @@ type PVCAssumeCache interface { // GetPVC returns the PVC from the cache with given pvcKey. // pvcKey is the result of MetaNamespaceKeyFunc on PVC obj GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) + GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) } type pvcAssumeCache struct { @@ -402,3 +430,15 @@ func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error } return pvc, nil } + +func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { + obj, err := c.GetAPIObj(pvcKey) + if err != nil { + return nil, err + } + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + return nil, &errWrongType{"v1.PersistentVolumeClaim", obj} + } + return pvc, nil +} diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder.go b/pkg/controller/volume/persistentvolume/scheduler_binder.go index 1d4918bbcd3..737c195611d 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/storage/etcd" coreinformers "k8s.io/client-go/informers/core/v1" storageinformers "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" @@ -145,6 +146,27 @@ func (b *volumeBinder) GetBindingsCache() PodBindingCache { // This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer. // That's necessary because some operations will need to pass in to the predicate fake node objects. func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) { + var ( + matchedClaims []*bindingInfo + provisionedClaims []*v1.PersistentVolumeClaim + ) + defer func() { + // We recreate bindings for each new schedule loop. + // Although we do not distinguish nil from empty in this function, for + // easier testing, we normalize empty to nil. + if len(matchedClaims) == 0 { + matchedClaims = nil + } + if len(provisionedClaims) == 0 { + provisionedClaims = nil + } + // TODO merge into one atomic function + // Mark cache with all the matches for each PVC for this node + b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims) + // Mark cache with all the PVCs that need provisioning for this node + b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims) + }() + podName := getPodName(pod) // Warning: Below log needs high verbosity as it can be printed several times (#60933). @@ -181,16 +203,39 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume } } + // Find matching volumes and node for unbound claims if len(claimsToBind) > 0 { - var claimsToProvision []*v1.PersistentVolumeClaim - unboundVolumesSatisfied, claimsToProvision, err = b.findMatchingVolumes(pod, claimsToBind, node) - if err != nil { - return false, false, err + var ( + claimsToFindMatching []*v1.PersistentVolumeClaim + claimsToProvision []*v1.PersistentVolumeClaim + ) + + // Filter out claims to provision + for _, claim := range claimsToBind { + if selectedNode, ok := claim.Annotations[annSelectedNode]; ok { + if selectedNode != node.Name { + // Fast path, skip unmatched node + return false, boundVolumesSatisfied, nil + } + claimsToProvision = append(claimsToProvision, claim) + } else { + claimsToFindMatching = append(claimsToFindMatching, claim) + } } - // Try to provision for unbound volumes - if !unboundVolumesSatisfied { - unboundVolumesSatisfied, err = b.checkVolumeProvisions(pod, claimsToProvision, node) + // Find matching volumes + if len(claimsToFindMatching) > 0 { + var unboundClaims []*v1.PersistentVolumeClaim + unboundVolumesSatisfied, matchedClaims, unboundClaims, err = b.findMatchingVolumes(pod, claimsToFindMatching, node) + if err != nil { + return false, false, err + } + claimsToProvision = append(claimsToProvision, unboundClaims...) + } + + // Check for claims to provision + if len(claimsToProvision) > 0 { + unboundVolumesSatisfied, provisionedClaims, err = b.checkVolumeProvisions(pod, claimsToProvision, node) if err != nil { return false, false, err } @@ -304,10 +349,8 @@ func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) (err error) { } return wait.Poll(time.Second, b.bindTimeout, func() (bool, error) { - // Get cached values every time in case the pod gets deleted - bindings = b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName) - claimsToProvision = b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName) - return b.checkBindings(assumedPod, bindings, claimsToProvision) + b, err := b.checkBindings(assumedPod, bindings, claimsToProvision) + return b, err }) } @@ -344,6 +387,7 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl var ( binding *bindingInfo + i int claim *v1.PersistentVolumeClaim ) @@ -352,18 +396,24 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl for _, binding = range bindings { klog.V(5).Infof("bindAPIUpdate: Pod %q, binding PV %q to PVC %q", podName, binding.pv.Name, binding.pvc.Name) // TODO: does it hurt if we make an api call and nothing needs to be updated? - if _, err := b.ctrl.updateBindVolumeToClaim(binding.pv, binding.pvc, false); err != nil { + if newPV, err := b.ctrl.updateBindVolumeToClaim(binding.pv, binding.pvc, false); err != nil { return err + } else { + // Save updated object from apiserver for later checking. + binding.pv = newPV } lastProcessedBinding++ } // Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest // PV controller is expect to signal back by removing related annotations if actual provisioning fails - for _, claim = range claimsToProvision { + for i, claim = range claimsToProvision { klog.V(5).Infof("bindAPIUpdate: Pod %q, PVC %q", podName, getPVCName(claim)) - if _, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil { + if newClaim, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil { return err + } else { + // Save updated object from apiserver for later checking. + claimsToProvision[i] = newClaim } lastProcessedProvisioning++ } @@ -371,12 +421,20 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl return nil } +var ( + versioner = etcd.APIObjectVersioner{} +) + // checkBindings runs through all the PVCs in the Pod and checks: // * if the PVC is fully bound // * if there are any conditions that require binding to fail and be retried // // It returns true when all of the Pod's PVCs are fully bound, and error if // binding (and scheduling) needs to be retried +// Note that it checks on API objects not PV/PVC cache, this is because +// PV/PVC cache can be assumed again in main scheduler loop, we must check +// latest state in API server which are shared with PV controller and +// provisioners func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) (bool, error) { podName := getPodName(pod) if bindings == nil { @@ -391,13 +449,32 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim return false, fmt.Errorf("failed to get node %q: %v", pod.Spec.NodeName, err) } - for _, binding := range bindings { - // Check for any conditions that might require scheduling retry + // Check for any conditions that might require scheduling retry - // Check if pv still exists - pv, err := b.pvCache.GetPV(binding.pv.Name) - if err != nil || pv == nil { - return false, fmt.Errorf("failed to check pv binding: %v", err) + // When pod is removed from scheduling queue because of deletion or any + // other reasons, binding operation should be cancelled. There is no need + // to check PV/PVC bindings any more. + // We check pod binding cache here which will be cleared when pod is + // removed from scheduling queue. + if b.podBindingCache.GetDecisions(pod) == nil { + return false, fmt.Errorf("pod %q does not exist any more", podName) + } + + for _, binding := range bindings { + pv, err := b.pvCache.GetAPIPV(binding.pv.Name) + if err != nil { + return false, fmt.Errorf("failed to check binding: %v", err) + } + + pvc, err := b.pvcCache.GetAPIPVC(getPVCName(binding.pvc)) + if err != nil { + return false, fmt.Errorf("failed to check binding: %v", err) + } + + // Because we updated PV in apiserver, skip if API object is older + // and wait for new API object propagated from apiserver. + if versioner.CompareResourceVersion(binding.pv, pv) > 0 { + return false, nil } // Check PV's node affinity (the node might not have the proper label) @@ -411,18 +488,21 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim } // Check if pvc is fully bound - if isBound, _, err := b.isPVCBound(binding.pvc.Namespace, binding.pvc.Name); !isBound || err != nil { - return false, err + if !b.isPVCFullyBound(pvc) { + return false, nil } - - // TODO; what if pvc is bound to the wrong pv? It means our assume cache should be reverted. - // Or will pv controller cleanup the pv.ClaimRef? } for _, claim := range claimsToProvision { - bound, pvc, err := b.isPVCBound(claim.Namespace, claim.Name) - if err != nil || pvc == nil { - return false, fmt.Errorf("failed to check pvc binding: %v", err) + pvc, err := b.pvcCache.GetAPIPVC(getPVCName(claim)) + if err != nil { + return false, fmt.Errorf("failed to check provisioning pvc: %v", err) + } + + // Because we updated PVC in apiserver, skip if API object is older + // and wait for new API object propagated from apiserver. + if versioner.CompareResourceVersion(claim, pvc) > 0 { + return false, nil } // Check if selectedNode annotation is still set @@ -436,7 +516,7 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim // If the PVC is bound to a PV, check its node affinity if pvc.Spec.VolumeName != "" { - pv, err := b.pvCache.GetPV(pvc.Spec.VolumeName) + pv, err := b.pvCache.GetAPIPV(pvc.Spec.VolumeName) if err != nil { return false, fmt.Errorf("failed to get pv %q from cache: %v", pvc.Spec.VolumeName, err) } @@ -445,7 +525,8 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim } } - if !bound { + // Check if pvc is fully bound + if !b.isPVCFullyBound(pvc) { return false, nil } } @@ -477,19 +558,21 @@ func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.Persiste return false, nil, fmt.Errorf("error getting PVC %q: %v", pvcKey, err) } - pvName := pvc.Spec.VolumeName - if pvName != "" { - if metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted) { - klog.V(5).Infof("PVC %q is fully bound to PV %q", pvcKey, pvName) - return true, pvc, nil + fullyBound := b.isPVCFullyBound(pvc) + if fullyBound { + klog.V(5).Infof("PVC %q is fully bound to PV %q", pvcKey, pvc.Spec.VolumeName) + } else { + if pvc.Spec.VolumeName != "" { + klog.V(5).Infof("PVC %q is not fully bound to PV %q", pvcKey, pvc.Spec.VolumeName) } else { - klog.V(5).Infof("PVC %q is not fully bound to PV %q", pvcKey, pvName) - return false, pvc, nil + klog.V(5).Infof("PVC %q is not bound", pvcKey) } } + return fullyBound, pvc, nil +} - klog.V(5).Infof("PVC %q is not bound", pvcKey) - return false, pvc, nil +func (b *volumeBinder) isPVCFullyBound(pvc *v1.PersistentVolumeClaim) bool { + return pvc.Spec.VolumeName != "" && metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted) } // arePodVolumesBound returns true if all volumes are fully bound @@ -503,12 +586,12 @@ func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool { return true } -// getPodVolumes returns a pod's PVCs separated into bound (including prebound), unbound with delayed binding, -// and unbound with immediate binding -func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaims []*bindingInfo, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) { +// getPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning) +// and unbound with immediate binding (including prebound) +func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaims []*v1.PersistentVolumeClaim, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) { boundClaims = []*v1.PersistentVolumeClaim{} unboundClaimsImmediate = []*v1.PersistentVolumeClaim{} - unboundClaims = []*bindingInfo{} + unboundClaims = []*v1.PersistentVolumeClaim{} for _, vol := range pod.Spec.Volumes { volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol) @@ -521,15 +604,16 @@ func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV if volumeBound { boundClaims = append(boundClaims, pvc) } else { - delayBinding, err := b.ctrl.shouldDelayBinding(pvc) + delayBindingMode, err := b.ctrl.isDelayBindingMode(pvc) if err != nil { return nil, nil, nil, err } // Prebound PVCs are treated as unbound immediate binding - if delayBinding && pvc.Spec.VolumeName == "" { + if delayBindingMode && pvc.Spec.VolumeName == "" { // Scheduler path - unboundClaims = append(unboundClaims, &bindingInfo{pvc: pvc}) + unboundClaims = append(unboundClaims, pvc) } else { + // !delayBindingMode || pvc.Spec.VolumeName != "" // Immediate binding should have already been bound unboundClaimsImmediate = append(unboundClaimsImmediate, pvc) } @@ -560,7 +644,7 @@ func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node // findMatchingVolumes tries to find matching volumes for given claims, // and return unbound claims for further provision. -func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, unboundClaims []*v1.PersistentVolumeClaim, err error) { +func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (foundMatches bool, matchedClaims []*bindingInfo, unboundClaims []*v1.PersistentVolumeClaim, err error) { podName := getPodName(pod) // Sort all the claims by increasing size request to get the smallest fits sort.Sort(byPVCSize(claimsToBind)) @@ -568,39 +652,34 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI chosenPVs := map[string]*v1.PersistentVolume{} foundMatches = true - matchedClaims := []*bindingInfo{} + matchedClaims = []*bindingInfo{} - for _, bindingInfo := range claimsToBind { + for _, pvc := range claimsToBind { // Get storage class name from each PVC storageClassName := "" - storageClass := bindingInfo.pvc.Spec.StorageClassName + storageClass := pvc.Spec.StorageClassName if storageClass != nil { storageClassName = *storageClass } allPVs := b.pvCache.ListPVs(storageClassName) - pvcName := getPVCName(bindingInfo.pvc) + pvcName := getPVCName(pvc) // Find a matching PV - bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true) + pv, err := findMatchingVolume(pvc, allPVs, node, chosenPVs, true) if err != nil { - return false, nil, err + return false, nil, nil, err } - if bindingInfo.pv == nil { + if pv == nil { klog.V(4).Infof("No matching volumes for Pod %q, PVC %q on node %q", podName, pvcName, node.Name) - unboundClaims = append(unboundClaims, bindingInfo.pvc) + unboundClaims = append(unboundClaims, pvc) foundMatches = false continue } // matching PV needs to be excluded so we don't select it again - chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv - matchedClaims = append(matchedClaims, bindingInfo) - klog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", bindingInfo.pv.Name, pvcName, node.Name, podName) - } - - // Mark cache with all the matches for each PVC for this node - if len(matchedClaims) > 0 { - b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims) + chosenPVs[pv.Name] = pv + matchedClaims = append(matchedClaims, &bindingInfo{pv: pv, pvc: pvc}) + klog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", pv.Name, pvcName, node.Name, podName) } if foundMatches { @@ -613,31 +692,31 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI // checkVolumeProvisions checks given unbound claims (the claims have gone through func // findMatchingVolumes, and do not have matching volumes for binding), and return true // if all of the claims are eligible for dynamic provision. -func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, err error) { +func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, provisionedClaims []*v1.PersistentVolumeClaim, err error) { podName := getPodName(pod) - provisionedClaims := []*v1.PersistentVolumeClaim{} + provisionedClaims = []*v1.PersistentVolumeClaim{} for _, claim := range claimsToProvision { pvcName := getPVCName(claim) className := v1helper.GetPersistentVolumeClaimClass(claim) if className == "" { - return false, fmt.Errorf("no class for claim %q", pvcName) + return false, nil, fmt.Errorf("no class for claim %q", pvcName) } class, err := b.ctrl.classLister.Get(className) if err != nil { - return false, fmt.Errorf("failed to find storage class %q", className) + return false, nil, fmt.Errorf("failed to find storage class %q", className) } provisioner := class.Provisioner if provisioner == "" || provisioner == notSupportedProvisioner { klog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, pvcName) - return false, nil + return false, nil, nil } // Check if the node can satisfy the topology requirement in the class if !v1helper.MatchTopologySelectorTerms(class.AllowedTopologies, labels.Set(node.Labels)) { klog.V(4).Infof("Node %q cannot satisfy provisioning topology requirements of claim %q", node.Name, pvcName) - return false, nil + return false, nil, nil } // TODO: Check if capacity of the node domain in the storage class @@ -648,10 +727,7 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v } klog.V(4).Infof("Provisioning for claims of pod %q that has no matching volumes on node %q ...", podName, node.Name) - // Mark cache with all the PVCs that need provisioning for this node - b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims) - - return true, nil + return true, provisionedClaims, nil } func (b *volumeBinder) revertAssumedPVs(bindings []*bindingInfo) { @@ -674,7 +750,7 @@ type bindingInfo struct { pv *v1.PersistentVolume } -type byPVCSize []*bindingInfo +type byPVCSize []*v1.PersistentVolumeClaim func (a byPVCSize) Len() int { return len(a) @@ -685,8 +761,8 @@ func (a byPVCSize) Swap(i, j int) { } func (a byPVCSize) Less(i, j int) bool { - iSize := a[i].pvc.Spec.Resources.Requests[v1.ResourceStorage] - jSize := a[j].pvc.Spec.Resources.Requests[v1.ResourceStorage] + iSize := a[i].Spec.Resources.Requests[v1.ResourceStorage] + jSize := a[j].Spec.Resources.Requests[v1.ResourceStorage] // return true if iSize is less than jSize return iSize.Cmp(jSize) == -1 } diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go b/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go index b95538304ad..f32c1269b54 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go @@ -44,6 +44,9 @@ type PodBindingCache interface { // means that no provisioning operations are needed. GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim + // GetDecisions will return all cached decisions for the given pod. + GetDecisions(pod *v1.Pod) nodeDecisions + // DeleteBindings will remove all cached bindings and provisionings for the given pod. // TODO: separate the func if it is needed to delete bindings/provisionings individually DeleteBindings(pod *v1.Pod) @@ -72,6 +75,17 @@ func NewPodBindingCache() PodBindingCache { return &podBindingCache{bindingDecisions: map[string]nodeDecisions{}} } +func (c *podBindingCache) GetDecisions(pod *v1.Pod) nodeDecisions { + c.rwMutex.Lock() + defer c.rwMutex.Unlock() + podName := getPodName(pod) + decisions, ok := c.bindingDecisions[podName] + if !ok { + return nil + } + return decisions +} + func (c *podBindingCache) DeleteBindings(pod *v1.Pod) { c.rwMutex.Lock() defer c.rwMutex.Unlock() From 8b94b9625bb7bbf0a1d9f8c03f715a4aa76ded1a Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Tue, 8 Jan 2019 01:18:34 +0800 Subject: [PATCH 2/4] Make volume binder resilient to races: unit tests --- .../volume/persistentvolume/framework_test.go | 98 +++- .../persistentvolume/scheduler_binder_test.go | 510 ++++++++++++------ 2 files changed, 433 insertions(+), 175 deletions(-) diff --git a/pkg/controller/volume/persistentvolume/framework_test.go b/pkg/controller/volume/persistentvolume/framework_test.go index ef632cd5285..7552a2cb44f 100644 --- a/pkg/controller/volume/persistentvolume/framework_test.go +++ b/pkg/controller/volume/persistentvolume/framework_test.go @@ -29,11 +29,13 @@ import ( "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/wait" @@ -136,6 +138,7 @@ type volumeReactor struct { fakeClaimWatch *watch.FakeWatcher lock sync.Mutex errors []reactorError + watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher } // reactorError is an error that is returned by test reactor (=simulated @@ -189,11 +192,34 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj // Store the updated object to appropriate places. r.volumes[volume.Name] = volume + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Add(volume) + } r.changedObjects = append(r.changedObjects, volume) r.changedSinceLastSync++ klog.V(4).Infof("created volume %s", volume.Name) return true, volume, nil + case action.Matches("create", "persistentvolumeclaims"): + obj := action.(core.UpdateAction).GetObject() + claim := obj.(*v1.PersistentVolumeClaim) + + // check the claim does not exist + _, found := r.claims[claim.Name] + if found { + return true, nil, fmt.Errorf("Cannot create claim %s: claim already exists", claim.Name) + } + + // Store the updated object to appropriate places. + r.claims[claim.Name] = claim + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Add(claim) + } + r.changedObjects = append(r.changedObjects, claim) + r.changedSinceLastSync++ + klog.V(4).Infof("created claim %s", claim.Name) + return true, claim, nil + case action.Matches("update", "persistentvolumes"): obj := action.(core.UpdateAction).GetObject() volume := obj.(*v1.PersistentVolume) @@ -206,6 +232,10 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj if storedVer != requestedVer { return true, obj, versionConflictError } + if reflect.DeepEqual(storedVolume, volume) { + klog.V(4).Infof("nothing updated volume %s", volume.Name) + return true, volume, nil + } // Don't modify the existing object volume = volume.DeepCopy() volume.ResourceVersion = strconv.Itoa(storedVer + 1) @@ -214,6 +244,9 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj } // Store the updated object to appropriate places. + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Modify(volume) + } r.volumes[volume.Name] = volume r.changedObjects = append(r.changedObjects, volume) r.changedSinceLastSync++ @@ -232,6 +265,10 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj if storedVer != requestedVer { return true, obj, versionConflictError } + if reflect.DeepEqual(storedClaim, claim) { + klog.V(4).Infof("nothing updated claim %s", claim.Name) + return true, claim, nil + } // Don't modify the existing object claim = claim.DeepCopy() claim.ResourceVersion = strconv.Itoa(storedVer + 1) @@ -240,6 +277,9 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj } // Store the updated object to appropriate places. + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Modify(claim) + } r.claims[claim.Name] = claim r.changedObjects = append(r.changedObjects, claim) r.changedSinceLastSync++ @@ -251,18 +291,32 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj volume, found := r.volumes[name] if found { klog.V(4).Infof("GetVolume: found %s", volume.Name) - return true, volume, nil + return true, volume.DeepCopy(), nil } else { klog.V(4).Infof("GetVolume: volume %s not found", name) return true, nil, fmt.Errorf("Cannot find volume %s", name) } + case action.Matches("get", "persistentvolumeclaims"): + name := action.(core.GetAction).GetName() + claim, found := r.claims[name] + if found { + klog.V(4).Infof("GetClaim: found %s", claim.Name) + return true, claim.DeepCopy(), nil + } else { + klog.V(4).Infof("GetClaim: claim %s not found", name) + return true, nil, apierrs.NewNotFound(action.GetResource().GroupResource(), name) + } + case action.Matches("delete", "persistentvolumes"): name := action.(core.DeleteAction).GetName() klog.V(4).Infof("deleted volume %s", name) - _, found := r.volumes[name] + obj, found := r.volumes[name] if found { delete(r.volumes, name) + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Delete(obj) + } r.changedSinceLastSync++ return true, nil, nil } else { @@ -272,9 +326,12 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj case action.Matches("delete", "persistentvolumeclaims"): name := action.(core.DeleteAction).GetName() klog.V(4).Infof("deleted claim %s", name) - _, found := r.volumes[name] + obj, found := r.claims[name] if found { delete(r.claims, name) + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Delete(obj) + } r.changedSinceLastSync++ return true, nil, nil } else { @@ -285,6 +342,36 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj return false, nil, nil } +// Watch watches objects from the volumeReactor. Watch returns a channel which +// will push added / modified / deleted object. +func (r *volumeReactor) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) { + r.lock.Lock() + defer r.lock.Unlock() + + fakewatcher := watch.NewRaceFreeFake() + + if _, exists := r.watchers[gvr]; !exists { + r.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher) + } + r.watchers[gvr][ns] = append(r.watchers[gvr][ns], fakewatcher) + return fakewatcher, nil +} + +func (r *volumeReactor) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher { + watches := []*watch.RaceFreeFakeWatcher{} + if r.watchers[gvr] != nil { + if w := r.watchers[gvr][ns]; w != nil { + watches = append(watches, w...) + } + if ns != metav1.NamespaceAll { + if w := r.watchers[gvr][metav1.NamespaceAll]; w != nil { + watches = append(watches, w...) + } + } + } + return watches +} + // injectReactError returns an error when the test requested given action to // fail. nil is returned otherwise. func (r *volumeReactor) injectReactError(action core.Action) error { @@ -596,11 +683,14 @@ func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, fakeVolumeWatch: fakeVolumeWatch, fakeClaimWatch: fakeClaimWatch, errors: errors, + watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher), } client.AddReactor("create", "persistentvolumes", reactor.React) + client.AddReactor("create", "persistentvolumeclaims", reactor.React) client.AddReactor("update", "persistentvolumes", reactor.React) client.AddReactor("update", "persistentvolumeclaims", reactor.React) client.AddReactor("get", "persistentvolumes", reactor.React) + client.AddReactor("get", "persistentvolumeclaims", reactor.React) client.AddReactor("delete", "persistentvolumes", reactor.React) client.AddReactor("delete", "persistentvolumeclaims", reactor.React) diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_test.go b/pkg/controller/volume/persistentvolume/scheduler_binder_test.go index 789e464fc19..a3792ecec7c 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_test.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder_test.go @@ -17,6 +17,7 @@ limitations under the License. package persistentvolume import ( + "context" "fmt" "reflect" "testing" @@ -28,10 +29,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" "k8s.io/klog" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/controller" @@ -75,14 +79,6 @@ var ( pvBoundImmediate = makeTestPV("pv-bound-immediate", "node1", "1G", "1", immediateBoundPVC, immediateClass) pvBoundImmediateNode2 = makeTestPV("pv-bound-immediate", "node2", "1G", "1", immediateBoundPVC, immediateClass) - // PVC/PV bindings for manual binding - binding1a = makeBinding(unboundPVC, pvNode1a) - binding1b = makeBinding(unboundPVC2, pvNode1b) - bindingNoNode = makeBinding(unboundPVC, pvNoNode) - bindingBad = makeBinding(badPVC, pvNode1b) - binding1aBound = makeBinding(unboundPVC, pvNode1aBound) - binding1bBound = makeBinding(unboundPVC2, pvNode1bBound) - // storage class names waitClass = "waitClass" immediateClass = "immediateClass" @@ -109,15 +105,24 @@ type testEnv struct { internalPVCCache *pvcAssumeCache } -func newTestBinder(t *testing.T) *testEnv { +func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv { client := &fake.Clientset{} reactor := newVolumeReactor(client, nil, nil, nil, nil) + // TODO refactor all tests to use real watch mechanism, see #72327 + client.AddWatchReactor("*", func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) { + gvr := action.GetResource() + ns := action.GetNamespace() + watch, err := reactor.Watch(gvr, ns) + if err != nil { + return false, nil, err + } + return true, watch, nil + }) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) nodeInformer := informerFactory.Core().V1().Nodes() pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() classInformer := informerFactory.Storage().V1().StorageClasses() - binder := NewVolumeBinder( client, nodeInformer, @@ -126,6 +131,14 @@ func newTestBinder(t *testing.T) *testEnv { classInformer, 10*time.Second) + // Wait for informers cache sync + informerFactory.Start(stopCh) + for v, synced := range informerFactory.WaitForCacheSync(stopCh) { + if !synced { + klog.Fatalf("Error syncing informer for %v", v) + } + } + // Add storageclasses waitMode := storagev1.VolumeBindingWaitForFirstConsumer immediateMode := storagev1.VolumeBindingImmediate @@ -247,6 +260,66 @@ func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.P } +func (env *testEnv) updateVolumes(t *testing.T, pvs []*v1.PersistentVolume, waitCache bool) { + for _, pv := range pvs { + if _, err := env.client.CoreV1().PersistentVolumes().Update(pv); err != nil { + t.Fatalf("failed to update PV %q", pv.Name) + } + } + if waitCache { + wait.Poll(100*time.Millisecond, 3*time.Second, func() (bool, error) { + for _, pv := range pvs { + obj, err := env.internalPVCache.GetAPIObj(pv.Name) + if obj == nil || err != nil { + return false, nil + } + pvInCache, ok := obj.(*v1.PersistentVolume) + if !ok { + return false, fmt.Errorf("PV %s invalid object", pvInCache.Name) + } + return versioner.CompareResourceVersion(pvInCache, pv) == 0, nil + } + return true, nil + }) + } +} + +func (env *testEnv) updateClaims(t *testing.T, pvcs []*v1.PersistentVolumeClaim, waitCache bool) { + for _, pvc := range pvcs { + if _, err := env.client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Update(pvc); err != nil { + t.Fatalf("failed to update PVC %q", getPVCName(pvc)) + } + } + if waitCache { + wait.Poll(100*time.Millisecond, 3*time.Second, func() (bool, error) { + for _, pvc := range pvcs { + obj, err := env.internalPVCCache.GetAPIObj(getPVCName(pvc)) + if obj == nil || err != nil { + return false, nil + } + pvcInCache, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + return false, fmt.Errorf("PVC %s invalid object", pvcInCache.Name) + } + return versioner.CompareResourceVersion(pvcInCache, pvc) == 0, nil + } + return true, nil + }) + } +} + +func (env *testEnv) deleteVolumes(pvs []*v1.PersistentVolume) { + for _, pv := range pvs { + env.internalPVCache.delete(pv) + } +} + +func (env *testEnv) deleteClaims(pvcs []*v1.PersistentVolumeClaim) { + for _, pvc := range pvcs { + env.internalPVCCache.delete(pvc) + } +} + func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) { pvCache := env.internalBinder.pvCache for _, binding := range bindings { @@ -540,7 +613,7 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV func pvcSetSelectedNode(pvc *v1.PersistentVolumeClaim, node string) *v1.PersistentVolumeClaim { newPVC := pvc.DeepCopy() - metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annSelectedNode, node) + metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annSelectedNode, node) return newPVC } @@ -676,7 +749,7 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) { "unbound-pvc,pv-same-node": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, pvs: []*v1.PersistentVolume{pvNode2, pvNode1a, pvNode1b}, - expectedBindings: []*bindingInfo{binding1a}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)}, expectedUnbound: true, expectedBound: true, }, @@ -689,28 +762,28 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) { "two-unbound-pvcs": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2}, pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, - expectedBindings: []*bindingInfo{binding1a, binding1b}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(unboundPVC2, pvNode1b)}, expectedUnbound: true, expectedBound: true, }, "two-unbound-pvcs,order-by-size": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC2, unboundPVC}, pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, - expectedBindings: []*bindingInfo{binding1a, binding1b}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(unboundPVC2, pvNode1b)}, expectedUnbound: true, expectedBound: true, }, "two-unbound-pvcs,partial-match": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2}, pvs: []*v1.PersistentVolume{pvNode1a}, - expectedBindings: []*bindingInfo{binding1a}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)}, expectedUnbound: false, expectedBound: true, }, "one-bound,one-unbound": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, boundPVC}, pvs: []*v1.PersistentVolume{pvBound, pvNode1a}, - expectedBindings: []*bindingInfo{binding1a}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)}, expectedUnbound: true, expectedBound: true, }, @@ -767,11 +840,14 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) { }, } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for name, scenario := range scenarios { klog.V(5).Infof("Running test case %q", name) // Setup - testEnv := newTestBinder(t) + testEnv := newTestBinder(t, ctx.Done()) testEnv.initVolumes(scenario.pvs, scenario.pvs) // a. Init pvc cache @@ -833,7 +909,7 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) { "two-unbound-pvcs,one-matched,one-provisioned": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC}, pvs: []*v1.PersistentVolume{pvNode1a}, - expectedBindings: []*bindingInfo{binding1a}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)}, expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC}, expectedUnbound: true, expectedBound: true, @@ -845,6 +921,13 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) { expectedUnbound: true, expectedBound: true, }, + "one-binding,one-selected-node": { + podPVCs: []*v1.PersistentVolumeClaim{boundPVC, selectedNodePVC}, + pvs: []*v1.PersistentVolume{pvBound}, + expectedProvisions: []*v1.PersistentVolumeClaim{selectedNodePVC}, + expectedUnbound: true, + expectedBound: true, + }, "immediate-unbound-pvc": { podPVCs: []*v1.PersistentVolumeClaim{immediateUnboundPVC}, expectedUnbound: false, @@ -879,9 +962,12 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) { }, } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for name, scenario := range scenarios { // Setup - testEnv := newTestBinder(t) + testEnv := newTestBinder(t, ctx.Done()) testEnv.initVolumes(scenario.pvs, scenario.pvs) // a. Init pvc cache @@ -937,59 +1023,62 @@ func TestAssumePodVolumes(t *testing.T) { }, "one-binding": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - bindings: []*bindingInfo{binding1a}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)}, pvs: []*v1.PersistentVolume{pvNode1a}, - expectedBindings: []*bindingInfo{binding1aBound}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, expectedProvisionings: []*v1.PersistentVolumeClaim{}, }, "two-bindings": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2}, - bindings: []*bindingInfo{binding1a, binding1b}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(unboundPVC2, pvNode1b)}, pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, - expectedBindings: []*bindingInfo{binding1aBound, binding1bBound}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound), makeBinding(unboundPVC2, pvNode1bBound)}, expectedProvisionings: []*v1.PersistentVolumeClaim{}, }, "pv-already-bound": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, pvs: []*v1.PersistentVolume{pvNode1aBound}, - expectedBindings: []*bindingInfo{binding1aBound}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, expectedProvisionings: []*v1.PersistentVolumeClaim{}, }, "claimref-failed": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - bindings: []*bindingInfo{binding1a, bindingBad}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(badPVC, pvNode1b)}, pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, shouldFail: true, }, "tmpupdate-failed": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - bindings: []*bindingInfo{binding1a, binding1b}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(unboundPVC2, pvNode1b)}, pvs: []*v1.PersistentVolume{pvNode1a}, shouldFail: true, }, "one-binding, one-pvc-provisioned": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC}, - bindings: []*bindingInfo{binding1a}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)}, pvs: []*v1.PersistentVolume{pvNode1a}, provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, - expectedBindings: []*bindingInfo{binding1aBound}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, expectedProvisionings: []*v1.PersistentVolumeClaim{selectedNodePVC}, }, "one-binding, one-provision-tmpupdate-failed": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVCHigherVersion}, - bindings: []*bindingInfo{binding1a}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)}, pvs: []*v1.PersistentVolume{pvNode1a}, provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC2}, shouldFail: true, }, } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for name, scenario := range scenarios { klog.V(5).Infof("Running test case %q", name) // Setup - testEnv := newTestBinder(t) + testEnv := newTestBinder(t, ctx.Done()) testEnv.initClaims(scenario.podPVCs, scenario.podPVCs) pod := makePod(scenario.podPVCs) testEnv.initPodCache(pod, "node1", scenario.bindings, scenario.provisionedPVCs) @@ -1062,25 +1151,25 @@ func TestBindAPIUpdate(t *testing.T) { provisionedPVCs: []*v1.PersistentVolumeClaim{}, }, "one-binding": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, cachedPVs: []*v1.PersistentVolume{pvNode1a}, expectedPVs: []*v1.PersistentVolume{pvNode1aBound}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, }, "two-bindings": { - bindings: []*bindingInfo{binding1aBound, binding1bBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound), makeBinding(unboundPVC2, pvNode1bBound)}, cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, }, "api-already-updated": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, expectedPVs: []*v1.PersistentVolume{pvNode1aBound}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, }, "api-update-failed": { - bindings: []*bindingInfo{binding1aBound, binding1bBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound), makeBinding(unboundPVC2, pvNode1bBound)}, cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, apiPVs: []*v1.PersistentVolume{pvNode1a, pvNode1bBoundHigherVersion}, expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1b}, @@ -1104,7 +1193,7 @@ func TestBindAPIUpdate(t *testing.T) { shouldFail: true, }, "binding-succeed, provision-api-update-failed": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, cachedPVs: []*v1.PersistentVolume{pvNode1a}, expectedPVs: []*v1.PersistentVolume{pvNode1aBound}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), addProvisionAnn(provisionedPVC2)}, @@ -1115,11 +1204,15 @@ func TestBindAPIUpdate(t *testing.T) { shouldFail: true, }, } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for name, scenario := range scenarios { klog.V(4).Infof("Running test case %q", name) // Setup - testEnv := newTestBinder(t) + testEnv := newTestBinder(t, ctx.Done()) pod := makePod(nil) if scenario.apiPVs == nil { scenario.apiPVs = scenario.cachedPVs @@ -1155,11 +1248,19 @@ func TestBindAPIUpdate(t *testing.T) { func TestCheckBindings(t *testing.T) { scenarios := map[string]struct { // Inputs - bindings []*bindingInfo - cachedPVs []*v1.PersistentVolume + initPVs []*v1.PersistentVolume + initPVCs []*v1.PersistentVolumeClaim + bindings []*bindingInfo provisionedPVCs []*v1.PersistentVolumeClaim - cachedPVCs []*v1.PersistentVolumeClaim + + // api updates before checking + apiPVs []*v1.PersistentVolume + apiPVCs []*v1.PersistentVolumeClaim + + // delete objects before checking + deletePVs bool + deletePVCs bool // Expected return values shouldFail bool @@ -1182,108 +1283,144 @@ func TestCheckBindings(t *testing.T) { expectedBound: true, }, "binding-bound": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, expectedBound: true, }, "binding-prebound": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{preboundPVCNode1a}, + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{preboundPVCNode1a}, }, "binding-unbound": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, }, "binding-pvc-not-exists": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVs: []*v1.PersistentVolume{pvNode1aBound}, shouldFail: true, }, "binding-pv-not-exists": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + deletePVs: true, shouldFail: true, }, "binding-claimref-nil": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVs: []*v1.PersistentVolume{pvNode1a}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + initPVs: []*v1.PersistentVolume{pvNode1a}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + apiPVs: []*v1.PersistentVolume{pvNode1a}, + apiPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, shouldFail: true, }, "binding-claimref-uid-empty": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVs: []*v1.PersistentVolume{pvRemoveClaimUID(pvNode1aBound)}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + apiPVs: []*v1.PersistentVolume{pvRemoveClaimUID(pvNode1aBound)}, + apiPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, shouldFail: true, }, "binding-one-bound,one-unbound": { - bindings: []*bindingInfo{binding1aBound, binding1bBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound), makeBinding(unboundPVC2, pvNode1bBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, unboundPVC2}, + initPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, unboundPVC2}, }, "provisioning-pvc-bound": { bindings: []*bindingInfo{}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, - cachedPVs: []*v1.PersistentVolume{pvBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVCBound)}, + initPVs: []*v1.PersistentVolume{pvBound}, + initPVCs: []*v1.PersistentVolumeClaim{provisionedPVCBound}, + apiPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVCBound)}, expectedBound: true, }, "provisioning-pvc-unbound": { bindings: []*bindingInfo{}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, - cachedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + initPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, }, "provisioning-pvc-not-exists": { bindings: []*bindingInfo{}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + deletePVCs: true, shouldFail: true, }, "provisioning-pvc-annotations-nil": { bindings: []*bindingInfo{}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, - cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + apiPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, shouldFail: true, }, "provisioning-pvc-selected-node-dropped": { bindings: []*bindingInfo{}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, - cachedPVCs: []*v1.PersistentVolumeClaim{pvcSetEmptyAnnotations(provisionedPVC)}, + initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + apiPVCs: []*v1.PersistentVolumeClaim{pvcSetEmptyAnnotations(provisionedPVC)}, shouldFail: true, }, "provisioning-pvc-selected-node-wrong-node": { + initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, bindings: []*bindingInfo{}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, - cachedPVCs: []*v1.PersistentVolumeClaim{pvcSetSelectedNode(provisionedPVC, "wrong-node")}, + apiPVCs: []*v1.PersistentVolumeClaim{pvcSetSelectedNode(provisionedPVC, "wrong-node")}, shouldFail: true, }, "binding-bound-provisioning-unbound": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, addProvisionAnn(provisionedPVC)}, + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, addProvisionAnn(provisionedPVC)}, + }, + "tolerate-provisioning-pvc-bound-pv-not-found": { + initPVs: []*v1.PersistentVolume{pvNode1a}, + initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + bindings: []*bindingInfo{}, + provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + apiPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVCBound)}, + deletePVs: true, }, } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for name, scenario := range scenarios { klog.V(4).Infof("Running test case %q", name) // Setup pod := makePod(nil) - testEnv := newTestBinder(t) + testEnv := newTestBinder(t, ctx.Done()) testEnv.initNodes([]*v1.Node{node1}) - testEnv.initVolumes(scenario.cachedPVs, nil) - testEnv.initClaims(scenario.cachedPVCs, nil) + testEnv.initVolumes(scenario.initPVs, nil) + testEnv.initClaims(scenario.initPVCs, nil) + testEnv.assumeVolumes(t, name, "node1", pod, scenario.bindings, scenario.provisionedPVCs) + + // Before execute + if scenario.deletePVs { + testEnv.deleteVolumes(scenario.initPVs) + } else { + testEnv.updateVolumes(t, scenario.apiPVs, true) + } + if scenario.deletePVCs { + testEnv.deleteClaims(scenario.initPVCs) + } else { + testEnv.updateClaims(t, scenario.apiPVCs, true) + } // Execute allBound, err := testEnv.internalBinder.checkBindings(pod, scenario.bindings, scenario.provisionedPVCs) @@ -1302,63 +1439,96 @@ func TestCheckBindings(t *testing.T) { } func TestBindPodVolumes(t *testing.T) { - scenarios := map[string]struct { + type scenarioType struct { // Inputs - // These tests only support a single pv and pvc and static binding - bindingsNil bool // Pass in nil bindings slice - binding *bindingInfo - cachedPVs []*v1.PersistentVolume - cachedPVCs []*v1.PersistentVolumeClaim - provisionedPVCs []*v1.PersistentVolumeClaim - apiPVs []*v1.PersistentVolume - nodes []*v1.Node + bindingsNil bool // Pass in nil bindings slice + + nodes []*v1.Node + + // before assume + initPVs []*v1.PersistentVolume + initPVCs []*v1.PersistentVolumeClaim + + // assume PV & PVC with these binding results + binding *bindingInfo + claimToProvision *v1.PersistentVolumeClaim + + // API updates after assume before bind + apiPV *v1.PersistentVolume + apiPVC *v1.PersistentVolumeClaim // This function runs with a delay of 5 seconds - delayFunc func(*testing.T, *testEnv, *v1.Pod, *bindingInfo, []*v1.PersistentVolumeClaim) + delayFunc func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) // Expected return values shouldFail bool - }{ + } + + scenarios := map[string]scenarioType{ "nothing-to-bind-nil": { bindingsNil: true, shouldFail: true, }, "nothing-to-bind-empty": {}, "already-bound": { - binding: binding1aBound, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + binding: makeBinding(unboundPVC, pvNode1aBound), + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, }, - "binding-succeeds-after-time": { - binding: binding1aBound, - cachedPVs: []*v1.PersistentVolume{pvNode1a}, - cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) { + "binding-static-pv-succeeds-after-time": { + initPVs: []*v1.PersistentVolume{pvNode1a}, + initPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + binding: makeBinding(unboundPVC, pvNode1aBound), + shouldFail: false, // Will succeed after PVC is fully bound to this PV by pv controller. + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) { + pvc := pvcs[0] + pv := pvs[0] // Update PVC to be fully bound to PV - newPVC := binding.pvc.DeepCopy() - newPVC.ResourceVersion = "100" - newPVC.Spec.VolumeName = binding.pv.Name + newPVC := pvc.DeepCopy() + newPVC.Spec.VolumeName = pv.Name metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes") - - // Update pvc cache, fake client doesn't invoke informers - internalBinder, ok := testEnv.binder.(*volumeBinder) - if !ok { - t.Fatalf("Failed to convert to internal binder") + if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil { + t.Errorf("failed to update PVC %q: %v", newPVC.Name, err) } - - pvcCache := internalBinder.pvcCache - internalPVCCache, ok := pvcCache.(*pvcAssumeCache) - if !ok { - t.Fatalf("Failed to convert to internal PVC cache") - } - internalPVCCache.add(newPVC) }, }, + "binding-dynamic-pv-succeeds-after-time": { + claimToProvision: pvcSetSelectedNode(provisionedPVC, "node1"), + initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) { + pvc := pvcs[0] + // Update PVC to be fully bound to PV + newPVC, err := testEnv.client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(pvc.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("failed to get PVC %q: %v", pvc.Name, err) + return + } + dynamicPV := makeTestPV("dynamic-pv", "node1", "1G", "1", newPVC, waitClass) + dynamicPV, err = testEnv.client.CoreV1().PersistentVolumes().Create(dynamicPV) + if err != nil { + t.Errorf("failed to create PV %q: %v", dynamicPV.Name, err) + return + } + newPVC.Spec.VolumeName = dynamicPV.Name + metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes") + if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil { + t.Errorf("failed to update PVC %q: %v", newPVC.Name, err) + } + }, + }, + "bound-by-pv-controller-before-bind": { + initPVs: []*v1.PersistentVolume{pvNode1a}, + initPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + binding: makeBinding(unboundPVC, pvNode1aBound), + apiPV: pvNode1aBound, + apiPVC: boundPVCNode1a, + shouldFail: true, // bindAPIUpdate will fail because API conflict + }, "pod-deleted-after-time": { - binding: binding1aBound, - cachedPVs: []*v1.PersistentVolume{pvNode1a}, - cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) { + binding: makeBinding(unboundPVC, pvNode1aBound), + initPVs: []*v1.PersistentVolume{pvNode1a}, + initPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) { bindingsCache := testEnv.binder.GetBindingsCache() if bindingsCache == nil { t.Fatalf("Failed to get bindings cache") @@ -1376,107 +1546,103 @@ func TestBindPodVolumes(t *testing.T) { shouldFail: true, }, "binding-times-out": { - binding: binding1aBound, - cachedPVs: []*v1.PersistentVolume{pvNode1a}, - cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + binding: makeBinding(unboundPVC, pvNode1aBound), + initPVs: []*v1.PersistentVolume{pvNode1a}, + initPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, shouldFail: true, }, "binding-fails": { - binding: binding1bBound, - cachedPVs: []*v1.PersistentVolume{pvNode1b}, - apiPVs: []*v1.PersistentVolume{pvNode1bBoundHigherVersion}, - cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC2}, + binding: makeBinding(unboundPVC2, pvNode1bBound), + initPVs: []*v1.PersistentVolume{pvNode1b}, + initPVCs: []*v1.PersistentVolumeClaim{unboundPVC2}, shouldFail: true, }, "check-fails": { - binding: binding1aBound, - cachedPVs: []*v1.PersistentVolume{pvNode1a}, - cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) { - // Delete PVC - // Update pvc cache, fake client doesn't invoke informers - internalBinder, ok := testEnv.binder.(*volumeBinder) - if !ok { - t.Fatalf("Failed to convert to internal binder") + binding: makeBinding(unboundPVC, pvNode1aBound), + initPVs: []*v1.PersistentVolume{pvNode1a}, + initPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) { + pvc := pvcs[0] + // Delete PVC will fail check + if err := testEnv.client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, &metav1.DeleteOptions{}); err != nil { + t.Errorf("failed to delete PVC %q: %v", pvc.Name, err) } - - pvcCache := internalBinder.pvcCache - internalPVCCache, ok := pvcCache.(*pvcAssumeCache) - if !ok { - t.Fatalf("Failed to convert to internal PVC cache") - } - internalPVCCache.delete(binding.pvc) }, shouldFail: true, }, "node-affinity-fails": { - binding: binding1aBound, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + binding: makeBinding(unboundPVC, pvNode1aBound), + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, nodes: []*v1.Node{node1NoLabels}, shouldFail: true, }, "node-affinity-fails-dynamic-provisioning": { - cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode2}, - cachedPVCs: []*v1.PersistentVolumeClaim{selectedNodePVC}, - provisionedPVCs: []*v1.PersistentVolumeClaim{selectedNodePVC}, - nodes: []*v1.Node{node1, node2}, - delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) { + initPVs: []*v1.PersistentVolume{pvNode1a, pvNode2}, + initPVCs: []*v1.PersistentVolumeClaim{selectedNodePVC}, + claimToProvision: selectedNodePVC, + nodes: []*v1.Node{node1, node2}, + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) { // Update PVC to be fully bound to a PV with a different node newPVC := pvcs[0].DeepCopy() - newPVC.ResourceVersion = "100" newPVC.Spec.VolumeName = pvNode2.Name metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes") - - // Update PVC cache, fake client doesn't invoke informers - internalBinder, ok := testEnv.binder.(*volumeBinder) - if !ok { - t.Fatalf("Failed to convert to internal binder") + if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil { + t.Errorf("failed to update PVC %q: %v", newPVC.Name, err) } - - pvcCache := internalBinder.pvcCache - internalPVCCache, ok := pvcCache.(*pvcAssumeCache) - if !ok { - t.Fatalf("Failed to convert to internal PVC cache") - } - internalPVCCache.add(newPVC) }, shouldFail: true, }, } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for name, scenario := range scenarios { klog.V(4).Infof("Running test case %q", name) // Setup pod := makePod(nil) - if scenario.apiPVs == nil { - scenario.apiPVs = scenario.cachedPVs - } + testEnv := newTestBinder(t, ctx.Done()) if scenario.nodes == nil { scenario.nodes = []*v1.Node{node1} } - if scenario.provisionedPVCs == nil { - scenario.provisionedPVCs = []*v1.PersistentVolumeClaim{} - } - testEnv := newTestBinder(t) if !scenario.bindingsNil { bindings := []*bindingInfo{} if scenario.binding != nil { bindings = []*bindingInfo{scenario.binding} } + claimsToProvision := []*v1.PersistentVolumeClaim{} + if scenario.claimToProvision != nil { + claimsToProvision = []*v1.PersistentVolumeClaim{scenario.claimToProvision} + } testEnv.initNodes(scenario.nodes) - testEnv.initVolumes(scenario.cachedPVs, scenario.apiPVs) - testEnv.initClaims(scenario.cachedPVCs, nil) - testEnv.assumeVolumes(t, name, "node1", pod, bindings, scenario.provisionedPVCs) + testEnv.initVolumes(scenario.initPVs, scenario.initPVs) + testEnv.initClaims(scenario.initPVCs, scenario.initPVCs) + testEnv.assumeVolumes(t, name, "node1", pod, bindings, claimsToProvision) + } + + // Before Execute + if scenario.apiPV != nil { + _, err := testEnv.client.CoreV1().PersistentVolumes().Update(scenario.apiPV) + if err != nil { + t.Fatalf("Test %q failed: failed to update PV %q", name, scenario.apiPV.Name) + } + } + if scenario.apiPVC != nil { + _, err := testEnv.client.CoreV1().PersistentVolumeClaims(scenario.apiPVC.Namespace).Update(scenario.apiPVC) + if err != nil { + t.Fatalf("Test %q failed: failed to update PVC %q", name, getPVCName(scenario.apiPVC)) + } } if scenario.delayFunc != nil { - go func() { + go func(scenario scenarioType) { time.Sleep(5 * time.Second) + // Sleep a while to run after bindAPIUpdate in BindPodVolumes klog.V(5).Infof("Running delay function") - scenario.delayFunc(t, testEnv, pod, scenario.binding, scenario.provisionedPVCs) - }() + scenario.delayFunc(t, testEnv, pod, scenario.initPVs, scenario.initPVCs) + }(scenario) } // Execute @@ -1498,7 +1664,9 @@ func TestFindAssumeVolumes(t *testing.T) { pvs := []*v1.PersistentVolume{pvNode2, pvNode1a, pvNode1c} // Setup - testEnv := newTestBinder(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + testEnv := newTestBinder(t, ctx.Done()) testEnv.initVolumes(pvs, pvs) testEnv.initClaims(podPVCs, podPVCs) pod := makePod(podPVCs) @@ -1548,6 +1716,6 @@ func TestFindAssumeVolumes(t *testing.T) { if !unboundSatisfied { t.Errorf("Test failed: couldn't find PVs for all PVCs") } - testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings, []*v1.PersistentVolumeClaim{}) + testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings, nil) } } From cfc8ef51d1f57f6dab061fa26fbc14db5dde14e4 Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Tue, 8 Jan 2019 01:19:23 +0800 Subject: [PATCH 3/4] Make volume binder resilient to races: scheduler change There is no need to clear stale pod binding cache in scheduling, because it will be recreated at beginning of each schedule loop, and will be cleared when pod is removed from scheduling queue. --- pkg/scheduler/factory/factory.go | 11 ----------- pkg/scheduler/scheduler.go | 4 ---- 2 files changed, 15 deletions(-) diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index bb2e3ee7a24..034aa3c4287 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -1105,7 +1105,6 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue Namespace: pod.Namespace, Name: pod.Name, } - origPod := pod // When pod priority is enabled, we would like to place an unschedulable // pod in the unschedulable queue. This ensures that if the pod is nominated @@ -1124,21 +1123,11 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue if err == nil { if len(pod.Spec.NodeName) == 0 { podQueue.AddUnschedulableIfNotPresent(pod) - } else { - if c.volumeBinder != nil { - // Volume binder only wants to keep unassigned pods - c.volumeBinder.DeletePodBindings(pod) - } } break } if errors.IsNotFound(err) { klog.Warningf("A pod %v no longer exists", podID) - - if c.volumeBinder != nil { - // Volume binder only wants to keep unassigned pods - c.volumeBinder.DeletePodBindings(origPod) - } return } klog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 999f25b2bfe..af5552a3787 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -383,10 +383,6 @@ func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } - // Volumes may be bound by PV controller asynchronously, we must clear - // stale pod binding cache. - sched.config.VolumeBinder.DeletePodBindings(assumed) - sched.recordSchedulingFailure(assumed, err, "VolumeBindingFailed", err.Error()) return err } From 1a62f53d3fc915a1c99156c8de049fedd2219a41 Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Tue, 8 Jan 2019 02:18:24 +0800 Subject: [PATCH 4/4] If provisioning PVC's PV is not found, check next time. --- .../volume/persistentvolume/scheduler_binder.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder.go b/pkg/controller/volume/persistentvolume/scheduler_binder.go index 737c195611d..c39dd5e2668 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder.go @@ -518,7 +518,15 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim if pvc.Spec.VolumeName != "" { pv, err := b.pvCache.GetAPIPV(pvc.Spec.VolumeName) if err != nil { - return false, fmt.Errorf("failed to get pv %q from cache: %v", pvc.Spec.VolumeName, err) + if _, ok := err.(*errNotFound); ok { + // We tolerate NotFound error here, because PV is possibly + // not found because of API delay, we can check next time. + // And if PV does not exist because it's deleted, PVC will + // be unbound eventually. + return false, nil + } else { + return false, fmt.Errorf("failed to get pv %q from cache: %v", pvc.Spec.VolumeName, err) + } } if err := volumeutil.CheckNodeAffinity(pv, node.Labels); err != nil { return false, fmt.Errorf("pv %q node affinity doesn't match node %q: %v", pv.Name, node.Name, err)