diff --git a/pkg/controller/volume/persistentvolume/framework_test.go b/pkg/controller/volume/persistentvolume/framework_test.go index 1e347ae9ffb..5c165abd946 100644 --- a/pkg/controller/volume/persistentvolume/framework_test.go +++ b/pkg/controller/volume/persistentvolume/framework_test.go @@ -196,6 +196,8 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj if storedVer != requestedVer { return true, obj, versionConflictError } + // Don't modify the existing object + volume = volume.DeepCopy() volume.ResourceVersion = strconv.Itoa(storedVer + 1) } else { return true, nil, fmt.Errorf("Cannot update volume %s: volume not found", volume.Name) @@ -220,6 +222,8 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj if storedVer != requestedVer { return true, obj, versionConflictError } + // Don't modify the existing object + claim = claim.DeepCopy() claim.ResourceVersion = strconv.Itoa(storedVer + 1) } else { return true, nil, fmt.Errorf("Cannot update claim %s: claim not found", claim.Name) @@ -301,7 +305,12 @@ func (r *volumeReactor) checkVolumes(expectedVolumes []*v1.PersistentVolume) err gotMap := make(map[string]*v1.PersistentVolume) // Clear any ResourceVersion from both sets for _, v := range expectedVolumes { + // Don't modify the existing object + v := v.DeepCopy() v.ResourceVersion = "" + if v.Spec.ClaimRef != nil { + v.Spec.ClaimRef.ResourceVersion = "" + } expectedMap[v.Name] = v } for _, v := range r.volumes { @@ -331,6 +340,8 @@ func (r *volumeReactor) checkClaims(expectedClaims []*v1.PersistentVolumeClaim) expectedMap := make(map[string]*v1.PersistentVolumeClaim) gotMap := make(map[string]*v1.PersistentVolumeClaim) for _, c := range expectedClaims { + // Don't modify the existing object + c = c.DeepCopy() c.ResourceVersion = "" expectedMap[c.Name] = c } diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder.go b/pkg/controller/volume/persistentvolume/scheduler_binder.go new file mode 100644 index 00000000000..7edd1ef459d --- /dev/null +++ b/pkg/controller/volume/persistentvolume/scheduler_binder.go @@ -0,0 +1,420 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package persistentvolume + +import ( + "fmt" + "sort" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + coreinformers "k8s.io/client-go/informers/core/v1" + storageinformers "k8s.io/client-go/informers/storage/v1" + clientset "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + volumeutil "k8s.io/kubernetes/pkg/volume/util" +) + +// SchedulerVolumeBinder is used by the scheduler to handle PVC/PV binding +// and dynamic provisioning. The binding decisions are integrated into the pod scheduling +// workflow so that the PV NodeAffinity is also considered along with the pod's other +// scheduling requirements. +// +// This integrates into the existing default scheduler workflow as follows: +// 1. The scheduler takes a Pod off the scheduler queue and processes it serially: +// a. Invokes all predicate functions, parallelized across nodes. FindPodVolumes() is invoked here. +// b. Invokes all priority functions. Future/TBD +// c. Selects the best node for the Pod. +// d. Cache the node selection for the Pod. (Assume phase) +// i. If PVC binding is required, cache in-memory only: +// * Updated PV objects for prebinding to the corresponding PVCs. +// * For the pod, which PVs need API updates. +// AssumePodVolumes() is invoked here. Then BindPodVolumes() is called asynchronously by the +// scheduler. After BindPodVolumes() is complete, the Pod is added back to the scheduler queue +// to be processed again until all PVCs are bound. +// ii. If PVC binding is not required, cache the Pod->Node binding in the scheduler's pod cache, +// and asynchronously bind the Pod to the Node. This is handled in the scheduler and not here. +// 2. Once the assume operation is done, the scheduler processes the next Pod in the scheduler queue +// while the actual binding operation occurs in the background. +type SchedulerVolumeBinder interface { + // FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the node. + // + // If a PVC is bound, it checks if the PV's NodeAffinity matches the Node. + // Otherwise, it tries to find an available PV to bind to the PVC. + // + // It returns true if there are matching PVs that can satisfy all of the Pod's PVCs, and returns true + // if bound volumes satisfy the PV NodeAffinity. + // + // This function is called by the volume binding scheduler predicate and can be called in parallel + FindPodVolumes(pod *v1.Pod, nodeName string) (unboundVolumesSatisified, boundVolumesSatisfied bool, err error) + + // AssumePodVolumes will take the PV matches for unbound PVCs and update the PV cache assuming + // that the PV is prebound to the PVC. + // + // It returns true if all volumes are fully bound, and returns true if any volume binding API operation needs + // to be done afterwards. + // + // This function will modify assumedPod with the node name. + // This function is called serially. + AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, bindingRequired bool, err error) + + // BindPodVolumes will initiate the volume binding by making the API call to prebind the PV + // to its matching PVC. + // + // This function can be called in parallel. + BindPodVolumes(assumedPod *v1.Pod) error + + // GetBindingsCache returns the cache used (if any) to store volume binding decisions. + GetBindingsCache() PodBindingCache +} + +type volumeBinder struct { + ctrl *PersistentVolumeController + + // TODO: Need AssumeCache for PVC for dynamic provisioning + pvcCache corelisters.PersistentVolumeClaimLister + nodeCache corelisters.NodeLister + pvCache PVAssumeCache + + // Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes. + // AssumePodVolumes modifies the bindings again for use in BindPodVolumes. + podBindingCache PodBindingCache +} + +// NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions. +func NewVolumeBinder( + kubeClient clientset.Interface, + pvcInformer coreinformers.PersistentVolumeClaimInformer, + pvInformer coreinformers.PersistentVolumeInformer, + nodeInformer coreinformers.NodeInformer, + storageClassInformer storageinformers.StorageClassInformer) SchedulerVolumeBinder { + + // TODO: find better way... + ctrl := &PersistentVolumeController{ + kubeClient: kubeClient, + classLister: storageClassInformer.Lister(), + } + + b := &volumeBinder{ + ctrl: ctrl, + pvcCache: pvcInformer.Lister(), + nodeCache: nodeInformer.Lister(), + pvCache: NewPVAssumeCache(pvInformer.Informer()), + podBindingCache: NewPodBindingCache(), + } + + return b +} + +func (b *volumeBinder) GetBindingsCache() PodBindingCache { + return b.podBindingCache +} + +// FindPodVolumes caches the matching PVs per node in podBindingCache +func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, nodeName string) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) { + podName := getPodName(pod) + + glog.V(4).Infof("FindPodVolumes for pod %q, node %q", podName, nodeName) + + // Initialize to true for pods that don't have volumes + unboundVolumesSatisfied = true + boundVolumesSatisfied = true + + node, err := b.nodeCache.Get(nodeName) + if node == nil || err != nil { + return false, false, fmt.Errorf("error getting node %q: %v", nodeName, err) + } + + // The pod's volumes need to be processed in one call to avoid the race condition where + // volumes can get bound in between calls. + boundClaims, unboundClaims, unboundClaimsImmediate, err := b.getPodVolumes(pod) + if err != nil { + return false, false, err + } + + // Immediate claims should be bound + if len(unboundClaimsImmediate) > 0 { + return false, false, fmt.Errorf("pod has unbound PersistentVolumeClaims") + } + + // Check PV node affinity on bound volumes + if len(boundClaims) > 0 { + boundVolumesSatisfied, err = b.checkBoundClaims(boundClaims, node, podName) + if err != nil { + return false, false, err + } + } + + // Find PVs for unbound volumes + if len(unboundClaims) > 0 { + unboundVolumesSatisfied, err = b.findMatchingVolumes(pod, unboundClaims, node) + if err != nil { + return false, false, err + } + } + + return unboundVolumesSatisfied, boundVolumesSatisfied, nil +} + +// AssumePodVolumes will take the cached matching PVs in podBindingCache for the chosen node +// and update the pvCache with the new prebound PV. It will update podBindingCache again +// with the PVs that need an API update. +func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound, bindingRequired bool, err error) { + podName := getPodName(assumedPod) + + glog.V(4).Infof("AssumePodVolumes for pod %q, node %q", podName, nodeName) + + if allBound := b.arePodVolumesBound(assumedPod); allBound { + glog.V(4).Infof("AssumePodVolumes: all PVCs bound and nothing to do") + return true, false, nil + } + + assumedPod.Spec.NodeName = nodeName + claimsToBind := b.podBindingCache.GetBindings(assumedPod, nodeName) + newBindings := []*bindingInfo{} + + for _, binding := range claimsToBind { + newPV, dirty, err := b.ctrl.getBindVolumeToClaim(binding.pv, binding.pvc) + glog.V(5).Infof("AssumePodVolumes: getBindVolumeToClaim for PV %q, PVC %q. newPV %p, dirty %v, err: %v", + binding.pv.Name, + binding.pvc.Name, + newPV, + dirty, + err) + if err != nil { + b.revertAssumedPVs(newBindings) + return false, true, err + } + if dirty { + err = b.pvCache.Assume(newPV) + if err != nil { + b.revertAssumedPVs(newBindings) + return false, true, err + } + + newBindings = append(newBindings, &bindingInfo{pv: newPV, pvc: binding.pvc}) + } + } + + if len(newBindings) == 0 { + // Don't update cached bindings if no API updates are needed. This can happen if we + // previously updated the PV object and are waiting for the PV controller to finish binding. + glog.V(4).Infof("AssumePodVolumes: PVs already assumed") + return false, false, nil + } + b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings) + + return false, true, nil +} + +// BindPodVolumes gets the cached bindings in podBindingCache and makes the API update for those PVs. +func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error { + glog.V(4).Infof("BindPodVolumes for pod %q", getPodName(assumedPod)) + + bindings := b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName) + + // Do the actual prebinding. Let the PV controller take care of the rest + // There is no API rollback if the actual binding fails + for i, bindingInfo := range bindings { + _, err := b.ctrl.updateBindVolumeToClaim(bindingInfo.pv, bindingInfo.pvc, false) + if err != nil { + // only revert assumed cached updates for volumes we haven't successfully bound + b.revertAssumedPVs(bindings[i:]) + return err + } + } + + return nil +} + +func getPodName(pod *v1.Pod) string { + return pod.Namespace + "/" + pod.Name +} + +func getPVCName(pvc *v1.PersistentVolumeClaim) string { + return pvc.Namespace + "/" + pvc.Name +} + +func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume, checkFullyBound bool) (bool, *v1.PersistentVolumeClaim, error) { + if vol.PersistentVolumeClaim == nil { + return true, nil, nil + } + + pvcName := vol.PersistentVolumeClaim.ClaimName + pvc, err := b.pvcCache.PersistentVolumeClaims(namespace).Get(pvcName) + if err != nil || pvc == nil { + return false, nil, fmt.Errorf("error getting PVC %q: %v", pvcName, err) + } + + pvName := pvc.Spec.VolumeName + if pvName != "" { + if checkFullyBound { + if metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted) { + glog.V(5).Infof("PVC %q is fully bound to PV %q", getPVCName(pvc), pvName) + return true, pvc, nil + } else { + glog.V(5).Infof("PVC %q is not fully bound to PV %q", getPVCName(pvc), pvName) + return false, pvc, nil + } + } + glog.V(5).Infof("PVC %q is bound or prebound to PV %q", getPVCName(pvc), pvName) + return true, pvc, nil + } + + glog.V(5).Infof("PVC %q is not bound", getPVCName(pvc)) + return false, pvc, nil +} + +// arePodVolumesBound returns true if all volumes are fully bound +func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool { + for _, vol := range pod.Spec.Volumes { + if isBound, _, _ := b.isVolumeBound(pod.Namespace, &vol, true); !isBound { + // Pod has at least one PVC that needs binding + return false + } + } + return true +} + +// getPodVolumes returns a pod's PVCs separated into bound (including prebound), unbound with delayed binding, +// and unbound with immediate binding +func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaims []*bindingInfo, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) { + boundClaims = []*v1.PersistentVolumeClaim{} + unboundClaimsImmediate = []*v1.PersistentVolumeClaim{} + unboundClaims = []*bindingInfo{} + + for _, vol := range pod.Spec.Volumes { + volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol, false) + if err != nil { + return nil, nil, nil, err + } + if pvc == nil { + continue + } + if volumeBound { + boundClaims = append(boundClaims, pvc) + } else { + delayBinding, err := b.ctrl.shouldDelayBinding(pvc) + if err != nil { + return nil, nil, nil, err + } + if delayBinding { + // Scheduler path + unboundClaims = append(unboundClaims, &bindingInfo{pvc: pvc}) + } else { + // Immediate binding should have already been bound + unboundClaimsImmediate = append(unboundClaimsImmediate, pvc) + } + } + } + return boundClaims, unboundClaims, unboundClaimsImmediate, nil +} + +func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node *v1.Node, podName string) (bool, error) { + for _, pvc := range claims { + pvName := pvc.Spec.VolumeName + pv, err := b.pvCache.GetPV(pvName) + if err != nil { + return false, err + } + + err = volumeutil.CheckNodeAffinity(pv, node.Labels) + if err != nil { + glog.V(4).Infof("PersistentVolume %q, Node %q mismatch for Pod %q: %v", pvName, node.Name, err.Error(), podName) + return false, nil + } + glog.V(5).Infof("PersistentVolume %q, Node %q matches for Pod %q", pvName, node.Name, podName) + } + + glog.V(4).Infof("All volumes for Pod %q match with Node %q", podName, node.Name) + return true, nil +} + +func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, err error) { + // Sort all the claims by increasing size request to get the smallest fits + sort.Sort(byPVCSize(claimsToBind)) + + allPVs := b.pvCache.ListPVs() + chosenPVs := map[string]*v1.PersistentVolume{} + + for _, bindingInfo := range claimsToBind { + // Find a matching PV + bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true) + if err != nil { + return false, err + } + if bindingInfo.pv == nil { + glog.V(4).Infof("No matching volumes for PVC %q on node %q", getPVCName(bindingInfo.pvc), node.Name) + return false, nil + } + + // matching PV needs to be excluded so we don't select it again + chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv + } + + // Mark cache with all the matches for each PVC for this node + b.podBindingCache.UpdateBindings(pod, node.Name, claimsToBind) + glog.V(4).Infof("Found matching volumes on node %q", node.Name) + + return true, nil +} + +func (b *volumeBinder) revertAssumedPVs(bindings []*bindingInfo) { + for _, bindingInfo := range bindings { + b.pvCache.Restore(bindingInfo.pv.Name) + } +} + +type bindingInfo struct { + // Claim that needs to be bound + pvc *v1.PersistentVolumeClaim + + // Proposed PV to bind to this claim + pv *v1.PersistentVolume +} + +// Used in unit test errors +func (b bindingInfo) String() string { + pvcName := "" + pvName := "" + if b.pvc != nil { + pvcName = getPVCName(b.pvc) + } + if b.pv != nil { + pvName = b.pv.Name + } + return fmt.Sprintf("[PVC %q, PV %q]", pvcName, pvName) +} + +type byPVCSize []*bindingInfo + +func (a byPVCSize) Len() int { + return len(a) +} + +func (a byPVCSize) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +func (a byPVCSize) Less(i, j int) bool { + iSize := a[i].pvc.Spec.Resources.Requests[v1.ResourceStorage] + jSize := a[j].pvc.Spec.Resources.Requests[v1.ResourceStorage] + // return true if iSize is less than jSize + return iSize.Cmp(jSize) == -1 +} diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_fake.go b/pkg/controller/volume/persistentvolume/scheduler_binder_fake.go new file mode 100644 index 00000000000..2810276b161 --- /dev/null +++ b/pkg/controller/volume/persistentvolume/scheduler_binder_fake.go @@ -0,0 +1,63 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package persistentvolume + +import ( + "k8s.io/api/core/v1" +) + +type FakeVolumeBinderConfig struct { + AllBound bool + FindUnboundSatsified bool + FindBoundSatsified bool + FindErr error + AssumeBindingRequired bool + AssumeErr error + BindErr error +} + +// NewVolumeBinder sets up all the caches needed for the scheduler to make +// topology-aware volume binding decisions. +func NewFakeVolumeBinder(config *FakeVolumeBinderConfig) *FakeVolumeBinder { + return &FakeVolumeBinder{ + config: config, + } +} + +type FakeVolumeBinder struct { + config *FakeVolumeBinderConfig + AssumeCalled bool + BindCalled bool +} + +func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, nodeName string) (unboundVolumesSatisfied, boundVolumesSatsified bool, err error) { + return b.config.FindUnboundSatsified, b.config.FindBoundSatsified, b.config.FindErr +} + +func (b *FakeVolumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (bool, bool, error) { + b.AssumeCalled = true + return b.config.AllBound, b.config.AssumeBindingRequired, b.config.AssumeErr +} + +func (b *FakeVolumeBinder) BindPodVolumes(assumedPod *v1.Pod) error { + b.BindCalled = true + return b.config.BindErr +} + +func (b *FakeVolumeBinder) GetBindingsCache() PodBindingCache { + return nil +} diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_test.go b/pkg/controller/volume/persistentvolume/scheduler_binder_test.go new file mode 100644 index 00000000000..c5f33f0409d --- /dev/null +++ b/pkg/controller/volume/persistentvolume/scheduler_binder_test.go @@ -0,0 +1,755 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package persistentvolume + +import ( + "fmt" + "reflect" + "testing" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/diff" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/controller" +) + +var ( + unboundPVC = makeTestPVC("unbound-pvc", "1G", pvcUnbound, "", &waitClass) + unboundPVC2 = makeTestPVC("unbound-pvc2", "5G", pvcUnbound, "", &waitClass) + preboundPVC = makeTestPVC("prebound-pvc", "1G", pvcPrebound, "pv-node1a", &waitClass) + boundPVC = makeTestPVC("bound-pvc", "1G", pvcBound, "pv-bound", &waitClass) + boundPVC2 = makeTestPVC("bound-pvc2", "1G", pvcBound, "pv-bound2", &waitClass) + badPVC = makeBadPVC() + immediateUnboundPVC = makeTestPVC("immediate-unbound-pvc", "1G", pvcUnbound, "", &immediateClass) + immediateBoundPVC = makeTestPVC("immediate-bound-pvc", "1G", pvcBound, "pv-bound-immediate", &immediateClass) + + pvNoNode = makeTestPV("pv-no-node", "", "1G", "1", nil, waitClass) + pvNode1a = makeTestPV("pv-node1a", "node1", "5G", "1", nil, waitClass) + pvNode1b = makeTestPV("pv-node1b", "node1", "10G", "1", nil, waitClass) + pvNode2 = makeTestPV("pv-node2", "node2", "1G", "1", nil, waitClass) + pvPrebound = makeTestPV("pv-prebound", "node1", "1G", "1", unboundPVC, waitClass) + pvBound = makeTestPV("pv-bound", "node1", "1G", "1", boundPVC, waitClass) + pvNode1aBound = makeTestPV("pv-node1a", "node1", "1G", "1", unboundPVC, waitClass) + pvNode1bBound = makeTestPV("pv-node1b", "node1", "5G", "1", unboundPVC2, waitClass) + pvNode1bBoundHigherVersion = makeTestPV("pv-node1b", "node1", "5G", "2", unboundPVC2, waitClass) + pvBoundImmediate = makeTestPV("pv-bound-immediate", "node1", "1G", "1", immediateBoundPVC, immediateClass) + pvBoundImmediateNode2 = makeTestPV("pv-bound-immediate", "node2", "1G", "1", immediateBoundPVC, immediateClass) + + binding1a = makeBinding(unboundPVC, pvNode1a) + binding1b = makeBinding(unboundPVC2, pvNode1b) + bindingNoNode = makeBinding(unboundPVC, pvNoNode) + bindingBad = makeBinding(badPVC, pvNode1b) + binding1aBound = makeBinding(unboundPVC, pvNode1aBound) + binding1bBound = makeBinding(unboundPVC2, pvNode1bBound) + + waitClass = "waitClass" + immediateClass = "immediateClass" +) + +type testEnv struct { + client clientset.Interface + reactor *volumeReactor + binder SchedulerVolumeBinder + internalBinder *volumeBinder + internalPVCache *pvAssumeCache + internalPVCCache cache.Indexer +} + +func newTestBinder(t *testing.T) *testEnv { + client := &fake.Clientset{} + reactor := newVolumeReactor(client, nil, nil, nil, nil) + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + + pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() + nodeInformer := informerFactory.Core().V1().Nodes() + classInformer := informerFactory.Storage().V1().StorageClasses() + + binder := NewVolumeBinder( + client, + pvcInformer, + informerFactory.Core().V1().PersistentVolumes(), + nodeInformer, + classInformer) + + // Add a node + err := nodeInformer.Informer().GetIndexer().Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: map[string]string{"key1": "node1"}, + }, + }) + if err != nil { + t.Fatalf("Failed to add node to internal cache: %v", err) + } + + // Add storageclasses + waitMode := storagev1.VolumeBindingWaitForFirstConsumer + immediateMode := storagev1.VolumeBindingImmediate + classes := []*storagev1.StorageClass{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: waitClass, + }, + VolumeBindingMode: &waitMode, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: immediateClass, + }, + VolumeBindingMode: &immediateMode, + }, + } + for _, class := range classes { + if err = classInformer.Informer().GetIndexer().Add(class); err != nil { + t.Fatalf("Failed to add storage class to internal cache: %v", err) + } + } + + // Get internal types + internalBinder, ok := binder.(*volumeBinder) + if !ok { + t.Fatalf("Failed to convert to internal binder") + } + + pvCache := internalBinder.pvCache + internalPVCache, ok := pvCache.(*pvAssumeCache) + if !ok { + t.Fatalf("Failed to convert to internal PV cache") + } + + return &testEnv{ + client: client, + reactor: reactor, + binder: binder, + internalBinder: internalBinder, + internalPVCache: internalPVCache, + internalPVCCache: pvcInformer.Informer().GetIndexer(), + } +} + +func (env *testEnv) initClaims(t *testing.T, pvcs []*v1.PersistentVolumeClaim) { + for _, pvc := range pvcs { + err := env.internalPVCCache.Add(pvc) + if err != nil { + t.Fatalf("Failed to add PVC %q to internal cache: %v", pvc.Name, err) + } + env.reactor.claims[pvc.Name] = pvc + } +} + +func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.PersistentVolume) { + internalPVCache := env.internalPVCache + for _, pv := range cachedPVs { + internalPVCache.add(pv) + if apiPVs == nil { + env.reactor.volumes[pv.Name] = pv + } + } + for _, pv := range apiPVs { + env.reactor.volumes[pv.Name] = pv + } + +} + +func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod, bindings []*bindingInfo) { + pvCache := env.internalBinder.pvCache + for _, binding := range bindings { + if err := pvCache.Assume(binding.pv); err != nil { + t.Fatalf("Failed to setup test %q: error: %v", name, err) + } + } + + env.internalBinder.podBindingCache.UpdateBindings(pod, node, bindings) +} + +func (env *testEnv) initPodCache(pod *v1.Pod, node string, bindings []*bindingInfo) { + cache := env.internalBinder.podBindingCache + cache.UpdateBindings(pod, node, bindings) +} + +func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Pod, expectedBindings []*bindingInfo) { + cache := env.internalBinder.podBindingCache + bindings := cache.GetBindings(pod, node) + + if !reflect.DeepEqual(expectedBindings, bindings) { + t.Errorf("Test %q failed: Expected bindings %+v, got %+v", name, expectedBindings, bindings) + } +} + +func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo) { + // TODO: Check binding cache + + // Check pv cache + pvCache := env.internalBinder.pvCache + for _, b := range bindings { + pv, err := pvCache.GetPV(b.pv.Name) + if err != nil { + t.Errorf("Test %q failed: GetPV %q returned error: %v", name, b.pv.Name, err) + continue + } + if pv.Spec.ClaimRef == nil { + t.Errorf("Test %q failed: PV %q ClaimRef is nil", name, b.pv.Name) + continue + } + if pv.Spec.ClaimRef.Name != b.pvc.Name { + t.Errorf("Test %q failed: expected PV.ClaimRef.Name %q, got %q", name, b.pvc.Name, pv.Spec.ClaimRef.Name) + } + if pv.Spec.ClaimRef.Namespace != b.pvc.Namespace { + t.Errorf("Test %q failed: expected PV.ClaimRef.Namespace %q, got %q", name, b.pvc.Namespace, pv.Spec.ClaimRef.Namespace) + } + } +} + +func (env *testEnv) validateFailedAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo) { + // All PVs have been unmodified in cache + pvCache := env.internalBinder.pvCache + for _, b := range bindings { + pv, _ := pvCache.GetPV(b.pv.Name) + // PV could be nil if it's missing from cache + if pv != nil && pv != b.pv { + t.Errorf("Test %q failed: PV %q was modified in cache", name, b.pv.Name) + } + } +} + +func (env *testEnv) validateBind( + t *testing.T, + name string, + pod *v1.Pod, + expectedPVs []*v1.PersistentVolume, + expectedAPIPVs []*v1.PersistentVolume) { + + // Check pv cache + pvCache := env.internalBinder.pvCache + for _, pv := range expectedPVs { + cachedPV, err := pvCache.GetPV(pv.Name) + if err != nil { + t.Errorf("Test %q failed: GetPV %q returned error: %v", name, pv.Name, err) + } + if !reflect.DeepEqual(cachedPV, pv) { + t.Errorf("Test %q failed: cached PV check failed [A-expected, B-got]:\n%s", name, diff.ObjectDiff(pv, cachedPV)) + } + } + + // Check reactor for API updates + if err := env.reactor.checkVolumes(expectedAPIPVs); err != nil { + t.Errorf("Test %q failed: API reactor validation failed: %v", name, err) + } +} + +const ( + pvcUnbound = iota + pvcPrebound + pvcBound +) + +func makeTestPVC(name, size string, pvcBoundState int, pvName string, className *string) *v1.PersistentVolumeClaim { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "testns", + UID: types.UID("pvc-uid"), + ResourceVersion: "1", + SelfLink: testapi.Default.SelfLink("pvc", name), + }, + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse(size), + }, + }, + StorageClassName: className, + }, + } + + switch pvcBoundState { + case pvcBound: + metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annBindCompleted, "yes") + fallthrough + case pvcPrebound: + pvc.Spec.VolumeName = pvName + } + return pvc +} + +func makeBadPVC() *v1.PersistentVolumeClaim { + return &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bad-pvc", + Namespace: "testns", + UID: types.UID("pvc-uid"), + ResourceVersion: "1", + // Don't include SefLink, so that GetReference will fail + }, + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse("1G"), + }, + }, + StorageClassName: &waitClass, + }, + } +} + +func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentVolumeClaim, className string) *v1.PersistentVolume { + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + ResourceVersion: version, + }, + Spec: v1.PersistentVolumeSpec{ + Capacity: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse(capacity), + }, + StorageClassName: className, + }, + } + if node != "" { + pv.Annotations = getAnnotationWithNodeAffinity("key1", node) + } + + if boundToPVC != nil { + pv.Spec.ClaimRef = &v1.ObjectReference{ + Name: boundToPVC.Name, + Namespace: boundToPVC.Namespace, + UID: boundToPVC.UID, + } + metav1.SetMetaDataAnnotation(&pv.ObjectMeta, annBoundByController, "yes") + } + + return pv +} + +func makePod(pvcs []*v1.PersistentVolumeClaim) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "testns", + }, + } + + volumes := []v1.Volume{} + for i, pvc := range pvcs { + pvcVol := v1.Volume{ + Name: fmt.Sprintf("vol%v", i), + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + }, + }, + } + volumes = append(volumes, pvcVol) + } + pod.Spec.Volumes = volumes + pod.Spec.NodeName = "node1" + return pod +} + +func makePodWithoutPVC() *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "testns", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + } + return pod +} + +func makeBinding(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) *bindingInfo { + return &bindingInfo{pvc: pvc, pv: pv} +} + +func makeStringPtr(str string) *string { + s := fmt.Sprintf("%v", str) + return &s +} + +func TestFindPodVolumes(t *testing.T) { + scenarios := map[string]struct { + // Inputs + pvs []*v1.PersistentVolume + podPVCs []*v1.PersistentVolumeClaim + // Defaults to node1 + node string + // If nil, use pod PVCs + cachePVCs []*v1.PersistentVolumeClaim + // If nil, makePod with podPVCs + pod *v1.Pod + + // Expected podBindingCache fields + expectedBindings []*bindingInfo + + // Expected return values + expectedUnbound bool + expectedBound bool + shouldFail bool + }{ + "no-volumes": { + pod: makePod(nil), + expectedUnbound: true, + expectedBound: true, + }, + "no-pvcs": { + pod: makePodWithoutPVC(), + expectedUnbound: true, + expectedBound: true, + }, + "pvc-not-found": { + cachePVCs: []*v1.PersistentVolumeClaim{}, + podPVCs: []*v1.PersistentVolumeClaim{boundPVC}, + expectedUnbound: false, + expectedBound: false, + shouldFail: true, + }, + "bound-pvc": { + podPVCs: []*v1.PersistentVolumeClaim{boundPVC}, + pvs: []*v1.PersistentVolume{pvBound}, + expectedUnbound: true, + expectedBound: true, + }, + "bound-pvc,pv-not-exists": { + podPVCs: []*v1.PersistentVolumeClaim{boundPVC}, + expectedUnbound: false, + expectedBound: false, + shouldFail: true, + }, + "prebound-pvc": { + podPVCs: []*v1.PersistentVolumeClaim{preboundPVC}, + pvs: []*v1.PersistentVolume{pvNode1aBound}, + expectedUnbound: true, + expectedBound: true, + }, + "unbound-pvc,node-not-exists": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + node: "node12", + expectedUnbound: false, + expectedBound: false, + shouldFail: true, + }, + "unbound-pvc,pv-same-node": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + pvs: []*v1.PersistentVolume{pvNode2, pvNode1a, pvNode1b}, + expectedBindings: []*bindingInfo{binding1a}, + expectedUnbound: true, + expectedBound: true, + }, + "unbound-pvc,pv-different-node": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + pvs: []*v1.PersistentVolume{pvNode2}, + expectedUnbound: false, + expectedBound: true, + }, + "two-unbound-pvcs": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2}, + pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, + expectedBindings: []*bindingInfo{binding1a, binding1b}, + expectedUnbound: true, + expectedBound: true, + }, + "two-unbound-pvcs,order-by-size": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC2, unboundPVC}, + pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, + expectedBindings: []*bindingInfo{binding1a, binding1b}, + expectedUnbound: true, + expectedBound: true, + }, + "two-unbound-pvcs,partial-match": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2}, + pvs: []*v1.PersistentVolume{pvNode1a}, + expectedUnbound: false, + expectedBound: true, + }, + "one-bound,one-unbound": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, boundPVC}, + pvs: []*v1.PersistentVolume{pvBound, pvNode1a}, + expectedBindings: []*bindingInfo{binding1a}, + expectedUnbound: true, + expectedBound: true, + }, + "one-bound,one-unbound,no-match": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, boundPVC}, + pvs: []*v1.PersistentVolume{pvBound, pvNode2}, + expectedUnbound: false, + expectedBound: true, + }, + "one-prebound,one-unbound": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, preboundPVC}, + pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, + expectedBindings: []*bindingInfo{binding1a}, + expectedUnbound: true, + expectedBound: true, + }, + "immediate-bound-pvc": { + podPVCs: []*v1.PersistentVolumeClaim{immediateBoundPVC}, + pvs: []*v1.PersistentVolume{pvBoundImmediate}, + expectedUnbound: true, + expectedBound: true, + }, + "immediate-bound-pvc-wrong-node": { + podPVCs: []*v1.PersistentVolumeClaim{immediateBoundPVC}, + pvs: []*v1.PersistentVolume{pvBoundImmediateNode2}, + expectedUnbound: true, + expectedBound: false, + }, + "immediate-unbound-pvc": { + podPVCs: []*v1.PersistentVolumeClaim{immediateUnboundPVC}, + expectedUnbound: false, + expectedBound: false, + shouldFail: true, + }, + "immediate-unbound-pvc,delayed-mode-bound": { + podPVCs: []*v1.PersistentVolumeClaim{immediateUnboundPVC, boundPVC}, + pvs: []*v1.PersistentVolume{pvBound}, + expectedUnbound: false, + expectedBound: false, + shouldFail: true, + }, + "immediate-unbound-pvc,delayed-mode-unbound": { + podPVCs: []*v1.PersistentVolumeClaim{immediateUnboundPVC, unboundPVC}, + expectedUnbound: false, + expectedBound: false, + shouldFail: true, + }, + } + + // Set feature gate + utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true") + defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false") + + for name, scenario := range scenarios { + glog.V(5).Infof("Running test case %q", name) + + // Setup + testEnv := newTestBinder(t) + testEnv.initVolumes(scenario.pvs, scenario.pvs) + if scenario.node == "" { + scenario.node = "node1" + } + + // a. Init pvc cache + if scenario.cachePVCs == nil { + scenario.cachePVCs = scenario.podPVCs + } + testEnv.initClaims(t, scenario.cachePVCs) + + // b. Generate pod with given claims + if scenario.pod == nil { + scenario.pod = makePod(scenario.podPVCs) + } + + // Execute + unboundSatisfied, boundSatisfied, err := testEnv.binder.FindPodVolumes(scenario.pod, scenario.node) + + // Validate + if !scenario.shouldFail && err != nil { + t.Errorf("Test %q failed: returned error: %v", name, err) + } + if scenario.shouldFail && err == nil { + t.Errorf("Test %q failed: returned success but expected error", name) + } + if boundSatisfied != scenario.expectedBound { + t.Errorf("Test %q failed: expected boundSatsified %v, got %v", name, scenario.expectedBound, boundSatisfied) + } + if unboundSatisfied != scenario.expectedUnbound { + t.Errorf("Test %q failed: expected unboundSatsified %v, got %v", name, scenario.expectedUnbound, unboundSatisfied) + } + testEnv.validatePodCache(t, name, scenario.node, scenario.pod, scenario.expectedBindings) + } +} + +func TestAssumePodVolumes(t *testing.T) { + scenarios := map[string]struct { + // Inputs + podPVCs []*v1.PersistentVolumeClaim + pvs []*v1.PersistentVolume + bindings []*bindingInfo + + // Expected return values + shouldFail bool + expectedBindingRequired bool + expectedAllBound bool + + // if nil, use bindings + expectedBindings []*bindingInfo + }{ + "all-bound": { + podPVCs: []*v1.PersistentVolumeClaim{boundPVC}, + pvs: []*v1.PersistentVolume{pvBound}, + expectedAllBound: true, + }, + "prebound-pvc": { + podPVCs: []*v1.PersistentVolumeClaim{preboundPVC}, + pvs: []*v1.PersistentVolume{pvNode1a}, + }, + "one-binding": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + bindings: []*bindingInfo{binding1a}, + pvs: []*v1.PersistentVolume{pvNode1a}, + expectedBindingRequired: true, + }, + "two-bindings": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2}, + bindings: []*bindingInfo{binding1a, binding1b}, + pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, + expectedBindingRequired: true, + }, + "pv-already-bound": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + bindings: []*bindingInfo{binding1aBound}, + pvs: []*v1.PersistentVolume{pvNode1aBound}, + expectedBindingRequired: false, + expectedBindings: []*bindingInfo{}, + }, + "claimref-failed": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + bindings: []*bindingInfo{binding1a, bindingBad}, + pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, + shouldFail: true, + expectedBindingRequired: true, + }, + "tmpupdate-failed": { + podPVCs: []*v1.PersistentVolumeClaim{unboundPVC}, + bindings: []*bindingInfo{binding1a, binding1b}, + pvs: []*v1.PersistentVolume{pvNode1a}, + shouldFail: true, + expectedBindingRequired: true, + }, + } + + for name, scenario := range scenarios { + glog.V(5).Infof("Running test case %q", name) + + // Setup + testEnv := newTestBinder(t) + testEnv.initClaims(t, scenario.podPVCs) + pod := makePod(scenario.podPVCs) + testEnv.initPodCache(pod, "node1", scenario.bindings) + testEnv.initVolumes(scenario.pvs, scenario.pvs) + + // Execute + allBound, bindingRequired, err := testEnv.binder.AssumePodVolumes(pod, "node1") + + // Validate + if !scenario.shouldFail && err != nil { + t.Errorf("Test %q failed: returned error: %v", name, err) + } + if scenario.shouldFail && err == nil { + t.Errorf("Test %q failed: returned success but expected error", name) + } + if scenario.expectedBindingRequired != bindingRequired { + t.Errorf("Test %q failed: returned unexpected bindingRequired: %v", name, bindingRequired) + } + if scenario.expectedAllBound != allBound { + t.Errorf("Test %q failed: returned unexpected allBound: %v", name, allBound) + } + if scenario.expectedBindings == nil { + scenario.expectedBindings = scenario.bindings + } + if scenario.shouldFail { + testEnv.validateFailedAssume(t, name, pod, scenario.expectedBindings) + } else { + testEnv.validateAssume(t, name, pod, scenario.expectedBindings) + } + } +} + +func TestBindPodVolumes(t *testing.T) { + scenarios := map[string]struct { + // Inputs + bindings []*bindingInfo + cachedPVs []*v1.PersistentVolume + // if nil, use cachedPVs + apiPVs []*v1.PersistentVolume + + // Expected return values + shouldFail bool + expectedPVs []*v1.PersistentVolume + // if nil, use expectedPVs + expectedAPIPVs []*v1.PersistentVolume + }{ + "all-bound": {}, + "not-fully-bound": { + bindings: []*bindingInfo{}, + }, + "one-binding": { + bindings: []*bindingInfo{binding1aBound}, + cachedPVs: []*v1.PersistentVolume{pvNode1a}, + expectedPVs: []*v1.PersistentVolume{pvNode1aBound}, + }, + "two-bindings": { + bindings: []*bindingInfo{binding1aBound, binding1bBound}, + cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, + expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound}, + }, + "api-update-failed": { + bindings: []*bindingInfo{binding1aBound, binding1bBound}, + cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode1b}, + apiPVs: []*v1.PersistentVolume{pvNode1a, pvNode1bBoundHigherVersion}, + expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1b}, + expectedAPIPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBoundHigherVersion}, + shouldFail: true, + }, + } + for name, scenario := range scenarios { + glog.V(5).Infof("Running test case %q", name) + + // Setup + testEnv := newTestBinder(t) + pod := makePod(nil) + if scenario.apiPVs == nil { + scenario.apiPVs = scenario.cachedPVs + } + testEnv.initVolumes(scenario.cachedPVs, scenario.apiPVs) + testEnv.assumeVolumes(t, name, "node1", pod, scenario.bindings) + + // Execute + err := testEnv.binder.BindPodVolumes(pod) + + // Validate + if !scenario.shouldFail && err != nil { + t.Errorf("Test %q failed: returned error: %v", name, err) + } + if scenario.shouldFail && err == nil { + t.Errorf("Test %q failed: returned success but expected error", name) + } + if scenario.expectedAPIPVs == nil { + scenario.expectedAPIPVs = scenario.expectedPVs + } + testEnv.validateBind(t, name, pod, scenario.expectedPVs, scenario.expectedAPIPVs) + } +}