Merge pull request #72045 from cofyc/fix71928

Make volume binder resilient to races
This commit is contained in:
Kubernetes Prow Robot 2019-01-11 17:42:32 -08:00 committed by GitHub
commit ccb1e1f26d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 668 additions and 272 deletions

View File

@ -44,6 +44,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
@ -92,6 +93,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -29,11 +29,13 @@ import (
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
@ -136,6 +138,7 @@ type volumeReactor struct {
fakeClaimWatch *watch.FakeWatcher
lock sync.Mutex
errors []reactorError
watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher
}
// reactorError is an error that is returned by test reactor (=simulated
@ -189,11 +192,34 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
// Store the updated object to appropriate places.
r.volumes[volume.Name] = volume
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Add(volume)
}
r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++
klog.V(4).Infof("created volume %s", volume.Name)
return true, volume, nil
case action.Matches("create", "persistentvolumeclaims"):
obj := action.(core.UpdateAction).GetObject()
claim := obj.(*v1.PersistentVolumeClaim)
// check the claim does not exist
_, found := r.claims[claim.Name]
if found {
return true, nil, fmt.Errorf("Cannot create claim %s: claim already exists", claim.Name)
}
// Store the updated object to appropriate places.
r.claims[claim.Name] = claim
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Add(claim)
}
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
klog.V(4).Infof("created claim %s", claim.Name)
return true, claim, nil
case action.Matches("update", "persistentvolumes"):
obj := action.(core.UpdateAction).GetObject()
volume := obj.(*v1.PersistentVolume)
@ -206,6 +232,10 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
if storedVer != requestedVer {
return true, obj, versionConflictError
}
if reflect.DeepEqual(storedVolume, volume) {
klog.V(4).Infof("nothing updated volume %s", volume.Name)
return true, volume, nil
}
// Don't modify the existing object
volume = volume.DeepCopy()
volume.ResourceVersion = strconv.Itoa(storedVer + 1)
@ -214,6 +244,9 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
}
// Store the updated object to appropriate places.
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Modify(volume)
}
r.volumes[volume.Name] = volume
r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++
@ -232,6 +265,10 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
if storedVer != requestedVer {
return true, obj, versionConflictError
}
if reflect.DeepEqual(storedClaim, claim) {
klog.V(4).Infof("nothing updated claim %s", claim.Name)
return true, claim, nil
}
// Don't modify the existing object
claim = claim.DeepCopy()
claim.ResourceVersion = strconv.Itoa(storedVer + 1)
@ -240,6 +277,9 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
}
// Store the updated object to appropriate places.
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Modify(claim)
}
r.claims[claim.Name] = claim
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
@ -251,18 +291,32 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
volume, found := r.volumes[name]
if found {
klog.V(4).Infof("GetVolume: found %s", volume.Name)
return true, volume, nil
return true, volume.DeepCopy(), nil
} else {
klog.V(4).Infof("GetVolume: volume %s not found", name)
return true, nil, fmt.Errorf("Cannot find volume %s", name)
}
case action.Matches("get", "persistentvolumeclaims"):
name := action.(core.GetAction).GetName()
claim, found := r.claims[name]
if found {
klog.V(4).Infof("GetClaim: found %s", claim.Name)
return true, claim.DeepCopy(), nil
} else {
klog.V(4).Infof("GetClaim: claim %s not found", name)
return true, nil, apierrs.NewNotFound(action.GetResource().GroupResource(), name)
}
case action.Matches("delete", "persistentvolumes"):
name := action.(core.DeleteAction).GetName()
klog.V(4).Infof("deleted volume %s", name)
_, found := r.volumes[name]
obj, found := r.volumes[name]
if found {
delete(r.volumes, name)
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Delete(obj)
}
r.changedSinceLastSync++
return true, nil, nil
} else {
@ -272,9 +326,12 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
case action.Matches("delete", "persistentvolumeclaims"):
name := action.(core.DeleteAction).GetName()
klog.V(4).Infof("deleted claim %s", name)
_, found := r.volumes[name]
obj, found := r.claims[name]
if found {
delete(r.claims, name)
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Delete(obj)
}
r.changedSinceLastSync++
return true, nil, nil
} else {
@ -285,6 +342,36 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
return false, nil, nil
}
// Watch watches objects from the volumeReactor. Watch returns a channel which
// will push added / modified / deleted object.
func (r *volumeReactor) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) {
r.lock.Lock()
defer r.lock.Unlock()
fakewatcher := watch.NewRaceFreeFake()
if _, exists := r.watchers[gvr]; !exists {
r.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher)
}
r.watchers[gvr][ns] = append(r.watchers[gvr][ns], fakewatcher)
return fakewatcher, nil
}
func (r *volumeReactor) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher {
watches := []*watch.RaceFreeFakeWatcher{}
if r.watchers[gvr] != nil {
if w := r.watchers[gvr][ns]; w != nil {
watches = append(watches, w...)
}
if ns != metav1.NamespaceAll {
if w := r.watchers[gvr][metav1.NamespaceAll]; w != nil {
watches = append(watches, w...)
}
}
}
return watches
}
// injectReactError returns an error when the test requested given action to
// fail. nil is returned otherwise.
func (r *volumeReactor) injectReactError(action core.Action) error {
@ -596,11 +683,14 @@ func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController,
fakeVolumeWatch: fakeVolumeWatch,
fakeClaimWatch: fakeClaimWatch,
errors: errors,
watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
}
client.AddReactor("create", "persistentvolumes", reactor.React)
client.AddReactor("create", "persistentvolumeclaims", reactor.React)
client.AddReactor("update", "persistentvolumes", reactor.React)
client.AddReactor("update", "persistentvolumeclaims", reactor.React)
client.AddReactor("get", "persistentvolumes", reactor.React)
client.AddReactor("get", "persistentvolumeclaims", reactor.React)
client.AddReactor("delete", "persistentvolumes", reactor.React)
client.AddReactor("delete", "persistentvolumeclaims", reactor.React)

View File

@ -285,15 +285,16 @@ func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVo
return nil
}
func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) {
func (ctrl *PersistentVolumeController) isDelayBindingProvisioning(claim *v1.PersistentVolumeClaim) bool {
// When feature VolumeScheduling enabled,
// Scheduler signal to the PV controller to start dynamic
// provisioning by setting the "annSelectedNode" annotation
// in the PVC
if _, ok := claim.Annotations[annSelectedNode]; ok {
return false, nil
}
_, ok := claim.Annotations[annSelectedNode]
return ok
}
func (ctrl *PersistentVolumeController) isDelayBindingMode(claim *v1.PersistentVolumeClaim) (bool, error) {
className := v1helper.GetPersistentVolumeClaimClass(claim)
if className == "" {
return false, nil
@ -311,6 +312,18 @@ func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentV
return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil
}
// shouldDelayBinding returns true if binding of claim should be delayed, false otherwise.
// If binding of claim should be delayed, only claims pbound by scheduler
func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) {
// If claim has already been assigned a node by scheduler for dynamic provisioning.
if ctrl.isDelayBindingProvisioning(claim) {
return false, nil
}
// If claim is in delay binding mode.
return ctrl.isDelayBindingMode(claim)
}
// syncUnboundClaim is the main controller method to decide what to do with an
// unbound claim.
func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error {

View File

@ -42,6 +42,9 @@ type AssumeCache interface {
// Get the object by name
Get(objName string) (interface{}, error)
// Get the API object by name
GetAPIObj(objName string) (interface{}, error)
// List all the objects in the cache
List(indexObj interface{}) []interface{}
}
@ -250,6 +253,17 @@ func (c *assumeCache) Get(objName string) (interface{}, error) {
return objInfo.latestObj, nil
}
func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
objInfo, err := c.getObjInfo(objName)
if err != nil {
return nil, err
}
return objInfo.apiObj, nil
}
func (c *assumeCache) List(indexObj interface{}) []interface{} {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
@ -297,7 +311,7 @@ func (c *assumeCache) Assume(obj interface{}) error {
}
if newVersion < storedVersion {
return fmt.Errorf("%v %q is out of sync", c.description, name)
return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion)
}
// Only update the cached object
@ -325,6 +339,7 @@ type PVAssumeCache interface {
AssumeCache
GetPV(pvName string) (*v1.PersistentVolume, error)
GetAPIPV(pvName string) (*v1.PersistentVolume, error)
ListPVs(storageClassName string) []*v1.PersistentVolume
}
@ -356,6 +371,18 @@ func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
return pv, nil
}
func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) {
obj, err := c.GetAPIObj(pvName)
if err != nil {
return nil, err
}
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
return nil, &errWrongType{"v1.PersistentVolume", obj}
}
return pv, nil
}
func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
objs := c.List(&v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
@ -380,6 +407,7 @@ type PVCAssumeCache interface {
// GetPVC returns the PVC from the cache with given pvcKey.
// pvcKey is the result of MetaNamespaceKeyFunc on PVC obj
GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
}
type pvcAssumeCache struct {
@ -402,3 +430,15 @@ func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error
}
return pvc, nil
}
func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
obj, err := c.GetAPIObj(pvcKey)
if err != nil {
return nil, err
}
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
}
return pvc, nil
}

View File

@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/etcd"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
@ -145,6 +146,27 @@ func (b *volumeBinder) GetBindingsCache() PodBindingCache {
// This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer.
// That's necessary because some operations will need to pass in to the predicate fake node objects.
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
var (
matchedClaims []*bindingInfo
provisionedClaims []*v1.PersistentVolumeClaim
)
defer func() {
// We recreate bindings for each new schedule loop.
// Although we do not distinguish nil from empty in this function, for
// easier testing, we normalize empty to nil.
if len(matchedClaims) == 0 {
matchedClaims = nil
}
if len(provisionedClaims) == 0 {
provisionedClaims = nil
}
// TODO merge into one atomic function
// Mark cache with all the matches for each PVC for this node
b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims)
// Mark cache with all the PVCs that need provisioning for this node
b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims)
}()
podName := getPodName(pod)
// Warning: Below log needs high verbosity as it can be printed several times (#60933).
@ -181,16 +203,39 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
}
}
// Find matching volumes and node for unbound claims
if len(claimsToBind) > 0 {
var claimsToProvision []*v1.PersistentVolumeClaim
unboundVolumesSatisfied, claimsToProvision, err = b.findMatchingVolumes(pod, claimsToBind, node)
if err != nil {
return false, false, err
var (
claimsToFindMatching []*v1.PersistentVolumeClaim
claimsToProvision []*v1.PersistentVolumeClaim
)
// Filter out claims to provision
for _, claim := range claimsToBind {
if selectedNode, ok := claim.Annotations[annSelectedNode]; ok {
if selectedNode != node.Name {
// Fast path, skip unmatched node
return false, boundVolumesSatisfied, nil
}
claimsToProvision = append(claimsToProvision, claim)
} else {
claimsToFindMatching = append(claimsToFindMatching, claim)
}
}
// Try to provision for unbound volumes
if !unboundVolumesSatisfied {
unboundVolumesSatisfied, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
// Find matching volumes
if len(claimsToFindMatching) > 0 {
var unboundClaims []*v1.PersistentVolumeClaim
unboundVolumesSatisfied, matchedClaims, unboundClaims, err = b.findMatchingVolumes(pod, claimsToFindMatching, node)
if err != nil {
return false, false, err
}
claimsToProvision = append(claimsToProvision, unboundClaims...)
}
// Check for claims to provision
if len(claimsToProvision) > 0 {
unboundVolumesSatisfied, provisionedClaims, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
if err != nil {
return false, false, err
}
@ -304,10 +349,8 @@ func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) (err error) {
}
return wait.Poll(time.Second, b.bindTimeout, func() (bool, error) {
// Get cached values every time in case the pod gets deleted
bindings = b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName)
claimsToProvision = b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName)
return b.checkBindings(assumedPod, bindings, claimsToProvision)
b, err := b.checkBindings(assumedPod, bindings, claimsToProvision)
return b, err
})
}
@ -344,6 +387,7 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl
var (
binding *bindingInfo
i int
claim *v1.PersistentVolumeClaim
)
@ -352,18 +396,24 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl
for _, binding = range bindings {
klog.V(5).Infof("bindAPIUpdate: Pod %q, binding PV %q to PVC %q", podName, binding.pv.Name, binding.pvc.Name)
// TODO: does it hurt if we make an api call and nothing needs to be updated?
if _, err := b.ctrl.updateBindVolumeToClaim(binding.pv, binding.pvc, false); err != nil {
if newPV, err := b.ctrl.updateBindVolumeToClaim(binding.pv, binding.pvc, false); err != nil {
return err
} else {
// Save updated object from apiserver for later checking.
binding.pv = newPV
}
lastProcessedBinding++
}
// Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest
// PV controller is expect to signal back by removing related annotations if actual provisioning fails
for _, claim = range claimsToProvision {
for i, claim = range claimsToProvision {
klog.V(5).Infof("bindAPIUpdate: Pod %q, PVC %q", podName, getPVCName(claim))
if _, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil {
if newClaim, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil {
return err
} else {
// Save updated object from apiserver for later checking.
claimsToProvision[i] = newClaim
}
lastProcessedProvisioning++
}
@ -371,12 +421,20 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl
return nil
}
var (
versioner = etcd.APIObjectVersioner{}
)
// checkBindings runs through all the PVCs in the Pod and checks:
// * if the PVC is fully bound
// * if there are any conditions that require binding to fail and be retried
//
// It returns true when all of the Pod's PVCs are fully bound, and error if
// binding (and scheduling) needs to be retried
// Note that it checks on API objects not PV/PVC cache, this is because
// PV/PVC cache can be assumed again in main scheduler loop, we must check
// latest state in API server which are shared with PV controller and
// provisioners
func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) (bool, error) {
podName := getPodName(pod)
if bindings == nil {
@ -391,13 +449,32 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
return false, fmt.Errorf("failed to get node %q: %v", pod.Spec.NodeName, err)
}
for _, binding := range bindings {
// Check for any conditions that might require scheduling retry
// Check for any conditions that might require scheduling retry
// Check if pv still exists
pv, err := b.pvCache.GetPV(binding.pv.Name)
if err != nil || pv == nil {
return false, fmt.Errorf("failed to check pv binding: %v", err)
// When pod is removed from scheduling queue because of deletion or any
// other reasons, binding operation should be cancelled. There is no need
// to check PV/PVC bindings any more.
// We check pod binding cache here which will be cleared when pod is
// removed from scheduling queue.
if b.podBindingCache.GetDecisions(pod) == nil {
return false, fmt.Errorf("pod %q does not exist any more", podName)
}
for _, binding := range bindings {
pv, err := b.pvCache.GetAPIPV(binding.pv.Name)
if err != nil {
return false, fmt.Errorf("failed to check binding: %v", err)
}
pvc, err := b.pvcCache.GetAPIPVC(getPVCName(binding.pvc))
if err != nil {
return false, fmt.Errorf("failed to check binding: %v", err)
}
// Because we updated PV in apiserver, skip if API object is older
// and wait for new API object propagated from apiserver.
if versioner.CompareResourceVersion(binding.pv, pv) > 0 {
return false, nil
}
// Check PV's node affinity (the node might not have the proper label)
@ -411,18 +488,21 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
}
// Check if pvc is fully bound
if isBound, _, err := b.isPVCBound(binding.pvc.Namespace, binding.pvc.Name); !isBound || err != nil {
return false, err
if !b.isPVCFullyBound(pvc) {
return false, nil
}
// TODO; what if pvc is bound to the wrong pv? It means our assume cache should be reverted.
// Or will pv controller cleanup the pv.ClaimRef?
}
for _, claim := range claimsToProvision {
bound, pvc, err := b.isPVCBound(claim.Namespace, claim.Name)
if err != nil || pvc == nil {
return false, fmt.Errorf("failed to check pvc binding: %v", err)
pvc, err := b.pvcCache.GetAPIPVC(getPVCName(claim))
if err != nil {
return false, fmt.Errorf("failed to check provisioning pvc: %v", err)
}
// Because we updated PVC in apiserver, skip if API object is older
// and wait for new API object propagated from apiserver.
if versioner.CompareResourceVersion(claim, pvc) > 0 {
return false, nil
}
// Check if selectedNode annotation is still set
@ -436,16 +516,25 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
// If the PVC is bound to a PV, check its node affinity
if pvc.Spec.VolumeName != "" {
pv, err := b.pvCache.GetPV(pvc.Spec.VolumeName)
pv, err := b.pvCache.GetAPIPV(pvc.Spec.VolumeName)
if err != nil {
return false, fmt.Errorf("failed to get pv %q from cache: %v", pvc.Spec.VolumeName, err)
if _, ok := err.(*errNotFound); ok {
// We tolerate NotFound error here, because PV is possibly
// not found because of API delay, we can check next time.
// And if PV does not exist because it's deleted, PVC will
// be unbound eventually.
return false, nil
} else {
return false, fmt.Errorf("failed to get pv %q from cache: %v", pvc.Spec.VolumeName, err)
}
}
if err := volumeutil.CheckNodeAffinity(pv, node.Labels); err != nil {
return false, fmt.Errorf("pv %q node affinity doesn't match node %q: %v", pv.Name, node.Name, err)
}
}
if !bound {
// Check if pvc is fully bound
if !b.isPVCFullyBound(pvc) {
return false, nil
}
}
@ -477,19 +566,21 @@ func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.Persiste
return false, nil, fmt.Errorf("error getting PVC %q: %v", pvcKey, err)
}
pvName := pvc.Spec.VolumeName
if pvName != "" {
if metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted) {
klog.V(5).Infof("PVC %q is fully bound to PV %q", pvcKey, pvName)
return true, pvc, nil
fullyBound := b.isPVCFullyBound(pvc)
if fullyBound {
klog.V(5).Infof("PVC %q is fully bound to PV %q", pvcKey, pvc.Spec.VolumeName)
} else {
if pvc.Spec.VolumeName != "" {
klog.V(5).Infof("PVC %q is not fully bound to PV %q", pvcKey, pvc.Spec.VolumeName)
} else {
klog.V(5).Infof("PVC %q is not fully bound to PV %q", pvcKey, pvName)
return false, pvc, nil
klog.V(5).Infof("PVC %q is not bound", pvcKey)
}
}
return fullyBound, pvc, nil
}
klog.V(5).Infof("PVC %q is not bound", pvcKey)
return false, pvc, nil
func (b *volumeBinder) isPVCFullyBound(pvc *v1.PersistentVolumeClaim) bool {
return pvc.Spec.VolumeName != "" && metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted)
}
// arePodVolumesBound returns true if all volumes are fully bound
@ -503,12 +594,12 @@ func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool {
return true
}
// getPodVolumes returns a pod's PVCs separated into bound (including prebound), unbound with delayed binding,
// and unbound with immediate binding
func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaims []*bindingInfo, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
// getPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning)
// and unbound with immediate binding (including prebound)
func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaims []*v1.PersistentVolumeClaim, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
boundClaims = []*v1.PersistentVolumeClaim{}
unboundClaimsImmediate = []*v1.PersistentVolumeClaim{}
unboundClaims = []*bindingInfo{}
unboundClaims = []*v1.PersistentVolumeClaim{}
for _, vol := range pod.Spec.Volumes {
volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol)
@ -521,15 +612,16 @@ func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV
if volumeBound {
boundClaims = append(boundClaims, pvc)
} else {
delayBinding, err := b.ctrl.shouldDelayBinding(pvc)
delayBindingMode, err := b.ctrl.isDelayBindingMode(pvc)
if err != nil {
return nil, nil, nil, err
}
// Prebound PVCs are treated as unbound immediate binding
if delayBinding && pvc.Spec.VolumeName == "" {
if delayBindingMode && pvc.Spec.VolumeName == "" {
// Scheduler path
unboundClaims = append(unboundClaims, &bindingInfo{pvc: pvc})
unboundClaims = append(unboundClaims, pvc)
} else {
// !delayBindingMode || pvc.Spec.VolumeName != ""
// Immediate binding should have already been bound
unboundClaimsImmediate = append(unboundClaimsImmediate, pvc)
}
@ -560,7 +652,7 @@ func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node
// findMatchingVolumes tries to find matching volumes for given claims,
// and return unbound claims for further provision.
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, unboundClaims []*v1.PersistentVolumeClaim, err error) {
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (foundMatches bool, matchedClaims []*bindingInfo, unboundClaims []*v1.PersistentVolumeClaim, err error) {
podName := getPodName(pod)
// Sort all the claims by increasing size request to get the smallest fits
sort.Sort(byPVCSize(claimsToBind))
@ -568,39 +660,34 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI
chosenPVs := map[string]*v1.PersistentVolume{}
foundMatches = true
matchedClaims := []*bindingInfo{}
matchedClaims = []*bindingInfo{}
for _, bindingInfo := range claimsToBind {
for _, pvc := range claimsToBind {
// Get storage class name from each PVC
storageClassName := ""
storageClass := bindingInfo.pvc.Spec.StorageClassName
storageClass := pvc.Spec.StorageClassName
if storageClass != nil {
storageClassName = *storageClass
}
allPVs := b.pvCache.ListPVs(storageClassName)
pvcName := getPVCName(bindingInfo.pvc)
pvcName := getPVCName(pvc)
// Find a matching PV
bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true)
pv, err := findMatchingVolume(pvc, allPVs, node, chosenPVs, true)
if err != nil {
return false, nil, err
return false, nil, nil, err
}
if bindingInfo.pv == nil {
if pv == nil {
klog.V(4).Infof("No matching volumes for Pod %q, PVC %q on node %q", podName, pvcName, node.Name)
unboundClaims = append(unboundClaims, bindingInfo.pvc)
unboundClaims = append(unboundClaims, pvc)
foundMatches = false
continue
}
// matching PV needs to be excluded so we don't select it again
chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv
matchedClaims = append(matchedClaims, bindingInfo)
klog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", bindingInfo.pv.Name, pvcName, node.Name, podName)
}
// Mark cache with all the matches for each PVC for this node
if len(matchedClaims) > 0 {
b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims)
chosenPVs[pv.Name] = pv
matchedClaims = append(matchedClaims, &bindingInfo{pv: pv, pvc: pvc})
klog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", pv.Name, pvcName, node.Name, podName)
}
if foundMatches {
@ -613,31 +700,31 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI
// checkVolumeProvisions checks given unbound claims (the claims have gone through func
// findMatchingVolumes, and do not have matching volumes for binding), and return true
// if all of the claims are eligible for dynamic provision.
func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, err error) {
func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, provisionedClaims []*v1.PersistentVolumeClaim, err error) {
podName := getPodName(pod)
provisionedClaims := []*v1.PersistentVolumeClaim{}
provisionedClaims = []*v1.PersistentVolumeClaim{}
for _, claim := range claimsToProvision {
pvcName := getPVCName(claim)
className := v1helper.GetPersistentVolumeClaimClass(claim)
if className == "" {
return false, fmt.Errorf("no class for claim %q", pvcName)
return false, nil, fmt.Errorf("no class for claim %q", pvcName)
}
class, err := b.ctrl.classLister.Get(className)
if err != nil {
return false, fmt.Errorf("failed to find storage class %q", className)
return false, nil, fmt.Errorf("failed to find storage class %q", className)
}
provisioner := class.Provisioner
if provisioner == "" || provisioner == notSupportedProvisioner {
klog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, pvcName)
return false, nil
return false, nil, nil
}
// Check if the node can satisfy the topology requirement in the class
if !v1helper.MatchTopologySelectorTerms(class.AllowedTopologies, labels.Set(node.Labels)) {
klog.V(4).Infof("Node %q cannot satisfy provisioning topology requirements of claim %q", node.Name, pvcName)
return false, nil
return false, nil, nil
}
// TODO: Check if capacity of the node domain in the storage class
@ -648,10 +735,7 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v
}
klog.V(4).Infof("Provisioning for claims of pod %q that has no matching volumes on node %q ...", podName, node.Name)
// Mark cache with all the PVCs that need provisioning for this node
b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims)
return true, nil
return true, provisionedClaims, nil
}
func (b *volumeBinder) revertAssumedPVs(bindings []*bindingInfo) {
@ -674,7 +758,7 @@ type bindingInfo struct {
pv *v1.PersistentVolume
}
type byPVCSize []*bindingInfo
type byPVCSize []*v1.PersistentVolumeClaim
func (a byPVCSize) Len() int {
return len(a)
@ -685,8 +769,8 @@ func (a byPVCSize) Swap(i, j int) {
}
func (a byPVCSize) Less(i, j int) bool {
iSize := a[i].pvc.Spec.Resources.Requests[v1.ResourceStorage]
jSize := a[j].pvc.Spec.Resources.Requests[v1.ResourceStorage]
iSize := a[i].Spec.Resources.Requests[v1.ResourceStorage]
jSize := a[j].Spec.Resources.Requests[v1.ResourceStorage]
// return true if iSize is less than jSize
return iSize.Cmp(jSize) == -1
}

View File

@ -44,6 +44,9 @@ type PodBindingCache interface {
// means that no provisioning operations are needed.
GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim
// GetDecisions will return all cached decisions for the given pod.
GetDecisions(pod *v1.Pod) nodeDecisions
// DeleteBindings will remove all cached bindings and provisionings for the given pod.
// TODO: separate the func if it is needed to delete bindings/provisionings individually
DeleteBindings(pod *v1.Pod)
@ -72,6 +75,17 @@ func NewPodBindingCache() PodBindingCache {
return &podBindingCache{bindingDecisions: map[string]nodeDecisions{}}
}
func (c *podBindingCache) GetDecisions(pod *v1.Pod) nodeDecisions {
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
podName := getPodName(pod)
decisions, ok := c.bindingDecisions[podName]
if !ok {
return nil
}
return decisions
}
func (c *podBindingCache) DeleteBindings(pod *v1.Pod) {
c.rwMutex.Lock()
defer c.rwMutex.Unlock()

View File

@ -17,6 +17,7 @@ limitations under the License.
package persistentvolume
import (
"context"
"fmt"
"reflect"
"testing"
@ -28,10 +29,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/controller"
@ -75,14 +79,6 @@ var (
pvBoundImmediate = makeTestPV("pv-bound-immediate", "node1", "1G", "1", immediateBoundPVC, immediateClass)
pvBoundImmediateNode2 = makeTestPV("pv-bound-immediate", "node2", "1G", "1", immediateBoundPVC, immediateClass)
// PVC/PV bindings for manual binding
binding1a = makeBinding(unboundPVC, pvNode1a)
binding1b = makeBinding(unboundPVC2, pvNode1b)
bindingNoNode = makeBinding(unboundPVC, pvNoNode)
bindingBad = makeBinding(badPVC, pvNode1b)
binding1aBound = makeBinding(unboundPVC, pvNode1aBound)
binding1bBound = makeBinding(unboundPVC2, pvNode1bBound)
// storage class names
waitClass = "waitClass"
immediateClass = "immediateClass"
@ -109,15 +105,24 @@ type testEnv struct {
internalPVCCache *pvcAssumeCache
}
func newTestBinder(t *testing.T) *testEnv {
func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv {
client := &fake.Clientset{}
reactor := newVolumeReactor(client, nil, nil, nil, nil)
// TODO refactor all tests to use real watch mechanism, see #72327
client.AddWatchReactor("*", func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) {
gvr := action.GetResource()
ns := action.GetNamespace()
watch, err := reactor.Watch(gvr, ns)
if err != nil {
return false, nil, err
}
return true, watch, nil
})
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
nodeInformer := informerFactory.Core().V1().Nodes()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
classInformer := informerFactory.Storage().V1().StorageClasses()
binder := NewVolumeBinder(
client,
nodeInformer,
@ -126,6 +131,14 @@ func newTestBinder(t *testing.T) *testEnv {
classInformer,
10*time.Second)
// Wait for informers cache sync
informerFactory.Start(stopCh)
for v, synced := range informerFactory.WaitForCacheSync(stopCh) {
if !synced {
klog.Fatalf("Error syncing informer for %v", v)
}
}
// Add storageclasses
waitMode := storagev1.VolumeBindingWaitForFirstConsumer
immediateMode := storagev1.VolumeBindingImmediate
@ -247,6 +260,66 @@ func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.P
}
func (env *testEnv) updateVolumes(t *testing.T, pvs []*v1.PersistentVolume, waitCache bool) {
for _, pv := range pvs {
if _, err := env.client.CoreV1().PersistentVolumes().Update(pv); err != nil {
t.Fatalf("failed to update PV %q", pv.Name)
}
}
if waitCache {
wait.Poll(100*time.Millisecond, 3*time.Second, func() (bool, error) {
for _, pv := range pvs {
obj, err := env.internalPVCache.GetAPIObj(pv.Name)
if obj == nil || err != nil {
return false, nil
}
pvInCache, ok := obj.(*v1.PersistentVolume)
if !ok {
return false, fmt.Errorf("PV %s invalid object", pvInCache.Name)
}
return versioner.CompareResourceVersion(pvInCache, pv) == 0, nil
}
return true, nil
})
}
}
func (env *testEnv) updateClaims(t *testing.T, pvcs []*v1.PersistentVolumeClaim, waitCache bool) {
for _, pvc := range pvcs {
if _, err := env.client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Update(pvc); err != nil {
t.Fatalf("failed to update PVC %q", getPVCName(pvc))
}
}
if waitCache {
wait.Poll(100*time.Millisecond, 3*time.Second, func() (bool, error) {
for _, pvc := range pvcs {
obj, err := env.internalPVCCache.GetAPIObj(getPVCName(pvc))
if obj == nil || err != nil {
return false, nil
}
pvcInCache, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return false, fmt.Errorf("PVC %s invalid object", pvcInCache.Name)
}
return versioner.CompareResourceVersion(pvcInCache, pvc) == 0, nil
}
return true, nil
})
}
}
func (env *testEnv) deleteVolumes(pvs []*v1.PersistentVolume) {
for _, pv := range pvs {
env.internalPVCache.delete(pv)
}
}
func (env *testEnv) deleteClaims(pvcs []*v1.PersistentVolumeClaim) {
for _, pvc := range pvcs {
env.internalPVCCache.delete(pvc)
}
}
func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) {
pvCache := env.internalBinder.pvCache
for _, binding := range bindings {
@ -540,7 +613,7 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV
func pvcSetSelectedNode(pvc *v1.PersistentVolumeClaim, node string) *v1.PersistentVolumeClaim {
newPVC := pvc.DeepCopy()
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annSelectedNode, node)
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annSelectedNode, node)
return newPVC
}
@ -676,7 +749,7 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
"unbound-pvc,pv-same-node": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
pvs: []*v1.PersistentVolume{pvNode2, pvNode1a, pvNode1b},
expectedBindings: []*bindingInfo{binding1a},
expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)},
expectedUnbound: true,
expectedBound: true,
},
@ -689,28 +762,28 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
"two-unbound-pvcs": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2},
pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
expectedBindings: []*bindingInfo{binding1a, binding1b},
expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(unboundPVC2, pvNode1b)},
expectedUnbound: true,
expectedBound: true,
},
"two-unbound-pvcs,order-by-size": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC2, unboundPVC},
pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
expectedBindings: []*bindingInfo{binding1a, binding1b},
expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(unboundPVC2, pvNode1b)},
expectedUnbound: true,
expectedBound: true,
},
"two-unbound-pvcs,partial-match": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2},
pvs: []*v1.PersistentVolume{pvNode1a},
expectedBindings: []*bindingInfo{binding1a},
expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)},
expectedUnbound: false,
expectedBound: true,
},
"one-bound,one-unbound": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, boundPVC},
pvs: []*v1.PersistentVolume{pvBound, pvNode1a},
expectedBindings: []*bindingInfo{binding1a},
expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)},
expectedUnbound: true,
expectedBound: true,
},
@ -767,11 +840,14 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for name, scenario := range scenarios {
klog.V(5).Infof("Running test case %q", name)
// Setup
testEnv := newTestBinder(t)
testEnv := newTestBinder(t, ctx.Done())
testEnv.initVolumes(scenario.pvs, scenario.pvs)
// a. Init pvc cache
@ -833,7 +909,7 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) {
"two-unbound-pvcs,one-matched,one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC},
pvs: []*v1.PersistentVolume{pvNode1a},
expectedBindings: []*bindingInfo{binding1a},
expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedUnbound: true,
expectedBound: true,
@ -845,6 +921,13 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) {
expectedUnbound: true,
expectedBound: true,
},
"one-binding,one-selected-node": {
podPVCs: []*v1.PersistentVolumeClaim{boundPVC, selectedNodePVC},
pvs: []*v1.PersistentVolume{pvBound},
expectedProvisions: []*v1.PersistentVolumeClaim{selectedNodePVC},
expectedUnbound: true,
expectedBound: true,
},
"immediate-unbound-pvc": {
podPVCs: []*v1.PersistentVolumeClaim{immediateUnboundPVC},
expectedUnbound: false,
@ -879,9 +962,12 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) {
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for name, scenario := range scenarios {
// Setup
testEnv := newTestBinder(t)
testEnv := newTestBinder(t, ctx.Done())
testEnv.initVolumes(scenario.pvs, scenario.pvs)
// a. Init pvc cache
@ -937,59 +1023,62 @@ func TestAssumePodVolumes(t *testing.T) {
},
"one-binding": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
bindings: []*bindingInfo{binding1a},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)},
pvs: []*v1.PersistentVolume{pvNode1a},
expectedBindings: []*bindingInfo{binding1aBound},
expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
expectedProvisionings: []*v1.PersistentVolumeClaim{},
},
"two-bindings": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2},
bindings: []*bindingInfo{binding1a, binding1b},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(unboundPVC2, pvNode1b)},
pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
expectedBindings: []*bindingInfo{binding1aBound, binding1bBound},
expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound), makeBinding(unboundPVC2, pvNode1bBound)},
expectedProvisionings: []*v1.PersistentVolumeClaim{},
},
"pv-already-bound": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
bindings: []*bindingInfo{binding1aBound},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
pvs: []*v1.PersistentVolume{pvNode1aBound},
expectedBindings: []*bindingInfo{binding1aBound},
expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
expectedProvisionings: []*v1.PersistentVolumeClaim{},
},
"claimref-failed": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
bindings: []*bindingInfo{binding1a, bindingBad},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(badPVC, pvNode1b)},
pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
shouldFail: true,
},
"tmpupdate-failed": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
bindings: []*bindingInfo{binding1a, binding1b},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a), makeBinding(unboundPVC2, pvNode1b)},
pvs: []*v1.PersistentVolume{pvNode1a},
shouldFail: true,
},
"one-binding, one-pvc-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC},
bindings: []*bindingInfo{binding1a},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)},
pvs: []*v1.PersistentVolume{pvNode1a},
provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedBindings: []*bindingInfo{binding1aBound},
expectedBindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
expectedProvisionings: []*v1.PersistentVolumeClaim{selectedNodePVC},
},
"one-binding, one-provision-tmpupdate-failed": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVCHigherVersion},
bindings: []*bindingInfo{binding1a},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1a)},
pvs: []*v1.PersistentVolume{pvNode1a},
provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC2},
shouldFail: true,
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for name, scenario := range scenarios {
klog.V(5).Infof("Running test case %q", name)
// Setup
testEnv := newTestBinder(t)
testEnv := newTestBinder(t, ctx.Done())
testEnv.initClaims(scenario.podPVCs, scenario.podPVCs)
pod := makePod(scenario.podPVCs)
testEnv.initPodCache(pod, "node1", scenario.bindings, scenario.provisionedPVCs)
@ -1062,25 +1151,25 @@ func TestBindAPIUpdate(t *testing.T) {
provisionedPVCs: []*v1.PersistentVolumeClaim{},
},
"one-binding": {
bindings: []*bindingInfo{binding1aBound},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
cachedPVs: []*v1.PersistentVolume{pvNode1a},
expectedPVs: []*v1.PersistentVolume{pvNode1aBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
},
"two-bindings": {
bindings: []*bindingInfo{binding1aBound, binding1bBound},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound), makeBinding(unboundPVC2, pvNode1bBound)},
cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
},
"api-already-updated": {
bindings: []*bindingInfo{binding1aBound},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
expectedPVs: []*v1.PersistentVolume{pvNode1aBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
},
"api-update-failed": {
bindings: []*bindingInfo{binding1aBound, binding1bBound},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound), makeBinding(unboundPVC2, pvNode1bBound)},
cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
apiPVs: []*v1.PersistentVolume{pvNode1a, pvNode1bBoundHigherVersion},
expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1b},
@ -1104,7 +1193,7 @@ func TestBindAPIUpdate(t *testing.T) {
shouldFail: true,
},
"binding-succeed, provision-api-update-failed": {
bindings: []*bindingInfo{binding1aBound},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
cachedPVs: []*v1.PersistentVolume{pvNode1a},
expectedPVs: []*v1.PersistentVolume{pvNode1aBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), addProvisionAnn(provisionedPVC2)},
@ -1115,11 +1204,15 @@ func TestBindAPIUpdate(t *testing.T) {
shouldFail: true,
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for name, scenario := range scenarios {
klog.V(4).Infof("Running test case %q", name)
// Setup
testEnv := newTestBinder(t)
testEnv := newTestBinder(t, ctx.Done())
pod := makePod(nil)
if scenario.apiPVs == nil {
scenario.apiPVs = scenario.cachedPVs
@ -1155,11 +1248,19 @@ func TestBindAPIUpdate(t *testing.T) {
func TestCheckBindings(t *testing.T) {
scenarios := map[string]struct {
// Inputs
bindings []*bindingInfo
cachedPVs []*v1.PersistentVolume
initPVs []*v1.PersistentVolume
initPVCs []*v1.PersistentVolumeClaim
bindings []*bindingInfo
provisionedPVCs []*v1.PersistentVolumeClaim
cachedPVCs []*v1.PersistentVolumeClaim
// api updates before checking
apiPVs []*v1.PersistentVolume
apiPVCs []*v1.PersistentVolumeClaim
// delete objects before checking
deletePVs bool
deletePVCs bool
// Expected return values
shouldFail bool
@ -1182,108 +1283,144 @@ func TestCheckBindings(t *testing.T) {
expectedBound: true,
},
"binding-bound": {
bindings: []*bindingInfo{binding1aBound},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
initPVs: []*v1.PersistentVolume{pvNode1aBound},
initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
expectedBound: true,
},
"binding-prebound": {
bindings: []*bindingInfo{binding1aBound},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
cachedPVCs: []*v1.PersistentVolumeClaim{preboundPVCNode1a},
initPVs: []*v1.PersistentVolume{pvNode1aBound},
initPVCs: []*v1.PersistentVolumeClaim{preboundPVCNode1a},
},
"binding-unbound": {
bindings: []*bindingInfo{binding1aBound},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
initPVs: []*v1.PersistentVolume{pvNode1aBound},
initPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
},
"binding-pvc-not-exists": {
bindings: []*bindingInfo{binding1aBound},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
initPVs: []*v1.PersistentVolume{pvNode1aBound},
shouldFail: true,
},
"binding-pv-not-exists": {
bindings: []*bindingInfo{binding1aBound},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
initPVs: []*v1.PersistentVolume{pvNode1aBound},
initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
deletePVs: true,
shouldFail: true,
},
"binding-claimref-nil": {
bindings: []*bindingInfo{binding1aBound},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVs: []*v1.PersistentVolume{pvNode1a},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
initPVs: []*v1.PersistentVolume{pvNode1a},
initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
apiPVs: []*v1.PersistentVolume{pvNode1a},
apiPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
shouldFail: true,
},
"binding-claimref-uid-empty": {
bindings: []*bindingInfo{binding1aBound},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVs: []*v1.PersistentVolume{pvRemoveClaimUID(pvNode1aBound)},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
initPVs: []*v1.PersistentVolume{pvNode1aBound},
initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
apiPVs: []*v1.PersistentVolume{pvRemoveClaimUID(pvNode1aBound)},
apiPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
shouldFail: true,
},
"binding-one-bound,one-unbound": {
bindings: []*bindingInfo{binding1aBound, binding1bBound},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound), makeBinding(unboundPVC2, pvNode1bBound)},
provisionedPVCs: []*v1.PersistentVolumeClaim{},
cachedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, unboundPVC2},
initPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound},
initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, unboundPVC2},
},
"provisioning-pvc-bound": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVs: []*v1.PersistentVolume{pvBound},
cachedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVCBound)},
initPVs: []*v1.PersistentVolume{pvBound},
initPVCs: []*v1.PersistentVolumeClaim{provisionedPVCBound},
apiPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVCBound)},
expectedBound: true,
},
"provisioning-pvc-unbound": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
initPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
},
"provisioning-pvc-not-exists": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
deletePVCs: true,
shouldFail: true,
},
"provisioning-pvc-annotations-nil": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
apiPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
shouldFail: true,
},
"provisioning-pvc-selected-node-dropped": {
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVCs: []*v1.PersistentVolumeClaim{pvcSetEmptyAnnotations(provisionedPVC)},
initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
apiPVCs: []*v1.PersistentVolumeClaim{pvcSetEmptyAnnotations(provisionedPVC)},
shouldFail: true,
},
"provisioning-pvc-selected-node-wrong-node": {
initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVCs: []*v1.PersistentVolumeClaim{pvcSetSelectedNode(provisionedPVC, "wrong-node")},
apiPVCs: []*v1.PersistentVolumeClaim{pvcSetSelectedNode(provisionedPVC, "wrong-node")},
shouldFail: true,
},
"binding-bound-provisioning-unbound": {
bindings: []*bindingInfo{binding1aBound},
bindings: []*bindingInfo{makeBinding(unboundPVC, pvNode1aBound)},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, addProvisionAnn(provisionedPVC)},
initPVs: []*v1.PersistentVolume{pvNode1aBound},
initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a, addProvisionAnn(provisionedPVC)},
},
"tolerate-provisioning-pvc-bound-pv-not-found": {
initPVs: []*v1.PersistentVolume{pvNode1a},
initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
bindings: []*bindingInfo{},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
apiPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVCBound)},
deletePVs: true,
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for name, scenario := range scenarios {
klog.V(4).Infof("Running test case %q", name)
// Setup
pod := makePod(nil)
testEnv := newTestBinder(t)
testEnv := newTestBinder(t, ctx.Done())
testEnv.initNodes([]*v1.Node{node1})
testEnv.initVolumes(scenario.cachedPVs, nil)
testEnv.initClaims(scenario.cachedPVCs, nil)
testEnv.initVolumes(scenario.initPVs, nil)
testEnv.initClaims(scenario.initPVCs, nil)
testEnv.assumeVolumes(t, name, "node1", pod, scenario.bindings, scenario.provisionedPVCs)
// Before execute
if scenario.deletePVs {
testEnv.deleteVolumes(scenario.initPVs)
} else {
testEnv.updateVolumes(t, scenario.apiPVs, true)
}
if scenario.deletePVCs {
testEnv.deleteClaims(scenario.initPVCs)
} else {
testEnv.updateClaims(t, scenario.apiPVCs, true)
}
// Execute
allBound, err := testEnv.internalBinder.checkBindings(pod, scenario.bindings, scenario.provisionedPVCs)
@ -1302,63 +1439,96 @@ func TestCheckBindings(t *testing.T) {
}
func TestBindPodVolumes(t *testing.T) {
scenarios := map[string]struct {
type scenarioType struct {
// Inputs
// These tests only support a single pv and pvc and static binding
bindingsNil bool // Pass in nil bindings slice
binding *bindingInfo
cachedPVs []*v1.PersistentVolume
cachedPVCs []*v1.PersistentVolumeClaim
provisionedPVCs []*v1.PersistentVolumeClaim
apiPVs []*v1.PersistentVolume
nodes []*v1.Node
bindingsNil bool // Pass in nil bindings slice
nodes []*v1.Node
// before assume
initPVs []*v1.PersistentVolume
initPVCs []*v1.PersistentVolumeClaim
// assume PV & PVC with these binding results
binding *bindingInfo
claimToProvision *v1.PersistentVolumeClaim
// API updates after assume before bind
apiPV *v1.PersistentVolume
apiPVC *v1.PersistentVolumeClaim
// This function runs with a delay of 5 seconds
delayFunc func(*testing.T, *testEnv, *v1.Pod, *bindingInfo, []*v1.PersistentVolumeClaim)
delayFunc func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim)
// Expected return values
shouldFail bool
}{
}
scenarios := map[string]scenarioType{
"nothing-to-bind-nil": {
bindingsNil: true,
shouldFail: true,
},
"nothing-to-bind-empty": {},
"already-bound": {
binding: binding1aBound,
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
binding: makeBinding(unboundPVC, pvNode1aBound),
initPVs: []*v1.PersistentVolume{pvNode1aBound},
initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
},
"binding-succeeds-after-time": {
binding: binding1aBound,
cachedPVs: []*v1.PersistentVolume{pvNode1a},
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) {
"binding-static-pv-succeeds-after-time": {
initPVs: []*v1.PersistentVolume{pvNode1a},
initPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
binding: makeBinding(unboundPVC, pvNode1aBound),
shouldFail: false, // Will succeed after PVC is fully bound to this PV by pv controller.
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) {
pvc := pvcs[0]
pv := pvs[0]
// Update PVC to be fully bound to PV
newPVC := binding.pvc.DeepCopy()
newPVC.ResourceVersion = "100"
newPVC.Spec.VolumeName = binding.pv.Name
newPVC := pvc.DeepCopy()
newPVC.Spec.VolumeName = pv.Name
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes")
// Update pvc cache, fake client doesn't invoke informers
internalBinder, ok := testEnv.binder.(*volumeBinder)
if !ok {
t.Fatalf("Failed to convert to internal binder")
if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil {
t.Errorf("failed to update PVC %q: %v", newPVC.Name, err)
}
pvcCache := internalBinder.pvcCache
internalPVCCache, ok := pvcCache.(*pvcAssumeCache)
if !ok {
t.Fatalf("Failed to convert to internal PVC cache")
}
internalPVCCache.add(newPVC)
},
},
"binding-dynamic-pv-succeeds-after-time": {
claimToProvision: pvcSetSelectedNode(provisionedPVC, "node1"),
initPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) {
pvc := pvcs[0]
// Update PVC to be fully bound to PV
newPVC, err := testEnv.client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(pvc.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("failed to get PVC %q: %v", pvc.Name, err)
return
}
dynamicPV := makeTestPV("dynamic-pv", "node1", "1G", "1", newPVC, waitClass)
dynamicPV, err = testEnv.client.CoreV1().PersistentVolumes().Create(dynamicPV)
if err != nil {
t.Errorf("failed to create PV %q: %v", dynamicPV.Name, err)
return
}
newPVC.Spec.VolumeName = dynamicPV.Name
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes")
if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil {
t.Errorf("failed to update PVC %q: %v", newPVC.Name, err)
}
},
},
"bound-by-pv-controller-before-bind": {
initPVs: []*v1.PersistentVolume{pvNode1a},
initPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
binding: makeBinding(unboundPVC, pvNode1aBound),
apiPV: pvNode1aBound,
apiPVC: boundPVCNode1a,
shouldFail: true, // bindAPIUpdate will fail because API conflict
},
"pod-deleted-after-time": {
binding: binding1aBound,
cachedPVs: []*v1.PersistentVolume{pvNode1a},
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) {
binding: makeBinding(unboundPVC, pvNode1aBound),
initPVs: []*v1.PersistentVolume{pvNode1a},
initPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) {
bindingsCache := testEnv.binder.GetBindingsCache()
if bindingsCache == nil {
t.Fatalf("Failed to get bindings cache")
@ -1376,107 +1546,103 @@ func TestBindPodVolumes(t *testing.T) {
shouldFail: true,
},
"binding-times-out": {
binding: binding1aBound,
cachedPVs: []*v1.PersistentVolume{pvNode1a},
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
binding: makeBinding(unboundPVC, pvNode1aBound),
initPVs: []*v1.PersistentVolume{pvNode1a},
initPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
shouldFail: true,
},
"binding-fails": {
binding: binding1bBound,
cachedPVs: []*v1.PersistentVolume{pvNode1b},
apiPVs: []*v1.PersistentVolume{pvNode1bBoundHigherVersion},
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC2},
binding: makeBinding(unboundPVC2, pvNode1bBound),
initPVs: []*v1.PersistentVolume{pvNode1b},
initPVCs: []*v1.PersistentVolumeClaim{unboundPVC2},
shouldFail: true,
},
"check-fails": {
binding: binding1aBound,
cachedPVs: []*v1.PersistentVolume{pvNode1a},
cachedPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) {
// Delete PVC
// Update pvc cache, fake client doesn't invoke informers
internalBinder, ok := testEnv.binder.(*volumeBinder)
if !ok {
t.Fatalf("Failed to convert to internal binder")
binding: makeBinding(unboundPVC, pvNode1aBound),
initPVs: []*v1.PersistentVolume{pvNode1a},
initPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) {
pvc := pvcs[0]
// Delete PVC will fail check
if err := testEnv.client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, &metav1.DeleteOptions{}); err != nil {
t.Errorf("failed to delete PVC %q: %v", pvc.Name, err)
}
pvcCache := internalBinder.pvcCache
internalPVCCache, ok := pvcCache.(*pvcAssumeCache)
if !ok {
t.Fatalf("Failed to convert to internal PVC cache")
}
internalPVCCache.delete(binding.pvc)
},
shouldFail: true,
},
"node-affinity-fails": {
binding: binding1aBound,
cachedPVs: []*v1.PersistentVolume{pvNode1aBound},
cachedPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
binding: makeBinding(unboundPVC, pvNode1aBound),
initPVs: []*v1.PersistentVolume{pvNode1aBound},
initPVCs: []*v1.PersistentVolumeClaim{boundPVCNode1a},
nodes: []*v1.Node{node1NoLabels},
shouldFail: true,
},
"node-affinity-fails-dynamic-provisioning": {
cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode2},
cachedPVCs: []*v1.PersistentVolumeClaim{selectedNodePVC},
provisionedPVCs: []*v1.PersistentVolumeClaim{selectedNodePVC},
nodes: []*v1.Node{node1, node2},
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, binding *bindingInfo, pvcs []*v1.PersistentVolumeClaim) {
initPVs: []*v1.PersistentVolume{pvNode1a, pvNode2},
initPVCs: []*v1.PersistentVolumeClaim{selectedNodePVC},
claimToProvision: selectedNodePVC,
nodes: []*v1.Node{node1, node2},
delayFunc: func(t *testing.T, testEnv *testEnv, pod *v1.Pod, pvs []*v1.PersistentVolume, pvcs []*v1.PersistentVolumeClaim) {
// Update PVC to be fully bound to a PV with a different node
newPVC := pvcs[0].DeepCopy()
newPVC.ResourceVersion = "100"
newPVC.Spec.VolumeName = pvNode2.Name
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes")
// Update PVC cache, fake client doesn't invoke informers
internalBinder, ok := testEnv.binder.(*volumeBinder)
if !ok {
t.Fatalf("Failed to convert to internal binder")
if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil {
t.Errorf("failed to update PVC %q: %v", newPVC.Name, err)
}
pvcCache := internalBinder.pvcCache
internalPVCCache, ok := pvcCache.(*pvcAssumeCache)
if !ok {
t.Fatalf("Failed to convert to internal PVC cache")
}
internalPVCCache.add(newPVC)
},
shouldFail: true,
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for name, scenario := range scenarios {
klog.V(4).Infof("Running test case %q", name)
// Setup
pod := makePod(nil)
if scenario.apiPVs == nil {
scenario.apiPVs = scenario.cachedPVs
}
testEnv := newTestBinder(t, ctx.Done())
if scenario.nodes == nil {
scenario.nodes = []*v1.Node{node1}
}
if scenario.provisionedPVCs == nil {
scenario.provisionedPVCs = []*v1.PersistentVolumeClaim{}
}
testEnv := newTestBinder(t)
if !scenario.bindingsNil {
bindings := []*bindingInfo{}
if scenario.binding != nil {
bindings = []*bindingInfo{scenario.binding}
}
claimsToProvision := []*v1.PersistentVolumeClaim{}
if scenario.claimToProvision != nil {
claimsToProvision = []*v1.PersistentVolumeClaim{scenario.claimToProvision}
}
testEnv.initNodes(scenario.nodes)
testEnv.initVolumes(scenario.cachedPVs, scenario.apiPVs)
testEnv.initClaims(scenario.cachedPVCs, nil)
testEnv.assumeVolumes(t, name, "node1", pod, bindings, scenario.provisionedPVCs)
testEnv.initVolumes(scenario.initPVs, scenario.initPVs)
testEnv.initClaims(scenario.initPVCs, scenario.initPVCs)
testEnv.assumeVolumes(t, name, "node1", pod, bindings, claimsToProvision)
}
// Before Execute
if scenario.apiPV != nil {
_, err := testEnv.client.CoreV1().PersistentVolumes().Update(scenario.apiPV)
if err != nil {
t.Fatalf("Test %q failed: failed to update PV %q", name, scenario.apiPV.Name)
}
}
if scenario.apiPVC != nil {
_, err := testEnv.client.CoreV1().PersistentVolumeClaims(scenario.apiPVC.Namespace).Update(scenario.apiPVC)
if err != nil {
t.Fatalf("Test %q failed: failed to update PVC %q", name, getPVCName(scenario.apiPVC))
}
}
if scenario.delayFunc != nil {
go func() {
go func(scenario scenarioType) {
time.Sleep(5 * time.Second)
// Sleep a while to run after bindAPIUpdate in BindPodVolumes
klog.V(5).Infof("Running delay function")
scenario.delayFunc(t, testEnv, pod, scenario.binding, scenario.provisionedPVCs)
}()
scenario.delayFunc(t, testEnv, pod, scenario.initPVs, scenario.initPVCs)
}(scenario)
}
// Execute
@ -1498,7 +1664,9 @@ func TestFindAssumeVolumes(t *testing.T) {
pvs := []*v1.PersistentVolume{pvNode2, pvNode1a, pvNode1c}
// Setup
testEnv := newTestBinder(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testEnv := newTestBinder(t, ctx.Done())
testEnv.initVolumes(pvs, pvs)
testEnv.initClaims(podPVCs, podPVCs)
pod := makePod(podPVCs)
@ -1548,6 +1716,6 @@ func TestFindAssumeVolumes(t *testing.T) {
if !unboundSatisfied {
t.Errorf("Test failed: couldn't find PVs for all PVCs")
}
testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings, []*v1.PersistentVolumeClaim{})
testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings, nil)
}
}

View File

@ -1093,7 +1093,6 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue
Namespace: pod.Namespace,
Name: pod.Name,
}
origPod := pod
// When pod priority is enabled, we would like to place an unschedulable
// pod in the unschedulable queue. This ensures that if the pod is nominated
@ -1112,21 +1111,11 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue
if err == nil {
if len(pod.Spec.NodeName) == 0 {
podQueue.AddUnschedulableIfNotPresent(pod)
} else {
if c.volumeBinder != nil {
// Volume binder only wants to keep unassigned pods
c.volumeBinder.DeletePodBindings(pod)
}
}
break
}
if errors.IsNotFound(err) {
klog.Warningf("A pod %v no longer exists", podID)
if c.volumeBinder != nil {
// Volume binder only wants to keep unassigned pods
c.volumeBinder.DeletePodBindings(origPod)
}
return
}
klog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err)

View File

@ -365,10 +365,6 @@ func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// Volumes may be bound by PV controller asynchronously, we must clear
// stale pod binding cache.
sched.config.VolumeBinder.DeletePodBindings(assumed)
sched.recordSchedulingFailure(assumed, err, "VolumeBindingFailed", err.Error())
return err
}