mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
Make volume binder resilient to races
- FindPodVolumes do not error if PVC is assumed with selected node - BindPodVolumes check against API objects
This commit is contained in:
parent
fdf381098b
commit
13d87fbff8
@ -44,6 +44,7 @@ go_library(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
|
||||
@ -92,6 +93,7 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
|
@ -285,15 +285,16 @@ func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVo
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) {
|
||||
func (ctrl *PersistentVolumeController) isDelayBindingProvisioning(claim *v1.PersistentVolumeClaim) bool {
|
||||
// When feature VolumeScheduling enabled,
|
||||
// Scheduler signal to the PV controller to start dynamic
|
||||
// provisioning by setting the "annSelectedNode" annotation
|
||||
// in the PVC
|
||||
if _, ok := claim.Annotations[annSelectedNode]; ok {
|
||||
return false, nil
|
||||
}
|
||||
_, ok := claim.Annotations[annSelectedNode]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (ctrl *PersistentVolumeController) isDelayBindingMode(claim *v1.PersistentVolumeClaim) (bool, error) {
|
||||
className := v1helper.GetPersistentVolumeClaimClass(claim)
|
||||
if className == "" {
|
||||
return false, nil
|
||||
@ -311,6 +312,18 @@ func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentV
|
||||
return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil
|
||||
}
|
||||
|
||||
// shouldDelayBinding returns true if binding of claim should be delayed, false otherwise.
|
||||
// If binding of claim should be delayed, only claims pbound by scheduler
|
||||
func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) {
|
||||
// If claim has already been assigned a node by scheduler for dynamic provisioning.
|
||||
if ctrl.isDelayBindingProvisioning(claim) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// If claim is in delay binding mode.
|
||||
return ctrl.isDelayBindingMode(claim)
|
||||
}
|
||||
|
||||
// syncUnboundClaim is the main controller method to decide what to do with an
|
||||
// unbound claim.
|
||||
func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error {
|
||||
|
@ -42,6 +42,9 @@ type AssumeCache interface {
|
||||
// Get the object by name
|
||||
Get(objName string) (interface{}, error)
|
||||
|
||||
// Get the API object by name
|
||||
GetAPIObj(objName string) (interface{}, error)
|
||||
|
||||
// List all the objects in the cache
|
||||
List(indexObj interface{}) []interface{}
|
||||
}
|
||||
@ -250,6 +253,17 @@ func (c *assumeCache) Get(objName string) (interface{}, error) {
|
||||
return objInfo.latestObj, nil
|
||||
}
|
||||
|
||||
func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) {
|
||||
c.rwMutex.RLock()
|
||||
defer c.rwMutex.RUnlock()
|
||||
|
||||
objInfo, err := c.getObjInfo(objName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return objInfo.apiObj, nil
|
||||
}
|
||||
|
||||
func (c *assumeCache) List(indexObj interface{}) []interface{} {
|
||||
c.rwMutex.RLock()
|
||||
defer c.rwMutex.RUnlock()
|
||||
@ -297,7 +311,7 @@ func (c *assumeCache) Assume(obj interface{}) error {
|
||||
}
|
||||
|
||||
if newVersion < storedVersion {
|
||||
return fmt.Errorf("%v %q is out of sync", c.description, name)
|
||||
return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion)
|
||||
}
|
||||
|
||||
// Only update the cached object
|
||||
@ -325,6 +339,7 @@ type PVAssumeCache interface {
|
||||
AssumeCache
|
||||
|
||||
GetPV(pvName string) (*v1.PersistentVolume, error)
|
||||
GetAPIPV(pvName string) (*v1.PersistentVolume, error)
|
||||
ListPVs(storageClassName string) []*v1.PersistentVolume
|
||||
}
|
||||
|
||||
@ -356,6 +371,18 @@ func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
|
||||
return pv, nil
|
||||
}
|
||||
|
||||
func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) {
|
||||
obj, err := c.GetAPIObj(pvName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pv, ok := obj.(*v1.PersistentVolume)
|
||||
if !ok {
|
||||
return nil, &errWrongType{"v1.PersistentVolume", obj}
|
||||
}
|
||||
return pv, nil
|
||||
}
|
||||
|
||||
func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
|
||||
objs := c.List(&v1.PersistentVolume{
|
||||
Spec: v1.PersistentVolumeSpec{
|
||||
@ -380,6 +407,7 @@ type PVCAssumeCache interface {
|
||||
// GetPVC returns the PVC from the cache with given pvcKey.
|
||||
// pvcKey is the result of MetaNamespaceKeyFunc on PVC obj
|
||||
GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
|
||||
GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
|
||||
}
|
||||
|
||||
type pvcAssumeCache struct {
|
||||
@ -402,3 +430,15 @@ func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error
|
||||
}
|
||||
return pvc, nil
|
||||
}
|
||||
|
||||
func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
|
||||
obj, err := c.GetAPIObj(pvcKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pvc, ok := obj.(*v1.PersistentVolumeClaim)
|
||||
if !ok {
|
||||
return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
|
||||
}
|
||||
return pvc, nil
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/storage/etcd"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
storageinformers "k8s.io/client-go/informers/storage/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
@ -145,6 +146,27 @@ func (b *volumeBinder) GetBindingsCache() PodBindingCache {
|
||||
// This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer.
|
||||
// That's necessary because some operations will need to pass in to the predicate fake node objects.
|
||||
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
|
||||
var (
|
||||
matchedClaims []*bindingInfo
|
||||
provisionedClaims []*v1.PersistentVolumeClaim
|
||||
)
|
||||
defer func() {
|
||||
// We recreate bindings for each new schedule loop.
|
||||
// Although we do not distinguish nil from empty in this function, for
|
||||
// easier testing, we normalize empty to nil.
|
||||
if len(matchedClaims) == 0 {
|
||||
matchedClaims = nil
|
||||
}
|
||||
if len(provisionedClaims) == 0 {
|
||||
provisionedClaims = nil
|
||||
}
|
||||
// TODO merge into one atomic function
|
||||
// Mark cache with all the matches for each PVC for this node
|
||||
b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims)
|
||||
// Mark cache with all the PVCs that need provisioning for this node
|
||||
b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims)
|
||||
}()
|
||||
|
||||
podName := getPodName(pod)
|
||||
|
||||
// Warning: Below log needs high verbosity as it can be printed several times (#60933).
|
||||
@ -181,16 +203,39 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
|
||||
}
|
||||
}
|
||||
|
||||
// Find matching volumes and node for unbound claims
|
||||
if len(claimsToBind) > 0 {
|
||||
var claimsToProvision []*v1.PersistentVolumeClaim
|
||||
unboundVolumesSatisfied, claimsToProvision, err = b.findMatchingVolumes(pod, claimsToBind, node)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
var (
|
||||
claimsToFindMatching []*v1.PersistentVolumeClaim
|
||||
claimsToProvision []*v1.PersistentVolumeClaim
|
||||
)
|
||||
|
||||
// Filter out claims to provision
|
||||
for _, claim := range claimsToBind {
|
||||
if selectedNode, ok := claim.Annotations[annSelectedNode]; ok {
|
||||
if selectedNode != node.Name {
|
||||
// Fast path, skip unmatched node
|
||||
return false, boundVolumesSatisfied, nil
|
||||
}
|
||||
claimsToProvision = append(claimsToProvision, claim)
|
||||
} else {
|
||||
claimsToFindMatching = append(claimsToFindMatching, claim)
|
||||
}
|
||||
}
|
||||
|
||||
// Try to provision for unbound volumes
|
||||
if !unboundVolumesSatisfied {
|
||||
unboundVolumesSatisfied, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
|
||||
// Find matching volumes
|
||||
if len(claimsToFindMatching) > 0 {
|
||||
var unboundClaims []*v1.PersistentVolumeClaim
|
||||
unboundVolumesSatisfied, matchedClaims, unboundClaims, err = b.findMatchingVolumes(pod, claimsToFindMatching, node)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
claimsToProvision = append(claimsToProvision, unboundClaims...)
|
||||
}
|
||||
|
||||
// Check for claims to provision
|
||||
if len(claimsToProvision) > 0 {
|
||||
unboundVolumesSatisfied, provisionedClaims, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
@ -304,10 +349,8 @@ func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) (err error) {
|
||||
}
|
||||
|
||||
return wait.Poll(time.Second, b.bindTimeout, func() (bool, error) {
|
||||
// Get cached values every time in case the pod gets deleted
|
||||
bindings = b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName)
|
||||
claimsToProvision = b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName)
|
||||
return b.checkBindings(assumedPod, bindings, claimsToProvision)
|
||||
b, err := b.checkBindings(assumedPod, bindings, claimsToProvision)
|
||||
return b, err
|
||||
})
|
||||
}
|
||||
|
||||
@ -344,6 +387,7 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl
|
||||
|
||||
var (
|
||||
binding *bindingInfo
|
||||
i int
|
||||
claim *v1.PersistentVolumeClaim
|
||||
)
|
||||
|
||||
@ -352,18 +396,24 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl
|
||||
for _, binding = range bindings {
|
||||
klog.V(5).Infof("bindAPIUpdate: Pod %q, binding PV %q to PVC %q", podName, binding.pv.Name, binding.pvc.Name)
|
||||
// TODO: does it hurt if we make an api call and nothing needs to be updated?
|
||||
if _, err := b.ctrl.updateBindVolumeToClaim(binding.pv, binding.pvc, false); err != nil {
|
||||
if newPV, err := b.ctrl.updateBindVolumeToClaim(binding.pv, binding.pvc, false); err != nil {
|
||||
return err
|
||||
} else {
|
||||
// Save updated object from apiserver for later checking.
|
||||
binding.pv = newPV
|
||||
}
|
||||
lastProcessedBinding++
|
||||
}
|
||||
|
||||
// Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest
|
||||
// PV controller is expect to signal back by removing related annotations if actual provisioning fails
|
||||
for _, claim = range claimsToProvision {
|
||||
for i, claim = range claimsToProvision {
|
||||
klog.V(5).Infof("bindAPIUpdate: Pod %q, PVC %q", podName, getPVCName(claim))
|
||||
if _, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil {
|
||||
if newClaim, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil {
|
||||
return err
|
||||
} else {
|
||||
// Save updated object from apiserver for later checking.
|
||||
claimsToProvision[i] = newClaim
|
||||
}
|
||||
lastProcessedProvisioning++
|
||||
}
|
||||
@ -371,12 +421,20 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
versioner = etcd.APIObjectVersioner{}
|
||||
)
|
||||
|
||||
// checkBindings runs through all the PVCs in the Pod and checks:
|
||||
// * if the PVC is fully bound
|
||||
// * if there are any conditions that require binding to fail and be retried
|
||||
//
|
||||
// It returns true when all of the Pod's PVCs are fully bound, and error if
|
||||
// binding (and scheduling) needs to be retried
|
||||
// Note that it checks on API objects not PV/PVC cache, this is because
|
||||
// PV/PVC cache can be assumed again in main scheduler loop, we must check
|
||||
// latest state in API server which are shared with PV controller and
|
||||
// provisioners
|
||||
func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) (bool, error) {
|
||||
podName := getPodName(pod)
|
||||
if bindings == nil {
|
||||
@ -391,13 +449,32 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
|
||||
return false, fmt.Errorf("failed to get node %q: %v", pod.Spec.NodeName, err)
|
||||
}
|
||||
|
||||
for _, binding := range bindings {
|
||||
// Check for any conditions that might require scheduling retry
|
||||
// Check for any conditions that might require scheduling retry
|
||||
|
||||
// Check if pv still exists
|
||||
pv, err := b.pvCache.GetPV(binding.pv.Name)
|
||||
if err != nil || pv == nil {
|
||||
return false, fmt.Errorf("failed to check pv binding: %v", err)
|
||||
// When pod is removed from scheduling queue because of deletion or any
|
||||
// other reasons, binding operation should be cancelled. There is no need
|
||||
// to check PV/PVC bindings any more.
|
||||
// We check pod binding cache here which will be cleared when pod is
|
||||
// removed from scheduling queue.
|
||||
if b.podBindingCache.GetDecisions(pod) == nil {
|
||||
return false, fmt.Errorf("pod %q does not exist any more", podName)
|
||||
}
|
||||
|
||||
for _, binding := range bindings {
|
||||
pv, err := b.pvCache.GetAPIPV(binding.pv.Name)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to check binding: %v", err)
|
||||
}
|
||||
|
||||
pvc, err := b.pvcCache.GetAPIPVC(getPVCName(binding.pvc))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to check binding: %v", err)
|
||||
}
|
||||
|
||||
// Because we updated PV in apiserver, skip if API object is older
|
||||
// and wait for new API object propagated from apiserver.
|
||||
if versioner.CompareResourceVersion(binding.pv, pv) > 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check PV's node affinity (the node might not have the proper label)
|
||||
@ -411,18 +488,21 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
|
||||
}
|
||||
|
||||
// Check if pvc is fully bound
|
||||
if isBound, _, err := b.isPVCBound(binding.pvc.Namespace, binding.pvc.Name); !isBound || err != nil {
|
||||
return false, err
|
||||
if !b.isPVCFullyBound(pvc) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// TODO; what if pvc is bound to the wrong pv? It means our assume cache should be reverted.
|
||||
// Or will pv controller cleanup the pv.ClaimRef?
|
||||
}
|
||||
|
||||
for _, claim := range claimsToProvision {
|
||||
bound, pvc, err := b.isPVCBound(claim.Namespace, claim.Name)
|
||||
if err != nil || pvc == nil {
|
||||
return false, fmt.Errorf("failed to check pvc binding: %v", err)
|
||||
pvc, err := b.pvcCache.GetAPIPVC(getPVCName(claim))
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to check provisioning pvc: %v", err)
|
||||
}
|
||||
|
||||
// Because we updated PVC in apiserver, skip if API object is older
|
||||
// and wait for new API object propagated from apiserver.
|
||||
if versioner.CompareResourceVersion(claim, pvc) > 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check if selectedNode annotation is still set
|
||||
@ -436,7 +516,7 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
|
||||
|
||||
// If the PVC is bound to a PV, check its node affinity
|
||||
if pvc.Spec.VolumeName != "" {
|
||||
pv, err := b.pvCache.GetPV(pvc.Spec.VolumeName)
|
||||
pv, err := b.pvCache.GetAPIPV(pvc.Spec.VolumeName)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get pv %q from cache: %v", pvc.Spec.VolumeName, err)
|
||||
}
|
||||
@ -445,7 +525,8 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
|
||||
}
|
||||
}
|
||||
|
||||
if !bound {
|
||||
// Check if pvc is fully bound
|
||||
if !b.isPVCFullyBound(pvc) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
@ -477,19 +558,21 @@ func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.Persiste
|
||||
return false, nil, fmt.Errorf("error getting PVC %q: %v", pvcKey, err)
|
||||
}
|
||||
|
||||
pvName := pvc.Spec.VolumeName
|
||||
if pvName != "" {
|
||||
if metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted) {
|
||||
klog.V(5).Infof("PVC %q is fully bound to PV %q", pvcKey, pvName)
|
||||
return true, pvc, nil
|
||||
fullyBound := b.isPVCFullyBound(pvc)
|
||||
if fullyBound {
|
||||
klog.V(5).Infof("PVC %q is fully bound to PV %q", pvcKey, pvc.Spec.VolumeName)
|
||||
} else {
|
||||
if pvc.Spec.VolumeName != "" {
|
||||
klog.V(5).Infof("PVC %q is not fully bound to PV %q", pvcKey, pvc.Spec.VolumeName)
|
||||
} else {
|
||||
klog.V(5).Infof("PVC %q is not fully bound to PV %q", pvcKey, pvName)
|
||||
return false, pvc, nil
|
||||
klog.V(5).Infof("PVC %q is not bound", pvcKey)
|
||||
}
|
||||
}
|
||||
return fullyBound, pvc, nil
|
||||
}
|
||||
|
||||
klog.V(5).Infof("PVC %q is not bound", pvcKey)
|
||||
return false, pvc, nil
|
||||
func (b *volumeBinder) isPVCFullyBound(pvc *v1.PersistentVolumeClaim) bool {
|
||||
return pvc.Spec.VolumeName != "" && metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted)
|
||||
}
|
||||
|
||||
// arePodVolumesBound returns true if all volumes are fully bound
|
||||
@ -503,12 +586,12 @@ func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// getPodVolumes returns a pod's PVCs separated into bound (including prebound), unbound with delayed binding,
|
||||
// and unbound with immediate binding
|
||||
func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaims []*bindingInfo, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
|
||||
// getPodVolumes returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning)
|
||||
// and unbound with immediate binding (including prebound)
|
||||
func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaims []*v1.PersistentVolumeClaim, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
|
||||
boundClaims = []*v1.PersistentVolumeClaim{}
|
||||
unboundClaimsImmediate = []*v1.PersistentVolumeClaim{}
|
||||
unboundClaims = []*bindingInfo{}
|
||||
unboundClaims = []*v1.PersistentVolumeClaim{}
|
||||
|
||||
for _, vol := range pod.Spec.Volumes {
|
||||
volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol)
|
||||
@ -521,15 +604,16 @@ func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV
|
||||
if volumeBound {
|
||||
boundClaims = append(boundClaims, pvc)
|
||||
} else {
|
||||
delayBinding, err := b.ctrl.shouldDelayBinding(pvc)
|
||||
delayBindingMode, err := b.ctrl.isDelayBindingMode(pvc)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
// Prebound PVCs are treated as unbound immediate binding
|
||||
if delayBinding && pvc.Spec.VolumeName == "" {
|
||||
if delayBindingMode && pvc.Spec.VolumeName == "" {
|
||||
// Scheduler path
|
||||
unboundClaims = append(unboundClaims, &bindingInfo{pvc: pvc})
|
||||
unboundClaims = append(unboundClaims, pvc)
|
||||
} else {
|
||||
// !delayBindingMode || pvc.Spec.VolumeName != ""
|
||||
// Immediate binding should have already been bound
|
||||
unboundClaimsImmediate = append(unboundClaimsImmediate, pvc)
|
||||
}
|
||||
@ -560,7 +644,7 @@ func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node
|
||||
|
||||
// findMatchingVolumes tries to find matching volumes for given claims,
|
||||
// and return unbound claims for further provision.
|
||||
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, unboundClaims []*v1.PersistentVolumeClaim, err error) {
|
||||
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.PersistentVolumeClaim, node *v1.Node) (foundMatches bool, matchedClaims []*bindingInfo, unboundClaims []*v1.PersistentVolumeClaim, err error) {
|
||||
podName := getPodName(pod)
|
||||
// Sort all the claims by increasing size request to get the smallest fits
|
||||
sort.Sort(byPVCSize(claimsToBind))
|
||||
@ -568,39 +652,34 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI
|
||||
chosenPVs := map[string]*v1.PersistentVolume{}
|
||||
|
||||
foundMatches = true
|
||||
matchedClaims := []*bindingInfo{}
|
||||
matchedClaims = []*bindingInfo{}
|
||||
|
||||
for _, bindingInfo := range claimsToBind {
|
||||
for _, pvc := range claimsToBind {
|
||||
// Get storage class name from each PVC
|
||||
storageClassName := ""
|
||||
storageClass := bindingInfo.pvc.Spec.StorageClassName
|
||||
storageClass := pvc.Spec.StorageClassName
|
||||
if storageClass != nil {
|
||||
storageClassName = *storageClass
|
||||
}
|
||||
allPVs := b.pvCache.ListPVs(storageClassName)
|
||||
pvcName := getPVCName(bindingInfo.pvc)
|
||||
pvcName := getPVCName(pvc)
|
||||
|
||||
// Find a matching PV
|
||||
bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true)
|
||||
pv, err := findMatchingVolume(pvc, allPVs, node, chosenPVs, true)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
return false, nil, nil, err
|
||||
}
|
||||
if bindingInfo.pv == nil {
|
||||
if pv == nil {
|
||||
klog.V(4).Infof("No matching volumes for Pod %q, PVC %q on node %q", podName, pvcName, node.Name)
|
||||
unboundClaims = append(unboundClaims, bindingInfo.pvc)
|
||||
unboundClaims = append(unboundClaims, pvc)
|
||||
foundMatches = false
|
||||
continue
|
||||
}
|
||||
|
||||
// matching PV needs to be excluded so we don't select it again
|
||||
chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv
|
||||
matchedClaims = append(matchedClaims, bindingInfo)
|
||||
klog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", bindingInfo.pv.Name, pvcName, node.Name, podName)
|
||||
}
|
||||
|
||||
// Mark cache with all the matches for each PVC for this node
|
||||
if len(matchedClaims) > 0 {
|
||||
b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims)
|
||||
chosenPVs[pv.Name] = pv
|
||||
matchedClaims = append(matchedClaims, &bindingInfo{pv: pv, pvc: pvc})
|
||||
klog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", pv.Name, pvcName, node.Name, podName)
|
||||
}
|
||||
|
||||
if foundMatches {
|
||||
@ -613,31 +692,31 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI
|
||||
// checkVolumeProvisions checks given unbound claims (the claims have gone through func
|
||||
// findMatchingVolumes, and do not have matching volumes for binding), and return true
|
||||
// if all of the claims are eligible for dynamic provision.
|
||||
func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, err error) {
|
||||
func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, provisionedClaims []*v1.PersistentVolumeClaim, err error) {
|
||||
podName := getPodName(pod)
|
||||
provisionedClaims := []*v1.PersistentVolumeClaim{}
|
||||
provisionedClaims = []*v1.PersistentVolumeClaim{}
|
||||
|
||||
for _, claim := range claimsToProvision {
|
||||
pvcName := getPVCName(claim)
|
||||
className := v1helper.GetPersistentVolumeClaimClass(claim)
|
||||
if className == "" {
|
||||
return false, fmt.Errorf("no class for claim %q", pvcName)
|
||||
return false, nil, fmt.Errorf("no class for claim %q", pvcName)
|
||||
}
|
||||
|
||||
class, err := b.ctrl.classLister.Get(className)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to find storage class %q", className)
|
||||
return false, nil, fmt.Errorf("failed to find storage class %q", className)
|
||||
}
|
||||
provisioner := class.Provisioner
|
||||
if provisioner == "" || provisioner == notSupportedProvisioner {
|
||||
klog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, pvcName)
|
||||
return false, nil
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
// Check if the node can satisfy the topology requirement in the class
|
||||
if !v1helper.MatchTopologySelectorTerms(class.AllowedTopologies, labels.Set(node.Labels)) {
|
||||
klog.V(4).Infof("Node %q cannot satisfy provisioning topology requirements of claim %q", node.Name, pvcName)
|
||||
return false, nil
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
// TODO: Check if capacity of the node domain in the storage class
|
||||
@ -648,10 +727,7 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v
|
||||
}
|
||||
klog.V(4).Infof("Provisioning for claims of pod %q that has no matching volumes on node %q ...", podName, node.Name)
|
||||
|
||||
// Mark cache with all the PVCs that need provisioning for this node
|
||||
b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims)
|
||||
|
||||
return true, nil
|
||||
return true, provisionedClaims, nil
|
||||
}
|
||||
|
||||
func (b *volumeBinder) revertAssumedPVs(bindings []*bindingInfo) {
|
||||
@ -674,7 +750,7 @@ type bindingInfo struct {
|
||||
pv *v1.PersistentVolume
|
||||
}
|
||||
|
||||
type byPVCSize []*bindingInfo
|
||||
type byPVCSize []*v1.PersistentVolumeClaim
|
||||
|
||||
func (a byPVCSize) Len() int {
|
||||
return len(a)
|
||||
@ -685,8 +761,8 @@ func (a byPVCSize) Swap(i, j int) {
|
||||
}
|
||||
|
||||
func (a byPVCSize) Less(i, j int) bool {
|
||||
iSize := a[i].pvc.Spec.Resources.Requests[v1.ResourceStorage]
|
||||
jSize := a[j].pvc.Spec.Resources.Requests[v1.ResourceStorage]
|
||||
iSize := a[i].Spec.Resources.Requests[v1.ResourceStorage]
|
||||
jSize := a[j].Spec.Resources.Requests[v1.ResourceStorage]
|
||||
// return true if iSize is less than jSize
|
||||
return iSize.Cmp(jSize) == -1
|
||||
}
|
||||
|
@ -44,6 +44,9 @@ type PodBindingCache interface {
|
||||
// means that no provisioning operations are needed.
|
||||
GetProvisionedPVCs(pod *v1.Pod, node string) []*v1.PersistentVolumeClaim
|
||||
|
||||
// GetDecisions will return all cached decisions for the given pod.
|
||||
GetDecisions(pod *v1.Pod) nodeDecisions
|
||||
|
||||
// DeleteBindings will remove all cached bindings and provisionings for the given pod.
|
||||
// TODO: separate the func if it is needed to delete bindings/provisionings individually
|
||||
DeleteBindings(pod *v1.Pod)
|
||||
@ -72,6 +75,17 @@ func NewPodBindingCache() PodBindingCache {
|
||||
return &podBindingCache{bindingDecisions: map[string]nodeDecisions{}}
|
||||
}
|
||||
|
||||
func (c *podBindingCache) GetDecisions(pod *v1.Pod) nodeDecisions {
|
||||
c.rwMutex.Lock()
|
||||
defer c.rwMutex.Unlock()
|
||||
podName := getPodName(pod)
|
||||
decisions, ok := c.bindingDecisions[podName]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return decisions
|
||||
}
|
||||
|
||||
func (c *podBindingCache) DeleteBindings(pod *v1.Pod) {
|
||||
c.rwMutex.Lock()
|
||||
defer c.rwMutex.Unlock()
|
||||
|
Loading…
Reference in New Issue
Block a user