Library for scheduler volume binding

This commit is contained in:
Michelle Au 2017-11-08 13:09:50 -08:00
parent b49a1ce1a4
commit fa6b62fa63
4 changed files with 1249 additions and 0 deletions

View File

@ -196,6 +196,8 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
if storedVer != requestedVer {
return true, obj, versionConflictError
}
// Don't modify the existing object
volume = volume.DeepCopy()
volume.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("Cannot update volume %s: volume not found", volume.Name)
@ -220,6 +222,8 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
if storedVer != requestedVer {
return true, obj, versionConflictError
}
// Don't modify the existing object
claim = claim.DeepCopy()
claim.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("Cannot update claim %s: claim not found", claim.Name)
@ -301,7 +305,12 @@ func (r *volumeReactor) checkVolumes(expectedVolumes []*v1.PersistentVolume) err
gotMap := make(map[string]*v1.PersistentVolume)
// Clear any ResourceVersion from both sets
for _, v := range expectedVolumes {
// Don't modify the existing object
v := v.DeepCopy()
v.ResourceVersion = ""
if v.Spec.ClaimRef != nil {
v.Spec.ClaimRef.ResourceVersion = ""
}
expectedMap[v.Name] = v
}
for _, v := range r.volumes {
@ -331,6 +340,8 @@ func (r *volumeReactor) checkClaims(expectedClaims []*v1.PersistentVolumeClaim)
expectedMap := make(map[string]*v1.PersistentVolumeClaim)
gotMap := make(map[string]*v1.PersistentVolumeClaim)
for _, c := range expectedClaims {
// Don't modify the existing object
c = c.DeepCopy()
c.ResourceVersion = ""
expectedMap[c.Name] = c
}

View File

@ -0,0 +1,420 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"sort"
"github.com/golang/glog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
// SchedulerVolumeBinder is used by the scheduler to handle PVC/PV binding
// and dynamic provisioning. The binding decisions are integrated into the pod scheduling
// workflow so that the PV NodeAffinity is also considered along with the pod's other
// scheduling requirements.
//
// This integrates into the existing default scheduler workflow as follows:
// 1. The scheduler takes a Pod off the scheduler queue and processes it serially:
// a. Invokes all predicate functions, parallelized across nodes. FindPodVolumes() is invoked here.
// b. Invokes all priority functions. Future/TBD
// c. Selects the best node for the Pod.
// d. Cache the node selection for the Pod. (Assume phase)
// i. If PVC binding is required, cache in-memory only:
// * Updated PV objects for prebinding to the corresponding PVCs.
// * For the pod, which PVs need API updates.
// AssumePodVolumes() is invoked here. Then BindPodVolumes() is called asynchronously by the
// scheduler. After BindPodVolumes() is complete, the Pod is added back to the scheduler queue
// to be processed again until all PVCs are bound.
// ii. If PVC binding is not required, cache the Pod->Node binding in the scheduler's pod cache,
// and asynchronously bind the Pod to the Node. This is handled in the scheduler and not here.
// 2. Once the assume operation is done, the scheduler processes the next Pod in the scheduler queue
// while the actual binding operation occurs in the background.
type SchedulerVolumeBinder interface {
// FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the node.
//
// If a PVC is bound, it checks if the PV's NodeAffinity matches the Node.
// Otherwise, it tries to find an available PV to bind to the PVC.
//
// It returns true if there are matching PVs that can satisfy all of the Pod's PVCs, and returns true
// if bound volumes satisfy the PV NodeAffinity.
//
// This function is called by the volume binding scheduler predicate and can be called in parallel
FindPodVolumes(pod *v1.Pod, nodeName string) (unboundVolumesSatisified, boundVolumesSatisfied bool, err error)
// AssumePodVolumes will take the PV matches for unbound PVCs and update the PV cache assuming
// that the PV is prebound to the PVC.
//
// It returns true if all volumes are fully bound, and returns true if any volume binding API operation needs
// to be done afterwards.
//
// This function will modify assumedPod with the node name.
// This function is called serially.
AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, bindingRequired bool, err error)
// BindPodVolumes will initiate the volume binding by making the API call to prebind the PV
// to its matching PVC.
//
// This function can be called in parallel.
BindPodVolumes(assumedPod *v1.Pod) error
// GetBindingsCache returns the cache used (if any) to store volume binding decisions.
GetBindingsCache() PodBindingCache
}
type volumeBinder struct {
ctrl *PersistentVolumeController
// TODO: Need AssumeCache for PVC for dynamic provisioning
pvcCache corelisters.PersistentVolumeClaimLister
nodeCache corelisters.NodeLister
pvCache PVAssumeCache
// Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes.
// AssumePodVolumes modifies the bindings again for use in BindPodVolumes.
podBindingCache PodBindingCache
}
// NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions.
func NewVolumeBinder(
kubeClient clientset.Interface,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
nodeInformer coreinformers.NodeInformer,
storageClassInformer storageinformers.StorageClassInformer) SchedulerVolumeBinder {
// TODO: find better way...
ctrl := &PersistentVolumeController{
kubeClient: kubeClient,
classLister: storageClassInformer.Lister(),
}
b := &volumeBinder{
ctrl: ctrl,
pvcCache: pvcInformer.Lister(),
nodeCache: nodeInformer.Lister(),
pvCache: NewPVAssumeCache(pvInformer.Informer()),
podBindingCache: NewPodBindingCache(),
}
return b
}
func (b *volumeBinder) GetBindingsCache() PodBindingCache {
return b.podBindingCache
}
// FindPodVolumes caches the matching PVs per node in podBindingCache
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, nodeName string) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
podName := getPodName(pod)
glog.V(4).Infof("FindPodVolumes for pod %q, node %q", podName, nodeName)
// Initialize to true for pods that don't have volumes
unboundVolumesSatisfied = true
boundVolumesSatisfied = true
node, err := b.nodeCache.Get(nodeName)
if node == nil || err != nil {
return false, false, fmt.Errorf("error getting node %q: %v", nodeName, err)
}
// The pod's volumes need to be processed in one call to avoid the race condition where
// volumes can get bound in between calls.
boundClaims, unboundClaims, unboundClaimsImmediate, err := b.getPodVolumes(pod)
if err != nil {
return false, false, err
}
// Immediate claims should be bound
if len(unboundClaimsImmediate) > 0 {
return false, false, fmt.Errorf("pod has unbound PersistentVolumeClaims")
}
// Check PV node affinity on bound volumes
if len(boundClaims) > 0 {
boundVolumesSatisfied, err = b.checkBoundClaims(boundClaims, node, podName)
if err != nil {
return false, false, err
}
}
// Find PVs for unbound volumes
if len(unboundClaims) > 0 {
unboundVolumesSatisfied, err = b.findMatchingVolumes(pod, unboundClaims, node)
if err != nil {
return false, false, err
}
}
return unboundVolumesSatisfied, boundVolumesSatisfied, nil
}
// AssumePodVolumes will take the cached matching PVs in podBindingCache for the chosen node
// and update the pvCache with the new prebound PV. It will update podBindingCache again
// with the PVs that need an API update.
func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound, bindingRequired bool, err error) {
podName := getPodName(assumedPod)
glog.V(4).Infof("AssumePodVolumes for pod %q, node %q", podName, nodeName)
if allBound := b.arePodVolumesBound(assumedPod); allBound {
glog.V(4).Infof("AssumePodVolumes: all PVCs bound and nothing to do")
return true, false, nil
}
assumedPod.Spec.NodeName = nodeName
claimsToBind := b.podBindingCache.GetBindings(assumedPod, nodeName)
newBindings := []*bindingInfo{}
for _, binding := range claimsToBind {
newPV, dirty, err := b.ctrl.getBindVolumeToClaim(binding.pv, binding.pvc)
glog.V(5).Infof("AssumePodVolumes: getBindVolumeToClaim for PV %q, PVC %q. newPV %p, dirty %v, err: %v",
binding.pv.Name,
binding.pvc.Name,
newPV,
dirty,
err)
if err != nil {
b.revertAssumedPVs(newBindings)
return false, true, err
}
if dirty {
err = b.pvCache.Assume(newPV)
if err != nil {
b.revertAssumedPVs(newBindings)
return false, true, err
}
newBindings = append(newBindings, &bindingInfo{pv: newPV, pvc: binding.pvc})
}
}
if len(newBindings) == 0 {
// Don't update cached bindings if no API updates are needed. This can happen if we
// previously updated the PV object and are waiting for the PV controller to finish binding.
glog.V(4).Infof("AssumePodVolumes: PVs already assumed")
return false, false, nil
}
b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings)
return false, true, nil
}
// BindPodVolumes gets the cached bindings in podBindingCache and makes the API update for those PVs.
func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {
glog.V(4).Infof("BindPodVolumes for pod %q", getPodName(assumedPod))
bindings := b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName)
// Do the actual prebinding. Let the PV controller take care of the rest
// There is no API rollback if the actual binding fails
for i, bindingInfo := range bindings {
_, err := b.ctrl.updateBindVolumeToClaim(bindingInfo.pv, bindingInfo.pvc, false)
if err != nil {
// only revert assumed cached updates for volumes we haven't successfully bound
b.revertAssumedPVs(bindings[i:])
return err
}
}
return nil
}
func getPodName(pod *v1.Pod) string {
return pod.Namespace + "/" + pod.Name
}
func getPVCName(pvc *v1.PersistentVolumeClaim) string {
return pvc.Namespace + "/" + pvc.Name
}
func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume, checkFullyBound bool) (bool, *v1.PersistentVolumeClaim, error) {
if vol.PersistentVolumeClaim == nil {
return true, nil, nil
}
pvcName := vol.PersistentVolumeClaim.ClaimName
pvc, err := b.pvcCache.PersistentVolumeClaims(namespace).Get(pvcName)
if err != nil || pvc == nil {
return false, nil, fmt.Errorf("error getting PVC %q: %v", pvcName, err)
}
pvName := pvc.Spec.VolumeName
if pvName != "" {
if checkFullyBound {
if metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted) {
glog.V(5).Infof("PVC %q is fully bound to PV %q", getPVCName(pvc), pvName)
return true, pvc, nil
} else {
glog.V(5).Infof("PVC %q is not fully bound to PV %q", getPVCName(pvc), pvName)
return false, pvc, nil
}
}
glog.V(5).Infof("PVC %q is bound or prebound to PV %q", getPVCName(pvc), pvName)
return true, pvc, nil
}
glog.V(5).Infof("PVC %q is not bound", getPVCName(pvc))
return false, pvc, nil
}
// arePodVolumesBound returns true if all volumes are fully bound
func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool {
for _, vol := range pod.Spec.Volumes {
if isBound, _, _ := b.isVolumeBound(pod.Namespace, &vol, true); !isBound {
// Pod has at least one PVC that needs binding
return false
}
}
return true
}
// getPodVolumes returns a pod's PVCs separated into bound (including prebound), unbound with delayed binding,
// and unbound with immediate binding
func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaims []*bindingInfo, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
boundClaims = []*v1.PersistentVolumeClaim{}
unboundClaimsImmediate = []*v1.PersistentVolumeClaim{}
unboundClaims = []*bindingInfo{}
for _, vol := range pod.Spec.Volumes {
volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol, false)
if err != nil {
return nil, nil, nil, err
}
if pvc == nil {
continue
}
if volumeBound {
boundClaims = append(boundClaims, pvc)
} else {
delayBinding, err := b.ctrl.shouldDelayBinding(pvc)
if err != nil {
return nil, nil, nil, err
}
if delayBinding {
// Scheduler path
unboundClaims = append(unboundClaims, &bindingInfo{pvc: pvc})
} else {
// Immediate binding should have already been bound
unboundClaimsImmediate = append(unboundClaimsImmediate, pvc)
}
}
}
return boundClaims, unboundClaims, unboundClaimsImmediate, nil
}
func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node *v1.Node, podName string) (bool, error) {
for _, pvc := range claims {
pvName := pvc.Spec.VolumeName
pv, err := b.pvCache.GetPV(pvName)
if err != nil {
return false, err
}
err = volumeutil.CheckNodeAffinity(pv, node.Labels)
if err != nil {
glog.V(4).Infof("PersistentVolume %q, Node %q mismatch for Pod %q: %v", pvName, node.Name, err.Error(), podName)
return false, nil
}
glog.V(5).Infof("PersistentVolume %q, Node %q matches for Pod %q", pvName, node.Name, podName)
}
glog.V(4).Infof("All volumes for Pod %q match with Node %q", podName, node.Name)
return true, nil
}
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, err error) {
// Sort all the claims by increasing size request to get the smallest fits
sort.Sort(byPVCSize(claimsToBind))
allPVs := b.pvCache.ListPVs()
chosenPVs := map[string]*v1.PersistentVolume{}
for _, bindingInfo := range claimsToBind {
// Find a matching PV
bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true)
if err != nil {
return false, err
}
if bindingInfo.pv == nil {
glog.V(4).Infof("No matching volumes for PVC %q on node %q", getPVCName(bindingInfo.pvc), node.Name)
return false, nil
}
// matching PV needs to be excluded so we don't select it again
chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv
}
// Mark cache with all the matches for each PVC for this node
b.podBindingCache.UpdateBindings(pod, node.Name, claimsToBind)
glog.V(4).Infof("Found matching volumes on node %q", node.Name)
return true, nil
}
func (b *volumeBinder) revertAssumedPVs(bindings []*bindingInfo) {
for _, bindingInfo := range bindings {
b.pvCache.Restore(bindingInfo.pv.Name)
}
}
type bindingInfo struct {
// Claim that needs to be bound
pvc *v1.PersistentVolumeClaim
// Proposed PV to bind to this claim
pv *v1.PersistentVolume
}
// Used in unit test errors
func (b bindingInfo) String() string {
pvcName := ""
pvName := ""
if b.pvc != nil {
pvcName = getPVCName(b.pvc)
}
if b.pv != nil {
pvName = b.pv.Name
}
return fmt.Sprintf("[PVC %q, PV %q]", pvcName, pvName)
}
type byPVCSize []*bindingInfo
func (a byPVCSize) Len() int {
return len(a)
}
func (a byPVCSize) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a byPVCSize) Less(i, j int) bool {
iSize := a[i].pvc.Spec.Resources.Requests[v1.ResourceStorage]
jSize := a[j].pvc.Spec.Resources.Requests[v1.ResourceStorage]
// return true if iSize is less than jSize
return iSize.Cmp(jSize) == -1
}

View File

@ -0,0 +1,63 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"k8s.io/api/core/v1"
)
type FakeVolumeBinderConfig struct {
AllBound bool
FindUnboundSatsified bool
FindBoundSatsified bool
FindErr error
AssumeBindingRequired bool
AssumeErr error
BindErr error
}
// NewVolumeBinder sets up all the caches needed for the scheduler to make
// topology-aware volume binding decisions.
func NewFakeVolumeBinder(config *FakeVolumeBinderConfig) *FakeVolumeBinder {
return &FakeVolumeBinder{
config: config,
}
}
type FakeVolumeBinder struct {
config *FakeVolumeBinderConfig
AssumeCalled bool
BindCalled bool
}
func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, nodeName string) (unboundVolumesSatisfied, boundVolumesSatsified bool, err error) {
return b.config.FindUnboundSatsified, b.config.FindBoundSatsified, b.config.FindErr
}
func (b *FakeVolumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (bool, bool, error) {
b.AssumeCalled = true
return b.config.AllBound, b.config.AssumeBindingRequired, b.config.AssumeErr
}
func (b *FakeVolumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {
b.BindCalled = true
return b.config.BindErr
}
func (b *FakeVolumeBinder) GetBindingsCache() PodBindingCache {
return nil
}

View File

@ -0,0 +1,755 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"reflect"
"testing"
"github.com/golang/glog"
"k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/controller"
)
var (
unboundPVC = makeTestPVC("unbound-pvc", "1G", pvcUnbound, "", &waitClass)
unboundPVC2 = makeTestPVC("unbound-pvc2", "5G", pvcUnbound, "", &waitClass)
preboundPVC = makeTestPVC("prebound-pvc", "1G", pvcPrebound, "pv-node1a", &waitClass)
boundPVC = makeTestPVC("bound-pvc", "1G", pvcBound, "pv-bound", &waitClass)
boundPVC2 = makeTestPVC("bound-pvc2", "1G", pvcBound, "pv-bound2", &waitClass)
badPVC = makeBadPVC()
immediateUnboundPVC = makeTestPVC("immediate-unbound-pvc", "1G", pvcUnbound, "", &immediateClass)
immediateBoundPVC = makeTestPVC("immediate-bound-pvc", "1G", pvcBound, "pv-bound-immediate", &immediateClass)
pvNoNode = makeTestPV("pv-no-node", "", "1G", "1", nil, waitClass)
pvNode1a = makeTestPV("pv-node1a", "node1", "5G", "1", nil, waitClass)
pvNode1b = makeTestPV("pv-node1b", "node1", "10G", "1", nil, waitClass)
pvNode2 = makeTestPV("pv-node2", "node2", "1G", "1", nil, waitClass)
pvPrebound = makeTestPV("pv-prebound", "node1", "1G", "1", unboundPVC, waitClass)
pvBound = makeTestPV("pv-bound", "node1", "1G", "1", boundPVC, waitClass)
pvNode1aBound = makeTestPV("pv-node1a", "node1", "1G", "1", unboundPVC, waitClass)
pvNode1bBound = makeTestPV("pv-node1b", "node1", "5G", "1", unboundPVC2, waitClass)
pvNode1bBoundHigherVersion = makeTestPV("pv-node1b", "node1", "5G", "2", unboundPVC2, waitClass)
pvBoundImmediate = makeTestPV("pv-bound-immediate", "node1", "1G", "1", immediateBoundPVC, immediateClass)
pvBoundImmediateNode2 = makeTestPV("pv-bound-immediate", "node2", "1G", "1", immediateBoundPVC, immediateClass)
binding1a = makeBinding(unboundPVC, pvNode1a)
binding1b = makeBinding(unboundPVC2, pvNode1b)
bindingNoNode = makeBinding(unboundPVC, pvNoNode)
bindingBad = makeBinding(badPVC, pvNode1b)
binding1aBound = makeBinding(unboundPVC, pvNode1aBound)
binding1bBound = makeBinding(unboundPVC2, pvNode1bBound)
waitClass = "waitClass"
immediateClass = "immediateClass"
)
type testEnv struct {
client clientset.Interface
reactor *volumeReactor
binder SchedulerVolumeBinder
internalBinder *volumeBinder
internalPVCache *pvAssumeCache
internalPVCCache cache.Indexer
}
func newTestBinder(t *testing.T) *testEnv {
client := &fake.Clientset{}
reactor := newVolumeReactor(client, nil, nil, nil, nil)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
nodeInformer := informerFactory.Core().V1().Nodes()
classInformer := informerFactory.Storage().V1().StorageClasses()
binder := NewVolumeBinder(
client,
pvcInformer,
informerFactory.Core().V1().PersistentVolumes(),
nodeInformer,
classInformer)
// Add a node
err := nodeInformer.Informer().GetIndexer().Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Labels: map[string]string{"key1": "node1"},
},
})
if err != nil {
t.Fatalf("Failed to add node to internal cache: %v", err)
}
// Add storageclasses
waitMode := storagev1.VolumeBindingWaitForFirstConsumer
immediateMode := storagev1.VolumeBindingImmediate
classes := []*storagev1.StorageClass{
{
ObjectMeta: metav1.ObjectMeta{
Name: waitClass,
},
VolumeBindingMode: &waitMode,
},
{
ObjectMeta: metav1.ObjectMeta{
Name: immediateClass,
},
VolumeBindingMode: &immediateMode,
},
}
for _, class := range classes {
if err = classInformer.Informer().GetIndexer().Add(class); err != nil {
t.Fatalf("Failed to add storage class to internal cache: %v", err)
}
}
// Get internal types
internalBinder, ok := binder.(*volumeBinder)
if !ok {
t.Fatalf("Failed to convert to internal binder")
}
pvCache := internalBinder.pvCache
internalPVCache, ok := pvCache.(*pvAssumeCache)
if !ok {
t.Fatalf("Failed to convert to internal PV cache")
}
return &testEnv{
client: client,
reactor: reactor,
binder: binder,
internalBinder: internalBinder,
internalPVCache: internalPVCache,
internalPVCCache: pvcInformer.Informer().GetIndexer(),
}
}
func (env *testEnv) initClaims(t *testing.T, pvcs []*v1.PersistentVolumeClaim) {
for _, pvc := range pvcs {
err := env.internalPVCCache.Add(pvc)
if err != nil {
t.Fatalf("Failed to add PVC %q to internal cache: %v", pvc.Name, err)
}
env.reactor.claims[pvc.Name] = pvc
}
}
func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.PersistentVolume) {
internalPVCache := env.internalPVCache
for _, pv := range cachedPVs {
internalPVCache.add(pv)
if apiPVs == nil {
env.reactor.volumes[pv.Name] = pv
}
}
for _, pv := range apiPVs {
env.reactor.volumes[pv.Name] = pv
}
}
func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod, bindings []*bindingInfo) {
pvCache := env.internalBinder.pvCache
for _, binding := range bindings {
if err := pvCache.Assume(binding.pv); err != nil {
t.Fatalf("Failed to setup test %q: error: %v", name, err)
}
}
env.internalBinder.podBindingCache.UpdateBindings(pod, node, bindings)
}
func (env *testEnv) initPodCache(pod *v1.Pod, node string, bindings []*bindingInfo) {
cache := env.internalBinder.podBindingCache
cache.UpdateBindings(pod, node, bindings)
}
func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Pod, expectedBindings []*bindingInfo) {
cache := env.internalBinder.podBindingCache
bindings := cache.GetBindings(pod, node)
if !reflect.DeepEqual(expectedBindings, bindings) {
t.Errorf("Test %q failed: Expected bindings %+v, got %+v", name, expectedBindings, bindings)
}
}
func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo) {
// TODO: Check binding cache
// Check pv cache
pvCache := env.internalBinder.pvCache
for _, b := range bindings {
pv, err := pvCache.GetPV(b.pv.Name)
if err != nil {
t.Errorf("Test %q failed: GetPV %q returned error: %v", name, b.pv.Name, err)
continue
}
if pv.Spec.ClaimRef == nil {
t.Errorf("Test %q failed: PV %q ClaimRef is nil", name, b.pv.Name)
continue
}
if pv.Spec.ClaimRef.Name != b.pvc.Name {
t.Errorf("Test %q failed: expected PV.ClaimRef.Name %q, got %q", name, b.pvc.Name, pv.Spec.ClaimRef.Name)
}
if pv.Spec.ClaimRef.Namespace != b.pvc.Namespace {
t.Errorf("Test %q failed: expected PV.ClaimRef.Namespace %q, got %q", name, b.pvc.Namespace, pv.Spec.ClaimRef.Namespace)
}
}
}
func (env *testEnv) validateFailedAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo) {
// All PVs have been unmodified in cache
pvCache := env.internalBinder.pvCache
for _, b := range bindings {
pv, _ := pvCache.GetPV(b.pv.Name)
// PV could be nil if it's missing from cache
if pv != nil && pv != b.pv {
t.Errorf("Test %q failed: PV %q was modified in cache", name, b.pv.Name)
}
}
}
func (env *testEnv) validateBind(
t *testing.T,
name string,
pod *v1.Pod,
expectedPVs []*v1.PersistentVolume,
expectedAPIPVs []*v1.PersistentVolume) {
// Check pv cache
pvCache := env.internalBinder.pvCache
for _, pv := range expectedPVs {
cachedPV, err := pvCache.GetPV(pv.Name)
if err != nil {
t.Errorf("Test %q failed: GetPV %q returned error: %v", name, pv.Name, err)
}
if !reflect.DeepEqual(cachedPV, pv) {
t.Errorf("Test %q failed: cached PV check failed [A-expected, B-got]:\n%s", name, diff.ObjectDiff(pv, cachedPV))
}
}
// Check reactor for API updates
if err := env.reactor.checkVolumes(expectedAPIPVs); err != nil {
t.Errorf("Test %q failed: API reactor validation failed: %v", name, err)
}
}
const (
pvcUnbound = iota
pvcPrebound
pvcBound
)
func makeTestPVC(name, size string, pvcBoundState int, pvName string, className *string) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "testns",
UID: types.UID("pvc-uid"),
ResourceVersion: "1",
SelfLink: testapi.Default.SelfLink("pvc", name),
},
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse(size),
},
},
StorageClassName: className,
},
}
switch pvcBoundState {
case pvcBound:
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annBindCompleted, "yes")
fallthrough
case pvcPrebound:
pvc.Spec.VolumeName = pvName
}
return pvc
}
func makeBadPVC() *v1.PersistentVolumeClaim {
return &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "bad-pvc",
Namespace: "testns",
UID: types.UID("pvc-uid"),
ResourceVersion: "1",
// Don't include SefLink, so that GetReference will fail
},
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse("1G"),
},
},
StorageClassName: &waitClass,
},
}
}
func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentVolumeClaim, className string) *v1.PersistentVolume {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: name,
ResourceVersion: version,
},
Spec: v1.PersistentVolumeSpec{
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse(capacity),
},
StorageClassName: className,
},
}
if node != "" {
pv.Annotations = getAnnotationWithNodeAffinity("key1", node)
}
if boundToPVC != nil {
pv.Spec.ClaimRef = &v1.ObjectReference{
Name: boundToPVC.Name,
Namespace: boundToPVC.Namespace,
UID: boundToPVC.UID,
}
metav1.SetMetaDataAnnotation(&pv.ObjectMeta, annBoundByController, "yes")
}
return pv
}
func makePod(pvcs []*v1.PersistentVolumeClaim) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "testns",
},
}
volumes := []v1.Volume{}
for i, pvc := range pvcs {
pvcVol := v1.Volume{
Name: fmt.Sprintf("vol%v", i),
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
},
},
}
volumes = append(volumes, pvcVol)
}
pod.Spec.Volumes = volumes
pod.Spec.NodeName = "node1"
return pod
}
func makePodWithoutPVC() *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "testns",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
},
},
},
},
}
return pod
}
func makeBinding(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) *bindingInfo {
return &bindingInfo{pvc: pvc, pv: pv}
}
func makeStringPtr(str string) *string {
s := fmt.Sprintf("%v", str)
return &s
}
func TestFindPodVolumes(t *testing.T) {
scenarios := map[string]struct {
// Inputs
pvs []*v1.PersistentVolume
podPVCs []*v1.PersistentVolumeClaim
// Defaults to node1
node string
// If nil, use pod PVCs
cachePVCs []*v1.PersistentVolumeClaim
// If nil, makePod with podPVCs
pod *v1.Pod
// Expected podBindingCache fields
expectedBindings []*bindingInfo
// Expected return values
expectedUnbound bool
expectedBound bool
shouldFail bool
}{
"no-volumes": {
pod: makePod(nil),
expectedUnbound: true,
expectedBound: true,
},
"no-pvcs": {
pod: makePodWithoutPVC(),
expectedUnbound: true,
expectedBound: true,
},
"pvc-not-found": {
cachePVCs: []*v1.PersistentVolumeClaim{},
podPVCs: []*v1.PersistentVolumeClaim{boundPVC},
expectedUnbound: false,
expectedBound: false,
shouldFail: true,
},
"bound-pvc": {
podPVCs: []*v1.PersistentVolumeClaim{boundPVC},
pvs: []*v1.PersistentVolume{pvBound},
expectedUnbound: true,
expectedBound: true,
},
"bound-pvc,pv-not-exists": {
podPVCs: []*v1.PersistentVolumeClaim{boundPVC},
expectedUnbound: false,
expectedBound: false,
shouldFail: true,
},
"prebound-pvc": {
podPVCs: []*v1.PersistentVolumeClaim{preboundPVC},
pvs: []*v1.PersistentVolume{pvNode1aBound},
expectedUnbound: true,
expectedBound: true,
},
"unbound-pvc,node-not-exists": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
node: "node12",
expectedUnbound: false,
expectedBound: false,
shouldFail: true,
},
"unbound-pvc,pv-same-node": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
pvs: []*v1.PersistentVolume{pvNode2, pvNode1a, pvNode1b},
expectedBindings: []*bindingInfo{binding1a},
expectedUnbound: true,
expectedBound: true,
},
"unbound-pvc,pv-different-node": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
pvs: []*v1.PersistentVolume{pvNode2},
expectedUnbound: false,
expectedBound: true,
},
"two-unbound-pvcs": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2},
pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
expectedBindings: []*bindingInfo{binding1a, binding1b},
expectedUnbound: true,
expectedBound: true,
},
"two-unbound-pvcs,order-by-size": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC2, unboundPVC},
pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
expectedBindings: []*bindingInfo{binding1a, binding1b},
expectedUnbound: true,
expectedBound: true,
},
"two-unbound-pvcs,partial-match": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2},
pvs: []*v1.PersistentVolume{pvNode1a},
expectedUnbound: false,
expectedBound: true,
},
"one-bound,one-unbound": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, boundPVC},
pvs: []*v1.PersistentVolume{pvBound, pvNode1a},
expectedBindings: []*bindingInfo{binding1a},
expectedUnbound: true,
expectedBound: true,
},
"one-bound,one-unbound,no-match": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, boundPVC},
pvs: []*v1.PersistentVolume{pvBound, pvNode2},
expectedUnbound: false,
expectedBound: true,
},
"one-prebound,one-unbound": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, preboundPVC},
pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
expectedBindings: []*bindingInfo{binding1a},
expectedUnbound: true,
expectedBound: true,
},
"immediate-bound-pvc": {
podPVCs: []*v1.PersistentVolumeClaim{immediateBoundPVC},
pvs: []*v1.PersistentVolume{pvBoundImmediate},
expectedUnbound: true,
expectedBound: true,
},
"immediate-bound-pvc-wrong-node": {
podPVCs: []*v1.PersistentVolumeClaim{immediateBoundPVC},
pvs: []*v1.PersistentVolume{pvBoundImmediateNode2},
expectedUnbound: true,
expectedBound: false,
},
"immediate-unbound-pvc": {
podPVCs: []*v1.PersistentVolumeClaim{immediateUnboundPVC},
expectedUnbound: false,
expectedBound: false,
shouldFail: true,
},
"immediate-unbound-pvc,delayed-mode-bound": {
podPVCs: []*v1.PersistentVolumeClaim{immediateUnboundPVC, boundPVC},
pvs: []*v1.PersistentVolume{pvBound},
expectedUnbound: false,
expectedBound: false,
shouldFail: true,
},
"immediate-unbound-pvc,delayed-mode-unbound": {
podPVCs: []*v1.PersistentVolumeClaim{immediateUnboundPVC, unboundPVC},
expectedUnbound: false,
expectedBound: false,
shouldFail: true,
},
}
// Set feature gate
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true")
defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false")
for name, scenario := range scenarios {
glog.V(5).Infof("Running test case %q", name)
// Setup
testEnv := newTestBinder(t)
testEnv.initVolumes(scenario.pvs, scenario.pvs)
if scenario.node == "" {
scenario.node = "node1"
}
// a. Init pvc cache
if scenario.cachePVCs == nil {
scenario.cachePVCs = scenario.podPVCs
}
testEnv.initClaims(t, scenario.cachePVCs)
// b. Generate pod with given claims
if scenario.pod == nil {
scenario.pod = makePod(scenario.podPVCs)
}
// Execute
unboundSatisfied, boundSatisfied, err := testEnv.binder.FindPodVolumes(scenario.pod, scenario.node)
// Validate
if !scenario.shouldFail && err != nil {
t.Errorf("Test %q failed: returned error: %v", name, err)
}
if scenario.shouldFail && err == nil {
t.Errorf("Test %q failed: returned success but expected error", name)
}
if boundSatisfied != scenario.expectedBound {
t.Errorf("Test %q failed: expected boundSatsified %v, got %v", name, scenario.expectedBound, boundSatisfied)
}
if unboundSatisfied != scenario.expectedUnbound {
t.Errorf("Test %q failed: expected unboundSatsified %v, got %v", name, scenario.expectedUnbound, unboundSatisfied)
}
testEnv.validatePodCache(t, name, scenario.node, scenario.pod, scenario.expectedBindings)
}
}
func TestAssumePodVolumes(t *testing.T) {
scenarios := map[string]struct {
// Inputs
podPVCs []*v1.PersistentVolumeClaim
pvs []*v1.PersistentVolume
bindings []*bindingInfo
// Expected return values
shouldFail bool
expectedBindingRequired bool
expectedAllBound bool
// if nil, use bindings
expectedBindings []*bindingInfo
}{
"all-bound": {
podPVCs: []*v1.PersistentVolumeClaim{boundPVC},
pvs: []*v1.PersistentVolume{pvBound},
expectedAllBound: true,
},
"prebound-pvc": {
podPVCs: []*v1.PersistentVolumeClaim{preboundPVC},
pvs: []*v1.PersistentVolume{pvNode1a},
},
"one-binding": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
bindings: []*bindingInfo{binding1a},
pvs: []*v1.PersistentVolume{pvNode1a},
expectedBindingRequired: true,
},
"two-bindings": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2},
bindings: []*bindingInfo{binding1a, binding1b},
pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
expectedBindingRequired: true,
},
"pv-already-bound": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
bindings: []*bindingInfo{binding1aBound},
pvs: []*v1.PersistentVolume{pvNode1aBound},
expectedBindingRequired: false,
expectedBindings: []*bindingInfo{},
},
"claimref-failed": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
bindings: []*bindingInfo{binding1a, bindingBad},
pvs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
shouldFail: true,
expectedBindingRequired: true,
},
"tmpupdate-failed": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC},
bindings: []*bindingInfo{binding1a, binding1b},
pvs: []*v1.PersistentVolume{pvNode1a},
shouldFail: true,
expectedBindingRequired: true,
},
}
for name, scenario := range scenarios {
glog.V(5).Infof("Running test case %q", name)
// Setup
testEnv := newTestBinder(t)
testEnv.initClaims(t, scenario.podPVCs)
pod := makePod(scenario.podPVCs)
testEnv.initPodCache(pod, "node1", scenario.bindings)
testEnv.initVolumes(scenario.pvs, scenario.pvs)
// Execute
allBound, bindingRequired, err := testEnv.binder.AssumePodVolumes(pod, "node1")
// Validate
if !scenario.shouldFail && err != nil {
t.Errorf("Test %q failed: returned error: %v", name, err)
}
if scenario.shouldFail && err == nil {
t.Errorf("Test %q failed: returned success but expected error", name)
}
if scenario.expectedBindingRequired != bindingRequired {
t.Errorf("Test %q failed: returned unexpected bindingRequired: %v", name, bindingRequired)
}
if scenario.expectedAllBound != allBound {
t.Errorf("Test %q failed: returned unexpected allBound: %v", name, allBound)
}
if scenario.expectedBindings == nil {
scenario.expectedBindings = scenario.bindings
}
if scenario.shouldFail {
testEnv.validateFailedAssume(t, name, pod, scenario.expectedBindings)
} else {
testEnv.validateAssume(t, name, pod, scenario.expectedBindings)
}
}
}
func TestBindPodVolumes(t *testing.T) {
scenarios := map[string]struct {
// Inputs
bindings []*bindingInfo
cachedPVs []*v1.PersistentVolume
// if nil, use cachedPVs
apiPVs []*v1.PersistentVolume
// Expected return values
shouldFail bool
expectedPVs []*v1.PersistentVolume
// if nil, use expectedPVs
expectedAPIPVs []*v1.PersistentVolume
}{
"all-bound": {},
"not-fully-bound": {
bindings: []*bindingInfo{},
},
"one-binding": {
bindings: []*bindingInfo{binding1aBound},
cachedPVs: []*v1.PersistentVolume{pvNode1a},
expectedPVs: []*v1.PersistentVolume{pvNode1aBound},
},
"two-bindings": {
bindings: []*bindingInfo{binding1aBound, binding1bBound},
cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBound},
},
"api-update-failed": {
bindings: []*bindingInfo{binding1aBound, binding1bBound},
cachedPVs: []*v1.PersistentVolume{pvNode1a, pvNode1b},
apiPVs: []*v1.PersistentVolume{pvNode1a, pvNode1bBoundHigherVersion},
expectedPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1b},
expectedAPIPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBoundHigherVersion},
shouldFail: true,
},
}
for name, scenario := range scenarios {
glog.V(5).Infof("Running test case %q", name)
// Setup
testEnv := newTestBinder(t)
pod := makePod(nil)
if scenario.apiPVs == nil {
scenario.apiPVs = scenario.cachedPVs
}
testEnv.initVolumes(scenario.cachedPVs, scenario.apiPVs)
testEnv.assumeVolumes(t, name, "node1", pod, scenario.bindings)
// Execute
err := testEnv.binder.BindPodVolumes(pod)
// Validate
if !scenario.shouldFail && err != nil {
t.Errorf("Test %q failed: returned error: %v", name, err)
}
if scenario.shouldFail && err == nil {
t.Errorf("Test %q failed: returned success but expected error", name)
}
if scenario.expectedAPIPVs == nil {
scenario.expectedAPIPVs = scenario.expectedPVs
}
testEnv.validateBind(t, name, pod, scenario.expectedPVs, scenario.expectedAPIPVs)
}
}