Merge pull request #92684 from cofyc/volume-scheduling-cleanup

cleanup in volume scheduling
This commit is contained in:
Kubernetes Prow Robot 2020-07-02 04:17:38 -07:00 committed by GitHub
commit e37c04bd7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 42 deletions

View File

@ -23,7 +23,7 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
) )
@ -134,7 +134,11 @@ func NewAssumeCache(informer cache.SharedIndexInformer, description, indexName s
indexFunc: indexFunc, indexFunc: indexFunc,
indexName: indexName, indexName: indexName,
} }
c.store = cache.NewIndexer(objInfoKeyFunc, cache.Indexers{indexName: c.objInfoIndexFunc}) indexers := cache.Indexers{}
if indexName != "" && indexFunc != nil {
indexers[indexName] = c.objInfoIndexFunc
}
c.store = cache.NewIndexer(objInfoKeyFunc, indexers)
// Unit tests don't use informers // Unit tests don't use informers
if informer != nil { if informer != nil {
@ -422,7 +426,7 @@ type pvcAssumeCache struct {
// NewPVCAssumeCache creates a PVC assume cache. // NewPVCAssumeCache creates a PVC assume cache.
func NewPVCAssumeCache(informer cache.SharedIndexInformer) PVCAssumeCache { func NewPVCAssumeCache(informer cache.SharedIndexInformer) PVCAssumeCache {
return &pvcAssumeCache{NewAssumeCache(informer, "v1.PersistentVolumeClaim", "namespace", cache.MetaNamespaceIndexFunc)} return &pvcAssumeCache{NewAssumeCache(informer, "v1.PersistentVolumeClaim", "", nil)}
} }
func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {

View File

@ -115,7 +115,7 @@ type InTreeToCSITranslator interface {
// PV controller to fully bind and provision the PVCs. If binding fails, the Pod is sent // PV controller to fully bind and provision the PVCs. If binding fails, the Pod is sent
// back through the scheduler. // back through the scheduler.
// ii. After BindPodVolumes() is complete, then the scheduler does the final Pod->Node binding. // ii. After BindPodVolumes() is complete, then the scheduler does the final Pod->Node binding.
// 2. Once all the assume operations are done in d), the scheduler processes the next Pod in the scheduler queue // 2. Once all the assume operations are done in e), the scheduler processes the next Pod in the scheduler queue
// while the actual binding operation occurs in the background. // while the actual binding operation occurs in the background.
type SchedulerVolumeBinder interface { type SchedulerVolumeBinder interface {
// GetPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning) // GetPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning)
@ -160,14 +160,15 @@ type SchedulerVolumeBinder interface {
} }
type volumeBinder struct { type volumeBinder struct {
kubeClient clientset.Interface kubeClient clientset.Interface
classLister storagelisters.StorageClassLister
podLister corelisters.PodLister classLister storagelisters.StorageClassLister
nodeInformer coreinformers.NodeInformer podLister corelisters.PodLister
csiNodeInformer storageinformers.CSINodeInformer nodeLister corelisters.NodeLister
pvcCache PVCAssumeCache csiNodeLister storagelisters.CSINodeLister
pvCache PVAssumeCache
pvcCache PVCAssumeCache
pvCache PVAssumeCache
// Amount of time to wait for the bind operation to succeed // Amount of time to wait for the bind operation to succeed
bindTimeout time.Duration bindTimeout time.Duration
@ -186,15 +187,15 @@ func NewVolumeBinder(
storageClassInformer storageinformers.StorageClassInformer, storageClassInformer storageinformers.StorageClassInformer,
bindTimeout time.Duration) SchedulerVolumeBinder { bindTimeout time.Duration) SchedulerVolumeBinder {
return &volumeBinder{ return &volumeBinder{
kubeClient: kubeClient, kubeClient: kubeClient,
podLister: podInformer.Lister(), podLister: podInformer.Lister(),
classLister: storageClassInformer.Lister(), classLister: storageClassInformer.Lister(),
nodeInformer: nodeInformer, nodeLister: nodeInformer.Lister(),
csiNodeInformer: csiNodeInformer, csiNodeLister: csiNodeInformer.Lister(),
pvcCache: NewPVCAssumeCache(pvcInformer.Informer()), pvcCache: NewPVCAssumeCache(pvcInformer.Informer()),
pvCache: NewPVAssumeCache(pvInformer.Informer()), pvCache: NewPVAssumeCache(pvInformer.Informer()),
bindTimeout: bindTimeout, bindTimeout: bindTimeout,
translator: csitrans.New(), translator: csitrans.New(),
} }
} }
@ -234,20 +235,20 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*
}() }()
var ( var (
matchedBindings []*BindingInfo staticBindings []*BindingInfo
provisionedClaims []*v1.PersistentVolumeClaim dynamicProvisions []*v1.PersistentVolumeClaim
) )
defer func() { defer func() {
// Although we do not distinguish nil from empty in this function, for // Although we do not distinguish nil from empty in this function, for
// easier testing, we normalize empty to nil. // easier testing, we normalize empty to nil.
if len(matchedBindings) == 0 { if len(staticBindings) == 0 {
matchedBindings = nil staticBindings = nil
} }
if len(provisionedClaims) == 0 { if len(dynamicProvisions) == 0 {
provisionedClaims = nil dynamicProvisions = nil
} }
podVolumes.StaticBindings = matchedBindings podVolumes.StaticBindings = staticBindings
podVolumes.DynamicProvisions = provisionedClaims podVolumes.DynamicProvisions = dynamicProvisions
}() }()
// Check PV node affinity on bound volumes // Check PV node affinity on bound volumes
@ -282,7 +283,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*
// Find matching volumes // Find matching volumes
if len(claimsToFindMatching) > 0 { if len(claimsToFindMatching) > 0 {
var unboundClaims []*v1.PersistentVolumeClaim var unboundClaims []*v1.PersistentVolumeClaim
unboundVolumesSatisfied, matchedBindings, unboundClaims, err = b.findMatchingVolumes(pod, claimsToFindMatching, node) unboundVolumesSatisfied, staticBindings, unboundClaims, err = b.findMatchingVolumes(pod, claimsToFindMatching, node)
if err != nil { if err != nil {
return return
} }
@ -291,7 +292,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*
// Check for claims to provision // Check for claims to provision
if len(claimsToProvision) > 0 { if len(claimsToProvision) > 0 {
unboundVolumesSatisfied, provisionedClaims, err = b.checkVolumeProvisions(pod, claimsToProvision, node) unboundVolumesSatisfied, dynamicProvisions, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
if err != nil { if err != nil {
return return
} }
@ -452,7 +453,7 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*BindingInfo, cl
for _, binding = range bindings { for _, binding = range bindings {
klog.V(5).Infof("bindAPIUpdate: Pod %q, binding PV %q to PVC %q", podName, binding.pv.Name, binding.pvc.Name) klog.V(5).Infof("bindAPIUpdate: Pod %q, binding PV %q to PVC %q", podName, binding.pv.Name, binding.pvc.Name)
// TODO: does it hurt if we make an api call and nothing needs to be updated? // TODO: does it hurt if we make an api call and nothing needs to be updated?
claimKey := claimToClaimKey(binding.pvc) claimKey := getPVCName(binding.pvc)
klog.V(2).Infof("claim %q bound to volume %q", claimKey, binding.pv.Name) klog.V(2).Infof("claim %q bound to volume %q", claimKey, binding.pv.Name)
newPV, err := b.kubeClient.CoreV1().PersistentVolumes().Update(context.TODO(), binding.pv, metav1.UpdateOptions{}) newPV, err := b.kubeClient.CoreV1().PersistentVolumes().Update(context.TODO(), binding.pv, metav1.UpdateOptions{})
if err != nil { if err != nil {
@ -504,12 +505,12 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claim
return false, fmt.Errorf("failed to get cached claims to provision for pod %q", podName) return false, fmt.Errorf("failed to get cached claims to provision for pod %q", podName)
} }
node, err := b.nodeInformer.Lister().Get(pod.Spec.NodeName) node, err := b.nodeLister.Get(pod.Spec.NodeName)
if err != nil { if err != nil {
return false, fmt.Errorf("failed to get node %q: %v", pod.Spec.NodeName, err) return false, fmt.Errorf("failed to get node %q: %v", pod.Spec.NodeName, err)
} }
csiNode, err := b.csiNodeInformer.Lister().Get(node.Name) csiNode, err := b.csiNodeLister.Get(node.Name)
if err != nil { if err != nil {
// TODO: return the error once CSINode is created by default // TODO: return the error once CSINode is created by default
klog.V(4).Infof("Could not get a CSINode object for the node %q: %v", node.Name, err) klog.V(4).Infof("Could not get a CSINode object for the node %q: %v", node.Name, err)
@ -711,7 +712,7 @@ func (b *volumeBinder) GetPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV
} }
func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node *v1.Node, podName string) (bool, error) { func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node *v1.Node, podName string) (bool, error) {
csiNode, err := b.csiNodeInformer.Lister().Get(node.Name) csiNode, err := b.csiNodeLister.Get(node.Name)
if err != nil { if err != nil {
// TODO: return the error once CSINode is created by default // TODO: return the error once CSINode is created by default
klog.V(4).Infof("Could not get a CSINode object for the node %q: %v", node.Name, err) klog.V(4).Infof("Could not get a CSINode object for the node %q: %v", node.Name, err)
@ -786,9 +787,9 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi
// checkVolumeProvisions checks given unbound claims (the claims have gone through func // checkVolumeProvisions checks given unbound claims (the claims have gone through func
// findMatchingVolumes, and do not have matching volumes for binding), and return true // findMatchingVolumes, and do not have matching volumes for binding), and return true
// if all of the claims are eligible for dynamic provision. // if all of the claims are eligible for dynamic provision.
func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, provisionedClaims []*v1.PersistentVolumeClaim, err error) { func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, dynamicProvisions []*v1.PersistentVolumeClaim, err error) {
podName := getPodName(pod) podName := getPodName(pod)
provisionedClaims = []*v1.PersistentVolumeClaim{} dynamicProvisions = []*v1.PersistentVolumeClaim{}
for _, claim := range claimsToProvision { for _, claim := range claimsToProvision {
pvcName := getPVCName(claim) pvcName := getPVCName(claim)
@ -816,12 +817,12 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v
// TODO: Check if capacity of the node domain in the storage class // TODO: Check if capacity of the node domain in the storage class
// can satisfy resource requirement of given claim // can satisfy resource requirement of given claim
provisionedClaims = append(provisionedClaims, claim) dynamicProvisions = append(dynamicProvisions, claim)
} }
klog.V(4).Infof("Provisioning for %d claims of pod %q that has no matching volumes on node %q ...", len(claimsToProvision), podName, node.Name) klog.V(4).Infof("Provisioning for %d claims of pod %q that has no matching volumes on node %q ...", len(claimsToProvision), podName, node.Name)
return true, provisionedClaims, nil return true, dynamicProvisions, nil
} }
func (b *volumeBinder) revertAssumedPVs(bindings []*BindingInfo) { func (b *volumeBinder) revertAssumedPVs(bindings []*BindingInfo) {
@ -853,10 +854,6 @@ func (a byPVCSize) Less(i, j int) bool {
return iSize.Cmp(jSize) == -1 return iSize.Cmp(jSize) == -1
} }
func claimToClaimKey(claim *v1.PersistentVolumeClaim) string {
return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)
}
// isCSIMigrationOnForPlugin checks if CSI migrartion is enabled for a given plugin. // isCSIMigrationOnForPlugin checks if CSI migrartion is enabled for a given plugin.
func isCSIMigrationOnForPlugin(pluginName string) bool { func isCSIMigrationOnForPlugin(pluginName string) bool {
switch pluginName { switch pluginName {