From 3eb6c4d368e7f1fe4663b082b670306bf6352100 Mon Sep 17 00:00:00 2001 From: Mengjiao Liu Date: Thu, 21 Sep 2023 11:26:28 +0800 Subject: [PATCH] Migrated `pkg/scheduler/framework/plugins/volumebinding` to contextual logging --- .../plugins/volumebinding/assume_cache.go | 53 +++++--- .../volumebinding/assume_cache_test.go | 25 ++-- .../framework/plugins/volumebinding/binder.go | 123 +++++++++--------- .../plugins/volumebinding/binder_test.go | 108 ++++++++------- .../plugins/volumebinding/fake_binder.go | 9 +- .../plugins/volumebinding/volume_binding.go | 21 +-- 6 files changed, 190 insertions(+), 149 deletions(-) diff --git a/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go b/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go index 5b512776221..77b78d172b1 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go +++ b/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go @@ -87,6 +87,10 @@ func (e *errObjectName) Error() string { // Restore() sets the latest object pointer back to the informer object. // Get/List() always returns the latest object pointer. type assumeCache struct { + // The logger that was chosen when setting up the cache. + // Will be used for all operations. + logger klog.Logger + // Synchronizes updates to store rwMutex sync.RWMutex @@ -129,8 +133,9 @@ func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) { } // NewAssumeCache creates an assume cache for general objects. -func NewAssumeCache(informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache { +func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache { c := &assumeCache{ + logger: logger, description: description, indexFunc: indexFunc, indexName: indexName, @@ -161,7 +166,7 @@ func (c *assumeCache) add(obj interface{}) { name, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { - klog.ErrorS(&errObjectName{err}, "Add failed") + c.logger.Error(&errObjectName{err}, "Add failed") return } @@ -171,29 +176,29 @@ func (c *assumeCache) add(obj interface{}) { if objInfo, _ := c.getObjInfo(name); objInfo != nil { newVersion, err := c.getObjVersion(name, obj) if err != nil { - klog.ErrorS(err, "Add failed: couldn't get object version") + c.logger.Error(err, "Add failed: couldn't get object version") return } storedVersion, err := c.getObjVersion(name, objInfo.latestObj) if err != nil { - klog.ErrorS(err, "Add failed: couldn't get stored object version") + c.logger.Error(err, "Add failed: couldn't get stored object version") return } // Only update object if version is newer. // This is so we don't override assumed objects due to informer resync. if newVersion <= storedVersion { - klog.V(10).InfoS("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion) + c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion) return } } objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj} if err = c.store.Update(objInfo); err != nil { - klog.InfoS("Error occurred while updating stored object", "err", err) + c.logger.Info("Error occurred while updating stored object", "err", err) } else { - klog.V(10).InfoS("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj) + c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj) } } @@ -208,7 +213,7 @@ func (c *assumeCache) delete(obj interface{}) { name, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { - klog.ErrorS(&errObjectName{err}, "Failed to delete") + c.logger.Error(&errObjectName{err}, "Failed to delete") return } @@ -218,7 +223,7 @@ func (c *assumeCache) delete(obj interface{}) { objInfo := &objInfo{name: name} err = c.store.Delete(objInfo) if err != nil { - klog.ErrorS(err, "Failed to delete", "description", c.description, "cacheKey", name) + c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name) } } @@ -280,14 +285,14 @@ func (c *assumeCache) List(indexObj interface{}) []interface{} { allObjs := []interface{}{} objs, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj}) if err != nil { - klog.ErrorS(err, "List index error") + c.logger.Error(err, "List index error") return nil } for _, obj := range objs { objInfo, ok := obj.(*objInfo) if !ok { - klog.ErrorS(&errWrongType{"objInfo", obj}, "List error") + c.logger.Error(&errWrongType{"objInfo", obj}, "List error") continue } allObjs = append(allObjs, objInfo.latestObj) @@ -325,7 +330,7 @@ func (c *assumeCache) Assume(obj interface{}) error { // Only update the cached object objInfo.latestObj = obj - klog.V(4).InfoS("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion) + c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion) return nil } @@ -336,10 +341,10 @@ func (c *assumeCache) Restore(objName string) { objInfo, err := c.getObjInfo(objName) if err != nil { // This could be expected if object got deleted - klog.V(5).InfoS("Restore object", "description", c.description, "cacheKey", objName, "err", err) + c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err) } else { objInfo.latestObj = objInfo.apiObj - klog.V(4).InfoS("Restored object", "description", c.description, "cacheKey", objName) + c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName) } } @@ -354,6 +359,7 @@ type PVAssumeCache interface { type pvAssumeCache struct { AssumeCache + logger klog.Logger } func pvStorageClassIndexFunc(obj interface{}) ([]string, error) { @@ -364,8 +370,12 @@ func pvStorageClassIndexFunc(obj interface{}) ([]string, error) { } // NewPVAssumeCache creates a PV assume cache. -func NewPVAssumeCache(informer cache.SharedIndexInformer) PVAssumeCache { - return &pvAssumeCache{NewAssumeCache(informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc)} +func NewPVAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVAssumeCache { + logger = klog.LoggerWithName(logger, "PV Cache") + return &pvAssumeCache{ + AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc), + logger: logger, + } } func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { @@ -403,7 +413,7 @@ func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume for _, obj := range objs { pv, ok := obj.(*v1.PersistentVolume) if !ok { - klog.ErrorS(&errWrongType{"v1.PersistentVolume", obj}, "ListPVs") + c.logger.Error(&errWrongType{"v1.PersistentVolume", obj}, "ListPVs") continue } pvs = append(pvs, pv) @@ -423,11 +433,16 @@ type PVCAssumeCache interface { type pvcAssumeCache struct { AssumeCache + logger klog.Logger } // NewPVCAssumeCache creates a PVC assume cache. -func NewPVCAssumeCache(informer cache.SharedIndexInformer) PVCAssumeCache { - return &pvcAssumeCache{NewAssumeCache(informer, "v1.PersistentVolumeClaim", "", nil)} +func NewPVCAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVCAssumeCache { + logger = klog.LoggerWithName(logger, "PVC Cache") + return &pvcAssumeCache{ + AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil), + logger: logger, + } } func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { diff --git a/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go b/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go index 49fcb1cee85..7391a412f89 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/assume_cache_test.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/component-helpers/storage/volume" + "k8s.io/klog/v2/ktesting" ) func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) { @@ -53,6 +54,7 @@ func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume) } func TestAssumePV(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) scenarios := map[string]struct { oldPV *v1.PersistentVolume newPV *v1.PersistentVolume @@ -96,7 +98,7 @@ func TestAssumePV(t *testing.T) { } for name, scenario := range scenarios { - cache := NewPVAssumeCache(nil) + cache := NewPVAssumeCache(logger, nil) internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) if !ok { t.Fatalf("Failed to get internal cache") @@ -130,7 +132,8 @@ func TestAssumePV(t *testing.T) { } func TestRestorePV(t *testing.T) { - cache := NewPVAssumeCache(nil) + logger, _ := ktesting.NewTestContext(t) + cache := NewPVAssumeCache(logger, nil) internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) if !ok { t.Fatalf("Failed to get internal cache") @@ -170,7 +173,8 @@ func TestRestorePV(t *testing.T) { } func TestBasicPVCache(t *testing.T) { - cache := NewPVAssumeCache(nil) + logger, _ := ktesting.NewTestContext(t) + cache := NewPVAssumeCache(logger, nil) internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) if !ok { t.Fatalf("Failed to get internal cache") @@ -214,7 +218,8 @@ func TestBasicPVCache(t *testing.T) { } func TestPVCacheWithStorageClasses(t *testing.T) { - cache := NewPVAssumeCache(nil) + logger, _ := ktesting.NewTestContext(t) + cache := NewPVAssumeCache(logger, nil) internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) if !ok { t.Fatalf("Failed to get internal cache") @@ -260,7 +265,8 @@ func TestPVCacheWithStorageClasses(t *testing.T) { } func TestAssumeUpdatePVCache(t *testing.T) { - cache := NewPVAssumeCache(nil) + logger, _ := ktesting.NewTestContext(t) + cache := NewPVAssumeCache(logger, nil) internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache) if !ok { t.Fatalf("Failed to get internal cache") @@ -315,6 +321,7 @@ func verifyPVC(cache PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVo } func TestAssumePVC(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) scenarios := map[string]struct { oldPVC *v1.PersistentVolumeClaim newPVC *v1.PersistentVolumeClaim @@ -353,7 +360,7 @@ func TestAssumePVC(t *testing.T) { } for name, scenario := range scenarios { - cache := NewPVCAssumeCache(nil) + cache := NewPVCAssumeCache(logger, nil) internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) if !ok { t.Fatalf("Failed to get internal cache") @@ -387,7 +394,8 @@ func TestAssumePVC(t *testing.T) { } func TestRestorePVC(t *testing.T) { - cache := NewPVCAssumeCache(nil) + logger, _ := ktesting.NewTestContext(t) + cache := NewPVCAssumeCache(logger, nil) internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) if !ok { t.Fatalf("Failed to get internal cache") @@ -427,7 +435,8 @@ func TestRestorePVC(t *testing.T) { } func TestAssumeUpdatePVCCache(t *testing.T) { - cache := NewPVCAssumeCache(nil) + logger, _ := ktesting.NewTestContext(t) + cache := NewPVCAssumeCache(logger, nil) internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache) if !ok { t.Fatalf("Failed to get internal cache") diff --git a/pkg/scheduler/framework/plugins/volumebinding/binder.go b/pkg/scheduler/framework/plugins/volumebinding/binder.go index d035b16721b..f6ce916c6bf 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/binder.go +++ b/pkg/scheduler/framework/plugins/volumebinding/binder.go @@ -149,7 +149,7 @@ type InTreeToCSITranslator interface { type SchedulerVolumeBinder interface { // GetPodVolumeClaims returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning), // unbound with immediate binding (including prebound) and PVs that belong to storage classes of unbound PVCs with delayed binding. - GetPodVolumeClaims(pod *v1.Pod) (podVolumeClaims *PodVolumeClaims, err error) + GetPodVolumeClaims(logger klog.Logger, pod *v1.Pod) (podVolumeClaims *PodVolumeClaims, err error) // GetEligibleNodes checks the existing bound claims of the pod to determine if the list of nodes can be // potentially reduced down to a subset of eligible nodes based on the bound claims which then can be used @@ -157,7 +157,7 @@ type SchedulerVolumeBinder interface { // // If eligibleNodes is 'nil', then it indicates that such eligible node reduction cannot be made // and all nodes should be considered. - GetEligibleNodes(boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.Set[string]) + GetEligibleNodes(logger klog.Logger, boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.Set[string]) // FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the // node and returns pod's volumes information. @@ -172,7 +172,7 @@ type SchedulerVolumeBinder interface { // for volumes that still need to be created. // // This function is called by the scheduler VolumeBinding plugin and can be called in parallel - FindPodVolumes(pod *v1.Pod, podVolumeClaims *PodVolumeClaims, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) + FindPodVolumes(logger klog.Logger, pod *v1.Pod, podVolumeClaims *PodVolumeClaims, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) // AssumePodVolumes will: // 1. Take the PV matches for unbound PVCs and update the PV cache assuming @@ -183,7 +183,7 @@ type SchedulerVolumeBinder interface { // It returns true if all volumes are fully bound // // This function is called serially. - AssumePodVolumes(assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error) + AssumePodVolumes(logger klog.Logger, assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error) // RevertAssumedPodVolumes will revert assumed PV and PVC cache. RevertAssumedPodVolumes(podVolumes *PodVolumes) @@ -244,6 +244,7 @@ type CapacityCheck struct { // // capacityCheck determines how storage capacity is checked (CSIStorageCapacity feature). func NewVolumeBinder( + logger klog.Logger, kubeClient clientset.Interface, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, @@ -259,8 +260,8 @@ func NewVolumeBinder( classLister: storageClassInformer.Lister(), nodeLister: nodeInformer.Lister(), csiNodeLister: csiNodeInformer.Lister(), - pvcCache: NewPVCAssumeCache(pvcInformer.Informer()), - pvCache: NewPVAssumeCache(pvInformer.Informer()), + pvcCache: NewPVCAssumeCache(logger, pvcInformer.Informer()), + pvCache: NewPVAssumeCache(logger, pvInformer.Informer()), bindTimeout: bindTimeout, translator: csitrans.New(), } @@ -274,11 +275,11 @@ func NewVolumeBinder( // FindPodVolumes finds the matching PVs for PVCs and nodes to provision PVs // for the given pod and node. If the node does not fit, conflict reasons are // returned. -func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, podVolumeClaims *PodVolumeClaims, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) { +func (b *volumeBinder) FindPodVolumes(logger klog.Logger, pod *v1.Pod, podVolumeClaims *PodVolumeClaims, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) { podVolumes = &PodVolumes{} // Warning: Below log needs high verbosity as it can be printed several times (#60933). - klog.V(5).InfoS("FindPodVolumes", "pod", klog.KObj(pod), "node", klog.KObj(node)) + logger.V(5).Info("FindPodVolumes", "pod", klog.KObj(pod), "node", klog.KObj(node)) // Initialize to true for pods that don't have volumes. These // booleans get translated into reason strings when the function @@ -330,7 +331,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, podVolumeClaims *PodVolumeCla // Check PV node affinity on bound volumes if len(podVolumeClaims.boundClaims) > 0 { - boundVolumesSatisfied, boundPVsFound, err = b.checkBoundClaims(podVolumeClaims.boundClaims, node, pod) + boundVolumesSatisfied, boundPVsFound, err = b.checkBoundClaims(logger, podVolumeClaims.boundClaims, node, pod) if err != nil { return } @@ -360,7 +361,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, podVolumeClaims *PodVolumeCla // Find matching volumes if len(claimsToFindMatching) > 0 { var unboundClaims []*v1.PersistentVolumeClaim - unboundVolumesSatisfied, staticBindings, unboundClaims, err = b.findMatchingVolumes(pod, claimsToFindMatching, podVolumeClaims.unboundVolumesDelayBinding, node) + unboundVolumesSatisfied, staticBindings, unboundClaims, err = b.findMatchingVolumes(logger, pod, claimsToFindMatching, podVolumeClaims.unboundVolumesDelayBinding, node) if err != nil { return } @@ -370,7 +371,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, podVolumeClaims *PodVolumeCla // Check for claims to provision. This is the first time where we potentially // find out that storage is not sufficient for the node. if len(claimsToProvision) > 0 { - unboundVolumesSatisfied, sufficientStorage, dynamicProvisions, err = b.checkVolumeProvisions(pod, claimsToProvision, node) + unboundVolumesSatisfied, sufficientStorage, dynamicProvisions, err = b.checkVolumeProvisions(logger, pod, claimsToProvision, node) if err != nil { return } @@ -386,7 +387,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, podVolumeClaims *PodVolumeCla // // Returning 'nil' for eligibleNodes indicates that such eligible node reduction cannot be made and all nodes // should be considered. -func (b *volumeBinder) GetEligibleNodes(boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.Set[string]) { +func (b *volumeBinder) GetEligibleNodes(logger klog.Logger, boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.Set[string]) { if len(boundClaims) == 0 { return } @@ -419,12 +420,12 @@ func (b *volumeBinder) GetEligibleNodes(boundClaims []*v1.PersistentVolumeClaim) } if len(errs) > 0 { - klog.V(4).InfoS("GetEligibleNodes: one or more error occurred finding eligible nodes", "error", errs) + logger.V(4).Info("GetEligibleNodes: one or more error occurred finding eligible nodes", "error", errs) return nil } if eligibleNodes != nil { - klog.V(4).InfoS("GetEligibleNodes: reduced down eligible nodes", "nodes", eligibleNodes) + logger.V(4).Info("GetEligibleNodes: reduced down eligible nodes", "nodes", eligibleNodes) } return } @@ -434,16 +435,16 @@ func (b *volumeBinder) GetEligibleNodes(boundClaims []*v1.PersistentVolumeClaim) // 1. Update the pvCache with the new prebound PV. // 2. Update the pvcCache with the new PVCs with annotations set // 3. Update PodVolumes again with cached API updates for PVs and PVCs. -func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error) { - klog.V(4).InfoS("AssumePodVolumes", "pod", klog.KObj(assumedPod), "node", klog.KRef("", nodeName)) +func (b *volumeBinder) AssumePodVolumes(logger klog.Logger, assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error) { + logger.V(4).Info("AssumePodVolumes", "pod", klog.KObj(assumedPod), "node", klog.KRef("", nodeName)) defer func() { if err != nil { metrics.VolumeSchedulingStageFailed.WithLabelValues("assume").Inc() } }() - if allBound := b.arePodVolumesBound(assumedPod); allBound { - klog.V(4).InfoS("AssumePodVolumes: all PVCs bound and nothing to do", "pod", klog.KObj(assumedPod), "node", klog.KRef("", nodeName)) + if allBound := b.arePodVolumesBound(logger, assumedPod); allBound { + logger.V(4).Info("AssumePodVolumes: all PVCs bound and nothing to do", "pod", klog.KObj(assumedPod), "node", klog.KRef("", nodeName)) return true, nil } @@ -451,7 +452,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string, pod newBindings := []*BindingInfo{} for _, binding := range podVolumes.StaticBindings { newPV, dirty, err := volume.GetBindVolumeToClaim(binding.pv, binding.pvc) - klog.V(5).InfoS("AssumePodVolumes: GetBindVolumeToClaim", + logger.V(5).Info("AssumePodVolumes: GetBindVolumeToClaim", "pod", klog.KObj(assumedPod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc), @@ -459,7 +460,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string, pod "dirty", dirty, ) if err != nil { - klog.ErrorS(err, "AssumePodVolumes: fail to GetBindVolumeToClaim") + logger.Error(err, "AssumePodVolumes: fail to GetBindVolumeToClaim") b.revertAssumedPVs(newBindings) return false, err } @@ -506,7 +507,8 @@ func (b *volumeBinder) RevertAssumedPodVolumes(podVolumes *PodVolumes) { // makes the API update for those PVs/PVCs, and waits for the PVCs to be completely bound // by the PV controller. func (b *volumeBinder) BindPodVolumes(ctx context.Context, assumedPod *v1.Pod, podVolumes *PodVolumes) (err error) { - klog.V(4).InfoS("BindPodVolumes", "pod", klog.KObj(assumedPod), "node", klog.KRef("", assumedPod.Spec.NodeName)) + logger := klog.FromContext(ctx) + logger.V(4).Info("BindPodVolumes", "pod", klog.KObj(assumedPod), "node", klog.KRef("", assumedPod.Spec.NodeName)) defer func() { if err != nil { @@ -524,7 +526,7 @@ func (b *volumeBinder) BindPodVolumes(ctx context.Context, assumedPod *v1.Pod, p } err = wait.PollUntilContextTimeout(ctx, time.Second, b.bindTimeout, false, func(ctx context.Context) (bool, error) { - b, err := b.checkBindings(assumedPod, bindings, claimsToProvision) + b, err := b.checkBindings(logger, assumedPod, bindings, claimsToProvision) return b, err }) if err != nil { @@ -543,6 +545,7 @@ func getPVCName(pvc *v1.PersistentVolumeClaim) string { // bindAPIUpdate makes the API update for those PVs/PVCs. func (b *volumeBinder) bindAPIUpdate(ctx context.Context, pod *v1.Pod, bindings []*BindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) error { + logger := klog.FromContext(ctx) podName := getPodName(pod) if bindings == nil { return fmt.Errorf("failed to get cached bindings for pod %q", podName) @@ -574,14 +577,14 @@ func (b *volumeBinder) bindAPIUpdate(ctx context.Context, pod *v1.Pod, bindings // There is no API rollback if the actual binding fails for _, binding = range bindings { // TODO: does it hurt if we make an api call and nothing needs to be updated? - klog.V(5).InfoS("Updating PersistentVolume: binding to claim", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc)) + logger.V(5).Info("Updating PersistentVolume: binding to claim", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc)) newPV, err := b.kubeClient.CoreV1().PersistentVolumes().Update(ctx, binding.pv, metav1.UpdateOptions{}) if err != nil { - klog.V(4).InfoS("Updating PersistentVolume: binding to claim failed", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc), "err", err) + logger.V(4).Info("Updating PersistentVolume: binding to claim failed", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc), "err", err) return err } - klog.V(2).InfoS("Updated PersistentVolume with claim. Waiting for binding to complete", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc)) + logger.V(2).Info("Updated PersistentVolume with claim. Waiting for binding to complete", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc)) // Save updated object from apiserver for later checking. binding.pv = newPV lastProcessedBinding++ @@ -590,10 +593,10 @@ func (b *volumeBinder) bindAPIUpdate(ctx context.Context, pod *v1.Pod, bindings // Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest // PV controller is expected to signal back by removing related annotations if actual provisioning fails for i, claim = range claimsToProvision { - klog.V(5).InfoS("Updating claims objects to trigger volume provisioning", "pod", klog.KObj(pod), "PVC", klog.KObj(claim)) + logger.V(5).Info("Updating claims objects to trigger volume provisioning", "pod", klog.KObj(pod), "PVC", klog.KObj(claim)) newClaim, err := b.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}) if err != nil { - klog.V(4).InfoS("Updating PersistentVolumeClaim: binding to volume failed", "PVC", klog.KObj(claim), "err", err) + logger.V(4).Info("Updating PersistentVolumeClaim: binding to volume failed", "PVC", klog.KObj(claim), "err", err) return err } @@ -619,7 +622,7 @@ var ( // PV/PVC cache can be assumed again in main scheduler loop, we must check // latest state in API server which are shared with PV controller and // provisioners -func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) (bool, error) { +func (b *volumeBinder) checkBindings(logger klog.Logger, pod *v1.Pod, bindings []*BindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) (bool, error) { podName := getPodName(pod) if bindings == nil { return false, fmt.Errorf("failed to get cached bindings for pod %q", podName) @@ -636,7 +639,7 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claim csiNode, err := b.csiNodeLister.Get(node.Name) if err != nil { // TODO: return the error once CSINode is created by default - klog.V(4).InfoS("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err) + logger.V(4).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err) } // Check for any conditions that might require scheduling retry @@ -648,7 +651,7 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claim if apierrors.IsNotFound(err) { return false, fmt.Errorf("pod does not exist any more: %w", err) } - klog.ErrorS(err, "Failed to get pod from the lister", "pod", klog.KObj(pod)) + logger.Error(err, "Failed to get pod from the lister", "pod", klog.KObj(pod)) } for _, binding := range bindings { @@ -744,11 +747,11 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claim } // All pvs and pvcs that we operated on are bound - klog.V(2).InfoS("All PVCs for pod are bound", "pod", klog.KObj(pod)) + logger.V(2).Info("All PVCs for pod are bound", "pod", klog.KObj(pod)) return true, nil } -func (b *volumeBinder) isVolumeBound(pod *v1.Pod, vol *v1.Volume) (bound bool, pvc *v1.PersistentVolumeClaim, err error) { +func (b *volumeBinder) isVolumeBound(logger klog.Logger, pod *v1.Pod, vol *v1.Volume) (bound bool, pvc *v1.PersistentVolumeClaim, err error) { pvcName := "" isEphemeral := false switch { @@ -763,7 +766,7 @@ func (b *volumeBinder) isVolumeBound(pod *v1.Pod, vol *v1.Volume) (bound bool, p return true, nil, nil } - bound, pvc, err = b.isPVCBound(pod.Namespace, pvcName) + bound, pvc, err = b.isPVCBound(logger, pod.Namespace, pvcName) // ... the PVC must be owned by the pod. if isEphemeral && err == nil && pvc != nil { if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil { @@ -773,7 +776,7 @@ func (b *volumeBinder) isVolumeBound(pod *v1.Pod, vol *v1.Volume) (bound bool, p return } -func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.PersistentVolumeClaim, error) { +func (b *volumeBinder) isPVCBound(logger klog.Logger, namespace, pvcName string) (bool, *v1.PersistentVolumeClaim, error) { claim := &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Name: pvcName, @@ -788,12 +791,12 @@ func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.Persiste fullyBound := b.isPVCFullyBound(pvc) if fullyBound { - klog.V(5).InfoS("PVC is fully bound to PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvc.Spec.VolumeName)) + logger.V(5).Info("PVC is fully bound to PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvc.Spec.VolumeName)) } else { if pvc.Spec.VolumeName != "" { - klog.V(5).InfoS("PVC is not fully bound to PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvc.Spec.VolumeName)) + logger.V(5).Info("PVC is not fully bound to PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvc.Spec.VolumeName)) } else { - klog.V(5).InfoS("PVC is not bound", "PVC", klog.KObj(pvc)) + logger.V(5).Info("PVC is not bound", "PVC", klog.KObj(pvc)) } } return fullyBound, pvc, nil @@ -804,9 +807,9 @@ func (b *volumeBinder) isPVCFullyBound(pvc *v1.PersistentVolumeClaim) bool { } // arePodVolumesBound returns true if all volumes are fully bound -func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool { +func (b *volumeBinder) arePodVolumesBound(logger klog.Logger, pod *v1.Pod) bool { for _, vol := range pod.Spec.Volumes { - if isBound, _, _ := b.isVolumeBound(pod, &vol); !isBound { + if isBound, _, _ := b.isVolumeBound(logger, pod, &vol); !isBound { // Pod has at least one PVC that needs binding return false } @@ -816,7 +819,7 @@ func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool { // GetPodVolumeClaims returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning), // unbound with immediate binding (including prebound) and PVs that belong to storage classes of unbound PVCs with delayed binding. -func (b *volumeBinder) GetPodVolumeClaims(pod *v1.Pod) (podVolumeClaims *PodVolumeClaims, err error) { +func (b *volumeBinder) GetPodVolumeClaims(logger klog.Logger, pod *v1.Pod) (podVolumeClaims *PodVolumeClaims, err error) { podVolumeClaims = &PodVolumeClaims{ boundClaims: []*v1.PersistentVolumeClaim{}, unboundClaimsImmediate: []*v1.PersistentVolumeClaim{}, @@ -824,7 +827,7 @@ func (b *volumeBinder) GetPodVolumeClaims(pod *v1.Pod) (podVolumeClaims *PodVolu } for _, vol := range pod.Spec.Volumes { - volumeBound, pvc, err := b.isVolumeBound(pod, &vol) + volumeBound, pvc, err := b.isVolumeBound(logger, pod, &vol) if err != nil { return podVolumeClaims, err } @@ -859,11 +862,11 @@ func (b *volumeBinder) GetPodVolumeClaims(pod *v1.Pod) (podVolumeClaims *PodVolu return podVolumeClaims, nil } -func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node *v1.Node, pod *v1.Pod) (bool, bool, error) { +func (b *volumeBinder) checkBoundClaims(logger klog.Logger, claims []*v1.PersistentVolumeClaim, node *v1.Node, pod *v1.Pod) (bool, bool, error) { csiNode, err := b.csiNodeLister.Get(node.Name) if err != nil { // TODO: return the error once CSINode is created by default - klog.V(4).InfoS("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err) + logger.V(4).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err) } for _, pvc := range claims { @@ -883,19 +886,19 @@ func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node err = volume.CheckNodeAffinity(pv, node.Labels) if err != nil { - klog.V(4).InfoS("PersistentVolume and node mismatch for pod", "PV", klog.KRef("", pvName), "node", klog.KObj(node), "pod", klog.KObj(pod), "err", err) + logger.V(4).Info("PersistentVolume and node mismatch for pod", "PV", klog.KRef("", pvName), "node", klog.KObj(node), "pod", klog.KObj(pod), "err", err) return false, true, nil } - klog.V(5).InfoS("PersistentVolume and node matches for pod", "PV", klog.KRef("", pvName), "node", klog.KObj(node), "pod", klog.KObj(pod)) + logger.V(5).Info("PersistentVolume and node matches for pod", "PV", klog.KRef("", pvName), "node", klog.KObj(node), "pod", klog.KObj(pod)) } - klog.V(4).InfoS("All bound volumes for pod match with node", "pod", klog.KObj(pod), "node", klog.KObj(node)) + logger.V(4).Info("All bound volumes for pod match with node", "pod", klog.KObj(pod), "node", klog.KObj(node)) return true, true, nil } // findMatchingVolumes tries to find matching volumes for given claims, // and return unbound claims for further provision. -func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.PersistentVolumeClaim, unboundVolumesDelayBinding map[string][]*v1.PersistentVolume, node *v1.Node) (foundMatches bool, bindings []*BindingInfo, unboundClaims []*v1.PersistentVolumeClaim, err error) { +func (b *volumeBinder) findMatchingVolumes(logger klog.Logger, pod *v1.Pod, claimsToBind []*v1.PersistentVolumeClaim, unboundVolumesDelayBinding map[string][]*v1.PersistentVolume, node *v1.Node) (foundMatches bool, bindings []*BindingInfo, unboundClaims []*v1.PersistentVolumeClaim, err error) { // Sort all the claims by increasing size request to get the smallest fits sort.Sort(byPVCSize(claimsToBind)) @@ -914,7 +917,7 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi return false, nil, nil, err } if pv == nil { - klog.V(4).InfoS("No matching volumes for pod", "pod", klog.KObj(pod), "PVC", klog.KObj(pvc), "node", klog.KObj(node)) + logger.V(4).Info("No matching volumes for pod", "pod", klog.KObj(pod), "PVC", klog.KObj(pvc), "node", klog.KObj(node)) unboundClaims = append(unboundClaims, pvc) foundMatches = false continue @@ -923,11 +926,11 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi // matching PV needs to be excluded so we don't select it again chosenPVs[pv.Name] = pv bindings = append(bindings, &BindingInfo{pv: pv, pvc: pvc}) - klog.V(5).InfoS("Found matching PV for PVC for pod", "PV", klog.KObj(pv), "PVC", klog.KObj(pvc), "node", klog.KObj(node), "pod", klog.KObj(pod)) + logger.V(5).Info("Found matching PV for PVC for pod", "PV", klog.KObj(pv), "PVC", klog.KObj(pvc), "node", klog.KObj(node), "pod", klog.KObj(pod)) } if foundMatches { - klog.V(4).InfoS("Found matching volumes for pod", "pod", klog.KObj(pod), "node", klog.KObj(node)) + logger.V(4).Info("Found matching volumes for pod", "pod", klog.KObj(pod), "node", klog.KObj(node)) } return @@ -936,7 +939,7 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi // checkVolumeProvisions checks given unbound claims (the claims have gone through func // findMatchingVolumes, and do not have matching volumes for binding), and return true // if all of the claims are eligible for dynamic provision. -func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied, sufficientStorage bool, dynamicProvisions []*v1.PersistentVolumeClaim, err error) { +func (b *volumeBinder) checkVolumeProvisions(logger klog.Logger, pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied, sufficientStorage bool, dynamicProvisions []*v1.PersistentVolumeClaim, err error) { dynamicProvisions = []*v1.PersistentVolumeClaim{} // We return early with provisionedClaims == nil if a check @@ -954,18 +957,18 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v } provisioner := class.Provisioner if provisioner == "" || provisioner == volume.NotSupportedProvisioner { - klog.V(4).InfoS("Storage class of claim does not support dynamic provisioning", "storageClassName", className, "PVC", klog.KObj(claim)) + logger.V(4).Info("Storage class of claim does not support dynamic provisioning", "storageClassName", className, "PVC", klog.KObj(claim)) return false, true, nil, nil } // Check if the node can satisfy the topology requirement in the class if !v1helper.MatchTopologySelectorTerms(class.AllowedTopologies, labels.Set(node.Labels)) { - klog.V(4).InfoS("Node cannot satisfy provisioning topology requirements of claim", "node", klog.KObj(node), "PVC", klog.KObj(claim)) + logger.V(4).Info("Node cannot satisfy provisioning topology requirements of claim", "node", klog.KObj(node), "PVC", klog.KObj(claim)) return false, true, nil, nil } // Check storage capacity. - sufficient, err := b.hasEnoughCapacity(provisioner, claim, class, node) + sufficient, err := b.hasEnoughCapacity(logger, provisioner, claim, class, node) if err != nil { return false, false, nil, err } @@ -977,7 +980,7 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v dynamicProvisions = append(dynamicProvisions, claim) } - klog.V(4).InfoS("Provisioning for claims of pod that has no matching volumes...", "claimCount", len(claimsToProvision), "pod", klog.KObj(pod), "node", klog.KObj(node)) + logger.V(4).Info("Provisioning for claims of pod that has no matching volumes...", "claimCount", len(claimsToProvision), "pod", klog.KObj(pod), "node", klog.KObj(node)) return true, true, dynamicProvisions, nil } @@ -996,7 +999,7 @@ func (b *volumeBinder) revertAssumedPVCs(claims []*v1.PersistentVolumeClaim) { // hasEnoughCapacity checks whether the provisioner has enough capacity left for a new volume of the given size // that is available from the node. -func (b *volumeBinder) hasEnoughCapacity(provisioner string, claim *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass, node *v1.Node) (bool, error) { +func (b *volumeBinder) hasEnoughCapacity(logger klog.Logger, provisioner string, claim *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass, node *v1.Node) (bool, error) { quantity, ok := claim.Spec.Resources.Requests[v1.ResourceStorage] if !ok { // No capacity to check for. @@ -1029,7 +1032,7 @@ func (b *volumeBinder) hasEnoughCapacity(provisioner string, claim *v1.Persisten for _, capacity := range capacities { if capacity.StorageClassName == storageClass.Name && capacitySufficient(capacity, sizeInBytes) && - b.nodeHasAccess(node, capacity) { + b.nodeHasAccess(logger, node, capacity) { // Enough capacity found. return true, nil } @@ -1037,7 +1040,7 @@ func (b *volumeBinder) hasEnoughCapacity(provisioner string, claim *v1.Persisten // TODO (?): this doesn't give any information about which pools where considered and why // they had to be rejected. Log that above? But that might be a lot of log output... - klog.V(4).InfoS("Node has no accessible CSIStorageCapacity with enough capacity for PVC", + logger.V(4).Info("Node has no accessible CSIStorageCapacity with enough capacity for PVC", "node", klog.KObj(node), "PVC", klog.KObj(claim), "size", sizeInBytes, "storageClass", klog.KObj(storageClass)) return false, nil } @@ -1051,7 +1054,7 @@ func capacitySufficient(capacity *storagev1.CSIStorageCapacity, sizeInBytes int6 return limit != nil && limit.Value() >= sizeInBytes } -func (b *volumeBinder) nodeHasAccess(node *v1.Node, capacity *storagev1.CSIStorageCapacity) bool { +func (b *volumeBinder) nodeHasAccess(logger klog.Logger, node *v1.Node, capacity *storagev1.CSIStorageCapacity) bool { if capacity.NodeTopology == nil { // Unavailable return false @@ -1059,7 +1062,7 @@ func (b *volumeBinder) nodeHasAccess(node *v1.Node, capacity *storagev1.CSIStora // Only matching by label is supported. selector, err := metav1.LabelSelectorAsSelector(capacity.NodeTopology) if err != nil { - klog.ErrorS(err, "Unexpected error converting to a label selector", "nodeTopology", capacity.NodeTopology) + logger.Error(err, "Unexpected error converting to a label selector", "nodeTopology", capacity.NodeTopology) return false } return selector.Matches(labels.Set(node.Labels)) diff --git a/pkg/scheduler/framework/plugins/volumebinding/binder_test.go b/pkg/scheduler/framework/plugins/volumebinding/binder_test.go index f053900c75b..1746780ce2e 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/binder_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/binder_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/component-helpers/storage/volume" "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" + _ "k8s.io/klog/v2/ktesting/init" "k8s.io/kubernetes/pkg/controller" pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing" ) @@ -129,10 +130,6 @@ var ( zone1Labels = map[string]string{v1.LabelFailureDomainBetaZone: "us-east-1", v1.LabelFailureDomainBetaRegion: "us-east-1a"} ) -func init() { - klog.InitFlags(nil) -} - type testEnv struct { client clientset.Interface reactor *pvtesting.VolumeReactor @@ -149,9 +146,9 @@ type testEnv struct { internalCSIStorageCapacityInformer storageinformers.CSIStorageCapacityInformer } -func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv { +func newTestBinder(t *testing.T, ctx context.Context) *testEnv { client := &fake.Clientset{} - _, ctx := ktesting.NewTestContext(t) + logger := klog.FromContext(ctx) reactor := pvtesting.NewVolumeReactor(ctx, client, nil, nil, nil) // TODO refactor all tests to use real watch mechanism, see #72327 client.AddWatchReactor("*", func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) { @@ -177,6 +174,7 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv { CSIStorageCapacityInformer: csiStorageCapacityInformer, } binder := NewVolumeBinder( + logger, client, podInformer, nodeInformer, @@ -188,10 +186,10 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv { 10*time.Second) // Wait for informers cache sync - informerFactory.Start(stopCh) - for v, synced := range informerFactory.WaitForCacheSync(stopCh) { + informerFactory.Start(ctx.Done()) + for v, synced := range informerFactory.WaitForCacheSync(ctx.Done()) { if !synced { - klog.ErrorS(nil, "Error syncing informer", "informer", v) + logger.Error(nil, "Error syncing informer", "informer", v) os.Exit(1) } } @@ -846,15 +844,15 @@ func checkReasons(t *testing.T, actual, expected ConflictReasons) { } // findPodVolumes gets and finds volumes for given pod and node -func findPodVolumes(binder SchedulerVolumeBinder, pod *v1.Pod, node *v1.Node) (*PodVolumes, ConflictReasons, error) { - podVolumeClaims, err := binder.GetPodVolumeClaims(pod) +func findPodVolumes(logger klog.Logger, binder SchedulerVolumeBinder, pod *v1.Pod, node *v1.Node) (*PodVolumes, ConflictReasons, error) { + podVolumeClaims, err := binder.GetPodVolumeClaims(logger, pod) if err != nil { return nil, nil, err } if len(podVolumeClaims.unboundClaimsImmediate) > 0 { return nil, nil, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims") } - return binder.FindPodVolumes(pod, podVolumeClaims, node) + return binder.FindPodVolumes(logger, pod, podVolumeClaims, node) } func TestFindPodVolumesWithoutProvisioning(t *testing.T) { @@ -1006,11 +1004,12 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) { } run := func(t *testing.T, scenario scenarioType, csiDriver *storagev1.CSIDriver) { - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() // Setup - testEnv := newTestBinder(t, ctx.Done()) + testEnv := newTestBinder(t, ctx) testEnv.initVolumes(scenario.pvs, scenario.pvs) if csiDriver != nil { testEnv.addCSIDriver(csiDriver) @@ -1031,7 +1030,7 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) { } // Execute - podVolumes, reasons, err := findPodVolumes(testEnv.binder, scenario.pod, testNode) + podVolumes, reasons, err := findPodVolumes(logger, testEnv.binder, scenario.pod, testNode) // Validate if !scenario.shouldFail && err != nil { @@ -1133,11 +1132,12 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) { } run := func(t *testing.T, scenario scenarioType, csiDriver *storagev1.CSIDriver) { - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() // Setup - testEnv := newTestBinder(t, ctx.Done()) + testEnv := newTestBinder(t, ctx) testEnv.initVolumes(scenario.pvs, scenario.pvs) if csiDriver != nil { testEnv.addCSIDriver(csiDriver) @@ -1158,7 +1158,7 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) { } // Execute - podVolumes, reasons, err := findPodVolumes(testEnv.binder, scenario.pod, testNode) + podVolumes, reasons, err := findPodVolumes(logger, testEnv.binder, scenario.pod, testNode) // Validate if !scenario.shouldFail && err != nil { @@ -1240,11 +1240,12 @@ func TestFindPodVolumesWithCSIMigration(t *testing.T) { } run := func(t *testing.T, scenario scenarioType) { - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() // Setup - testEnv := newTestBinder(t, ctx.Done()) + testEnv := newTestBinder(t, ctx) testEnv.initVolumes(scenario.pvs, scenario.pvs) var node *v1.Node @@ -1274,7 +1275,7 @@ func TestFindPodVolumesWithCSIMigration(t *testing.T) { } // Execute - _, reasons, err := findPodVolumes(testEnv.binder, scenario.pod, node) + _, reasons, err := findPodVolumes(logger, testEnv.binder, scenario.pod, node) // Validate if !scenario.shouldFail && err != nil { @@ -1357,11 +1358,12 @@ func TestAssumePodVolumes(t *testing.T) { } run := func(t *testing.T, scenario scenarioType) { - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() // Setup - testEnv := newTestBinder(t, ctx.Done()) + testEnv := newTestBinder(t, ctx) testEnv.initClaims(scenario.podPVCs, scenario.podPVCs) pod := makePod("test-pod"). withNamespace("testns"). @@ -1374,7 +1376,7 @@ func TestAssumePodVolumes(t *testing.T) { testEnv.initVolumes(scenario.pvs, scenario.pvs) // Execute - allBound, err := testEnv.binder.AssumePodVolumes(pod, "node1", podVolumes) + allBound, err := testEnv.binder.AssumePodVolumes(logger, pod, "node1", podVolumes) // Validate if !scenario.shouldFail && err != nil { @@ -1406,7 +1408,8 @@ func TestAssumePodVolumes(t *testing.T) { } func TestRevertAssumedPodVolumes(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() podPVCs := []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC} @@ -1417,7 +1420,7 @@ func TestRevertAssumedPodVolumes(t *testing.T) { expectedProvisionings := []*v1.PersistentVolumeClaim{selectedNodePVC} // Setup - testEnv := newTestBinder(t, ctx.Done()) + testEnv := newTestBinder(t, ctx) testEnv.initClaims(podPVCs, podPVCs) pod := makePod("test-pod"). withNamespace("testns"). @@ -1429,7 +1432,7 @@ func TestRevertAssumedPodVolumes(t *testing.T) { } testEnv.initVolumes(pvs, pvs) - allbound, err := testEnv.binder.AssumePodVolumes(pod, "node1", podVolumes) + allbound, err := testEnv.binder.AssumePodVolumes(logger, pod, "node1", podVolumes) if allbound || err != nil { t.Errorf("No volumes are assumed") } @@ -1534,11 +1537,12 @@ func TestBindAPIUpdate(t *testing.T) { } run := func(t *testing.T, scenario scenarioType) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() // Setup - testEnv := newTestBinder(t, ctx.Done()) + testEnv := newTestBinder(t, ctx) pod := makePod("test-pod"). withNamespace("testns"). withNodeName("node1").Pod @@ -1732,13 +1736,14 @@ func TestCheckBindings(t *testing.T) { } run := func(t *testing.T, scenario scenarioType) { - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() // Setup pod := makePod("test-pod"). withNamespace("testns"). withNodeName("node1").Pod - testEnv := newTestBinder(t, ctx.Done()) + testEnv := newTestBinder(t, ctx) testEnv.internalPodInformer.Informer().GetIndexer().Add(pod) testEnv.initNodes([]*v1.Node{node1}) testEnv.initVolumes(scenario.initPVs, nil) @@ -1762,7 +1767,7 @@ func TestCheckBindings(t *testing.T) { } // Execute - allBound, err := testEnv.internalBinder.checkBindings(pod, scenario.bindings, scenario.provisionedPVCs) + allBound, err := testEnv.internalBinder.checkBindings(logger, pod, scenario.bindings, scenario.provisionedPVCs) // Validate if !scenario.shouldFail && err != nil { @@ -1857,14 +1862,15 @@ func TestCheckBindingsWithCSIMigration(t *testing.T) { } run := func(t *testing.T, scenario scenarioType) { - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() // Setup pod := makePod("test-pod"). withNamespace("testns"). withNodeName("node1").Pod - testEnv := newTestBinder(t, ctx.Done()) + testEnv := newTestBinder(t, ctx) testEnv.internalPodInformer.Informer().GetIndexer().Add(pod) testEnv.initNodes(scenario.initNodes) testEnv.initCSINodes(scenario.initCSINodes) @@ -1881,7 +1887,7 @@ func TestCheckBindingsWithCSIMigration(t *testing.T) { } // Execute - allBound, err := testEnv.internalBinder.checkBindings(pod, scenario.bindings, scenario.provisionedPVCs) + allBound, err := testEnv.internalBinder.checkBindings(logger, pod, scenario.bindings, scenario.provisionedPVCs) // Validate if !scenario.shouldFail && err != nil { @@ -2047,13 +2053,14 @@ func TestBindPodVolumes(t *testing.T) { } run := func(t *testing.T, scenario scenarioType) { - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() // Setup pod := makePod("test-pod"). withNamespace("testns"). withNodeName("node1").Pod - testEnv := newTestBinder(t, ctx.Done()) + testEnv := newTestBinder(t, ctx) testEnv.internalPodInformer.Informer().GetIndexer().Add(pod) if scenario.nodes == nil { scenario.nodes = []*v1.Node{node1} @@ -2091,7 +2098,7 @@ func TestBindPodVolumes(t *testing.T) { go func(scenario scenarioType) { time.Sleep(5 * time.Second) // Sleep a while to run after bindAPIUpdate in BindPodVolumes - klog.V(5).InfoS("Running delay function") + logger.V(5).Info("Running delay function") scenario.delayFunc(t, ctx, testEnv, pod, scenario.initPVs, scenario.initPVCs) }(scenario) } @@ -2127,9 +2134,10 @@ func TestFindAssumeVolumes(t *testing.T) { pvs := []*v1.PersistentVolume{pvNode2, pvNode1a, pvNode1c} // Setup - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() - testEnv := newTestBinder(t, ctx.Done()) + testEnv := newTestBinder(t, ctx) testEnv.initVolumes(pvs, pvs) testEnv.initClaims(podPVCs, podPVCs) pod := makePod("test-pod"). @@ -2148,7 +2156,7 @@ func TestFindAssumeVolumes(t *testing.T) { // Execute // 1. Find matching PVs - podVolumes, reasons, err := findPodVolumes(testEnv.binder, pod, testNode) + podVolumes, reasons, err := findPodVolumes(logger, testEnv.binder, pod, testNode) if err != nil { t.Errorf("Test failed: FindPodVolumes returned error: %v", err) } @@ -2158,7 +2166,7 @@ func TestFindAssumeVolumes(t *testing.T) { expectedBindings := podVolumes.StaticBindings // 2. Assume matches - allBound, err := testEnv.binder.AssumePodVolumes(pod, testNode.Name, podVolumes) + allBound, err := testEnv.binder.AssumePodVolumes(logger, pod, testNode.Name, podVolumes) if err != nil { t.Errorf("Test failed: AssumePodVolumes returned error: %v", err) } @@ -2174,7 +2182,7 @@ func TestFindAssumeVolumes(t *testing.T) { // This should always return the original chosen pv // Run this many times in case sorting returns different orders for the two PVs. for i := 0; i < 50; i++ { - podVolumes, reasons, err := findPodVolumes(testEnv.binder, pod, testNode) + podVolumes, reasons, err := findPodVolumes(logger, testEnv.binder, pod, testNode) if err != nil { t.Errorf("Test failed: FindPodVolumes returned error: %v", err) } @@ -2283,11 +2291,12 @@ func TestCapacity(t *testing.T) { } run := func(t *testing.T, scenario scenarioType, optIn bool) { - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() // Setup: the driver has the feature enabled, but the scheduler might not. - testEnv := newTestBinder(t, ctx.Done()) + testEnv := newTestBinder(t, ctx) testEnv.addCSIDriver(makeCSIDriver(provisioner, optIn)) testEnv.addCSIStorageCapacities(scenario.capacities) @@ -2301,7 +2310,7 @@ func TestCapacity(t *testing.T) { withPVCSVolume(scenario.pvcs).Pod // Execute - podVolumes, reasons, err := findPodVolumes(testEnv.binder, pod, testNode) + podVolumes, reasons, err := findPodVolumes(logger, testEnv.binder, pod, testNode) // Validate shouldFail := scenario.shouldFail @@ -2431,18 +2440,19 @@ func TestGetEligibleNodes(t *testing.T) { } run := func(t *testing.T, scenario scenarioType) { - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() // Setup - testEnv := newTestBinder(t, ctx.Done()) + testEnv := newTestBinder(t, ctx) testEnv.initVolumes(scenario.pvs, scenario.pvs) testEnv.initNodes(scenario.nodes) testEnv.initClaims(scenario.pvcs, scenario.pvcs) // Execute - eligibleNodes := testEnv.binder.GetEligibleNodes(scenario.pvcs) + eligibleNodes := testEnv.binder.GetEligibleNodes(logger, scenario.pvcs) // Validate if reflect.DeepEqual(scenario.eligibleNodes, eligibleNodes) { diff --git a/pkg/scheduler/framework/plugins/volumebinding/fake_binder.go b/pkg/scheduler/framework/plugins/volumebinding/fake_binder.go index b8e78b8bea1..667669c65b4 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/fake_binder.go +++ b/pkg/scheduler/framework/plugins/volumebinding/fake_binder.go @@ -21,6 +21,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" ) // FakeVolumeBinderConfig holds configurations for fake volume binder. @@ -50,22 +51,22 @@ type FakeVolumeBinder struct { var _ SchedulerVolumeBinder = &FakeVolumeBinder{} // GetPodVolumeClaims implements SchedulerVolumeBinder.GetPodVolumes. -func (b *FakeVolumeBinder) GetPodVolumeClaims(pod *v1.Pod) (podVolumeClaims *PodVolumeClaims, err error) { +func (b *FakeVolumeBinder) GetPodVolumeClaims(_ klog.Logger, pod *v1.Pod) (podVolumeClaims *PodVolumeClaims, err error) { return &PodVolumeClaims{}, nil } // GetEligibleNodes implements SchedulerVolumeBinder.GetEligibleNodes. -func (b *FakeVolumeBinder) GetEligibleNodes(boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.Set[string]) { +func (b *FakeVolumeBinder) GetEligibleNodes(_ klog.Logger, boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.Set[string]) { return nil } // FindPodVolumes implements SchedulerVolumeBinder.FindPodVolumes. -func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, _ *PodVolumeClaims, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) { +func (b *FakeVolumeBinder) FindPodVolumes(_ klog.Logger, pod *v1.Pod, _ *PodVolumeClaims, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) { return nil, b.config.FindReasons, b.config.FindErr } // AssumePodVolumes implements SchedulerVolumeBinder.AssumePodVolumes. -func (b *FakeVolumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (bool, error) { +func (b *FakeVolumeBinder) AssumePodVolumes(_ klog.Logger, assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (bool, error) { b.AssumeCalled = true return b.config.AllBound, b.config.AssumeErr } diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index c54e4b89e3e..a8f3a596d17 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -161,6 +161,7 @@ func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) (bool, error) { // immediate PVCs bound. If not all immediate PVCs are bound, an // UnschedulableAndUnresolvable is returned. func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { + logger := klog.FromContext(ctx) // If pod does not reference any PVC, we don't need to do anything. if hasPVC, err := pl.podHasPVCs(pod); err != nil { return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) @@ -168,7 +169,7 @@ func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleSt state.Write(stateKey, &stateData{}) return nil, framework.NewStatus(framework.Skip) } - podVolumeClaims, err := pl.Binder.GetPodVolumeClaims(pod) + podVolumeClaims, err := pl.Binder.GetPodVolumeClaims(logger, pod) if err != nil { return nil, framework.AsStatus(err) } @@ -182,7 +183,7 @@ func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleSt } // Attempt to reduce down the number of nodes to consider in subsequent scheduling stages if pod has bound claims. var result *framework.PreFilterResult - if eligibleNodes := pl.Binder.GetEligibleNodes(podVolumeClaims.boundClaims); eligibleNodes != nil { + if eligibleNodes := pl.Binder.GetEligibleNodes(logger, podVolumeClaims.boundClaims); eligibleNodes != nil { result = &framework.PreFilterResult{ NodeNames: eligibleNodes, } @@ -232,6 +233,7 @@ func getStateData(cs *framework.CycleState) (*stateData, error) { // The predicate returns true if all bound PVCs have compatible PVs with the node, and if all unbound // PVCs can be matched with an available and node-compatible PV. func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + logger := klog.FromContext(ctx) node := nodeInfo.Node() state, err := getStateData(cs) @@ -239,7 +241,7 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p return framework.AsStatus(err) } - podVolumes, reasons, err := pl.Binder.FindPodVolumes(pod, state.podVolumeClaims, node) + podVolumes, reasons, err := pl.Binder.FindPodVolumes(logger, pod, state.podVolumeClaims, node) if err != nil { return framework.AsStatus(err) @@ -304,7 +306,7 @@ func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, // we don't need to hold the lock as only one node will be reserved for the given pod podVolumes, ok := state.podVolumesByNode[nodeName] if ok { - allBound, err := pl.Binder.AssumePodVolumes(pod, nodeName, podVolumes) + allBound, err := pl.Binder.AssumePodVolumes(klog.FromContext(ctx), pod, nodeName, podVolumes) if err != nil { return framework.AsStatus(err) } @@ -335,13 +337,14 @@ func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState, if !ok { return framework.AsStatus(fmt.Errorf("no pod volumes found for node %q", nodeName)) } - klog.V(5).InfoS("Trying to bind volumes for pod", "pod", klog.KObj(pod)) + logger := klog.FromContext(ctx) + logger.V(5).Info("Trying to bind volumes for pod", "pod", klog.KObj(pod)) err = pl.Binder.BindPodVolumes(ctx, pod, podVolumes) if err != nil { - klog.V(1).InfoS("Failed to bind volumes for pod", "pod", klog.KObj(pod), "err", err) + logger.V(5).Info("Failed to bind volumes for pod", "pod", klog.KObj(pod), "err", err) return framework.AsStatus(err) } - klog.V(5).InfoS("Success binding volumes for pod", "pod", klog.KObj(pod)) + logger.V(5).Info("Success binding volumes for pod", "pod", klog.KObj(pod)) return nil } @@ -361,7 +364,7 @@ func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState } // New initializes a new plugin and returns it. -func New(_ context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { +func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { args, ok := plArgs.(*config.VolumeBindingArgs) if !ok { return nil, fmt.Errorf("want args to be of type VolumeBindingArgs, got %T", plArgs) @@ -381,7 +384,7 @@ func New(_ context.Context, plArgs runtime.Object, fh framework.Handle, fts feat CSIDriverInformer: fh.SharedInformerFactory().Storage().V1().CSIDrivers(), CSIStorageCapacityInformer: fh.SharedInformerFactory().Storage().V1().CSIStorageCapacities(), } - binder := NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second) + binder := NewVolumeBinder(klog.FromContext(ctx), fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second) // build score function var scorer volumeCapacityScorer