From 32df4300ef61a58c196f3738fcb4750fc2f94573 Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Tue, 26 May 2020 21:39:12 +0800 Subject: [PATCH] emit correct event when unbound delay binding claim is used by pod --- pkg/controller/volume/events/event.go | 1 + .../volume/persistentvolume/framework_test.go | 1 + .../volume/persistentvolume/pv_controller.go | 80 +++++++++++++++---- .../persistentvolume/pv_controller_base.go | 32 ++++++++ 4 files changed, 99 insertions(+), 15 deletions(-) diff --git a/pkg/controller/volume/events/event.go b/pkg/controller/volume/events/event.go index 1229403043f..06aaef249bf 100644 --- a/pkg/controller/volume/events/event.go +++ b/pkg/controller/volume/events/event.go @@ -30,5 +30,6 @@ const ( ProvisioningCleanupFailed = "ProvisioningCleanupFailed" ProvisioningSucceeded = "ProvisioningSucceeded" WaitForFirstConsumer = "WaitForFirstConsumer" + WaitForPodScheduled = "WaitForPodScheduled" ExternalExpanding = "ExternalExpanding" ) diff --git a/pkg/controller/volume/persistentvolume/framework_test.go b/pkg/controller/volume/persistentvolume/framework_test.go index e98032ede1c..4f6d556583f 100644 --- a/pkg/controller/volume/persistentvolume/framework_test.go +++ b/pkg/controller/volume/persistentvolume/framework_test.go @@ -657,6 +657,7 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) for _, pod := range pods { podIndexer.Add(pod) + ctrl.podIndexer.Add(pod) } ctrl.podLister = corelisters.NewPodLister(podIndexer) diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index 4a64187d835..87f4c69fbec 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -27,7 +27,6 @@ import ( storage "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" @@ -157,6 +156,7 @@ type PersistentVolumeController struct { classListerSynced cache.InformerSynced podLister corelisters.PodLister podListerSynced cache.InformerSynced + podIndexer cache.Indexer NodeLister corelisters.NodeLister NodeListerSynced cache.InformerSynced @@ -296,6 +296,31 @@ func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVo return nil } +// emitEventForUnboundDelayBindingClaim generates informative event for claim +// if it's in delay binding mode and not bound yet. +func (ctrl *PersistentVolumeController) emitEventForUnboundDelayBindingClaim(claim *v1.PersistentVolumeClaim) error { + reason := events.WaitForFirstConsumer + message := "waiting for first consumer to be created before binding" + podNames, err := ctrl.findNonScheduledPodsByPVC(claim) + if err != nil { + return err + } + if len(podNames) > 0 { + reason = events.WaitForPodScheduled + if len(podNames) > 1 { + // Although only one pod is taken into account in + // volume scheduling, more than one pods can reference + // the PVC at the same time. We can't know which pod is + // used in scheduling, all pods are included. + message = fmt.Sprintf("waiting for pods %s to be scheduled", strings.Join(podNames, ",")) + } else { + message = fmt.Sprintf("waiting for pod %s to be scheduled", podNames[0]) + } + } + ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, reason, message) + return nil +} + // syncUnboundClaim is the main controller method to decide what to do with an // unbound claim. func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error { @@ -320,7 +345,9 @@ func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVol // OBSERVATION: pvc is "Pending", will retry switch { case delayBinding && !pvutil.IsDelayBindingProvisioning(claim): - ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.WaitForFirstConsumer, "waiting for first consumer to be created before binding") + if err = ctrl.emitEventForUnboundDelayBindingClaim(claim); err != nil { + return err + } case v1helper.GetPersistentVolumeClaimClass(claim) != "": if err = ctrl.provisionClaim(claim); err != nil { return err @@ -1294,32 +1321,55 @@ func (ctrl *PersistentVolumeController) isVolumeReleased(volume *v1.PersistentVo return true, nil } +func (ctrl *PersistentVolumeController) findPodsByPVCKey(key string) ([]*v1.Pod, error) { + pods := []*v1.Pod{} + objs, err := ctrl.podIndexer.ByIndex(pvcKeyIndex, key) + if err != nil { + return pods, err + } + for _, obj := range objs { + pod, ok := obj.(*v1.Pod) + if !ok { + continue + } + pods = append(pods, pod) + } + return pods, err +} + // isVolumeUsed returns list of pods that use given PV. func (ctrl *PersistentVolumeController) isVolumeUsed(pv *v1.PersistentVolume) ([]string, bool, error) { if pv.Spec.ClaimRef == nil { return nil, false, nil } - claimName := pv.Spec.ClaimRef.Name - podNames := sets.NewString() - pods, err := ctrl.podLister.Pods(pv.Spec.ClaimRef.Namespace).List(labels.Everything()) + pvcKey := fmt.Sprintf("%s/%s", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name) + pods, err := ctrl.findPodsByPVCKey(pvcKey) if err != nil { - return nil, false, fmt.Errorf("error listing pods: %s", err) + return nil, false, fmt.Errorf("error finding pods by pvc %q: %s", pvcKey, err) } for _, pod := range pods { - if util.IsPodTerminated(pod, pod.Status) { - continue - } - for i := range pod.Spec.Volumes { - usedPV := &pod.Spec.Volumes[i] - if usedPV.PersistentVolumeClaim != nil && usedPV.PersistentVolumeClaim.ClaimName == claimName { - podNames.Insert(pod.Namespace + "/" + pod.Name) - } - } + podNames.Insert(pod.Namespace + "/" + pod.Name) } return podNames.List(), podNames.Len() != 0, nil } +// findNonScheduledPodsByPVC returns list of non-scheduled pods that reference given PVC. +func (ctrl *PersistentVolumeController) findNonScheduledPodsByPVC(pvc *v1.PersistentVolumeClaim) ([]string, error) { + pvcKey := fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name) + pods, err := ctrl.findPodsByPVCKey(pvcKey) + if err != nil { + return nil, err + } + podNames := []string{} + for _, pod := range pods { + if len(pod.Spec.NodeName) == 0 { + podNames = append(podNames, pod.Name) + } + } + return podNames, nil +} + // doDeleteVolume finds appropriate delete plugin and deletes given volume, returning // the volume plugin name. Also, it returns 'true', when the volume was deleted and // 'false' when the volume cannot be deleted because the deleter is external. No diff --git a/pkg/controller/volume/persistentvolume/pv_controller_base.go b/pkg/controller/volume/persistentvolume/pv_controller_base.go index 9d53e980758..a1b2730f38b 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_base.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_base.go @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/util/goroutinemap" vol "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csimigration" + "k8s.io/kubernetes/pkg/volume/util" "k8s.io/klog/v2" ) @@ -126,10 +127,19 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error) controller.classLister = p.ClassInformer.Lister() controller.classListerSynced = p.ClassInformer.Informer().HasSynced controller.podLister = p.PodInformer.Lister() + controller.podIndexer = p.PodInformer.Informer().GetIndexer() controller.podListerSynced = p.PodInformer.Informer().HasSynced controller.NodeLister = p.NodeInformer.Lister() controller.NodeListerSynced = p.NodeInformer.Informer().HasSynced + // This custom indexer will index pods by its PVC keys. Then we don't need + // to iterate all pods every time to find pods which reference given PVC. + if err := controller.podIndexer.AddIndexers(cache.Indexers{ + pvcKeyIndex: indexByPVCKey, + }); err != nil { + return nil, fmt.Errorf("Could not initialize PersistentVolume Controller: %v", err) + } + csiTranslator := csitrans.New() controller.translator = csiTranslator controller.csiMigratedPluginManager = csimigration.NewPluginManager(csiTranslator) @@ -559,6 +569,28 @@ func (ctrl *PersistentVolumeController) setClaimProvisioner(claim *v1.Persistent // Stateless functions +const ( + pvcKeyIndex string = "pvc-key-index" +) + +// indexByPVCKey returns PVC keys for given pod +func indexByPVCKey(obj interface{}) ([]string, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return []string{}, nil + } + if util.IsPodTerminated(pod, pod.Status) { + return []string{}, nil + } + keys := []string{} + for _, podVolume := range pod.Spec.Volumes { + if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil { + keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName)) + } + } + return keys, nil +} + func getClaimStatusForLogging(claim *v1.PersistentVolumeClaim) string { bound := metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBindCompleted) boundByController := metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController)