Add dynamic provisioning process

This commit is contained in:
lichuqiang 2018-04-09 14:37:41 +08:00
parent 91d403f384
commit 95b530366a
5 changed files with 492 additions and 82 deletions

View File

@ -135,6 +135,14 @@ const annDynamicallyProvisioned = "pv.kubernetes.io/provisioned-by"
// a volume for this PVC. // a volume for this PVC.
const annStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner" const annStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner"
// This annotation is added to a PVC that has been triggered by scheduler to
// be dynamically provisioned. Its value is the name of the selected node.
const annSelectedNode = "volume.alpha.kubernetes.io/selected-node"
// If the provisioner name in a storage class is set to "kubernetes.io/no-provisioner",
// then dynamic provisioning is not supported by the storage.
const notSupportedProvisioner = "kubernetes.io/no-provisioner"
// CloudVolumeCreatedForClaimNamespaceTag is a name of a tag attached to a real volume in cloud (e.g. AWS EBS or GCE PD) // CloudVolumeCreatedForClaimNamespaceTag is a name of a tag attached to a real volume in cloud (e.g. AWS EBS or GCE PD)
// with namespace of a persistent volume claim used to create this volume. // with namespace of a persistent volume claim used to create this volume.
const CloudVolumeCreatedForClaimNamespaceTag = "kubernetes.io/created-for/pvc/namespace" const CloudVolumeCreatedForClaimNamespaceTag = "kubernetes.io/created-for/pvc/namespace"

View File

@ -24,10 +24,12 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1" storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
volumeutil "k8s.io/kubernetes/pkg/volume/util" volumeutil "k8s.io/kubernetes/pkg/volume/util"
) )
@ -58,24 +60,30 @@ type SchedulerVolumeBinder interface {
// If a PVC is bound, it checks if the PV's NodeAffinity matches the Node. // If a PVC is bound, it checks if the PV's NodeAffinity matches the Node.
// Otherwise, it tries to find an available PV to bind to the PVC. // Otherwise, it tries to find an available PV to bind to the PVC.
// //
// It returns true if there are matching PVs that can satisfy all of the Pod's PVCs, and returns true // It returns true if all of the Pod's PVCs have matching PVs or can be dynamic provisioned,
// if bound volumes satisfy the PV NodeAffinity. // and returns true if bound volumes satisfy the PV NodeAffinity.
// //
// This function is called by the volume binding scheduler predicate and can be called in parallel // This function is called by the volume binding scheduler predicate and can be called in parallel
FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisified, boundVolumesSatisfied bool, err error) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisified, boundVolumesSatisfied bool, err error)
// AssumePodVolumes will take the PV matches for unbound PVCs and update the PV cache assuming // AssumePodVolumes will:
// 1. Take the PV matches for unbound PVCs and update the PV cache assuming
// that the PV is prebound to the PVC. // that the PV is prebound to the PVC.
// 2. Take the PVCs that need provisioning and update the PVC cache with related
// annotations set.
// //
// It returns true if all volumes are fully bound, and returns true if any volume binding API operation needs // It returns true if all volumes are fully bound, and returns true if any volume binding/provisioning
// to be done afterwards. // API operation needs to be done afterwards.
// //
// This function will modify assumedPod with the node name. // This function will modify assumedPod with the node name.
// This function is called serially. // This function is called serially.
AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, bindingRequired bool, err error) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, bindingRequired bool, err error)
// BindPodVolumes will initiate the volume binding by making the API call to prebind the PV // BindPodVolumes will:
// 1. Initiate the volume binding by making the API call to prebind the PV
// to its matching PVC. // to its matching PVC.
// 2. Trigger the volume provisioning by making the API call to set related
// annotations on the PVC
// //
// This function can be called in parallel. // This function can be called in parallel.
BindPodVolumes(assumedPod *v1.Pod) error BindPodVolumes(assumedPod *v1.Pod) error
@ -87,8 +95,7 @@ type SchedulerVolumeBinder interface {
type volumeBinder struct { type volumeBinder struct {
ctrl *PersistentVolumeController ctrl *PersistentVolumeController
// TODO: Need AssumeCache for PVC for dynamic provisioning pvcCache PVCAssumeCache
pvcCache corelisters.PersistentVolumeClaimLister
pvCache PVAssumeCache pvCache PVAssumeCache
// Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes. // Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes.
@ -111,7 +118,7 @@ func NewVolumeBinder(
b := &volumeBinder{ b := &volumeBinder{
ctrl: ctrl, ctrl: ctrl,
pvcCache: pvcInformer.Lister(), pvcCache: NewPVCAssumeCache(pvcInformer.Informer()),
pvCache: NewPVAssumeCache(pvInformer.Informer()), pvCache: NewPVAssumeCache(pvInformer.Informer()),
podBindingCache: NewPodBindingCache(), podBindingCache: NewPodBindingCache(),
} }
@ -123,7 +130,7 @@ func (b *volumeBinder) GetBindingsCache() PodBindingCache {
return b.podBindingCache return b.podBindingCache
} }
// FindPodVolumes caches the matching PVs per node in podBindingCache // FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) { func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
podName := getPodName(pod) podName := getPodName(pod)
@ -135,8 +142,8 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
boundVolumesSatisfied = true boundVolumesSatisfied = true
// The pod's volumes need to be processed in one call to avoid the race condition where // The pod's volumes need to be processed in one call to avoid the race condition where
// volumes can get bound in between calls. // volumes can get bound/provisioned in between calls.
boundClaims, unboundClaims, unboundClaimsImmediate, err := b.getPodVolumes(pod) boundClaims, claimsToBind, unboundClaimsImmediate, err := b.getPodVolumes(pod)
if err != nil { if err != nil {
return false, false, err return false, false, err
} }
@ -154,20 +161,32 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
} }
} }
// Find PVs for unbound volumes if len(claimsToBind) > 0 {
if len(unboundClaims) > 0 { var claimsToProvision []*v1.PersistentVolumeClaim
unboundVolumesSatisfied, err = b.findMatchingVolumes(pod, unboundClaims, node) unboundVolumesSatisfied, claimsToProvision, err = b.findMatchingVolumes(pod, claimsToBind, node)
if err != nil { if err != nil {
return false, false, err return false, false, err
} }
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicProvisioningScheduling) {
// Try to provision for unbound volumes
if !unboundVolumesSatisfied {
unboundVolumesSatisfied, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
if err != nil {
return false, false, err
}
}
}
} }
return unboundVolumesSatisfied, boundVolumesSatisfied, nil return unboundVolumesSatisfied, boundVolumesSatisfied, nil
} }
// AssumePodVolumes will take the cached matching PVs in podBindingCache for the chosen node // AssumePodVolumes will take the cached matching PVs and PVCs to provision
// and update the pvCache with the new prebound PV. It will update podBindingCache again // in podBindingCache for the chosen node, and:
// with the PVs that need an API update. // 1. Update the pvCache with the new prebound PV.
// 2. Update the pvcCache with the new PVCs with annotations set
// It will update podBindingCache again with the PVs and PVCs that need an API update.
func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound, bindingRequired bool, err error) { func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound, bindingRequired bool, err error) {
podName := getPodName(assumedPod) podName := getPodName(assumedPod)
@ -179,6 +198,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
} }
assumedPod.Spec.NodeName = nodeName assumedPod.Spec.NodeName = nodeName
// Assume PV
claimsToBind := b.podBindingCache.GetBindings(assumedPod, nodeName) claimsToBind := b.podBindingCache.GetBindings(assumedPod, nodeName)
newBindings := []*bindingInfo{} newBindings := []*bindingInfo{}
@ -206,23 +226,48 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
} }
} }
if len(newBindings) == 0 {
// Don't update cached bindings if no API updates are needed. This can happen if we // Don't update cached bindings if no API updates are needed. This can happen if we
// previously updated the PV object and are waiting for the PV controller to finish binding. // previously updated the PV object and are waiting for the PV controller to finish binding.
glog.V(4).Infof("AssumePodVolumes for pod %q, node %q: PVs already assumed", podName, nodeName) if len(newBindings) != 0 {
return false, false, nil bindingRequired = true
}
b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings) b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings)
}
return false, true, nil // Assume PVCs
claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, nodeName)
newProvisionedPVCs := []*v1.PersistentVolumeClaim{}
for _, claim := range claimsToProvision {
// The claims from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
claimClone := claim.DeepCopy()
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, annSelectedNode, nodeName)
err = b.pvcCache.Assume(claimClone)
if err != nil {
b.revertAssumedPVs(newBindings)
b.revertAssumedPVCs(newProvisionedPVCs)
return
}
newProvisionedPVCs = append(newProvisionedPVCs, claimClone)
}
if len(newProvisionedPVCs) != 0 {
bindingRequired = true
b.podBindingCache.UpdateProvisionedPVCs(assumedPod, nodeName, newProvisionedPVCs)
}
return
} }
// BindPodVolumes gets the cached bindings in podBindingCache and makes the API update for those PVs. // BindPodVolumes gets the cached bindings and PVCs to provision in podBindingCache
// and makes the API update for those PVs/PVCs.
func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error { func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {
podName := getPodName(assumedPod) podName := getPodName(assumedPod)
glog.V(4).Infof("BindPodVolumes for pod %q", podName) glog.V(4).Infof("BindPodVolumes for pod %q", podName)
bindings := b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName) bindings := b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName)
claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName)
// Do the actual prebinding. Let the PV controller take care of the rest // Do the actual prebinding. Let the PV controller take care of the rest
// There is no API rollback if the actual binding fails // There is no API rollback if the actual binding fails
@ -232,6 +277,20 @@ func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {
if err != nil { if err != nil {
// only revert assumed cached updates for volumes we haven't successfully bound // only revert assumed cached updates for volumes we haven't successfully bound
b.revertAssumedPVs(bindings[i:]) b.revertAssumedPVs(bindings[i:])
// Revert all of the assumed cached updates for claims,
// since no actual API update will be done
b.revertAssumedPVCs(claimsToProvision)
return err
}
}
// Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest
// PV controller is expect to signal back by removing related annotations if actual provisioning fails
for i, claim := range claimsToProvision {
if _, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil {
glog.V(4).Infof("updating PersistentVolumeClaim[%s] failed: %v", getPVCName(claim), err)
// only revert assumed cached updates for claims we haven't successfully updated
b.revertAssumedPVCs(claimsToProvision[i:])
return err return err
} }
} }
@ -253,7 +312,13 @@ func (b *volumeBinder) isVolumeBound(namespace string, vol *v1.Volume, checkFull
} }
pvcName := vol.PersistentVolumeClaim.ClaimName pvcName := vol.PersistentVolumeClaim.ClaimName
pvc, err := b.pvcCache.PersistentVolumeClaims(namespace).Get(pvcName) claim := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Namespace: namespace,
},
}
pvc, err := b.pvcCache.GetPVC(getPVCName(claim))
if err != nil || pvc == nil { if err != nil || pvc == nil {
return false, nil, fmt.Errorf("error getting PVC %q: %v", pvcName, err) return false, nil, fmt.Errorf("error getting PVC %q: %v", pvcName, err)
} }
@ -342,14 +407,18 @@ func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node
return true, nil return true, nil
} }
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, err error) { // findMatchingVolumes tries to find matching volumes for given claims,
// and return unbound claims for further provision.
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, unboundClaims []*v1.PersistentVolumeClaim, err error) {
podName := getPodName(pod) podName := getPodName(pod)
// Sort all the claims by increasing size request to get the smallest fits // Sort all the claims by increasing size request to get the smallest fits
sort.Sort(byPVCSize(claimsToBind)) sort.Sort(byPVCSize(claimsToBind))
chosenPVs := map[string]*v1.PersistentVolume{} chosenPVs := map[string]*v1.PersistentVolume{}
foundMatches = true
matchedClaims := []*bindingInfo{}
for _, bindingInfo := range claimsToBind { for _, bindingInfo := range claimsToBind {
// Get storage class name from each PVC // Get storage class name from each PVC
storageClassName := "" storageClassName := ""
@ -362,21 +431,68 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI
// Find a matching PV // Find a matching PV
bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true) bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true)
if err != nil { if err != nil {
return false, err return false, nil, err
} }
if bindingInfo.pv == nil { if bindingInfo.pv == nil {
glog.V(4).Infof("No matching volumes for Pod %q, PVC %q on node %q", podName, getPVCName(bindingInfo.pvc), node.Name) glog.V(4).Infof("No matching volumes for Pod %q, PVC %q on node %q", podName, getPVCName(bindingInfo.pvc), node.Name)
return false, nil unboundClaims = append(unboundClaims, bindingInfo.pvc)
foundMatches = false
continue
} }
// matching PV needs to be excluded so we don't select it again // matching PV needs to be excluded so we don't select it again
chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv
matchedClaims = append(matchedClaims, bindingInfo)
glog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", bindingInfo.pv.Name, getPVCName(bindingInfo.pvc), node.Name, podName) glog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", bindingInfo.pv.Name, getPVCName(bindingInfo.pvc), node.Name, podName)
} }
// Mark cache with all the matches for each PVC for this node // Mark cache with all the matches for each PVC for this node
b.podBindingCache.UpdateBindings(pod, node.Name, claimsToBind) if len(matchedClaims) > 0 {
b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims)
}
if foundMatches {
glog.V(4).Infof("Found matching volumes for pod %q on node %q", podName, node.Name) glog.V(4).Infof("Found matching volumes for pod %q on node %q", podName, node.Name)
}
return
}
// checkVolumeProvisions checks given unbound claims (the claims have gone through func
// findMatchingVolumes, and do not have matching volumes for binding), and return true
// if all of the claims are eligible for dynamic provision.
func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, err error) {
podName := getPodName(pod)
provisionedClaims := []*v1.PersistentVolumeClaim{}
for _, claim := range claimsToProvision {
className := v1helper.GetPersistentVolumeClaimClass(claim)
if className == "" {
return false, fmt.Errorf("no class for claim %q", getPVCName(claim))
}
class, err := b.ctrl.classLister.Get(className)
if err != nil {
return false, fmt.Errorf("failed to find storage class %q", className)
}
provisioner := class.Provisioner
if provisioner == "" || provisioner == notSupportedProvisioner {
glog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, getPVCName(claim))
return false, nil
}
// TODO: Check if the node can satisfy the topology requirement in the class
// TODO: Check if capacity of the node domain in the storage class
// can satisfy resource requirement of given claim
provisionedClaims = append(provisionedClaims, claim)
}
glog.V(4).Infof("Provisioning for claims of pod %q that has no matching volumes on node %q ...", podName, node.Name)
// Mark cache with all the PVCs that need provisioning for this node
b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims)
return true, nil return true, nil
} }
@ -387,6 +503,12 @@ func (b *volumeBinder) revertAssumedPVs(bindings []*bindingInfo) {
} }
} }
func (b *volumeBinder) revertAssumedPVCs(claims []*v1.PersistentVolumeClaim) {
for _, claim := range claims {
b.pvcCache.Restore(getPVCName(claim))
}
}
type bindingInfo struct { type bindingInfo struct {
// Claim that needs to be bound // Claim that needs to be bound
pvc *v1.PersistentVolumeClaim pvc *v1.PersistentVolumeClaim

View File

@ -33,20 +33,23 @@ import (
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
) )
var ( var (
unboundPVC = makeTestPVC("unbound-pvc", "1G", pvcUnbound, "", &waitClass) unboundPVC = makeTestPVC("unbound-pvc", "1G", pvcUnbound, "", "1", &waitClass)
unboundPVC2 = makeTestPVC("unbound-pvc2", "5G", pvcUnbound, "", &waitClass) unboundPVC2 = makeTestPVC("unbound-pvc2", "5G", pvcUnbound, "", "1", &waitClass)
preboundPVC = makeTestPVC("prebound-pvc", "1G", pvcPrebound, "pv-node1a", &waitClass) preboundPVC = makeTestPVC("prebound-pvc", "1G", pvcPrebound, "pv-node1a", "1", &waitClass)
boundPVC = makeTestPVC("bound-pvc", "1G", pvcBound, "pv-bound", &waitClass) boundPVC = makeTestPVC("bound-pvc", "1G", pvcBound, "pv-bound", "1", &waitClass)
boundPVC2 = makeTestPVC("bound-pvc2", "1G", pvcBound, "pv-bound2", &waitClass) boundPVC2 = makeTestPVC("bound-pvc2", "1G", pvcBound, "pv-bound2", "1", &waitClass)
badPVC = makeBadPVC() badPVC = makeBadPVC()
immediateUnboundPVC = makeTestPVC("immediate-unbound-pvc", "1G", pvcUnbound, "", &immediateClass) immediateUnboundPVC = makeTestPVC("immediate-unbound-pvc", "1G", pvcUnbound, "", "1", &immediateClass)
immediateBoundPVC = makeTestPVC("immediate-bound-pvc", "1G", pvcBound, "pv-bound-immediate", &immediateClass) immediateBoundPVC = makeTestPVC("immediate-bound-pvc", "1G", pvcBound, "pv-bound-immediate", "1", &immediateClass)
provisionedPVC = makeTestPVC("provisioned-pvc", "1Gi", pvcUnbound, "", "1", &waitClass)
provisionedPVC2 = makeTestPVC("provisioned-pvc2", "1Gi", pvcUnbound, "", "1", &waitClass)
provisionedPVCHigherVersion = makeTestPVC("provisioned-pvc2", "1Gi", pvcUnbound, "", "2", &waitClass)
noProvisionerPVC = makeTestPVC("no-provisioner-pvc", "1Gi", pvcUnbound, "", "1", &provisionNotSupportClass)
pvNoNode = makeTestPV("pv-no-node", "", "1G", "1", nil, waitClass) pvNoNode = makeTestPV("pv-no-node", "", "1G", "1", nil, waitClass)
pvNode1a = makeTestPV("pv-node1a", "node1", "5G", "1", nil, waitClass) pvNode1a = makeTestPV("pv-node1a", "node1", "5G", "1", nil, waitClass)
@ -70,8 +73,10 @@ var (
waitClass = "waitClass" waitClass = "waitClass"
immediateClass = "immediateClass" immediateClass = "immediateClass"
provisionNotSupportClass = "provisionNotSupportedClass"
nodeLabelKey = "nodeKey" nodeLabelKey = "nodeKey"
nodeLabelValue = "node1"
) )
type testEnv struct { type testEnv struct {
@ -80,7 +85,7 @@ type testEnv struct {
binder SchedulerVolumeBinder binder SchedulerVolumeBinder
internalBinder *volumeBinder internalBinder *volumeBinder
internalPVCache *pvAssumeCache internalPVCache *pvAssumeCache
internalPVCCache cache.Indexer internalPVCCache *pvcAssumeCache
} }
func newTestBinder(t *testing.T) *testEnv { func newTestBinder(t *testing.T) *testEnv {
@ -106,6 +111,7 @@ func newTestBinder(t *testing.T) *testEnv {
Name: waitClass, Name: waitClass,
}, },
VolumeBindingMode: &waitMode, VolumeBindingMode: &waitMode,
Provisioner: "test-provisioner",
}, },
{ {
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -113,6 +119,13 @@ func newTestBinder(t *testing.T) *testEnv {
}, },
VolumeBindingMode: &immediateMode, VolumeBindingMode: &immediateMode,
}, },
{
ObjectMeta: metav1.ObjectMeta{
Name: provisionNotSupportClass,
},
VolumeBindingMode: &waitMode,
Provisioner: "kubernetes.io/no-provisioner",
},
} }
for _, class := range classes { for _, class := range classes {
if err := classInformer.Informer().GetIndexer().Add(class); err != nil { if err := classInformer.Informer().GetIndexer().Add(class); err != nil {
@ -132,22 +145,31 @@ func newTestBinder(t *testing.T) *testEnv {
t.Fatalf("Failed to convert to internal PV cache") t.Fatalf("Failed to convert to internal PV cache")
} }
pvcCache := internalBinder.pvcCache
internalPVCCache, ok := pvcCache.(*pvcAssumeCache)
if !ok {
t.Fatalf("Failed to convert to internal PVC cache")
}
return &testEnv{ return &testEnv{
client: client, client: client,
reactor: reactor, reactor: reactor,
binder: binder, binder: binder,
internalBinder: internalBinder, internalBinder: internalBinder,
internalPVCache: internalPVCache, internalPVCache: internalPVCache,
internalPVCCache: pvcInformer.Informer().GetIndexer(), internalPVCCache: internalPVCCache,
} }
} }
func (env *testEnv) initClaims(t *testing.T, pvcs []*v1.PersistentVolumeClaim) { func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs []*v1.PersistentVolumeClaim) {
for _, pvc := range pvcs { internalPVCCache := env.internalPVCCache
err := env.internalPVCCache.Add(pvc) for _, pvc := range cachedPVCs {
if err != nil { internalPVCCache.add(pvc)
t.Fatalf("Failed to add PVC %q to internal cache: %v", pvc.Name, err) if apiPVCs == nil {
env.reactor.claims[pvc.Name] = pvc
} }
}
for _, pvc := range apiPVCs {
env.reactor.claims[pvc.Name] = pvc env.reactor.claims[pvc.Name] = pvc
} }
} }
@ -166,7 +188,7 @@ func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.P
} }
func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod, bindings []*bindingInfo) { func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) {
pvCache := env.internalBinder.pvCache pvCache := env.internalBinder.pvCache
for _, binding := range bindings { for _, binding := range bindings {
if err := pvCache.Assume(binding.pv); err != nil { if err := pvCache.Assume(binding.pv); err != nil {
@ -175,20 +197,38 @@ func (env *testEnv) assumeVolumes(t *testing.T, name, node string, pod *v1.Pod,
} }
env.internalBinder.podBindingCache.UpdateBindings(pod, node, bindings) env.internalBinder.podBindingCache.UpdateBindings(pod, node, bindings)
pvcCache := env.internalBinder.pvcCache
for _, pvc := range provisionings {
if err := pvcCache.Assume(pvc); err != nil {
t.Fatalf("Failed to setup test %q: error: %v", name, err)
}
}
env.internalBinder.podBindingCache.UpdateProvisionedPVCs(pod, node, provisionings)
} }
func (env *testEnv) initPodCache(pod *v1.Pod, node string, bindings []*bindingInfo) { func (env *testEnv) initPodCache(pod *v1.Pod, node string, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) {
cache := env.internalBinder.podBindingCache cache := env.internalBinder.podBindingCache
cache.UpdateBindings(pod, node, bindings) cache.UpdateBindings(pod, node, bindings)
cache.UpdateProvisionedPVCs(pod, node, provisionings)
} }
func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Pod, expectedBindings []*bindingInfo) { func (env *testEnv) validatePodCache(t *testing.T, name, node string, pod *v1.Pod, expectedBindings []*bindingInfo, expectedProvisionings []*v1.PersistentVolumeClaim) {
cache := env.internalBinder.podBindingCache cache := env.internalBinder.podBindingCache
bindings := cache.GetBindings(pod, node) bindings := cache.GetBindings(pod, node)
if !reflect.DeepEqual(expectedBindings, bindings) { if !reflect.DeepEqual(expectedBindings, bindings) {
t.Errorf("Test %q failed: Expected bindings %+v, got %+v", name, expectedBindings, bindings) t.Errorf("Test %q failed: Expected bindings %+v, got %+v", name, expectedBindings, bindings)
} }
provisionedClaims := cache.GetProvisionedPVCs(pod, node)
if !reflect.DeepEqual(expectedProvisionings, provisionedClaims) {
t.Errorf("Test %q failed: Expected provisionings %+v, got %+v", name, expectedProvisionings, provisionedClaims)
}
} }
func (env *testEnv) getPodBindings(t *testing.T, name, node string, pod *v1.Pod) []*bindingInfo { func (env *testEnv) getPodBindings(t *testing.T, name, node string, pod *v1.Pod) []*bindingInfo {
@ -196,7 +236,7 @@ func (env *testEnv) getPodBindings(t *testing.T, name, node string, pod *v1.Pod)
return cache.GetBindings(pod, node) return cache.GetBindings(pod, node)
} }
func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo) { func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) {
// TODO: Check binding cache // TODO: Check binding cache
// Check pv cache // Check pv cache
@ -218,9 +258,23 @@ func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindi
t.Errorf("Test %q failed: expected PV.ClaimRef.Namespace %q, got %q", name, b.pvc.Namespace, pv.Spec.ClaimRef.Namespace) t.Errorf("Test %q failed: expected PV.ClaimRef.Namespace %q, got %q", name, b.pvc.Namespace, pv.Spec.ClaimRef.Namespace)
} }
} }
// Check pvc cache
pvcCache := env.internalBinder.pvcCache
for _, p := range provisionings {
pvcKey := getPVCName(p)
pvc, err := pvcCache.GetPVC(pvcKey)
if err != nil {
t.Errorf("Test %q failed: GetPVC %q returned error: %v", name, pvcKey, err)
continue
}
if pvc.Annotations[annSelectedNode] != nodeLabelValue {
t.Errorf("Test %q failed: expected annSelectedNode of pvc %q to be %q, but got %q", name, pvcKey, nodeLabelValue, pvc.Annotations[annSelectedNode])
}
}
} }
func (env *testEnv) validateFailedAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo) { func (env *testEnv) validateFailedAssume(t *testing.T, name string, pod *v1.Pod, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim) {
// All PVs have been unmodified in cache // All PVs have been unmodified in cache
pvCache := env.internalBinder.pvCache pvCache := env.internalBinder.pvCache
for _, b := range bindings { for _, b := range bindings {
@ -230,6 +284,20 @@ func (env *testEnv) validateFailedAssume(t *testing.T, name string, pod *v1.Pod,
t.Errorf("Test %q failed: PV %q was modified in cache", name, b.pv.Name) t.Errorf("Test %q failed: PV %q was modified in cache", name, b.pv.Name)
} }
} }
// Check pvc cache
pvcCache := env.internalBinder.pvcCache
for _, p := range provisionings {
pvcKey := getPVCName(p)
pvc, err := pvcCache.GetPVC(pvcKey)
if err != nil {
t.Errorf("Test %q failed: GetPVC %q returned error: %v", name, pvcKey, err)
continue
}
if pvc.Annotations[annSelectedNode] != "" {
t.Errorf("Test %q failed: expected annSelectedNode of pvc %q empty, but got %q", name, pvcKey, pvc.Annotations[annSelectedNode])
}
}
} }
func (env *testEnv) validateBind( func (env *testEnv) validateBind(
@ -257,20 +325,46 @@ func (env *testEnv) validateBind(
} }
} }
func (env *testEnv) validateProvision(
t *testing.T,
name string,
pod *v1.Pod,
expectedPVCs []*v1.PersistentVolumeClaim,
expectedAPIPVCs []*v1.PersistentVolumeClaim) {
// Check pvc cache
pvcCache := env.internalBinder.pvcCache
for _, pvc := range expectedPVCs {
cachedPVC, err := pvcCache.GetPVC(getPVCName(pvc))
if err != nil {
t.Errorf("Test %q failed: GetPVC %q returned error: %v", name, getPVCName(pvc), err)
}
if !reflect.DeepEqual(cachedPVC, pvc) {
t.Errorf("Test %q failed: cached PVC check failed [A-expected, B-got]:\n%s", name, diff.ObjectDiff(pvc, cachedPVC))
}
}
// Check reactor for API updates
if err := env.reactor.checkClaims(expectedAPIPVCs); err != nil {
t.Errorf("Test %q failed: API reactor validation failed: %v", name, err)
}
}
const ( const (
pvcUnbound = iota pvcUnbound = iota
pvcPrebound pvcPrebound
pvcBound pvcBound
) )
func makeTestPVC(name, size string, pvcBoundState int, pvName string, className *string) *v1.PersistentVolumeClaim { func makeTestPVC(name, size string, pvcBoundState int, pvName, resourceVersion string, className *string) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{ pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
Namespace: "testns", Namespace: "testns",
UID: types.UID("pvc-uid"), UID: types.UID("pvc-uid"),
ResourceVersion: "1", ResourceVersion: resourceVersion,
SelfLink: testapi.Default.SelfLink("pvc", name), SelfLink: testapi.Default.SelfLink("pvc", name),
Annotations: map[string]string{},
}, },
Spec: v1.PersistentVolumeClaimSpec{ Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{ Resources: v1.ResourceRequirements{
@ -389,7 +483,15 @@ func makeBinding(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) *bindin
return &bindingInfo{pvc: pvc, pv: pv} return &bindingInfo{pvc: pvc, pv: pv}
} }
func TestFindPodVolumes(t *testing.T) { func addProvisionAnn(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
res := pvc.DeepCopy()
// Add provision related annotations
res.Annotations[annSelectedNode] = nodeLabelValue
return res
}
func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
scenarios := map[string]struct { scenarios := map[string]struct {
// Inputs // Inputs
pvs []*v1.PersistentVolume pvs []*v1.PersistentVolume
@ -472,6 +574,7 @@ func TestFindPodVolumes(t *testing.T) {
"two-unbound-pvcs,partial-match": { "two-unbound-pvcs,partial-match": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2}, podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, unboundPVC2},
pvs: []*v1.PersistentVolume{pvNode1a}, pvs: []*v1.PersistentVolume{pvNode1a},
expectedBindings: []*bindingInfo{binding1a},
expectedUnbound: false, expectedUnbound: false,
expectedBound: true, expectedBound: true,
}, },
@ -552,7 +655,7 @@ func TestFindPodVolumes(t *testing.T) {
if scenario.cachePVCs == nil { if scenario.cachePVCs == nil {
scenario.cachePVCs = scenario.podPVCs scenario.cachePVCs = scenario.podPVCs
} }
testEnv.initClaims(t, scenario.cachePVCs) testEnv.initClaims(scenario.cachePVCs, scenario.cachePVCs)
// b. Generate pod with given claims // b. Generate pod with given claims
if scenario.pod == nil { if scenario.pod == nil {
@ -575,7 +678,116 @@ func TestFindPodVolumes(t *testing.T) {
if unboundSatisfied != scenario.expectedUnbound { if unboundSatisfied != scenario.expectedUnbound {
t.Errorf("Test %q failed: expected unboundSatsified %v, got %v", name, scenario.expectedUnbound, unboundSatisfied) t.Errorf("Test %q failed: expected unboundSatsified %v, got %v", name, scenario.expectedUnbound, unboundSatisfied)
} }
testEnv.validatePodCache(t, name, testNode.Name, scenario.pod, scenario.expectedBindings) testEnv.validatePodCache(t, name, testNode.Name, scenario.pod, scenario.expectedBindings, nil)
}
}
func TestFindPodVolumesWithProvisioning(t *testing.T) {
scenarios := map[string]struct {
// Inputs
pvs []*v1.PersistentVolume
podPVCs []*v1.PersistentVolumeClaim
// If nil, use pod PVCs
cachePVCs []*v1.PersistentVolumeClaim
// If nil, makePod with podPVCs
pod *v1.Pod
// Expected podBindingCache fields
expectedBindings []*bindingInfo
expectedProvisions []*v1.PersistentVolumeClaim
// Expected return values
expectedUnbound bool
expectedBound bool
shouldFail bool
}{
"one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedUnbound: true,
expectedBound: true,
},
"two-unbound-pvcs,one-matched,one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC},
pvs: []*v1.PersistentVolume{pvNode1a},
expectedBindings: []*bindingInfo{binding1a},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedUnbound: true,
expectedBound: true,
},
"one-bound,one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{boundPVC, provisionedPVC},
pvs: []*v1.PersistentVolume{pvBound},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedUnbound: true,
expectedBound: true,
},
"immediate-unbound-pvc": {
podPVCs: []*v1.PersistentVolumeClaim{immediateUnboundPVC},
expectedUnbound: false,
expectedBound: false,
shouldFail: true,
},
"one-immediate-bound,one-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{immediateBoundPVC, provisionedPVC},
pvs: []*v1.PersistentVolume{pvBoundImmediate},
expectedProvisions: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedUnbound: true,
expectedBound: true,
},
"invalid-provisioner": {
podPVCs: []*v1.PersistentVolumeClaim{noProvisionerPVC},
expectedUnbound: false,
expectedBound: true,
},
}
// Set VolumeScheduling and DynamicProvisioningScheduling feature gate
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true,DynamicProvisioningScheduling=true")
defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false,DynamicProvisioningScheduling=false")
testNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Labels: map[string]string{
nodeLabelKey: "node1",
},
},
}
for name, scenario := range scenarios {
// Setup
testEnv := newTestBinder(t)
testEnv.initVolumes(scenario.pvs, scenario.pvs)
// a. Init pvc cache
if scenario.cachePVCs == nil {
scenario.cachePVCs = scenario.podPVCs
}
testEnv.initClaims(scenario.cachePVCs, scenario.cachePVCs)
// b. Generate pod with given claims
if scenario.pod == nil {
scenario.pod = makePod(scenario.podPVCs)
}
// Execute
unboundSatisfied, boundSatisfied, err := testEnv.binder.FindPodVolumes(scenario.pod, testNode)
// Validate
if !scenario.shouldFail && err != nil {
t.Errorf("Test %q failed: returned error: %v", name, err)
}
if scenario.shouldFail && err == nil {
t.Errorf("Test %q failed: returned success but expected error", name)
}
if boundSatisfied != scenario.expectedBound {
t.Errorf("Test %q failed: expected boundSatsified %v, got %v", name, scenario.expectedBound, boundSatisfied)
}
if unboundSatisfied != scenario.expectedUnbound {
t.Errorf("Test %q failed: expected unboundSatsified %v, got %v", name, scenario.expectedUnbound, unboundSatisfied)
}
testEnv.validatePodCache(t, name, testNode.Name, scenario.pod, scenario.expectedBindings, scenario.expectedProvisions)
} }
} }
@ -585,6 +797,7 @@ func TestAssumePodVolumes(t *testing.T) {
podPVCs []*v1.PersistentVolumeClaim podPVCs []*v1.PersistentVolumeClaim
pvs []*v1.PersistentVolume pvs []*v1.PersistentVolume
bindings []*bindingInfo bindings []*bindingInfo
provisionedPVCs []*v1.PersistentVolumeClaim
// Expected return values // Expected return values
shouldFail bool shouldFail bool
@ -636,6 +849,21 @@ func TestAssumePodVolumes(t *testing.T) {
shouldFail: true, shouldFail: true,
expectedBindingRequired: true, expectedBindingRequired: true,
}, },
"one-binding, one-pvc-provisioned": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC},
bindings: []*bindingInfo{binding1a},
pvs: []*v1.PersistentVolume{pvNode1a},
provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedBindingRequired: true,
},
"one-binding, one-provision-tmpupdate-failed": {
podPVCs: []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVCHigherVersion},
bindings: []*bindingInfo{binding1a},
pvs: []*v1.PersistentVolume{pvNode1a},
provisionedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC2},
shouldFail: true,
expectedBindingRequired: true,
},
} }
for name, scenario := range scenarios { for name, scenario := range scenarios {
@ -643,9 +871,9 @@ func TestAssumePodVolumes(t *testing.T) {
// Setup // Setup
testEnv := newTestBinder(t) testEnv := newTestBinder(t)
testEnv.initClaims(t, scenario.podPVCs) testEnv.initClaims(scenario.podPVCs, scenario.podPVCs)
pod := makePod(scenario.podPVCs) pod := makePod(scenario.podPVCs)
testEnv.initPodCache(pod, "node1", scenario.bindings) testEnv.initPodCache(pod, "node1", scenario.bindings, scenario.provisionedPVCs)
testEnv.initVolumes(scenario.pvs, scenario.pvs) testEnv.initVolumes(scenario.pvs, scenario.pvs)
// Execute // Execute
@ -668,9 +896,9 @@ func TestAssumePodVolumes(t *testing.T) {
scenario.expectedBindings = scenario.bindings scenario.expectedBindings = scenario.bindings
} }
if scenario.shouldFail { if scenario.shouldFail {
testEnv.validateFailedAssume(t, name, pod, scenario.expectedBindings) testEnv.validateFailedAssume(t, name, pod, scenario.expectedBindings, scenario.provisionedPVCs)
} else { } else {
testEnv.validateAssume(t, name, pod, scenario.expectedBindings) testEnv.validateAssume(t, name, pod, scenario.expectedBindings, scenario.provisionedPVCs)
} }
} }
} }
@ -683,11 +911,20 @@ func TestBindPodVolumes(t *testing.T) {
// if nil, use cachedPVs // if nil, use cachedPVs
apiPVs []*v1.PersistentVolume apiPVs []*v1.PersistentVolume
provisionedPVCs []*v1.PersistentVolumeClaim
cachedPVCs []*v1.PersistentVolumeClaim
// if nil, use cachedPVCs
apiPVCs []*v1.PersistentVolumeClaim
// Expected return values // Expected return values
shouldFail bool shouldFail bool
expectedPVs []*v1.PersistentVolume expectedPVs []*v1.PersistentVolume
// if nil, use expectedPVs // if nil, use expectedPVs
expectedAPIPVs []*v1.PersistentVolume expectedAPIPVs []*v1.PersistentVolume
expectedPVCs []*v1.PersistentVolumeClaim
// if nil, use expectedPVCs
expectedAPIPVCs []*v1.PersistentVolumeClaim
}{ }{
"all-bound": {}, "all-bound": {},
"not-fully-bound": { "not-fully-bound": {
@ -711,6 +948,30 @@ func TestBindPodVolumes(t *testing.T) {
expectedAPIPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBoundHigherVersion}, expectedAPIPVs: []*v1.PersistentVolume{pvNode1aBound, pvNode1bBoundHigherVersion},
shouldFail: true, shouldFail: true,
}, },
"one-provisioned-pvc": {
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC},
expectedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC)},
},
"provision-api-update-failed": {
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), addProvisionAnn(provisionedPVC2)},
cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVC2},
apiPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVCHigherVersion},
expectedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), provisionedPVC2},
expectedAPIPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), provisionedPVCHigherVersion},
shouldFail: true,
},
"bingding-succeed, provision-api-update-failed": {
bindings: []*bindingInfo{binding1aBound},
cachedPVs: []*v1.PersistentVolume{pvNode1a},
expectedPVs: []*v1.PersistentVolume{pvNode1aBound},
provisionedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), addProvisionAnn(provisionedPVC2)},
cachedPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVC2},
apiPVCs: []*v1.PersistentVolumeClaim{provisionedPVC, provisionedPVCHigherVersion},
expectedPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), provisionedPVC2},
expectedAPIPVCs: []*v1.PersistentVolumeClaim{addProvisionAnn(provisionedPVC), provisionedPVCHigherVersion},
shouldFail: true,
},
} }
for name, scenario := range scenarios { for name, scenario := range scenarios {
glog.V(5).Infof("Running test case %q", name) glog.V(5).Infof("Running test case %q", name)
@ -721,8 +982,12 @@ func TestBindPodVolumes(t *testing.T) {
if scenario.apiPVs == nil { if scenario.apiPVs == nil {
scenario.apiPVs = scenario.cachedPVs scenario.apiPVs = scenario.cachedPVs
} }
if scenario.apiPVCs == nil {
scenario.apiPVCs = scenario.cachedPVCs
}
testEnv.initVolumes(scenario.cachedPVs, scenario.apiPVs) testEnv.initVolumes(scenario.cachedPVs, scenario.apiPVs)
testEnv.assumeVolumes(t, name, "node1", pod, scenario.bindings) testEnv.initClaims(scenario.cachedPVCs, scenario.apiPVCs)
testEnv.assumeVolumes(t, name, "node1", pod, scenario.bindings, scenario.provisionedPVCs)
// Execute // Execute
err := testEnv.binder.BindPodVolumes(pod) err := testEnv.binder.BindPodVolumes(pod)
@ -737,7 +1002,11 @@ func TestBindPodVolumes(t *testing.T) {
if scenario.expectedAPIPVs == nil { if scenario.expectedAPIPVs == nil {
scenario.expectedAPIPVs = scenario.expectedPVs scenario.expectedAPIPVs = scenario.expectedPVs
} }
if scenario.expectedAPIPVCs == nil {
scenario.expectedAPIPVCs = scenario.expectedPVCs
}
testEnv.validateBind(t, name, pod, scenario.expectedPVs, scenario.expectedAPIPVs) testEnv.validateBind(t, name, pod, scenario.expectedPVs, scenario.expectedAPIPVs)
testEnv.validateProvision(t, name, pod, scenario.expectedPVCs, scenario.expectedAPIPVCs)
} }
} }
@ -753,7 +1022,7 @@ func TestFindAssumeVolumes(t *testing.T) {
// Setup // Setup
testEnv := newTestBinder(t) testEnv := newTestBinder(t)
testEnv.initVolumes(pvs, pvs) testEnv.initVolumes(pvs, pvs)
testEnv.initClaims(t, podPVCs) testEnv.initClaims(podPVCs, podPVCs)
pod := makePod(podPVCs) pod := makePod(podPVCs)
testNode := &v1.Node{ testNode := &v1.Node{
@ -787,7 +1056,7 @@ func TestFindAssumeVolumes(t *testing.T) {
if !bindingRequired { if !bindingRequired {
t.Errorf("Test failed: binding not required") t.Errorf("Test failed: binding not required")
} }
testEnv.validateAssume(t, "assume", pod, expectedBindings) testEnv.validateAssume(t, "assume", pod, expectedBindings, nil)
// After assume, claimref should be set on pv // After assume, claimref should be set on pv
expectedBindings = testEnv.getPodBindings(t, "after-assume", testNode.Name, pod) expectedBindings = testEnv.getPodBindings(t, "after-assume", testNode.Name, pod)
@ -803,6 +1072,6 @@ func TestFindAssumeVolumes(t *testing.T) {
if !unboundSatisfied { if !unboundSatisfied {
t.Errorf("Test failed: couldn't find PVs for all PVCs") t.Errorf("Test failed: couldn't find PVs for all PVCs")
} }
testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings) testEnv.validatePodCache(t, "after-assume", testNode.Name, pod, expectedBindings, nil)
} }
} }

View File

@ -279,6 +279,12 @@ const (
// A node which has closer cpu,memory utilization and volume count is favoured by scheduler // A node which has closer cpu,memory utilization and volume count is favoured by scheduler
// while making decisions. // while making decisions.
BalanceAttachedNodeVolumes utilfeature.Feature = "BalanceAttachedNodeVolumes" BalanceAttachedNodeVolumes utilfeature.Feature = "BalanceAttachedNodeVolumes"
// owner: @lichuqiang
// alpha: v1.11
//
// Extend the default scheduler to be aware of volume topology and handle PV provisioning
DynamicProvisioningScheduling utilfeature.Feature = "DynamicProvisioningScheduling"
) )
func init() { func init() {
@ -327,6 +333,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
RunAsGroup: {Default: false, PreRelease: utilfeature.Alpha}, RunAsGroup: {Default: false, PreRelease: utilfeature.Alpha},
VolumeSubpath: {Default: true, PreRelease: utilfeature.GA}, VolumeSubpath: {Default: true, PreRelease: utilfeature.GA},
BalanceAttachedNodeVolumes: {Default: false, PreRelease: utilfeature.Alpha}, BalanceAttachedNodeVolumes: {Default: false, PreRelease: utilfeature.Alpha},
DynamicProvisioningScheduling: {Default: false, PreRelease: utilfeature.Alpha},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed // inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side: // unintentionally on either side:

View File

@ -506,12 +506,16 @@ func ClusterRoles() []rbacv1.ClusterRole {
} }
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
roles = append(roles, rbacv1.ClusterRole{ rules := []rbacv1.PolicyRule{
ObjectMeta: metav1.ObjectMeta{Name: "system:volume-scheduler"},
Rules: []rbacv1.PolicyRule{
rbacv1helpers.NewRule(ReadUpdate...).Groups(legacyGroup).Resources("persistentvolumes").RuleOrDie(), rbacv1helpers.NewRule(ReadUpdate...).Groups(legacyGroup).Resources("persistentvolumes").RuleOrDie(),
rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("storageclasses").RuleOrDie(), rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("storageclasses").RuleOrDie(),
}, }
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicProvisioningScheduling) {
rules = append(rules, rbacv1helpers.NewRule(ReadUpdate...).Groups(legacyGroup).Resources("persistentvolumeclaims").RuleOrDie())
}
roles = append(roles, rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: "system:volume-scheduler"},
Rules: rules,
}) })
} }