Merge pull request #91455 from cofyc/fix88229

Emit correct event when unbound delay binding claim is used by pod
This commit is contained in:
Kubernetes Prow Robot 2020-06-02 18:11:36 -07:00 committed by GitHub
commit 2bf8e27737
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 99 additions and 15 deletions

View File

@ -30,5 +30,6 @@ const (
ProvisioningCleanupFailed = "ProvisioningCleanupFailed" ProvisioningCleanupFailed = "ProvisioningCleanupFailed"
ProvisioningSucceeded = "ProvisioningSucceeded" ProvisioningSucceeded = "ProvisioningSucceeded"
WaitForFirstConsumer = "WaitForFirstConsumer" WaitForFirstConsumer = "WaitForFirstConsumer"
WaitForPodScheduled = "WaitForPodScheduled"
ExternalExpanding = "ExternalExpanding" ExternalExpanding = "ExternalExpanding"
) )

View File

@ -657,6 +657,7 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag
podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for _, pod := range pods { for _, pod := range pods {
podIndexer.Add(pod) podIndexer.Add(pod)
ctrl.podIndexer.Add(pod)
} }
ctrl.podLister = corelisters.NewPodLister(podIndexer) ctrl.podLister = corelisters.NewPodLister(podIndexer)

View File

@ -27,7 +27,6 @@ import (
storage "k8s.io/api/storage/v1" storage "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
@ -157,6 +156,7 @@ type PersistentVolumeController struct {
classListerSynced cache.InformerSynced classListerSynced cache.InformerSynced
podLister corelisters.PodLister podLister corelisters.PodLister
podListerSynced cache.InformerSynced podListerSynced cache.InformerSynced
podIndexer cache.Indexer
NodeLister corelisters.NodeLister NodeLister corelisters.NodeLister
NodeListerSynced cache.InformerSynced NodeListerSynced cache.InformerSynced
@ -296,6 +296,31 @@ func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVo
return nil return nil
} }
// emitEventForUnboundDelayBindingClaim generates informative event for claim
// if it's in delay binding mode and not bound yet.
func (ctrl *PersistentVolumeController) emitEventForUnboundDelayBindingClaim(claim *v1.PersistentVolumeClaim) error {
reason := events.WaitForFirstConsumer
message := "waiting for first consumer to be created before binding"
podNames, err := ctrl.findNonScheduledPodsByPVC(claim)
if err != nil {
return err
}
if len(podNames) > 0 {
reason = events.WaitForPodScheduled
if len(podNames) > 1 {
// Although only one pod is taken into account in
// volume scheduling, more than one pods can reference
// the PVC at the same time. We can't know which pod is
// used in scheduling, all pods are included.
message = fmt.Sprintf("waiting for pods %s to be scheduled", strings.Join(podNames, ","))
} else {
message = fmt.Sprintf("waiting for pod %s to be scheduled", podNames[0])
}
}
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, reason, message)
return nil
}
// syncUnboundClaim is the main controller method to decide what to do with an // syncUnboundClaim is the main controller method to decide what to do with an
// unbound claim. // unbound claim.
func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error { func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error {
@ -320,7 +345,9 @@ func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVol
// OBSERVATION: pvc is "Pending", will retry // OBSERVATION: pvc is "Pending", will retry
switch { switch {
case delayBinding && !pvutil.IsDelayBindingProvisioning(claim): case delayBinding && !pvutil.IsDelayBindingProvisioning(claim):
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.WaitForFirstConsumer, "waiting for first consumer to be created before binding") if err = ctrl.emitEventForUnboundDelayBindingClaim(claim); err != nil {
return err
}
case v1helper.GetPersistentVolumeClaimClass(claim) != "": case v1helper.GetPersistentVolumeClaimClass(claim) != "":
if err = ctrl.provisionClaim(claim); err != nil { if err = ctrl.provisionClaim(claim); err != nil {
return err return err
@ -1294,32 +1321,55 @@ func (ctrl *PersistentVolumeController) isVolumeReleased(volume *v1.PersistentVo
return true, nil return true, nil
} }
func (ctrl *PersistentVolumeController) findPodsByPVCKey(key string) ([]*v1.Pod, error) {
pods := []*v1.Pod{}
objs, err := ctrl.podIndexer.ByIndex(pvcKeyIndex, key)
if err != nil {
return pods, err
}
for _, obj := range objs {
pod, ok := obj.(*v1.Pod)
if !ok {
continue
}
pods = append(pods, pod)
}
return pods, err
}
// isVolumeUsed returns list of pods that use given PV. // isVolumeUsed returns list of pods that use given PV.
func (ctrl *PersistentVolumeController) isVolumeUsed(pv *v1.PersistentVolume) ([]string, bool, error) { func (ctrl *PersistentVolumeController) isVolumeUsed(pv *v1.PersistentVolume) ([]string, bool, error) {
if pv.Spec.ClaimRef == nil { if pv.Spec.ClaimRef == nil {
return nil, false, nil return nil, false, nil
} }
claimName := pv.Spec.ClaimRef.Name
podNames := sets.NewString() podNames := sets.NewString()
pods, err := ctrl.podLister.Pods(pv.Spec.ClaimRef.Namespace).List(labels.Everything()) pvcKey := fmt.Sprintf("%s/%s", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)
pods, err := ctrl.findPodsByPVCKey(pvcKey)
if err != nil { if err != nil {
return nil, false, fmt.Errorf("error listing pods: %s", err) return nil, false, fmt.Errorf("error finding pods by pvc %q: %s", pvcKey, err)
} }
for _, pod := range pods { for _, pod := range pods {
if util.IsPodTerminated(pod, pod.Status) { podNames.Insert(pod.Namespace + "/" + pod.Name)
continue
}
for i := range pod.Spec.Volumes {
usedPV := &pod.Spec.Volumes[i]
if usedPV.PersistentVolumeClaim != nil && usedPV.PersistentVolumeClaim.ClaimName == claimName {
podNames.Insert(pod.Namespace + "/" + pod.Name)
}
}
} }
return podNames.List(), podNames.Len() != 0, nil return podNames.List(), podNames.Len() != 0, nil
} }
// findNonScheduledPodsByPVC returns list of non-scheduled pods that reference given PVC.
func (ctrl *PersistentVolumeController) findNonScheduledPodsByPVC(pvc *v1.PersistentVolumeClaim) ([]string, error) {
pvcKey := fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name)
pods, err := ctrl.findPodsByPVCKey(pvcKey)
if err != nil {
return nil, err
}
podNames := []string{}
for _, pod := range pods {
if len(pod.Spec.NodeName) == 0 {
podNames = append(podNames, pod.Name)
}
}
return podNames, nil
}
// doDeleteVolume finds appropriate delete plugin and deletes given volume, returning // doDeleteVolume finds appropriate delete plugin and deletes given volume, returning
// the volume plugin name. Also, it returns 'true', when the volume was deleted and // the volume plugin name. Also, it returns 'true', when the volume was deleted and
// 'false' when the volume cannot be deleted because the deleter is external. No // 'false' when the volume cannot be deleted because the deleter is external. No

View File

@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/util/goroutinemap" "k8s.io/kubernetes/pkg/util/goroutinemap"
vol "k8s.io/kubernetes/pkg/volume" vol "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csimigration" "k8s.io/kubernetes/pkg/volume/csimigration"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@ -126,10 +127,19 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
controller.classLister = p.ClassInformer.Lister() controller.classLister = p.ClassInformer.Lister()
controller.classListerSynced = p.ClassInformer.Informer().HasSynced controller.classListerSynced = p.ClassInformer.Informer().HasSynced
controller.podLister = p.PodInformer.Lister() controller.podLister = p.PodInformer.Lister()
controller.podIndexer = p.PodInformer.Informer().GetIndexer()
controller.podListerSynced = p.PodInformer.Informer().HasSynced controller.podListerSynced = p.PodInformer.Informer().HasSynced
controller.NodeLister = p.NodeInformer.Lister() controller.NodeLister = p.NodeInformer.Lister()
controller.NodeListerSynced = p.NodeInformer.Informer().HasSynced controller.NodeListerSynced = p.NodeInformer.Informer().HasSynced
// This custom indexer will index pods by its PVC keys. Then we don't need
// to iterate all pods every time to find pods which reference given PVC.
if err := controller.podIndexer.AddIndexers(cache.Indexers{
pvcKeyIndex: indexByPVCKey,
}); err != nil {
return nil, fmt.Errorf("Could not initialize PersistentVolume Controller: %v", err)
}
csiTranslator := csitrans.New() csiTranslator := csitrans.New()
controller.translator = csiTranslator controller.translator = csiTranslator
controller.csiMigratedPluginManager = csimigration.NewPluginManager(csiTranslator) controller.csiMigratedPluginManager = csimigration.NewPluginManager(csiTranslator)
@ -559,6 +569,28 @@ func (ctrl *PersistentVolumeController) setClaimProvisioner(claim *v1.Persistent
// Stateless functions // Stateless functions
const (
pvcKeyIndex string = "pvc-key-index"
)
// indexByPVCKey returns PVC keys for given pod
func indexByPVCKey(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
if util.IsPodTerminated(pod, pod.Status) {
return []string{}, nil
}
keys := []string{}
for _, podVolume := range pod.Spec.Volumes {
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName))
}
}
return keys, nil
}
func getClaimStatusForLogging(claim *v1.PersistentVolumeClaim) string { func getClaimStatusForLogging(claim *v1.PersistentVolumeClaim) string {
bound := metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBindCompleted) bound := metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBindCompleted)
boundByController := metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController) boundByController := metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController)