diff --git a/pkg/controller/volume/persistentvolume/BUILD b/pkg/controller/volume/persistentvolume/BUILD index d0c648f5cfb..8e3f145dab3 100644 --- a/pkg/controller/volume/persistentvolume/BUILD +++ b/pkg/controller/volume/persistentvolume/BUILD @@ -44,6 +44,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", @@ -92,6 +93,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/controller/volume/persistentvolume/framework_test.go b/pkg/controller/volume/persistentvolume/framework_test.go index ef632cd5285..7552a2cb44f 100644 --- a/pkg/controller/volume/persistentvolume/framework_test.go +++ b/pkg/controller/volume/persistentvolume/framework_test.go @@ -29,11 +29,13 @@ import ( "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/wait" @@ -136,6 +138,7 @@ type volumeReactor struct { fakeClaimWatch *watch.FakeWatcher lock sync.Mutex errors []reactorError + watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher } // reactorError is an error that is returned by test reactor (=simulated @@ -189,11 +192,34 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj // Store the updated object to appropriate places. r.volumes[volume.Name] = volume + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Add(volume) + } r.changedObjects = append(r.changedObjects, volume) r.changedSinceLastSync++ klog.V(4).Infof("created volume %s", volume.Name) return true, volume, nil + case action.Matches("create", "persistentvolumeclaims"): + obj := action.(core.UpdateAction).GetObject() + claim := obj.(*v1.PersistentVolumeClaim) + + // check the claim does not exist + _, found := r.claims[claim.Name] + if found { + return true, nil, fmt.Errorf("Cannot create claim %s: claim already exists", claim.Name) + } + + // Store the updated object to appropriate places. + r.claims[claim.Name] = claim + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Add(claim) + } + r.changedObjects = append(r.changedObjects, claim) + r.changedSinceLastSync++ + klog.V(4).Infof("created claim %s", claim.Name) + return true, claim, nil + case action.Matches("update", "persistentvolumes"): obj := action.(core.UpdateAction).GetObject() volume := obj.(*v1.PersistentVolume) @@ -206,6 +232,10 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj if storedVer != requestedVer { return true, obj, versionConflictError } + if reflect.DeepEqual(storedVolume, volume) { + klog.V(4).Infof("nothing updated volume %s", volume.Name) + return true, volume, nil + } // Don't modify the existing object volume = volume.DeepCopy() volume.ResourceVersion = strconv.Itoa(storedVer + 1) @@ -214,6 +244,9 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj } // Store the updated object to appropriate places. + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Modify(volume) + } r.volumes[volume.Name] = volume r.changedObjects = append(r.changedObjects, volume) r.changedSinceLastSync++ @@ -232,6 +265,10 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj if storedVer != requestedVer { return true, obj, versionConflictError } + if reflect.DeepEqual(storedClaim, claim) { + klog.V(4).Infof("nothing updated claim %s", claim.Name) + return true, claim, nil + } // Don't modify the existing object claim = claim.DeepCopy() claim.ResourceVersion = strconv.Itoa(storedVer + 1) @@ -240,6 +277,9 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj } // Store the updated object to appropriate places. + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Modify(claim) + } r.claims[claim.Name] = claim r.changedObjects = append(r.changedObjects, claim) r.changedSinceLastSync++ @@ -251,18 +291,32 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj volume, found := r.volumes[name] if found { klog.V(4).Infof("GetVolume: found %s", volume.Name) - return true, volume, nil + return true, volume.DeepCopy(), nil } else { klog.V(4).Infof("GetVolume: volume %s not found", name) return true, nil, fmt.Errorf("Cannot find volume %s", name) } + case action.Matches("get", "persistentvolumeclaims"): + name := action.(core.GetAction).GetName() + claim, found := r.claims[name] + if found { + klog.V(4).Infof("GetClaim: found %s", claim.Name) + return true, claim.DeepCopy(), nil + } else { + klog.V(4).Infof("GetClaim: claim %s not found", name) + return true, nil, apierrs.NewNotFound(action.GetResource().GroupResource(), name) + } + case action.Matches("delete", "persistentvolumes"): name := action.(core.DeleteAction).GetName() klog.V(4).Infof("deleted volume %s", name) - _, found := r.volumes[name] + obj, found := r.volumes[name] if found { delete(r.volumes, name) + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Delete(obj) + } r.changedSinceLastSync++ return true, nil, nil } else { @@ -272,9 +326,12 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj case action.Matches("delete", "persistentvolumeclaims"): name := action.(core.DeleteAction).GetName() klog.V(4).Infof("deleted claim %s", name) - _, found := r.volumes[name] + obj, found := r.claims[name] if found { delete(r.claims, name) + for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) { + w.Delete(obj) + } r.changedSinceLastSync++ return true, nil, nil } else { @@ -285,6 +342,36 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj return false, nil, nil } +// Watch watches objects from the volumeReactor. Watch returns a channel which +// will push added / modified / deleted object. +func (r *volumeReactor) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) { + r.lock.Lock() + defer r.lock.Unlock() + + fakewatcher := watch.NewRaceFreeFake() + + if _, exists := r.watchers[gvr]; !exists { + r.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher) + } + r.watchers[gvr][ns] = append(r.watchers[gvr][ns], fakewatcher) + return fakewatcher, nil +} + +func (r *volumeReactor) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher { + watches := []*watch.RaceFreeFakeWatcher{} + if r.watchers[gvr] != nil { + if w := r.watchers[gvr][ns]; w != nil { + watches = append(watches, w...) + } + if ns != metav1.NamespaceAll { + if w := r.watchers[gvr][metav1.NamespaceAll]; w != nil { + watches = append(watches, w...) + } + } + } + return watches +} + // injectReactError returns an error when the test requested given action to // fail. nil is returned otherwise. func (r *volumeReactor) injectReactError(action core.Action) error { @@ -596,11 +683,14 @@ func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, fakeVolumeWatch: fakeVolumeWatch, fakeClaimWatch: fakeClaimWatch, errors: errors, + watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher), } client.AddReactor("create", "persistentvolumes", reactor.React) + client.AddReactor("create", "persistentvolumeclaims", reactor.React) client.AddReactor("update", "persistentvolumes", reactor.React) client.AddReactor("update", "persistentvolumeclaims", reactor.React) client.AddReactor("get", "persistentvolumes", reactor.React) + client.AddReactor("get", "persistentvolumeclaims", reactor.React) client.AddReactor("delete", "persistentvolumes", reactor.React) client.AddReactor("delete", "persistentvolumeclaims", reactor.React) diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index c6e195d79ce..3fe0ed90c5a 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -285,15 +285,16 @@ func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVo return nil } -func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) { +func (ctrl *PersistentVolumeController) isDelayBindingProvisioning(claim *v1.PersistentVolumeClaim) bool { // When feature VolumeScheduling enabled, // Scheduler signal to the PV controller to start dynamic // provisioning by setting the "annSelectedNode" annotation // in the PVC - if _, ok := claim.Annotations[annSelectedNode]; ok { - return false, nil - } + _, ok := claim.Annotations[annSelectedNode] + return ok +} +func (ctrl *PersistentVolumeController) isDelayBindingMode(claim *v1.PersistentVolumeClaim) (bool, error) { className := v1helper.GetPersistentVolumeClaimClass(claim) if className == "" { return false, nil @@ -311,6 +312,18 @@ func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentV return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil } +// shouldDelayBinding returns true if binding of claim should be delayed, false otherwise. +// If binding of claim should be delayed, only claims pbound by scheduler +func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) { + // If claim has already been assigned a node by scheduler for dynamic provisioning. + if ctrl.isDelayBindingProvisioning(claim) { + return false, nil + } + + // If claim is in delay binding mode. + return ctrl.isDelayBindingMode(claim) +} + // syncUnboundClaim is the main controller method to decide what to do with an // unbound claim. func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error { diff --git a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go index 2cc50a91a5f..cd4bc88d7ca 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go +++ b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go @@ -42,6 +42,9 @@ type AssumeCache interface { // Get the object by name Get(objName string) (interface{}, error) + // Get the API object by name + GetAPIObj(objName string) (interface{}, error) + // List all the objects in the cache List(indexObj interface{}) []interface{} } @@ -250,6 +253,17 @@ func (c *assumeCache) Get(objName string) (interface{}, error) { return objInfo.latestObj, nil } +func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) { + c.rwMutex.RLock() + defer c.rwMutex.RUnlock() + + objInfo, err := c.getObjInfo(objName) + if err != nil { + return nil, err + } + return objInfo.apiObj, nil +} + func (c *assumeCache) List(indexObj interface{}) []interface{} { c.rwMutex.RLock() defer c.rwMutex.RUnlock() @@ -297,7 +311,7 @@ func (c *assumeCache) Assume(obj interface{}) error { } if newVersion < storedVersion { - return fmt.Errorf("%v %q is out of sync", c.description, name) + return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) } // Only update the cached object @@ -325,6 +339,7 @@ type PVAssumeCache interface { AssumeCache GetPV(pvName string) (*v1.PersistentVolume, error) + GetAPIPV(pvName string) (*v1.PersistentVolume, error) ListPVs(storageClassName string) []*v1.PersistentVolume } @@ -356,6 +371,18 @@ func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { return pv, nil } +func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) { + obj, err := c.GetAPIObj(pvName) + if err != nil { + return nil, err + } + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + return nil, &errWrongType{"v1.PersistentVolume", obj} + } + return pv, nil +} + func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume { objs := c.List(&v1.PersistentVolume{ Spec: v1.PersistentVolumeSpec{ @@ -380,6 +407,7 @@ type PVCAssumeCache interface { // GetPVC returns the PVC from the cache with given pvcKey. // pvcKey is the result of MetaNamespaceKeyFunc on PVC obj GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) + GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) } type pvcAssumeCache struct { @@ -402,3 +430,15 @@ func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error } return pvc, nil } + +func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { + obj, err := c.GetAPIObj(pvcKey) + if err != nil { + return nil, err + } + pvc, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + return nil, &errWrongType{"v1.PersistentVolumeClaim", obj} + } + return pvc, nil +} diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder.go b/pkg/controller/volume/persistentvolume/scheduler_binder.go index 1d4918bbcd3..c39dd5e2668 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/storage/etcd" coreinformers "k8s.io/client-go/informers/core/v1" storageinformers "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" @@ -145,6 +146,27 @@ func (b *volumeBinder) GetBindingsCache() PodBindingCache { // This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer. // That's necessary because some operations will need to pass in to the predicate fake node objects. func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) { + var ( + matchedClaims []*bindingInfo + provisionedClaims []*v1.PersistentVolumeClaim + ) + defer func() { + // We recreate bindings for each new schedule loop. + // Although we do not distinguish nil from empty in this function, for + // easier testing, we normalize empty to nil. + if len(matchedClaims) == 0 { + matchedClaims = nil + } + if len(provisionedClaims) == 0 { + provisionedClaims = nil + } + // TODO merge into one atomic function + // Mark cache with all the matches for each PVC for this node + b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims) + // Mark cache with all the PVCs that need provisioning for this node + b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims) + }() + podName := getPodName(pod) // Warning: Below log needs high verbosity as it can be printed several times (#60933). @@ -181,16 +203,39 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume } } + // Find matching volumes and node for unbound claims if len(claimsToBind) > 0 { - var claimsToProvision []*v1.PersistentVolumeClaim - unboundVolumesSatisfied, claimsToProvision, err = b.findMatchingVolumes(pod, claimsToBind, node) - if err != nil { - return false, false, err + var ( + claimsToFindMatching []*v1.PersistentVolumeClaim + claimsToProvision []*v1.PersistentVolumeClaim + ) + + // Filter out claims to provision + for _, claim := range claimsToBind { + if selectedNode, ok := claim.Annotations[annSelectedNode]; ok { + if selectedNode != node.Name { + // Fast path, skip unmatched node + return false, boundVolumesSatisfied, nil + } + claimsToProvision = append(claimsToProvision, claim) + } else { + claimsToFindMatching = append(claimsToFindMatching, claim) + } } - // Try to provision for unbound volumes - if !unboundVolumesSatisfied { - unboundVolumesSatisfied, err = b.checkVolumeProvisions(pod, claimsToProvision, node) + // Find matching volumes + if len(claimsToFindMatching) > 0 { + var unboundClaims []*v1.PersistentVolumeClaim + unboundVolumesSatisfied, matchedClaims, unboundClaims, err = b.findMatchingVolumes(pod, claimsToFindMatching, node) + if err != nil { + return false, false, err + } + claimsToProvision = append(claimsToProvision, unboundClaims...) + } + + // Check for claims to provision + if len(claimsToProvision) > 0 { + unboundVolumesSatisfied, provisionedClaims, err = b.checkVolumeProvisions(pod, claimsToProvision, node) if err != nil { return false, false, err } @@ -304,10 +349,8 @@ func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) (err error) { } return wait.Poll(time.Second, b.bindTimeout, func() (bool, error) { - // Get cached values every time in case the pod gets deleted - bindings = b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName) - claimsToProvision = b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName) - return b.checkBindings(assumedPod, bindings, claimsToProvision) + b, err := b.checkBindings(assumedPod, bindings, claimsToProvision) + return b, err }) } @@ -344,6 +387,7 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl var ( binding *bindingInfo + i int claim *v1.PersistentVolumeClaim ) @@ -352,18 +396,24 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl for _, binding = range bindings { klog.V(5).Infof("bindAPIUpdate: Pod %q, binding PV %q to PVC %q", podName, binding.pv.Name, binding.pvc.Name) // TODO: does it hurt if we make an api call and nothing needs to be updated? - if _, err := b.ctrl.updateBindVolumeToClaim(binding.pv, binding.pvc, false); err != nil { + if newPV, err := b.ctrl.updateBindVolumeToClaim(binding.pv, binding.pvc, false); err != nil { return err + } else { + // Save updated object from apiserver for later checking. + binding.pv = newPV } lastProcessedBinding++ } // Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest // PV controller is expect to signal back by removing related annotations if actual provisioning fails - for _, claim = range claimsToProvision { + for i, claim = range claimsToProvision { klog.V(5).Infof("bindAPIUpdate: Pod %q, PVC %q", podName, getPVCName(claim)) - if _, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil { + if newClaim, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil { return err + } else { + // Save updated object from apiserver for later checking. + claimsToProvision[i] = newClaim } lastProcessedProvisioning++ } @@ -371,12 +421,20 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl return nil } +var ( + versioner = etcd.APIObjectVersioner{} +) + // checkBindings runs through all the PVCs in the Pod and checks: // * if the PVC is fully bound // * if there are any conditions that require binding to fail and be retried // // It returns true when all of the Pod's PVCs are fully bound, and error if // binding (and scheduling) needs to be retried +// Note that it checks on API objects not PV/PVC cache, this is because +// PV/PVC cache can be assumed again in main scheduler loop, we must check +// latest state in API server which are shared with PV controller and +// provisioners func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) (bool, error) { podName := getPodName(pod) if bindings == nil { @@ -391,13 +449,32 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim return false, fmt.Errorf("failed to get node %q: %v", pod.Spec.NodeName, err) } - for _, binding := range bindings { - // Check for any conditions that might require scheduling retry + // Check for any conditions that might require scheduling retry - // Check if pv still exists - pv, err := b.pvCache.GetPV(binding.pv.Name) - if err != nil || pv == nil { - return false, fmt.Errorf("failed to check pv binding: %v", err) + // When pod is removed from scheduling queue because of deletion or any + // other reasons, binding operation should be cancelled. There is no need + // to check PV/PVC bindings any more. + // We check pod binding cache here which will be cleared when pod is + // removed from scheduling queue. + if b.podBindingCache.GetDecisions(pod) == nil { + return false, fmt.Errorf("pod %q does not exist any more", podName) + } + + for _, binding := range bindings { + pv, err := b.pvCache.GetAPIPV(binding.pv.Name) + if err != nil { + return false, fmt.Errorf("failed to check binding: %v", err) + } + + pvc, err := b.pvcCache.GetAPIPVC(getPVCName(binding.pvc)) + if err != nil { + return false, fmt.Errorf("failed to check binding: %v", err) + } + + // Because we updated PV in apiserver, skip if API object is older + // and wait for new API object propagated from apiserver. + if versioner.CompareResourceVersion(binding.pv, pv) > 0 { + return false, nil } // Check PV's node affinity (the node might not have the proper label) @@ -411,18 +488,21 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim } // Check if pvc is fully bound - if isBound, _, err := b.isPVCBound(binding.pvc.Namespace, binding.pvc.Name); !isBound || err != nil { - return false, err + if !b.isPVCFullyBound(pvc) { + return false, nil } - - // TODO; what if pvc is bound to the wrong pv? It means our assume cache should be reverted. - // Or will pv controller cleanup the pv.ClaimRef? } for _, claim := range claimsToProvision { - bound, pvc, err := b.isPVCBound(claim.Namespace, claim.Name) - if err != nil || pvc == nil { - return false, fmt.Errorf("failed to check pvc binding: %v", err) + pvc, err := b.pvcCache.GetAPIPVC(getPVCName(claim)) + if err != nil { + return false, fmt.Errorf("failed to check provisioning pvc: %v", err) + } + + // Because we updated PVC in apiserver, skip if API object is older + // and wait for new API object propagated from apiserver. + if versioner.CompareResourceVersion(claim, pvc) > 0 { + return false, nil } // Check if selectedNode annotation is still set @@ -436,16 +516,25 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim // If the PVC is bound to a PV, check its node affinity if pvc.Spec.VolumeName != "" { - pv, err := b.pvCache.GetPV(pvc.Spec.VolumeName) + pv, err := b.pvCache.GetAPIPV(pvc.Spec.VolumeName) if err != nil { - return false, fmt.Errorf("failed to get pv %q from cache: %v", pvc.Spec.VolumeName, err) + if _, ok := err.(*errNotFound); ok { + // We tolerate NotFound error here, because PV is possibly + // not found because of API delay, we can check next time. + // And if PV does not exist because it's deleted, PVC will + // be unbound eventually. + return false, nil + } else { + return false, fmt.Errorf("failed to get pv %q from cache: %v", pvc.Spec.VolumeName, err) + } } if err := volumeutil.CheckNodeAffinity(pv, node.Labels); err != nil { return false, fmt.Errorf("pv %q node affinity doesn't match node %q: %v", pv.Name, node.Name, err) } } - if !bound { + // Check if pvc is fully bound + if !b.isPVCFullyBound(pvc) { return false, nil } } @@ -477,19 +566,21 @@ func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.Persiste return false, nil, fmt.Errorf("error getting PVC %q: %v", pvcKey, err) } - pvName := pvc.Spec.VolumeName - if pvName != "" { - if metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted) { - klog.V(5).Infof("PVC %q is fully bound to PV %q", pvcKey, pvName) - return true, pvc, nil + fullyBound := b.isPVCFullyBound(pvc) + if fullyBound { + klog.V(5).Infof("PVC %q is fully bound to PV %q", pvcKey, pvc.Spec.VolumeName) + } else { + if pvc.Spec.VolumeName != "" { + klog.V(5).Infof("PVC %q is not fully bound to PV %q", pvcKey, pvc.Spec.VolumeName) } else { - klog.V(5).Infof("PVC %q is not fully bound to PV %q", pvcKey, pvName) - return false, pvc, nil + klog.V(5).Infof("PVC %q is not bound", pvcKey) } } + return fullyBound, pvc, nil +} - klog.V(5).Infof("PVC %q is not bound", pvcKey) - return false, pvc, nil +func (b *volumeBinder) isPVCFullyBound(pvc *v1.PersistentVolumeClaim) bool { + return pvc.Spec.VolumeName != "" && metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted) } // arePodVolumesBound returns true if all volumes are fully bound @@ -503,12 +594,12 @@ func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool { return true } -// getPodVolumes returns a pod's PVCs separated into bound (including prebound), unbound with delayed binding, -// and unbound with immediate binding -func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaims []*bindingInfo, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) { +// getPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning) +// and unbound with immediate binding (including prebound) +func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaims []*v1.PersistentVolumeClaim, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) { boundClaims = []*v1.PersistentVolumeClaim{} unboundClaimsImmediate = []*v1.PersistentVolumeClaim{} - unboundClaims = []*bindingInfo{} + unboundClaims = []*v1.PersistentVolumeClaim{} for _, vol := range pod.Spec.Volumes { volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol) @@ -521,15 +612,16 @@ func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV if volumeBound { boundClaims = append(boundClaims, pvc) } else { - delayBinding, err := b.ctrl.shouldDelayBinding(pvc) + delayBindingMode, err := b.ctrl.isDelayBindingMode(pvc) if err != nil { return nil, nil, nil, err } // Prebound PVCs are treated as unbound immediate binding - if delayBinding && pvc.Spec.VolumeName == "" { + if delayBindingMode && pvc.Spec.VolumeName == "" { // Scheduler path - unboundClaims = append(unboundClaims, &bindingInfo{pvc: pvc}) + unboundClaims = append(unboundClaims, pvc) } else { + // !delayBindingMode || pvc.Spec.VolumeName != "" // Immediate binding should have already been bound unboundClaimsImmediate = append(unboundClaimsImmediate, pvc) } @@ -560,7 +652,7 @@ func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node // findMatchingVolumes tries to find matching volumes for given claims, // and return unbound claims for further provision. -func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, unboundClaims []*v1.PersistentVolumeClaim, err error) { +func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (foundMatches bool, matchedClaims []*bindingInfo, unboundClaims []*v1.PersistentVolumeClaim, err error) { podName := getPodName(pod) // Sort all the claims by increasing size request to get the smallest fits sort.Sort(byPVCSize(claimsToBind)) @@ -568,39 +660,34 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI chosenPVs := map[string]*v1.PersistentVolume{} foundMatches = true - matchedClaims := []*bindingInfo{} + matchedClaims = []*bindingInfo{} - for _, bindingInfo := range claimsToBind { + for _, pvc := range claimsToBind { // Get storage class name from each PVC storageClassName := "" - storageClass := bindingInfo.pvc.Spec.StorageClassName + storageClass := pvc.Spec.StorageClassName if storageClass != nil { storageClassName = *storageClass } allPVs := b.pvCache.ListPVs(storageClassName) - pvcName := getPVCName(bindingInfo.pvc) + pvcName := getPVCName(pvc) // Find a matching PV - bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true) + pv, err := findMatchingVolume(pvc, allPVs, node, chosenPVs, true) if err != nil { - return false, nil, err + return false, nil, nil, err } - if bindingInfo.pv == nil { + if pv == nil { klog.V(4).Infof("No matching volumes for Pod %q, PVC %q on node %q", podName, pvcName, node.Name) - unboundClaims = append(unboundClaims, bindingInfo.pvc) + unboundClaims = append(unboundClaims, pvc) foundMatches = false continue } // matching PV needs to be excluded so we don't select it again - chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv - matchedClaims = append(matchedClaims, bindingInfo) - klog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", bindingInfo.pv.Name, pvcName, node.Name, podName) - } - - // Mark cache with all the matches for each PVC for this node - if len(matchedClaims) > 0 { - b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims) + chosenPVs[pv.Name] = pv + matchedClaims = append(matchedClaims, &bindingInfo{pv: pv, pvc: pvc}) + klog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", pv.Name, pvcName, node.Name, podName) } if foundMatches { @@ -613,31 +700,31 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI // checkVolumeProvisions checks given unbound claims (the claims have gone through func // findMatchingVolumes, and do not have matching volumes for binding), and return true // if all of the claims are eligible for dynamic provision. -func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, err error) { +func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, provisionedClaims []*v1.PersistentVolumeClaim, err error) { podName := getPodName(pod) - provisionedClaims := []*v1.PersistentVolumeClaim{} + provisionedClaims = []*v1.PersistentVolumeClaim{} for _, claim := range claimsToProvision { pvcName := getPVCName(claim) className := v1helper.GetPersistentVolumeClaimClass(claim) if className == "" { - return false, fmt.Errorf("no class for claim %q", pvcName) + return false, nil, fmt.Errorf("no class for claim %q", pvcName) } class, err := b.ctrl.classLister.Get(className) if err != nil { - return false, fmt.Errorf("failed to find storage class %q", className) + return false, nil, fmt.Errorf("failed to find storage class %q", className) } provisioner := class.Provisioner if provisioner == "" || provisioner == notSupportedProvisioner { klog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, pvcName) - return false, nil + return false, nil, nil } // Check if the node can satisfy the topology requirement in the class if !v1helper.MatchTopologySelectorTerms(class.AllowedTopologies, labels.Set(node.Labels)) { klog.V(4).Infof("Node %q cannot satisfy provisioning topology requirements of claim %q", node.Name, pvcName) - return false, nil + return false, nil, nil } // TODO: Check if capacity of the node domain in the storage class @@ -648,10 +735,7 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v } klog.V(4).Infof("Provisioning for claims of pod %q that has no matching volumes on node %q ...", podName, node.Name) - // Mark cache with all the PVCs that need provisioning for this node - b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims) - - return true, nil + return true, provisionedClaims, nil } func (b *volumeBinder) revertAssumedPVs(bindings []*bindingInfo) { @@ -674,7 +758,7 @@ type bindingInfo struct { pv *v1.PersistentVolume } -type byPVCSize []*bindingInfo +type byPVCSize []*v1.PersistentVolumeClaim func (a byPVCSize) Len() int { return len(a) @@ -685,8 +769,8 @@ func (a byPVCSize) Swap(i, j int) { } func (a byPVCSize) Less(i, j int) bool { - iSize := a[i].pvc.Spec.Resources.Requests[v1.ResourceStorage] - jSize := a[j].pvc.Spec.Resources.Requests[v1.ResourceStorage] + iSize := a[i].Spec.Resources.Requests[v1.ResourceStorage] + jSize := a[j].Spec.Resources.Requests[v1.ResourceStorage] // return true if iSize is less than jSize return iSize.Cmp(jSize) == -1 } diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go b/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go index b95538304ad..f32c1269b54 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go @@ -44,6 +44,9 @@ type PodBindingCache interface { // means that no provisioning operations are needed. GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim + // GetDecisions will return all cached decisions for the given pod. + GetDecisions(pod *v1.Pod) nodeDecisions + // DeleteBindings will remove all cached bindings and provisionings for the given pod. // TODO: separate the func if it is needed to delete bindings/provisionings individually DeleteBindings(pod *v1.Pod) @@ -72,6 +75,17 @@ func NewPodBindingCache() PodBindingCache { return &podBindingCache{bindingDecisions: map[string]nodeDecisions{}} } +func (c *podBindingCache) GetDecisions(pod *v1.Pod) nodeDecisions { + c.rwMutex.Lock() + defer c.rwMutex.Unlock() + podName := getPodName(pod) + decisions, ok := c.bindingDecisions[podName] + if !ok { + return nil + } + return decisions +} + func (c *podBindingCache) DeleteBindings(pod *v1.Pod) { c.rwMutex.Lock() defer c.rwMutex.Unlock() diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_test.go b/pkg/controller/volume/persistentvolume/scheduler_binder_test.go index 789e464fc19..a3792ecec7c 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_test.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder_test.go @@ -17,6 +17,7 @@ limitations under the License. package persistentvolume import ( + "context" "fmt" "reflect" "testing" @@ -28,10 +29,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" "k8s.io/klog" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/controller" @@ -75,14 +79,6 @@ var ( pvBoundImmediate = makeTestPV("pv-bound-immediate", "node1", "1G", "1", immediateBoundPVC, immediateClass) pvBoundImmediateNode2 = makeTestPV("pv-bound-immediate", "node2", "1G", "1", immediateBoundPVC, immediateClass) - // PVC/PV bindings for manual binding - binding1a = makeBinding(unboundPVC, pvNode1a) - binding1b = makeBinding(unboundPVC2, pvNode1b) - bindingNoNode = makeBinding(unboundPVC, pvNoNode) - bindingBad = makeBinding(badPVC, pvNode1b) - binding1aBound = makeBinding(unboundPVC, pvNode1aBound) - binding1bBound = makeBinding(unboundPVC2, pvNode1bBound) - // storage class names waitClass = "waitClass" immediateClass = "immediateClass" @@ -109,15 +105,24 @@ type testEnv struct { internalPVCCache *pvcAssumeCache } -func newTestBinder(t *testing.T) *testEnv { +func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv { client := &fake.Clientset{} reactor := newVolumeReactor(client, nil, nil, nil, nil) + // TODO refactor all tests to use real watch mechanism, see #72327 + client.AddWatchReactor("*", func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) { + gvr := action.GetResource() + ns := action.GetNamespace() + watch, err := reactor.Watch(gvr, ns) + if err != nil { + return false, nil, err + } + return true, watch, nil + }) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) nodeInformer := informerFactory.Core().V1().Nodes() pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() classInformer := informerFactory.Storage().V1().StorageClasses() - binder := NewVolumeBinder( client, nodeInformer, @@ -126,6 +131,14 @@ func newTestBinder(t *testing.T) *testEnv { classInformer, 10*time.Second) + // Wait for informers cache sync + informerFactory.Start(stopCh) + for v, synced := range informerFactory.WaitForCacheSync(stopCh) { + if !synced { + klog.Fatalf("Error syncing informer for %v", v) + } + } + // Add storageclasses waitMode := storagev1.VolumeBindingWaitForFirstConsumer immediateMode := storagev1.VolumeBindingImmediate @@ -247,6 +260,66 @@ func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.P } +func (env *testEnv) updateVolumes(t *testing.T, pvs []*v1.PersistentVolume, waitCache bool) { + for _, pv := range pvs { + if _, err := env.client.CoreV1().PersistentVolumes().Update(pv); err != nil { + t.Fatalf("failed to update PV %q", pv.Name) + } + } + if waitCache { + wait.Poll(100*time.Millisecond, 3*time.Second, func() (bool, error) { + for _, pv := range pvs { + obj, err := env.internalPVCache.GetAPIObj(pv.Name) + if obj == nil || err != nil { + return false, nil + } + pvInCache, ok := obj.(*v1.PersistentVolume) + if !ok { + return false, fmt.Errorf("PV %s invalid object", pvInCache.Name) + } + return versioner.CompareResourceVersion(pvInCache, pv) == 0, nil + } + return true, nil + }) + } +} + +func (env *testEnv) updateClaims(t *testing.T, pvcs []*v1.PersistentVolumeClaim, waitCache bool) { + for _, pvc := range pvcs { + if _, err := env.client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Update(pvc); err != nil { + t.Fatalf("failed to update PVC %q", getPVCName(pvc)) + } + } + if waitCache { + wait.Poll(100*time.Millisecond, 3*time.Second, func() (bool, error) { + for _, pvc := range pvcs { + obj, err := env.internalPVCCache.GetAPIObj(getPVCName(pvc)) + if obj == nil || err != nil { + return false, nil + } + pvcInCache, ok := obj.(*v1.PersistentVolumeClaim) + if !ok { + return false, fmt.Errorf("PVC %s invalid object", pvcInCache.Name) + } + return versioner.CompareResourceVersion(pvcInCache, pvc) == 0, nil + } + return true, nil + }) + } +} + +func (env *testEnv) deleteVolumes(pvs []*v1.PersistentVolume) { + for _, pv := range pvs { + env.internalPVCache.delete(pv) + } +} + +func (env *testEnv) deleteClaims(pvcs []*v1.PersistentVolumeClaim) { + for _, pvc := range pvcs { + env.internalPVCCache.delete(pvc) + } +} + func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) { pvCache := env.internalBinder.pvCache for _, binding := range bindings { @@ -540,7 +613,7 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV func pvcSetSelectedNode(pvc *v1.PersistentVolumeClaim, node string) *v1.PersistentVolumeClaim { newPVC := pvc.DeepCopy() - metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annSelectedNode, node) + metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annSelectedNode, node) return newPVC } @@ -676,7 +749,7 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) { "unbound-pvc,pv-same-node": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, pvs: []*v1.PersistentVolume{pvNode2, pvNode1a, pvNode1b}, - expectedBindings: []*bindingInfo{binding1a}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)}, expectedUnbound: true, expectedBound: true, }, @@ -689,28 +762,28 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) { "two-unbound-pvcs": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2}, pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, - expectedBindings: []*bindingInfo{binding1a, binding1b}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(unboundPVC2, pvNode1b)}, expectedUnbound: true, expectedBound: true, }, "two-unbound-pvcs,order-by-size": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC2, unboundPVC}, pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, - expectedBindings: []*bindingInfo{binding1a, binding1b}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(unboundPVC2, pvNode1b)}, expectedUnbound: true, expectedBound: true, }, "two-unbound-pvcs,partial-match": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2}, pvs: []*v1.PersistentVolume{pvNode1a}, - expectedBindings: []*bindingInfo{binding1a}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)}, expectedUnbound: false, expectedBound: true, }, "one-bound,one-unbound": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, boundPVC}, pvs: []*v1.PersistentVolume{pvBound, pvNode1a}, - expectedBindings: []*bindingInfo{binding1a}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)}, expectedUnbound: true, expectedBound: true, }, @@ -767,11 +840,14 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) { }, } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for name, scenario := range scenarios { klog.V(5).Infof("Running test case %q", name) // Setup - testEnv := newTestBinder(t) + testEnv := newTestBinder(t, ctx.Done()) testEnv.initVolumes(scenario.pvs, scenario.pvs) // a. Init pvc cache @@ -833,7 +909,7 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) { "two-unbound-pvcs,one-matched,one-provisioned": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC}, pvs: []*v1.PersistentVolume{pvNode1a}, - expectedBindings: []*bindingInfo{binding1a}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)}, expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC}, expectedUnbound: true, expectedBound: true, @@ -845,6 +921,13 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) { expectedUnbound: true, expectedBound: true, }, + "one-binding,one-selected-node": { + podPVCs: []*v1.PersistentVolumeClaim{boundPVC, selectedNodePVC}, + pvs: []*v1.PersistentVolume{pvBound}, + expectedProvisions: []*v1.PersistentVolumeClaim{selectedNodePVC}, + expectedUnbound: true, + expectedBound: true, + }, "immediate-unbound-pvc": { podPVCs: []*v1.PersistentVolumeClaim{immediateUnboundPVC}, expectedUnbound: false, @@ -879,9 +962,12 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) { }, } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for name, scenario := range scenarios { // Setup - testEnv := newTestBinder(t) + testEnv := newTestBinder(t, ctx.Done()) testEnv.initVolumes(scenario.pvs, scenario.pvs) // a. Init pvc cache @@ -937,59 +1023,62 @@ func TestAssumePodVolumes(t *testing.T) { }, "one-binding": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - bindings: []*bindingInfo{binding1a}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)}, pvs: []*v1.PersistentVolume{pvNode1a}, - expectedBindings: []*bindingInfo{binding1aBound}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, expectedProvisionings: []*v1.PersistentVolumeClaim{}, }, "two-bindings": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2}, - bindings: []*bindingInfo{binding1a, binding1b}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(unboundPVC2, pvNode1b)}, pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, - expectedBindings: []*bindingInfo{binding1aBound, binding1bBound}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound), makeBinding(unboundPVC2, pvNode1bBound)}, expectedProvisionings: []*v1.PersistentVolumeClaim{}, }, "pv-already-bound": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, pvs: []*v1.PersistentVolume{pvNode1aBound}, - expectedBindings: []*bindingInfo{binding1aBound}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, expectedProvisionings: []*v1.PersistentVolumeClaim{}, }, "claimref-failed": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - bindings: []*bindingInfo{binding1a, bindingBad}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(badPVC, pvNode1b)}, pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, shouldFail: true, }, "tmpupdate-failed": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - bindings: []*bindingInfo{binding1a, binding1b}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(unboundPVC2, pvNode1b)}, pvs: []*v1.PersistentVolume{pvNode1a}, shouldFail: true, }, "one-binding, one-pvc-provisioned": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC}, - bindings: []*bindingInfo{binding1a}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)}, pvs: []*v1.PersistentVolume{pvNode1a}, provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, - expectedBindings: []*bindingInfo{binding1aBound}, + expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, expectedProvisionings: []*v1.PersistentVolumeClaim{selectedNodePVC}, }, "one-binding, one-provision-tmpupdate-failed": { podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVCHigherVersion}, - bindings: []*bindingInfo{binding1a}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)}, pvs: []*v1.PersistentVolume{pvNode1a}, provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC2}, shouldFail: true, }, } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for name, scenario := range scenarios { klog.V(5).Infof("Running test case %q", name) // Setup - testEnv := newTestBinder(t) + testEnv := newTestBinder(t, ctx.Done()) testEnv.initClaims(scenario.podPVCs, scenario.podPVCs) pod := makePod(scenario.podPVCs) testEnv.initPodCache(pod, "node1", scenario.bindings, scenario.provisionedPVCs) @@ -1062,25 +1151,25 @@ func TestBindAPIUpdate(t *testing.T) { provisionedPVCs: []*v1.PersistentVolumeClaim{}, }, "one-binding": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, cachedPVs: []*v1.PersistentVolume{pvNode1a}, expectedPVs: []*v1.PersistentVolume{pvNode1aBound}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, }, "two-bindings": { - bindings: []*bindingInfo{binding1aBound, binding1bBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound), makeBinding(unboundPVC2, pvNode1bBound)}, cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, }, "api-already-updated": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, expectedPVs: []*v1.PersistentVolume{pvNode1aBound}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, }, "api-update-failed": { - bindings: []*bindingInfo{binding1aBound, binding1bBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound), makeBinding(unboundPVC2, pvNode1bBound)}, cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, apiPVs: []*v1.PersistentVolume{pvNode1a, pvNode1bBoundHigherVersion}, expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1b}, @@ -1104,7 +1193,7 @@ func TestBindAPIUpdate(t *testing.T) { shouldFail: true, }, "binding-succeed, provision-api-update-failed": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, cachedPVs: []*v1.PersistentVolume{pvNode1a}, expectedPVs: []*v1.PersistentVolume{pvNode1aBound}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), addProvisionAnn(provisionedPVC2)}, @@ -1115,11 +1204,15 @@ func TestBindAPIUpdate(t *testing.T) { shouldFail: true, }, } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for name, scenario := range scenarios { klog.V(4).Infof("Running test case %q", name) // Setup - testEnv := newTestBinder(t) + testEnv := newTestBinder(t, ctx.Done()) pod := makePod(nil) if scenario.apiPVs == nil { scenario.apiPVs = scenario.cachedPVs @@ -1155,11 +1248,19 @@ func TestBindAPIUpdate(t *testing.T) { func TestCheckBindings(t *testing.T) { scenarios := map[string]struct { // Inputs - bindings []*bindingInfo - cachedPVs []*v1.PersistentVolume + initPVs []*v1.PersistentVolume + initPVCs []*v1.PersistentVolumeClaim + bindings []*bindingInfo provisionedPVCs []*v1.PersistentVolumeClaim - cachedPVCs []*v1.PersistentVolumeClaim + + // api updates before checking + apiPVs []*v1.PersistentVolume + apiPVCs []*v1.PersistentVolumeClaim + + // delete objects before checking + deletePVs bool + deletePVCs bool // Expected return values shouldFail bool @@ -1182,108 +1283,144 @@ func TestCheckBindings(t *testing.T) { expectedBound: true, }, "binding-bound": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, expectedBound: true, }, "binding-prebound": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{preboundPVCNode1a}, + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{preboundPVCNode1a}, }, "binding-unbound": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, }, "binding-pvc-not-exists": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVs: []*v1.PersistentVolume{pvNode1aBound}, shouldFail: true, }, "binding-pv-not-exists": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + deletePVs: true, shouldFail: true, }, "binding-claimref-nil": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVs: []*v1.PersistentVolume{pvNode1a}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + initPVs: []*v1.PersistentVolume{pvNode1a}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + apiPVs: []*v1.PersistentVolume{pvNode1a}, + apiPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, shouldFail: true, }, "binding-claimref-uid-empty": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVs: []*v1.PersistentVolume{pvRemoveClaimUID(pvNode1aBound)}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + apiPVs: []*v1.PersistentVolume{pvRemoveClaimUID(pvNode1aBound)}, + apiPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, shouldFail: true, }, "binding-one-bound,one-unbound": { - bindings: []*bindingInfo{binding1aBound, binding1bBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound), makeBinding(unboundPVC2, pvNode1bBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{}, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, unboundPVC2}, + initPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, unboundPVC2}, }, "provisioning-pvc-bound": { bindings: []*bindingInfo{}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, - cachedPVs: []*v1.PersistentVolume{pvBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVCBound)}, + initPVs: []*v1.PersistentVolume{pvBound}, + initPVCs: []*v1.PersistentVolumeClaim{provisionedPVCBound}, + apiPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVCBound)}, expectedBound: true, }, "provisioning-pvc-unbound": { bindings: []*bindingInfo{}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, - cachedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + initPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, }, "provisioning-pvc-not-exists": { bindings: []*bindingInfo{}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + deletePVCs: true, shouldFail: true, }, "provisioning-pvc-annotations-nil": { bindings: []*bindingInfo{}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, - cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + apiPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, shouldFail: true, }, "provisioning-pvc-selected-node-dropped": { bindings: []*bindingInfo{}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, - cachedPVCs: []*v1.PersistentVolumeClaim{pvcSetEmptyAnnotations(provisionedPVC)}, + initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + apiPVCs: []*v1.PersistentVolumeClaim{pvcSetEmptyAnnotations(provisionedPVC)}, shouldFail: true, }, "provisioning-pvc-selected-node-wrong-node": { + initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, bindings: []*bindingInfo{}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, - cachedPVCs: []*v1.PersistentVolumeClaim{pvcSetSelectedNode(provisionedPVC, "wrong-node")}, + apiPVCs: []*v1.PersistentVolumeClaim{pvcSetSelectedNode(provisionedPVC, "wrong-node")}, shouldFail: true, }, "binding-bound-provisioning-unbound": { - bindings: []*bindingInfo{binding1aBound}, + bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)}, provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, addProvisionAnn(provisionedPVC)}, + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, addProvisionAnn(provisionedPVC)}, + }, + "tolerate-provisioning-pvc-bound-pv-not-found": { + initPVs: []*v1.PersistentVolume{pvNode1a}, + initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + bindings: []*bindingInfo{}, + provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)}, + apiPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVCBound)}, + deletePVs: true, }, } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for name, scenario := range scenarios { klog.V(4).Infof("Running test case %q", name) // Setup pod := makePod(nil) - testEnv := newTestBinder(t) + testEnv := newTestBinder(t, ctx.Done()) testEnv.initNodes([]*v1.Node{node1}) - testEnv.initVolumes(scenario.cachedPVs, nil) - testEnv.initClaims(scenario.cachedPVCs, nil) + testEnv.initVolumes(scenario.initPVs, nil) + testEnv.initClaims(scenario.initPVCs, nil) + testEnv.assumeVolumes(t, name, "node1", pod, scenario.bindings, scenario.provisionedPVCs) + + // Before execute + if scenario.deletePVs { + testEnv.deleteVolumes(scenario.initPVs) + } else { + testEnv.updateVolumes(t, scenario.apiPVs, true) + } + if scenario.deletePVCs { + testEnv.deleteClaims(scenario.initPVCs) + } else { + testEnv.updateClaims(t, scenario.apiPVCs, true) + } // Execute allBound, err := testEnv.internalBinder.checkBindings(pod, scenario.bindings, scenario.provisionedPVCs) @@ -1302,63 +1439,96 @@ func TestCheckBindings(t *testing.T) { } func TestBindPodVolumes(t *testing.T) { - scenarios := map[string]struct { + type scenarioType struct { // Inputs - // These tests only support a single pv and pvc and static binding - bindingsNil bool // Pass in nil bindings slice - binding *bindingInfo - cachedPVs []*v1.PersistentVolume - cachedPVCs []*v1.PersistentVolumeClaim - provisionedPVCs []*v1.PersistentVolumeClaim - apiPVs []*v1.PersistentVolume - nodes []*v1.Node + bindingsNil bool // Pass in nil bindings slice + + nodes []*v1.Node + + // before assume + initPVs []*v1.PersistentVolume + initPVCs []*v1.PersistentVolumeClaim + + // assume PV & PVC with these binding results + binding *bindingInfo + claimToProvision *v1.PersistentVolumeClaim + + // API updates after assume before bind + apiPV *v1.PersistentVolume + apiPVC *v1.PersistentVolumeClaim // This function runs with a delay of 5 seconds - delayFunc func(*testing.T, *testEnv, *v1.Pod, *bindingInfo, []*v1.PersistentVolumeClaim) + delayFunc func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) // Expected return values shouldFail bool - }{ + } + + scenarios := map[string]scenarioType{ "nothing-to-bind-nil": { bindingsNil: true, shouldFail: true, }, "nothing-to-bind-empty": {}, "already-bound": { - binding: binding1aBound, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + binding: makeBinding(unboundPVC, pvNode1aBound), + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, }, - "binding-succeeds-after-time": { - binding: binding1aBound, - cachedPVs: []*v1.PersistentVolume{pvNode1a}, - cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) { + "binding-static-pv-succeeds-after-time": { + initPVs: []*v1.PersistentVolume{pvNode1a}, + initPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + binding: makeBinding(unboundPVC, pvNode1aBound), + shouldFail: false, // Will succeed after PVC is fully bound to this PV by pv controller. + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) { + pvc := pvcs[0] + pv := pvs[0] // Update PVC to be fully bound to PV - newPVC := binding.pvc.DeepCopy() - newPVC.ResourceVersion = "100" - newPVC.Spec.VolumeName = binding.pv.Name + newPVC := pvc.DeepCopy() + newPVC.Spec.VolumeName = pv.Name metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes") - - // Update pvc cache, fake client doesn't invoke informers - internalBinder, ok := testEnv.binder.(*volumeBinder) - if !ok { - t.Fatalf("Failed to convert to internal binder") + if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil { + t.Errorf("failed to update PVC %q: %v", newPVC.Name, err) } - - pvcCache := internalBinder.pvcCache - internalPVCCache, ok := pvcCache.(*pvcAssumeCache) - if !ok { - t.Fatalf("Failed to convert to internal PVC cache") - } - internalPVCCache.add(newPVC) }, }, + "binding-dynamic-pv-succeeds-after-time": { + claimToProvision: pvcSetSelectedNode(provisionedPVC, "node1"), + initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC}, + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) { + pvc := pvcs[0] + // Update PVC to be fully bound to PV + newPVC, err := testEnv.client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(pvc.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("failed to get PVC %q: %v", pvc.Name, err) + return + } + dynamicPV := makeTestPV("dynamic-pv", "node1", "1G", "1", newPVC, waitClass) + dynamicPV, err = testEnv.client.CoreV1().PersistentVolumes().Create(dynamicPV) + if err != nil { + t.Errorf("failed to create PV %q: %v", dynamicPV.Name, err) + return + } + newPVC.Spec.VolumeName = dynamicPV.Name + metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes") + if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil { + t.Errorf("failed to update PVC %q: %v", newPVC.Name, err) + } + }, + }, + "bound-by-pv-controller-before-bind": { + initPVs: []*v1.PersistentVolume{pvNode1a}, + initPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + binding: makeBinding(unboundPVC, pvNode1aBound), + apiPV: pvNode1aBound, + apiPVC: boundPVCNode1a, + shouldFail: true, // bindAPIUpdate will fail because API conflict + }, "pod-deleted-after-time": { - binding: binding1aBound, - cachedPVs: []*v1.PersistentVolume{pvNode1a}, - cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) { + binding: makeBinding(unboundPVC, pvNode1aBound), + initPVs: []*v1.PersistentVolume{pvNode1a}, + initPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) { bindingsCache := testEnv.binder.GetBindingsCache() if bindingsCache == nil { t.Fatalf("Failed to get bindings cache") @@ -1376,107 +1546,103 @@ func TestBindPodVolumes(t *testing.T) { shouldFail: true, }, "binding-times-out": { - binding: binding1aBound, - cachedPVs: []*v1.PersistentVolume{pvNode1a}, - cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + binding: makeBinding(unboundPVC, pvNode1aBound), + initPVs: []*v1.PersistentVolume{pvNode1a}, + initPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, shouldFail: true, }, "binding-fails": { - binding: binding1bBound, - cachedPVs: []*v1.PersistentVolume{pvNode1b}, - apiPVs: []*v1.PersistentVolume{pvNode1bBoundHigherVersion}, - cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC2}, + binding: makeBinding(unboundPVC2, pvNode1bBound), + initPVs: []*v1.PersistentVolume{pvNode1b}, + initPVCs: []*v1.PersistentVolumeClaim{unboundPVC2}, shouldFail: true, }, "check-fails": { - binding: binding1aBound, - cachedPVs: []*v1.PersistentVolume{pvNode1a}, - cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, - delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) { - // Delete PVC - // Update pvc cache, fake client doesn't invoke informers - internalBinder, ok := testEnv.binder.(*volumeBinder) - if !ok { - t.Fatalf("Failed to convert to internal binder") + binding: makeBinding(unboundPVC, pvNode1aBound), + initPVs: []*v1.PersistentVolume{pvNode1a}, + initPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) { + pvc := pvcs[0] + // Delete PVC will fail check + if err := testEnv.client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, &metav1.DeleteOptions{}); err != nil { + t.Errorf("failed to delete PVC %q: %v", pvc.Name, err) } - - pvcCache := internalBinder.pvcCache - internalPVCCache, ok := pvcCache.(*pvcAssumeCache) - if !ok { - t.Fatalf("Failed to convert to internal PVC cache") - } - internalPVCCache.delete(binding.pvc) }, shouldFail: true, }, "node-affinity-fails": { - binding: binding1aBound, - cachedPVs: []*v1.PersistentVolume{pvNode1aBound}, - cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, + binding: makeBinding(unboundPVC, pvNode1aBound), + initPVs: []*v1.PersistentVolume{pvNode1aBound}, + initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a}, nodes: []*v1.Node{node1NoLabels}, shouldFail: true, }, "node-affinity-fails-dynamic-provisioning": { - cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode2}, - cachedPVCs: []*v1.PersistentVolumeClaim{selectedNodePVC}, - provisionedPVCs: []*v1.PersistentVolumeClaim{selectedNodePVC}, - nodes: []*v1.Node{node1, node2}, - delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) { + initPVs: []*v1.PersistentVolume{pvNode1a, pvNode2}, + initPVCs: []*v1.PersistentVolumeClaim{selectedNodePVC}, + claimToProvision: selectedNodePVC, + nodes: []*v1.Node{node1, node2}, + delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) { // Update PVC to be fully bound to a PV with a different node newPVC := pvcs[0].DeepCopy() - newPVC.ResourceVersion = "100" newPVC.Spec.VolumeName = pvNode2.Name metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes") - - // Update PVC cache, fake client doesn't invoke informers - internalBinder, ok := testEnv.binder.(*volumeBinder) - if !ok { - t.Fatalf("Failed to convert to internal binder") + if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil { + t.Errorf("failed to update PVC %q: %v", newPVC.Name, err) } - - pvcCache := internalBinder.pvcCache - internalPVCCache, ok := pvcCache.(*pvcAssumeCache) - if !ok { - t.Fatalf("Failed to convert to internal PVC cache") - } - internalPVCCache.add(newPVC) }, shouldFail: true, }, } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for name, scenario := range scenarios { klog.V(4).Infof("Running test case %q", name) // Setup pod := makePod(nil) - if scenario.apiPVs == nil { - scenario.apiPVs = scenario.cachedPVs - } + testEnv := newTestBinder(t, ctx.Done()) if scenario.nodes == nil { scenario.nodes = []*v1.Node{node1} } - if scenario.provisionedPVCs == nil { - scenario.provisionedPVCs = []*v1.PersistentVolumeClaim{} - } - testEnv := newTestBinder(t) if !scenario.bindingsNil { bindings := []*bindingInfo{} if scenario.binding != nil { bindings = []*bindingInfo{scenario.binding} } + claimsToProvision := []*v1.PersistentVolumeClaim{} + if scenario.claimToProvision != nil { + claimsToProvision = []*v1.PersistentVolumeClaim{scenario.claimToProvision} + } testEnv.initNodes(scenario.nodes) - testEnv.initVolumes(scenario.cachedPVs, scenario.apiPVs) - testEnv.initClaims(scenario.cachedPVCs, nil) - testEnv.assumeVolumes(t, name, "node1", pod, bindings, scenario.provisionedPVCs) + testEnv.initVolumes(scenario.initPVs, scenario.initPVs) + testEnv.initClaims(scenario.initPVCs, scenario.initPVCs) + testEnv.assumeVolumes(t, name, "node1", pod, bindings, claimsToProvision) + } + + // Before Execute + if scenario.apiPV != nil { + _, err := testEnv.client.CoreV1().PersistentVolumes().Update(scenario.apiPV) + if err != nil { + t.Fatalf("Test %q failed: failed to update PV %q", name, scenario.apiPV.Name) + } + } + if scenario.apiPVC != nil { + _, err := testEnv.client.CoreV1().PersistentVolumeClaims(scenario.apiPVC.Namespace).Update(scenario.apiPVC) + if err != nil { + t.Fatalf("Test %q failed: failed to update PVC %q", name, getPVCName(scenario.apiPVC)) + } } if scenario.delayFunc != nil { - go func() { + go func(scenario scenarioType) { time.Sleep(5 * time.Second) + // Sleep a while to run after bindAPIUpdate in BindPodVolumes klog.V(5).Infof("Running delay function") - scenario.delayFunc(t, testEnv, pod, scenario.binding, scenario.provisionedPVCs) - }() + scenario.delayFunc(t, testEnv, pod, scenario.initPVs, scenario.initPVCs) + }(scenario) } // Execute @@ -1498,7 +1664,9 @@ func TestFindAssumeVolumes(t *testing.T) { pvs := []*v1.PersistentVolume{pvNode2, pvNode1a, pvNode1c} // Setup - testEnv := newTestBinder(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + testEnv := newTestBinder(t, ctx.Done()) testEnv.initVolumes(pvs, pvs) testEnv.initClaims(podPVCs, podPVCs) pod := makePod(podPVCs) @@ -1548,6 +1716,6 @@ func TestFindAssumeVolumes(t *testing.T) { if !unboundSatisfied { t.Errorf("Test failed: couldn't find PVs for all PVCs") } - testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings, []*v1.PersistentVolumeClaim{}) + testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings, nil) } } diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index f82f727d607..91432afbbcf 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -1093,7 +1093,6 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue Namespace: pod.Namespace, Name: pod.Name, } - origPod := pod // When pod priority is enabled, we would like to place an unschedulable // pod in the unschedulable queue. This ensures that if the pod is nominated @@ -1112,21 +1111,11 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue if err == nil { if len(pod.Spec.NodeName) == 0 { podQueue.AddUnschedulableIfNotPresent(pod) - } else { - if c.volumeBinder != nil { - // Volume binder only wants to keep unassigned pods - c.volumeBinder.DeletePodBindings(pod) - } } break } if errors.IsNotFound(err) { klog.Warningf("A pod %v no longer exists", podID) - - if c.volumeBinder != nil { - // Volume binder only wants to keep unassigned pods - c.volumeBinder.DeletePodBindings(origPod) - } return } klog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 38f8e282732..5aae42f798b 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -365,10 +365,6 @@ func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } - // Volumes may be bound by PV controller asynchronously, we must clear - // stale pod binding cache. - sched.config.VolumeBinder.DeletePodBindings(assumed) - sched.recordSchedulingFailure(assumed, err, "VolumeBindingFailed", err.Error()) return err }