mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Share pod volume binding cache via framework.CycleState
This commit is contained in:
parent
db8a88721e
commit
ee4d7410be
@ -25,6 +25,7 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
@ -34,6 +35,7 @@ import (
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
storageinformers "k8s.io/client-go/informers/storage/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
storagelisters "k8s.io/client-go/listers/storage/v1"
|
||||
csitrans "k8s.io/csi-translation-lib"
|
||||
csiplugins "k8s.io/csi-translation-lib/plugins"
|
||||
@ -63,6 +65,24 @@ const (
|
||||
ErrReasonNodeConflict ConflictReason = "node(s) had volume node affinity conflict"
|
||||
)
|
||||
|
||||
// BindingInfo holds a binding between PV and PVC.
|
||||
type BindingInfo struct {
|
||||
// PVC that needs to be bound
|
||||
pvc *v1.PersistentVolumeClaim
|
||||
|
||||
// Proposed PV to bind to this PVC
|
||||
pv *v1.PersistentVolume
|
||||
}
|
||||
|
||||
// PodVolumes holds pod's volumes information used in volume scheduling.
|
||||
type PodVolumes struct {
|
||||
// StaticBindings are binding decisions for PVCs which can be bound to
|
||||
// pre-provisioned static PVs.
|
||||
StaticBindings []*BindingInfo
|
||||
// DynamicProvisions are PVCs that require dynamic provisioning
|
||||
DynamicProvisions []*v1.PersistentVolumeClaim
|
||||
}
|
||||
|
||||
// InTreeToCSITranslator contains methods required to check migratable status
|
||||
// and perform translations from InTree PV's to CSI
|
||||
type InTreeToCSITranslator interface {
|
||||
@ -102,7 +122,8 @@ type SchedulerVolumeBinder interface {
|
||||
// and unbound with immediate binding (including prebound)
|
||||
GetPodVolumes(pod *v1.Pod) (boundClaims, unboundClaimsDelayBinding, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error)
|
||||
|
||||
// FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the node.
|
||||
// FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the
|
||||
// node and returns pod's volumes information.
|
||||
//
|
||||
// If a PVC is bound, it checks if the PV's NodeAffinity matches the Node.
|
||||
// Otherwise, it tries to find an available PV to bind to the PVC.
|
||||
@ -110,8 +131,8 @@ type SchedulerVolumeBinder interface {
|
||||
// It returns an error when something went wrong or a list of reasons why the node is
|
||||
// (currently) not usable for the pod.
|
||||
//
|
||||
// This function is called by the volume binding scheduler predicate and can be called in parallel
|
||||
FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (reasons ConflictReasons, err error)
|
||||
// This function is called by the scheduler VolumeBinding plugin and can be called in parallel
|
||||
FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error)
|
||||
|
||||
// AssumePodVolumes will:
|
||||
// 1. Take the PV matches for unbound PVCs and update the PV cache assuming
|
||||
@ -122,10 +143,10 @@ type SchedulerVolumeBinder interface {
|
||||
// It returns true if all volumes are fully bound
|
||||
//
|
||||
// This function is called serially.
|
||||
AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, err error)
|
||||
AssumePodVolumes(assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error)
|
||||
|
||||
// RevertAssumedPodVolumes will revert assumed PV and PVC cache.
|
||||
RevertAssumedPodVolumes(assumedPod *v1.Pod, nodeName string)
|
||||
RevertAssumedPodVolumes(podVolumes *PodVolumes)
|
||||
|
||||
// BindPodVolumes will:
|
||||
// 1. Initiate the volume binding by making the API call to prebind the PV
|
||||
@ -135,28 +156,19 @@ type SchedulerVolumeBinder interface {
|
||||
// 3. Wait for PVCs to be completely bound by the PV controller
|
||||
//
|
||||
// This function can be called in parallel.
|
||||
BindPodVolumes(assumedPod *v1.Pod) error
|
||||
|
||||
// GetBindingsCache returns the cache used (if any) to store volume binding decisions.
|
||||
GetBindingsCache() PodBindingCache
|
||||
|
||||
// DeletePodBindings will delete pod's bindingDecisions in podBindingCache.
|
||||
DeletePodBindings(pod *v1.Pod)
|
||||
BindPodVolumes(assumedPod *v1.Pod, podVolumes *PodVolumes) error
|
||||
}
|
||||
|
||||
type volumeBinder struct {
|
||||
kubeClient clientset.Interface
|
||||
classLister storagelisters.StorageClassLister
|
||||
|
||||
podLister corelisters.PodLister
|
||||
nodeInformer coreinformers.NodeInformer
|
||||
csiNodeInformer storageinformers.CSINodeInformer
|
||||
pvcCache PVCAssumeCache
|
||||
pvCache PVAssumeCache
|
||||
|
||||
// Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes.
|
||||
// AssumePodVolumes modifies the bindings again for use in BindPodVolumes.
|
||||
podBindingCache PodBindingCache
|
||||
|
||||
// Amount of time to wait for the bind operation to succeed
|
||||
bindTimeout time.Duration
|
||||
|
||||
@ -166,44 +178,31 @@ type volumeBinder struct {
|
||||
// NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions.
|
||||
func NewVolumeBinder(
|
||||
kubeClient clientset.Interface,
|
||||
podInformer coreinformers.PodInformer,
|
||||
nodeInformer coreinformers.NodeInformer,
|
||||
csiNodeInformer storageinformers.CSINodeInformer,
|
||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||
pvInformer coreinformers.PersistentVolumeInformer,
|
||||
storageClassInformer storageinformers.StorageClassInformer,
|
||||
bindTimeout time.Duration) SchedulerVolumeBinder {
|
||||
|
||||
b := &volumeBinder{
|
||||
return &volumeBinder{
|
||||
kubeClient: kubeClient,
|
||||
podLister: podInformer.Lister(),
|
||||
classLister: storageClassInformer.Lister(),
|
||||
nodeInformer: nodeInformer,
|
||||
csiNodeInformer: csiNodeInformer,
|
||||
pvcCache: NewPVCAssumeCache(pvcInformer.Informer()),
|
||||
pvCache: NewPVAssumeCache(pvInformer.Informer()),
|
||||
podBindingCache: NewPodBindingCache(),
|
||||
bindTimeout: bindTimeout,
|
||||
translator: csitrans.New(),
|
||||
}
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *volumeBinder) GetBindingsCache() PodBindingCache {
|
||||
return b.podBindingCache
|
||||
}
|
||||
|
||||
// DeletePodBindings will delete pod's bindingDecisions in podBindingCache.
|
||||
func (b *volumeBinder) DeletePodBindings(pod *v1.Pod) {
|
||||
cache := b.podBindingCache
|
||||
if pod != nil {
|
||||
cache.DeleteBindings(pod)
|
||||
}
|
||||
}
|
||||
|
||||
// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache.
|
||||
// This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer.
|
||||
// That's necessary because some operations will need to pass in to the predicate fake node objects.
|
||||
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (reasons ConflictReasons, err error) {
|
||||
// FindPodVolumes finds the matching PVs for PVCs and nodes to provision PVs
|
||||
// for the given pod and node. If the node does not fit, confilict reasons are
|
||||
// returned.
|
||||
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) {
|
||||
podVolumes = &PodVolumes{}
|
||||
podName := getPodName(pod)
|
||||
|
||||
// Warning: Below log needs high verbosity as it can be printed several times (#60933).
|
||||
@ -235,16 +234,10 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*
|
||||
}()
|
||||
|
||||
var (
|
||||
matchedBindings []*bindingInfo
|
||||
matchedBindings []*BindingInfo
|
||||
provisionedClaims []*v1.PersistentVolumeClaim
|
||||
)
|
||||
defer func() {
|
||||
// We recreate bindings for each new schedule loop.
|
||||
if len(matchedBindings) == 0 && len(provisionedClaims) == 0 {
|
||||
// Clear cache if no claims to bind or provision for this node.
|
||||
b.podBindingCache.ClearBindings(pod, node.Name)
|
||||
return
|
||||
}
|
||||
// Although we do not distinguish nil from empty in this function, for
|
||||
// easier testing, we normalize empty to nil.
|
||||
if len(matchedBindings) == 0 {
|
||||
@ -253,15 +246,15 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*
|
||||
if len(provisionedClaims) == 0 {
|
||||
provisionedClaims = nil
|
||||
}
|
||||
// Mark cache with all matched and provisioned claims for this node
|
||||
b.podBindingCache.UpdateBindings(pod, node.Name, matchedBindings, provisionedClaims)
|
||||
podVolumes.StaticBindings = matchedBindings
|
||||
podVolumes.DynamicProvisions = provisionedClaims
|
||||
}()
|
||||
|
||||
// Check PV node affinity on bound volumes
|
||||
if len(boundClaims) > 0 {
|
||||
boundVolumesSatisfied, err = b.checkBoundClaims(boundClaims, node, podName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@ -291,7 +284,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*
|
||||
var unboundClaims []*v1.PersistentVolumeClaim
|
||||
unboundVolumesSatisfied, matchedBindings, unboundClaims, err = b.findMatchingVolumes(pod, claimsToFindMatching, node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return
|
||||
}
|
||||
claimsToProvision = append(claimsToProvision, unboundClaims...)
|
||||
}
|
||||
@ -300,7 +293,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*
|
||||
if len(claimsToProvision) > 0 {
|
||||
unboundVolumesSatisfied, provisionedClaims, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -308,12 +301,12 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, boundClaims, claimsToBind []*
|
||||
return
|
||||
}
|
||||
|
||||
// AssumePodVolumes will take the cached matching PVs and PVCs to provision
|
||||
// in podBindingCache for the chosen node, and:
|
||||
// AssumePodVolumes will take the matching PVs and PVCs to provision in pod's
|
||||
// volume information for the chosen node, and:
|
||||
// 1. Update the pvCache with the new prebound PV.
|
||||
// 2. Update the pvcCache with the new PVCs with annotations set
|
||||
// 3. Update podBindingCache again with cached API updates for PVs and PVCs.
|
||||
func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, err error) {
|
||||
// 3. Update PodVolumes again with cached API updates for PVs and PVCs.
|
||||
func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error) {
|
||||
podName := getPodName(assumedPod)
|
||||
|
||||
klog.V(4).Infof("AssumePodVolumes for pod %q, node %q", podName, nodeName)
|
||||
@ -330,12 +323,9 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
|
||||
return true, nil
|
||||
}
|
||||
|
||||
claimsToBind := b.podBindingCache.GetBindings(assumedPod, nodeName)
|
||||
claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, nodeName)
|
||||
|
||||
// Assume PV
|
||||
newBindings := []*bindingInfo{}
|
||||
for _, binding := range claimsToBind {
|
||||
newBindings := []*BindingInfo{}
|
||||
for _, binding := range podVolumes.StaticBindings {
|
||||
newPV, dirty, err := pvutil.GetBindVolumeToClaim(binding.pv, binding.pvc)
|
||||
klog.V(5).Infof("AssumePodVolumes: GetBindVolumeToClaim for pod %q, PV %q, PVC %q. newPV %p, dirty %v, err: %v",
|
||||
podName,
|
||||
@ -356,12 +346,12 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
newBindings = append(newBindings, &bindingInfo{pv: newPV, pvc: binding.pvc})
|
||||
newBindings = append(newBindings, &BindingInfo{pv: newPV, pvc: binding.pvc})
|
||||
}
|
||||
|
||||
// Assume PVCs
|
||||
newProvisionedPVCs := []*v1.PersistentVolumeClaim{}
|
||||
for _, claim := range claimsToProvision {
|
||||
for _, claim := range podVolumes.DynamicProvisions {
|
||||
// The claims from method args can be pointing to watcher cache. We must not
|
||||
// modify these, therefore create a copy.
|
||||
claimClone := claim.DeepCopy()
|
||||
@ -376,24 +366,21 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
|
||||
newProvisionedPVCs = append(newProvisionedPVCs, claimClone)
|
||||
}
|
||||
|
||||
// Update cache with the assumed pvcs and pvs
|
||||
// Even if length is zero, update the cache with an empty slice to indicate that no
|
||||
// operations are needed
|
||||
b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings, newProvisionedPVCs)
|
||||
|
||||
podVolumes.StaticBindings = newBindings
|
||||
podVolumes.DynamicProvisions = newProvisionedPVCs
|
||||
return
|
||||
}
|
||||
|
||||
// RevertAssumedPodVolumes will revert assumed PV and PVC cache.
|
||||
func (b *volumeBinder) RevertAssumedPodVolumes(assumedPod *v1.Pod, nodeName string) {
|
||||
b.revertAssumedPVs(b.podBindingCache.GetBindings(assumedPod, nodeName))
|
||||
b.revertAssumedPVCs(b.podBindingCache.GetProvisionedPVCs(assumedPod, nodeName))
|
||||
func (b *volumeBinder) RevertAssumedPodVolumes(podVolumes *PodVolumes) {
|
||||
b.revertAssumedPVs(podVolumes.StaticBindings)
|
||||
b.revertAssumedPVCs(podVolumes.DynamicProvisions)
|
||||
}
|
||||
|
||||
// BindPodVolumes gets the cached bindings and PVCs to provision in podBindingCache,
|
||||
// BindPodVolumes gets the cached bindings and PVCs to provision in pod's volumes information,
|
||||
// makes the API update for those PVs/PVCs, and waits for the PVCs to be completely bound
|
||||
// by the PV controller.
|
||||
func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) (err error) {
|
||||
func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod, podVolumes *PodVolumes) (err error) {
|
||||
podName := getPodName(assumedPod)
|
||||
klog.V(4).Infof("BindPodVolumes for pod %q, node %q", podName, assumedPod.Spec.NodeName)
|
||||
|
||||
@ -405,8 +392,8 @@ func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) (err error) {
|
||||
}
|
||||
}()
|
||||
|
||||
bindings := b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName)
|
||||
claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName)
|
||||
bindings := podVolumes.StaticBindings
|
||||
claimsToProvision := podVolumes.DynamicProvisions
|
||||
|
||||
// Start API operations
|
||||
err = b.bindAPIUpdate(podName, bindings, claimsToProvision)
|
||||
@ -432,9 +419,8 @@ func getPVCName(pvc *v1.PersistentVolumeClaim) string {
|
||||
return pvc.Namespace + "/" + pvc.Name
|
||||
}
|
||||
|
||||
// bindAPIUpdate gets the cached bindings and PVCs to provision in podBindingCache
|
||||
// and makes the API update for those PVs/PVCs.
|
||||
func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) error {
|
||||
// bindAPIUpdate makes the API update for those PVs/PVCs.
|
||||
func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*BindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) error {
|
||||
if bindings == nil {
|
||||
return fmt.Errorf("failed to get cached bindings for pod %q", podName)
|
||||
}
|
||||
@ -456,7 +442,7 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl
|
||||
}()
|
||||
|
||||
var (
|
||||
binding *bindingInfo
|
||||
binding *BindingInfo
|
||||
i int
|
||||
claim *v1.PersistentVolumeClaim
|
||||
)
|
||||
@ -509,7 +495,7 @@ var (
|
||||
// PV/PVC cache can be assumed again in main scheduler loop, we must check
|
||||
// latest state in API server which are shared with PV controller and
|
||||
// provisioners
|
||||
func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) (bool, error) {
|
||||
func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) (bool, error) {
|
||||
podName := getPodName(pod)
|
||||
if bindings == nil {
|
||||
return false, fmt.Errorf("failed to get cached bindings for pod %q", podName)
|
||||
@ -531,13 +517,14 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
|
||||
|
||||
// Check for any conditions that might require scheduling retry
|
||||
|
||||
// When pod is removed from scheduling queue because of deletion or any
|
||||
// other reasons, binding operation should be cancelled. There is no need
|
||||
// to check PV/PVC bindings any more.
|
||||
// We check pod binding cache here which will be cleared when pod is
|
||||
// removed from scheduling queue.
|
||||
if b.podBindingCache.GetDecisions(pod) == nil {
|
||||
return false, fmt.Errorf("pod %q does not exist any more", podName)
|
||||
// When pod is deleted, binding operation should be cancelled. There is no
|
||||
// need to check PV/PVC bindings any more.
|
||||
_, err = b.podLister.Pods(pod.Namespace).Get(pod.Name)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
return false, fmt.Errorf("pod %q does not exist any more", podName)
|
||||
}
|
||||
klog.Errorf("failed to get pod %s/%s from the lister: %v", pod.Namespace, pod.Name, err)
|
||||
}
|
||||
|
||||
for _, binding := range bindings {
|
||||
@ -756,7 +743,7 @@ func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node
|
||||
|
||||
// findMatchingVolumes tries to find matching volumes for given claims,
|
||||
// and return unbound claims for further provision.
|
||||
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (foundMatches bool, bindings []*bindingInfo, unboundClaims []*v1.PersistentVolumeClaim, err error) {
|
||||
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (foundMatches bool, bindings []*BindingInfo, unboundClaims []*v1.PersistentVolumeClaim, err error) {
|
||||
podName := getPodName(pod)
|
||||
// Sort all the claims by increasing size request to get the smallest fits
|
||||
sort.Sort(byPVCSize(claimsToBind))
|
||||
@ -785,7 +772,7 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi
|
||||
|
||||
// matching PV needs to be excluded so we don't select it again
|
||||
chosenPVs[pv.Name] = pv
|
||||
bindings = append(bindings, &bindingInfo{pv: pv, pvc: pvc})
|
||||
bindings = append(bindings, &BindingInfo{pv: pv, pvc: pvc})
|
||||
klog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", pv.Name, pvcName, node.Name, podName)
|
||||
}
|
||||
|
||||
@ -837,9 +824,9 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v
|
||||
return true, provisionedClaims, nil
|
||||
}
|
||||
|
||||
func (b *volumeBinder) revertAssumedPVs(bindings []*bindingInfo) {
|
||||
for _, bindingInfo := range bindings {
|
||||
b.pvCache.Restore(bindingInfo.pv.Name)
|
||||
func (b *volumeBinder) revertAssumedPVs(bindings []*BindingInfo) {
|
||||
for _, BindingInfo := range bindings {
|
||||
b.pvCache.Restore(BindingInfo.pv.Name)
|
||||
}
|
||||
}
|
||||
|
||||
@ -849,14 +836,6 @@ func (b *volumeBinder) revertAssumedPVCs(claims []*v1.PersistentVolumeClaim) {
|
||||
}
|
||||
}
|
||||
|
||||
type bindingInfo struct {
|
||||
// Claim that needs to be bound
|
||||
pvc *v1.PersistentVolumeClaim
|
||||
|
||||
// Proposed PV to bind to this claim
|
||||
pv *v1.PersistentVolume
|
||||
}
|
||||
|
||||
type byPVCSize []*v1.PersistentVolumeClaim
|
||||
|
||||
func (a byPVCSize) Len() int {
|
||||
|
@ -1,167 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduling
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/scheduling/metrics"
|
||||
)
|
||||
|
||||
// PodBindingCache stores PV binding decisions per pod per node.
|
||||
// Pod entries are removed when the Pod is deleted or updated to
|
||||
// no longer be schedulable.
|
||||
type PodBindingCache interface {
|
||||
// UpdateBindings will update the cache with the given bindings for the
|
||||
// pod and node.
|
||||
UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo, provisionings []*v1.PersistentVolumeClaim)
|
||||
|
||||
// ClearBindings will clear the cached bindings for the given pod and node.
|
||||
ClearBindings(pod *v1.Pod, node string)
|
||||
|
||||
// GetBindings will return the cached bindings for the given pod and node.
|
||||
// A nil return value means that the entry was not found. An empty slice
|
||||
// means that no binding operations are needed.
|
||||
GetBindings(pod *v1.Pod, node string) []*bindingInfo
|
||||
|
||||
// A nil return value means that the entry was not found. An empty slice
|
||||
// means that no provisioning operations are needed.
|
||||
GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim
|
||||
|
||||
// GetDecisions will return all cached decisions for the given pod.
|
||||
GetDecisions(pod *v1.Pod) nodeDecisions
|
||||
|
||||
// DeleteBindings will remove all cached bindings and provisionings for the given pod.
|
||||
// TODO: separate the func if it is needed to delete bindings/provisionings individually
|
||||
DeleteBindings(pod *v1.Pod)
|
||||
}
|
||||
|
||||
type podBindingCache struct {
|
||||
// synchronizes bindingDecisions
|
||||
rwMutex sync.RWMutex
|
||||
|
||||
// Key = pod name
|
||||
// Value = nodeDecisions
|
||||
bindingDecisions map[string]nodeDecisions
|
||||
}
|
||||
|
||||
// Key = nodeName
|
||||
// Value = bindings & provisioned PVCs of the node
|
||||
type nodeDecisions map[string]nodeDecision
|
||||
|
||||
// A decision includes bindingInfo and provisioned PVCs of the node
|
||||
type nodeDecision struct {
|
||||
bindings []*bindingInfo
|
||||
provisionings []*v1.PersistentVolumeClaim
|
||||
}
|
||||
|
||||
// NewPodBindingCache creates a pod binding cache.
|
||||
func NewPodBindingCache() PodBindingCache {
|
||||
return &podBindingCache{bindingDecisions: map[string]nodeDecisions{}}
|
||||
}
|
||||
|
||||
func (c *podBindingCache) GetDecisions(pod *v1.Pod) nodeDecisions {
|
||||
c.rwMutex.RLock()
|
||||
defer c.rwMutex.RUnlock()
|
||||
podName := getPodName(pod)
|
||||
decisions, ok := c.bindingDecisions[podName]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return decisions
|
||||
}
|
||||
|
||||
func (c *podBindingCache) DeleteBindings(pod *v1.Pod) {
|
||||
c.rwMutex.Lock()
|
||||
defer c.rwMutex.Unlock()
|
||||
|
||||
podName := getPodName(pod)
|
||||
|
||||
if _, ok := c.bindingDecisions[podName]; ok {
|
||||
delete(c.bindingDecisions, podName)
|
||||
metrics.VolumeBindingRequestSchedulerBinderCache.WithLabelValues("delete").Inc()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *podBindingCache) UpdateBindings(pod *v1.Pod, node string, bindings []*bindingInfo, pvcs []*v1.PersistentVolumeClaim) {
|
||||
c.rwMutex.Lock()
|
||||
defer c.rwMutex.Unlock()
|
||||
|
||||
podName := getPodName(pod)
|
||||
decisions, ok := c.bindingDecisions[podName]
|
||||
if !ok {
|
||||
decisions = nodeDecisions{}
|
||||
c.bindingDecisions[podName] = decisions
|
||||
}
|
||||
decision, ok := decisions[node]
|
||||
if !ok {
|
||||
decision = nodeDecision{
|
||||
bindings: bindings,
|
||||
provisionings: pvcs,
|
||||
}
|
||||
metrics.VolumeBindingRequestSchedulerBinderCache.WithLabelValues("add").Inc()
|
||||
} else {
|
||||
decision.bindings = bindings
|
||||
decision.provisionings = pvcs
|
||||
}
|
||||
decisions[node] = decision
|
||||
}
|
||||
|
||||
func (c *podBindingCache) GetBindings(pod *v1.Pod, node string) []*bindingInfo {
|
||||
c.rwMutex.RLock()
|
||||
defer c.rwMutex.RUnlock()
|
||||
|
||||
podName := getPodName(pod)
|
||||
decisions, ok := c.bindingDecisions[podName]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
decision, ok := decisions[node]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return decision.bindings
|
||||
}
|
||||
|
||||
func (c *podBindingCache) GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim {
|
||||
c.rwMutex.RLock()
|
||||
defer c.rwMutex.RUnlock()
|
||||
|
||||
podName := getPodName(pod)
|
||||
decisions, ok := c.bindingDecisions[podName]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
decision, ok := decisions[node]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return decision.provisionings
|
||||
}
|
||||
|
||||
func (c *podBindingCache) ClearBindings(pod *v1.Pod, node string) {
|
||||
c.rwMutex.Lock()
|
||||
defer c.rwMutex.Unlock()
|
||||
|
||||
podName := getPodName(pod)
|
||||
decisions, ok := c.bindingDecisions[podName]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
delete(decisions, node)
|
||||
}
|
@ -16,7 +16,7 @@ limitations under the License.
|
||||
|
||||
package scheduling
|
||||
|
||||
import "k8s.io/api/core/v1"
|
||||
import v1 "k8s.io/api/core/v1"
|
||||
|
||||
// FakeVolumeBinderConfig holds configurations for fake volume binder.
|
||||
type FakeVolumeBinderConfig struct {
|
||||
@ -48,29 +48,21 @@ func (b *FakeVolumeBinder) GetPodVolumes(pod *v1.Pod) (boundClaims, unboundClaim
|
||||
}
|
||||
|
||||
// FindPodVolumes implements SchedulerVolumeBinder.FindPodVolumes.
|
||||
func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, _, _ []*v1.PersistentVolumeClaim, node *v1.Node) (reasons ConflictReasons, err error) {
|
||||
return b.config.FindReasons, b.config.FindErr
|
||||
func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, _, _ []*v1.PersistentVolumeClaim, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) {
|
||||
return nil, b.config.FindReasons, b.config.FindErr
|
||||
}
|
||||
|
||||
// AssumePodVolumes implements SchedulerVolumeBinder.AssumePodVolumes.
|
||||
func (b *FakeVolumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (bool, error) {
|
||||
func (b *FakeVolumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (bool, error) {
|
||||
b.AssumeCalled = true
|
||||
return b.config.AllBound, b.config.AssumeErr
|
||||
}
|
||||
|
||||
// RevertAssumedPodVolumes implements SchedulerVolumeBinder.RevertAssumedPodVolumes
|
||||
func (b *FakeVolumeBinder) RevertAssumedPodVolumes(assumedPod *v1.Pod, nodeName string) {}
|
||||
func (b *FakeVolumeBinder) RevertAssumedPodVolumes(_ *PodVolumes) {}
|
||||
|
||||
// BindPodVolumes implements SchedulerVolumeBinder.BindPodVolumes.
|
||||
func (b *FakeVolumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {
|
||||
func (b *FakeVolumeBinder) BindPodVolumes(assumedPod *v1.Pod, podVolumes *PodVolumes) error {
|
||||
b.BindCalled = true
|
||||
return b.config.BindErr
|
||||
}
|
||||
|
||||
// GetBindingsCache implements SchedulerVolumeBinder.GetBindingsCache.
|
||||
func (b *FakeVolumeBinder) GetBindingsCache() PodBindingCache {
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeletePodBindings implements SchedulerVolumeBinder.DeletePodBindings.
|
||||
func (b *FakeVolumeBinder) DeletePodBindings(pod *v1.Pod) {}
|
||||
|
@ -157,11 +157,6 @@ func getDefaultConfig() *schedulerapi.Plugins {
|
||||
{Name: defaultbinder.Name},
|
||||
},
|
||||
},
|
||||
PostBind: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -274,7 +274,6 @@ func NewLegacyRegistry() *LegacyRegistry {
|
||||
plugins.Reserve = appendToPluginSet(plugins.Reserve, volumebinding.Name, nil)
|
||||
plugins.PreBind = appendToPluginSet(plugins.PreBind, volumebinding.Name, nil)
|
||||
plugins.Unreserve = appendToPluginSet(plugins.Unreserve, volumebinding.Name, nil)
|
||||
plugins.PostBind = appendToPluginSet(plugins.PostBind, volumebinding.Name, nil)
|
||||
return
|
||||
})
|
||||
registry.registerPredicateConfigProducer(NoDiskConflictPred,
|
||||
|
@ -20,12 +20,11 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
@ -39,11 +38,21 @@ const (
|
||||
stateKey framework.StateKey = Name
|
||||
)
|
||||
|
||||
// the state is initialized in PreFilter phase. because we save the pointer in
|
||||
// framework.CycleState, in the later phases we don't need to call Write method
|
||||
// to update the value
|
||||
type stateData struct {
|
||||
skip bool // set true if pod does not have PVCs
|
||||
boundClaims []*v1.PersistentVolumeClaim
|
||||
claimsToBind []*v1.PersistentVolumeClaim
|
||||
allBound bool
|
||||
// podVolumesByNode holds the pod's volume information found in the Filter
|
||||
// phase for each node
|
||||
// it's initialized in the PreFilter phase
|
||||
podVolumesByNode map[string]*scheduling.PodVolumes
|
||||
// guards podVolumesByNode in the Filter phase, as the method may be called
|
||||
// in parallel
|
||||
podVolumesByNodeMutex sync.Mutex
|
||||
}
|
||||
|
||||
func (d *stateData) Clone() framework.StateData {
|
||||
@ -52,12 +61,7 @@ func (d *stateData) Clone() framework.StateData {
|
||||
|
||||
// VolumeBinding is a plugin that binds pod volumes in scheduling.
|
||||
// In the Filter phase, pod binding cache is created for the pod and used in
|
||||
// Reserve and PreBind phases. Pod binding cache will be cleared at
|
||||
// Unreserve and PostBind extension points. However, if pod fails before
|
||||
// the Reserve phase and is deleted from the apiserver later, its pod binding
|
||||
// cache cannot be cleared at plugin extension points. We register an
|
||||
// event handler to clear pod binding cache when the pod is deleted to
|
||||
// prevent memory leaking.
|
||||
// Reserve and PreBind phases.
|
||||
type VolumeBinding struct {
|
||||
Binder scheduling.SchedulerVolumeBinder
|
||||
}
|
||||
@ -67,7 +71,6 @@ var _ framework.FilterPlugin = &VolumeBinding{}
|
||||
var _ framework.ReservePlugin = &VolumeBinding{}
|
||||
var _ framework.PreBindPlugin = &VolumeBinding{}
|
||||
var _ framework.UnreservePlugin = &VolumeBinding{}
|
||||
var _ framework.PostBindPlugin = &VolumeBinding{}
|
||||
|
||||
// Name is the name of the plugin used in Registry and configurations.
|
||||
const Name = "VolumeBinding"
|
||||
@ -90,7 +93,7 @@ func podHasPVCs(pod *v1.Pod) bool {
|
||||
// immediate PVCs bound. If not all immediate PVCs are bound, an
|
||||
// UnschedulableAndUnresolvable is returned.
|
||||
func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
|
||||
// If pod does not request any PVC, we don't need to do anything.
|
||||
// If pod does not reference any PVC, we don't need to do anything.
|
||||
if !podHasPVCs(pod) {
|
||||
state.Write(stateKey, &stateData{skip: true})
|
||||
return nil
|
||||
@ -107,7 +110,7 @@ func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleSt
|
||||
status.AppendReason("pod has unbound immediate PersistentVolumeClaims")
|
||||
return status
|
||||
}
|
||||
state.Write(stateKey, &stateData{boundClaims: boundClaims, claimsToBind: claimsToBind})
|
||||
state.Write(stateKey, &stateData{boundClaims: boundClaims, claimsToBind: claimsToBind, podVolumesByNode: make(map[string]*scheduling.PodVolumes)})
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -155,7 +158,7 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p
|
||||
return nil
|
||||
}
|
||||
|
||||
reasons, err := pl.Binder.FindPodVolumes(pod, state.boundClaims, state.claimsToBind, node)
|
||||
podVolumes, reasons, err := pl.Binder.FindPodVolumes(pod, state.boundClaims, state.claimsToBind, node)
|
||||
|
||||
if err != nil {
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
@ -168,16 +171,31 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p
|
||||
}
|
||||
return status
|
||||
}
|
||||
|
||||
state.podVolumesByNodeMutex.Lock()
|
||||
state.podVolumesByNode[node.Name] = podVolumes
|
||||
state.podVolumesByNodeMutex.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reserve reserves volumes of pod and saves binding status in cycle state.
|
||||
func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
allBound, err := pl.Binder.AssumePodVolumes(pod, nodeName)
|
||||
state, err := getStateData(cs)
|
||||
if err != nil {
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
cs.Write(stateKey, &stateData{allBound: allBound})
|
||||
// we don't need to hold the lock as only one node will be reserved for the given pod
|
||||
podVolumes, ok := state.podVolumesByNode[nodeName]
|
||||
if ok {
|
||||
allBound, err := pl.Binder.AssumePodVolumes(pod, nodeName, podVolumes)
|
||||
if err != nil {
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
state.allBound = allBound
|
||||
} else {
|
||||
// may not exist if the pod does not reference any PVC
|
||||
state.allBound = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -195,8 +213,13 @@ func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState,
|
||||
// no need to bind volumes
|
||||
return nil
|
||||
}
|
||||
// we don't need to hold the lock as only one node will be pre-bound for the given pod
|
||||
podVolumes, ok := s.podVolumesByNode[nodeName]
|
||||
if !ok {
|
||||
return framework.NewStatus(framework.Error, fmt.Sprintf("no pod volumes found for node %q", nodeName))
|
||||
}
|
||||
klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", pod.Namespace, pod.Name)
|
||||
err = pl.Binder.BindPodVolumes(pod)
|
||||
err = pl.Binder.BindPodVolumes(pod, podVolumes)
|
||||
if err != nil {
|
||||
klog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", pod.Namespace, pod.Name, err)
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
@ -205,17 +228,19 @@ func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState,
|
||||
return nil
|
||||
}
|
||||
|
||||
// Unreserve clears assumed PV and PVC cache and pod binding state.
|
||||
// It's idempotent, and does nothing if no cache or binding state found for the given pod.
|
||||
// Unreserve clears assumed PV and PVC cache.
|
||||
// It's idempotent, and does nothing if no cache found for the given pod.
|
||||
func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
|
||||
pl.Binder.RevertAssumedPodVolumes(pod, nodeName)
|
||||
pl.Binder.DeletePodBindings(pod)
|
||||
return
|
||||
}
|
||||
|
||||
// PostBind is called after a pod is successfully bound.
|
||||
func (pl *VolumeBinding) PostBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
|
||||
pl.Binder.DeletePodBindings(pod)
|
||||
s, err := getStateData(cs)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// we don't need to hold the lock as only one node may be unreserved
|
||||
podVolumes, ok := s.podVolumesByNode[nodeName]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
pl.Binder.RevertAssumedPodVolumes(podVolumes)
|
||||
return
|
||||
}
|
||||
|
||||
@ -228,37 +253,13 @@ func New(plArgs runtime.Object, fh framework.FrameworkHandle) (framework.Plugin,
|
||||
if err := validateArgs(args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
podInformer := fh.SharedInformerFactory().Core().V1().Pods()
|
||||
nodeInformer := fh.SharedInformerFactory().Core().V1().Nodes()
|
||||
pvcInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumeClaims()
|
||||
pvInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumes()
|
||||
storageClassInformer := fh.SharedInformerFactory().Storage().V1().StorageClasses()
|
||||
csiNodeInformer := fh.SharedInformerFactory().Storage().V1().CSINodes()
|
||||
binder := scheduling.NewVolumeBinder(fh.ClientSet(), nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
|
||||
// TODO(#90962) Because pod volume binding cache in SchedulerVolumeBinder is
|
||||
// used only in current scheduling cycle, we can share it via
|
||||
// framework.CycleState, then we don't need to register this event handler
|
||||
// and Unreserve/PostBind extension points to clear pod volume binding
|
||||
// cache.
|
||||
fh.SharedInformerFactory().Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
var pod *v1.Pod
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
pod = obj.(*v1.Pod)
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
var ok bool
|
||||
pod, ok = t.Obj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod", obj))
|
||||
return
|
||||
}
|
||||
default:
|
||||
utilruntime.HandleError(fmt.Errorf("unable to handle object %T", obj))
|
||||
return
|
||||
}
|
||||
binder.DeletePodBindings(pod)
|
||||
},
|
||||
})
|
||||
binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
|
||||
return &VolumeBinding{
|
||||
Binder: binder,
|
||||
}, nil
|
||||
|
Loading…
Reference in New Issue
Block a user