Merge pull request #116803 from mengjiao-liu/contextual-logging-scheduler-plugin-volumebinding

Migrated `pkg/scheduler/framework/plugins/volumebinding` to contextual logging
This commit is contained in:
Kubernetes Prow Robot 2023-09-27 15:04:38 -07:00 committed by GitHub
commit 9c5698f514
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 190 additions and 149 deletions

View File

@ -87,6 +87,10 @@ func (e *errObjectName) Error() string {
// Restore() sets the latest object pointer back to the informer object.
// Get/List() always returns the latest object pointer.
type assumeCache struct {
// The logger that was chosen when setting up the cache.
// Will be used for all operations.
logger klog.Logger
// Synchronizes updates to store
rwMutex sync.RWMutex
@ -129,8 +133,9 @@ func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
}
// NewAssumeCache creates an assume cache for general objects.
func NewAssumeCache(informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache {
func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache {
c := &assumeCache{
logger: logger,
description: description,
indexFunc: indexFunc,
indexName: indexName,
@ -161,7 +166,7 @@ func (c *assumeCache) add(obj interface{}) {
name, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
klog.ErrorS(&errObjectName{err}, "Add failed")
c.logger.Error(&errObjectName{err}, "Add failed")
return
}
@ -171,29 +176,29 @@ func (c *assumeCache) add(obj interface{}) {
if objInfo, _ := c.getObjInfo(name); objInfo != nil {
newVersion, err := c.getObjVersion(name, obj)
if err != nil {
klog.ErrorS(err, "Add failed: couldn't get object version")
c.logger.Error(err, "Add failed: couldn't get object version")
return
}
storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
if err != nil {
klog.ErrorS(err, "Add failed: couldn't get stored object version")
c.logger.Error(err, "Add failed: couldn't get stored object version")
return
}
// Only update object if version is newer.
// This is so we don't override assumed objects due to informer resync.
if newVersion <= storedVersion {
klog.V(10).InfoS("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion)
c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion)
return
}
}
objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj}
if err = c.store.Update(objInfo); err != nil {
klog.InfoS("Error occurred while updating stored object", "err", err)
c.logger.Info("Error occurred while updating stored object", "err", err)
} else {
klog.V(10).InfoS("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj)
c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj)
}
}
@ -208,7 +213,7 @@ func (c *assumeCache) delete(obj interface{}) {
name, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
klog.ErrorS(&errObjectName{err}, "Failed to delete")
c.logger.Error(&errObjectName{err}, "Failed to delete")
return
}
@ -218,7 +223,7 @@ func (c *assumeCache) delete(obj interface{}) {
objInfo := &objInfo{name: name}
err = c.store.Delete(objInfo)
if err != nil {
klog.ErrorS(err, "Failed to delete", "description", c.description, "cacheKey", name)
c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name)
}
}
@ -280,14 +285,14 @@ func (c *assumeCache) List(indexObj interface{}) []interface{} {
allObjs := []interface{}{}
objs, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj})
if err != nil {
klog.ErrorS(err, "List index error")
c.logger.Error(err, "List index error")
return nil
}
for _, obj := range objs {
objInfo, ok := obj.(*objInfo)
if !ok {
klog.ErrorS(&errWrongType{"objInfo", obj}, "List error")
c.logger.Error(&errWrongType{"objInfo", obj}, "List error")
continue
}
allObjs = append(allObjs, objInfo.latestObj)
@ -325,7 +330,7 @@ func (c *assumeCache) Assume(obj interface{}) error {
// Only update the cached object
objInfo.latestObj = obj
klog.V(4).InfoS("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion)
c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion)
return nil
}
@ -336,10 +341,10 @@ func (c *assumeCache) Restore(objName string) {
objInfo, err := c.getObjInfo(objName)
if err != nil {
// This could be expected if object got deleted
klog.V(5).InfoS("Restore object", "description", c.description, "cacheKey", objName, "err", err)
c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err)
} else {
objInfo.latestObj = objInfo.apiObj
klog.V(4).InfoS("Restored object", "description", c.description, "cacheKey", objName)
c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName)
}
}
@ -354,6 +359,7 @@ type PVAssumeCache interface {
type pvAssumeCache struct {
AssumeCache
logger klog.Logger
}
func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
@ -364,8 +370,12 @@ func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
}
// NewPVAssumeCache creates a PV assume cache.
func NewPVAssumeCache(informer cache.SharedIndexInformer) PVAssumeCache {
return &pvAssumeCache{NewAssumeCache(informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc)}
func NewPVAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVAssumeCache {
logger = klog.LoggerWithName(logger, "PV Cache")
return &pvAssumeCache{
AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc),
logger: logger,
}
}
func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
@ -403,7 +413,7 @@ func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume
for _, obj := range objs {
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
klog.ErrorS(&errWrongType{"v1.PersistentVolume", obj}, "ListPVs")
c.logger.Error(&errWrongType{"v1.PersistentVolume", obj}, "ListPVs")
continue
}
pvs = append(pvs, pv)
@ -423,11 +433,16 @@ type PVCAssumeCache interface {
type pvcAssumeCache struct {
AssumeCache
logger klog.Logger
}
// NewPVCAssumeCache creates a PVC assume cache.
func NewPVCAssumeCache(informer cache.SharedIndexInformer) PVCAssumeCache {
return &pvcAssumeCache{NewAssumeCache(informer, "v1.PersistentVolumeClaim", "", nil)}
func NewPVCAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVCAssumeCache {
logger = klog.LoggerWithName(logger, "PVC Cache")
return &pvcAssumeCache{
AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil),
logger: logger,
}
}
func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {

View File

@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2/ktesting"
)
func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) {
@ -53,6 +54,7 @@ func verifyPV(cache PVAssumeCache, name string, expectedPV *v1.PersistentVolume)
}
func TestAssumePV(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
scenarios := map[string]struct {
oldPV *v1.PersistentVolume
newPV *v1.PersistentVolume
@ -96,7 +98,7 @@ func TestAssumePV(t *testing.T) {
}
for name, scenario := range scenarios {
cache := NewPVAssumeCache(nil)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
@ -130,7 +132,8 @@ func TestAssumePV(t *testing.T) {
}
func TestRestorePV(t *testing.T) {
cache := NewPVAssumeCache(nil)
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
@ -170,7 +173,8 @@ func TestRestorePV(t *testing.T) {
}
func TestBasicPVCache(t *testing.T) {
cache := NewPVAssumeCache(nil)
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
@ -214,7 +218,8 @@ func TestBasicPVCache(t *testing.T) {
}
func TestPVCacheWithStorageClasses(t *testing.T) {
cache := NewPVAssumeCache(nil)
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
@ -260,7 +265,8 @@ func TestPVCacheWithStorageClasses(t *testing.T) {
}
func TestAssumeUpdatePVCache(t *testing.T) {
cache := NewPVAssumeCache(nil)
logger, _ := ktesting.NewTestContext(t)
cache := NewPVAssumeCache(logger, nil)
internalCache, ok := cache.(*pvAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
@ -315,6 +321,7 @@ func verifyPVC(cache PVCAssumeCache, pvcKey string, expectedPVC *v1.PersistentVo
}
func TestAssumePVC(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
scenarios := map[string]struct {
oldPVC *v1.PersistentVolumeClaim
newPVC *v1.PersistentVolumeClaim
@ -353,7 +360,7 @@ func TestAssumePVC(t *testing.T) {
}
for name, scenario := range scenarios {
cache := NewPVCAssumeCache(nil)
cache := NewPVCAssumeCache(logger, nil)
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
@ -387,7 +394,8 @@ func TestAssumePVC(t *testing.T) {
}
func TestRestorePVC(t *testing.T) {
cache := NewPVCAssumeCache(nil)
logger, _ := ktesting.NewTestContext(t)
cache := NewPVCAssumeCache(logger, nil)
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
@ -427,7 +435,8 @@ func TestRestorePVC(t *testing.T) {
}
func TestAssumeUpdatePVCCache(t *testing.T) {
cache := NewPVCAssumeCache(nil)
logger, _ := ktesting.NewTestContext(t)
cache := NewPVCAssumeCache(logger, nil)
internalCache, ok := cache.(*pvcAssumeCache).AssumeCache.(*assumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")

View File

@ -149,7 +149,7 @@ type InTreeToCSITranslator interface {
type SchedulerVolumeBinder interface {
// GetPodVolumeClaims returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning),
// unbound with immediate binding (including prebound) and PVs that belong to storage classes of unbound PVCs with delayed binding.
GetPodVolumeClaims(pod *v1.Pod) (podVolumeClaims *PodVolumeClaims, err error)
GetPodVolumeClaims(logger klog.Logger, pod *v1.Pod) (podVolumeClaims *PodVolumeClaims, err error)
// GetEligibleNodes checks the existing bound claims of the pod to determine if the list of nodes can be
// potentially reduced down to a subset of eligible nodes based on the bound claims which then can be used
@ -157,7 +157,7 @@ type SchedulerVolumeBinder interface {
//
// If eligibleNodes is 'nil', then it indicates that such eligible node reduction cannot be made
// and all nodes should be considered.
GetEligibleNodes(boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.Set[string])
GetEligibleNodes(logger klog.Logger, boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.Set[string])
// FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the
// node and returns pod's volumes information.
@ -172,7 +172,7 @@ type SchedulerVolumeBinder interface {
// for volumes that still need to be created.
//
// This function is called by the scheduler VolumeBinding plugin and can be called in parallel
FindPodVolumes(pod *v1.Pod, podVolumeClaims *PodVolumeClaims, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error)
FindPodVolumes(logger klog.Logger, pod *v1.Pod, podVolumeClaims *PodVolumeClaims, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error)
// AssumePodVolumes will:
// 1. Take the PV matches for unbound PVCs and update the PV cache assuming
@ -183,7 +183,7 @@ type SchedulerVolumeBinder interface {
// It returns true if all volumes are fully bound
//
// This function is called serially.
AssumePodVolumes(assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error)
AssumePodVolumes(logger klog.Logger, assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error)
// RevertAssumedPodVolumes will revert assumed PV and PVC cache.
RevertAssumedPodVolumes(podVolumes *PodVolumes)
@ -244,6 +244,7 @@ type CapacityCheck struct {
//
// capacityCheck determines how storage capacity is checked (CSIStorageCapacity feature).
func NewVolumeBinder(
logger klog.Logger,
kubeClient clientset.Interface,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
@ -259,8 +260,8 @@ func NewVolumeBinder(
classLister: storageClassInformer.Lister(),
nodeLister: nodeInformer.Lister(),
csiNodeLister: csiNodeInformer.Lister(),
pvcCache: NewPVCAssumeCache(pvcInformer.Informer()),
pvCache: NewPVAssumeCache(pvInformer.Informer()),
pvcCache: NewPVCAssumeCache(logger, pvcInformer.Informer()),
pvCache: NewPVAssumeCache(logger, pvInformer.Informer()),
bindTimeout: bindTimeout,
translator: csitrans.New(),
}
@ -274,11 +275,11 @@ func NewVolumeBinder(
// FindPodVolumes finds the matching PVs for PVCs and nodes to provision PVs
// for the given pod and node. If the node does not fit, conflict reasons are
// returned.
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, podVolumeClaims *PodVolumeClaims, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) {
func (b *volumeBinder) FindPodVolumes(logger klog.Logger, pod *v1.Pod, podVolumeClaims *PodVolumeClaims, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) {
podVolumes = &PodVolumes{}
// Warning: Below log needs high verbosity as it can be printed several times (#60933).
klog.V(5).InfoS("FindPodVolumes", "pod", klog.KObj(pod), "node", klog.KObj(node))
logger.V(5).Info("FindPodVolumes", "pod", klog.KObj(pod), "node", klog.KObj(node))
// Initialize to true for pods that don't have volumes. These
// booleans get translated into reason strings when the function
@ -330,7 +331,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, podVolumeClaims *PodVolumeCla
// Check PV node affinity on bound volumes
if len(podVolumeClaims.boundClaims) > 0 {
boundVolumesSatisfied, boundPVsFound, err = b.checkBoundClaims(podVolumeClaims.boundClaims, node, pod)
boundVolumesSatisfied, boundPVsFound, err = b.checkBoundClaims(logger, podVolumeClaims.boundClaims, node, pod)
if err != nil {
return
}
@ -360,7 +361,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, podVolumeClaims *PodVolumeCla
// Find matching volumes
if len(claimsToFindMatching) > 0 {
var unboundClaims []*v1.PersistentVolumeClaim
unboundVolumesSatisfied, staticBindings, unboundClaims, err = b.findMatchingVolumes(pod, claimsToFindMatching, podVolumeClaims.unboundVolumesDelayBinding, node)
unboundVolumesSatisfied, staticBindings, unboundClaims, err = b.findMatchingVolumes(logger, pod, claimsToFindMatching, podVolumeClaims.unboundVolumesDelayBinding, node)
if err != nil {
return
}
@ -370,7 +371,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, podVolumeClaims *PodVolumeCla
// Check for claims to provision. This is the first time where we potentially
// find out that storage is not sufficient for the node.
if len(claimsToProvision) > 0 {
unboundVolumesSatisfied, sufficientStorage, dynamicProvisions, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
unboundVolumesSatisfied, sufficientStorage, dynamicProvisions, err = b.checkVolumeProvisions(logger, pod, claimsToProvision, node)
if err != nil {
return
}
@ -386,7 +387,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, podVolumeClaims *PodVolumeCla
//
// Returning 'nil' for eligibleNodes indicates that such eligible node reduction cannot be made and all nodes
// should be considered.
func (b *volumeBinder) GetEligibleNodes(boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.Set[string]) {
func (b *volumeBinder) GetEligibleNodes(logger klog.Logger, boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.Set[string]) {
if len(boundClaims) == 0 {
return
}
@ -419,12 +420,12 @@ func (b *volumeBinder) GetEligibleNodes(boundClaims []*v1.PersistentVolumeClaim)
}
if len(errs) > 0 {
klog.V(4).InfoS("GetEligibleNodes: one or more error occurred finding eligible nodes", "error", errs)
logger.V(4).Info("GetEligibleNodes: one or more error occurred finding eligible nodes", "error", errs)
return nil
}
if eligibleNodes != nil {
klog.V(4).InfoS("GetEligibleNodes: reduced down eligible nodes", "nodes", eligibleNodes)
logger.V(4).Info("GetEligibleNodes: reduced down eligible nodes", "nodes", eligibleNodes)
}
return
}
@ -434,16 +435,16 @@ func (b *volumeBinder) GetEligibleNodes(boundClaims []*v1.PersistentVolumeClaim)
// 1. Update the pvCache with the new prebound PV.
// 2. Update the pvcCache with the new PVCs with annotations set
// 3. Update PodVolumes again with cached API updates for PVs and PVCs.
func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error) {
klog.V(4).InfoS("AssumePodVolumes", "pod", klog.KObj(assumedPod), "node", klog.KRef("", nodeName))
func (b *volumeBinder) AssumePodVolumes(logger klog.Logger, assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (allFullyBound bool, err error) {
logger.V(4).Info("AssumePodVolumes", "pod", klog.KObj(assumedPod), "node", klog.KRef("", nodeName))
defer func() {
if err != nil {
metrics.VolumeSchedulingStageFailed.WithLabelValues("assume").Inc()
}
}()
if allBound := b.arePodVolumesBound(assumedPod); allBound {
klog.V(4).InfoS("AssumePodVolumes: all PVCs bound and nothing to do", "pod", klog.KObj(assumedPod), "node", klog.KRef("", nodeName))
if allBound := b.arePodVolumesBound(logger, assumedPod); allBound {
logger.V(4).Info("AssumePodVolumes: all PVCs bound and nothing to do", "pod", klog.KObj(assumedPod), "node", klog.KRef("", nodeName))
return true, nil
}
@ -451,7 +452,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string, pod
newBindings := []*BindingInfo{}
for _, binding := range podVolumes.StaticBindings {
newPV, dirty, err := volume.GetBindVolumeToClaim(binding.pv, binding.pvc)
klog.V(5).InfoS("AssumePodVolumes: GetBindVolumeToClaim",
logger.V(5).Info("AssumePodVolumes: GetBindVolumeToClaim",
"pod", klog.KObj(assumedPod),
"PV", klog.KObj(binding.pv),
"PVC", klog.KObj(binding.pvc),
@ -459,7 +460,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string, pod
"dirty", dirty,
)
if err != nil {
klog.ErrorS(err, "AssumePodVolumes: fail to GetBindVolumeToClaim")
logger.Error(err, "AssumePodVolumes: fail to GetBindVolumeToClaim")
b.revertAssumedPVs(newBindings)
return false, err
}
@ -506,7 +507,8 @@ func (b *volumeBinder) RevertAssumedPodVolumes(podVolumes *PodVolumes) {
// makes the API update for those PVs/PVCs, and waits for the PVCs to be completely bound
// by the PV controller.
func (b *volumeBinder) BindPodVolumes(ctx context.Context, assumedPod *v1.Pod, podVolumes *PodVolumes) (err error) {
klog.V(4).InfoS("BindPodVolumes", "pod", klog.KObj(assumedPod), "node", klog.KRef("", assumedPod.Spec.NodeName))
logger := klog.FromContext(ctx)
logger.V(4).Info("BindPodVolumes", "pod", klog.KObj(assumedPod), "node", klog.KRef("", assumedPod.Spec.NodeName))
defer func() {
if err != nil {
@ -524,7 +526,7 @@ func (b *volumeBinder) BindPodVolumes(ctx context.Context, assumedPod *v1.Pod, p
}
err = wait.PollUntilContextTimeout(ctx, time.Second, b.bindTimeout, false, func(ctx context.Context) (bool, error) {
b, err := b.checkBindings(assumedPod, bindings, claimsToProvision)
b, err := b.checkBindings(logger, assumedPod, bindings, claimsToProvision)
return b, err
})
if err != nil {
@ -543,6 +545,7 @@ func getPVCName(pvc *v1.PersistentVolumeClaim) string {
// bindAPIUpdate makes the API update for those PVs/PVCs.
func (b *volumeBinder) bindAPIUpdate(ctx context.Context, pod *v1.Pod, bindings []*BindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) error {
logger := klog.FromContext(ctx)
podName := getPodName(pod)
if bindings == nil {
return fmt.Errorf("failed to get cached bindings for pod %q", podName)
@ -574,14 +577,14 @@ func (b *volumeBinder) bindAPIUpdate(ctx context.Context, pod *v1.Pod, bindings
// There is no API rollback if the actual binding fails
for _, binding = range bindings {
// TODO: does it hurt if we make an api call and nothing needs to be updated?
klog.V(5).InfoS("Updating PersistentVolume: binding to claim", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc))
logger.V(5).Info("Updating PersistentVolume: binding to claim", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc))
newPV, err := b.kubeClient.CoreV1().PersistentVolumes().Update(ctx, binding.pv, metav1.UpdateOptions{})
if err != nil {
klog.V(4).InfoS("Updating PersistentVolume: binding to claim failed", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc), "err", err)
logger.V(4).Info("Updating PersistentVolume: binding to claim failed", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc), "err", err)
return err
}
klog.V(2).InfoS("Updated PersistentVolume with claim. Waiting for binding to complete", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc))
logger.V(2).Info("Updated PersistentVolume with claim. Waiting for binding to complete", "pod", klog.KObj(pod), "PV", klog.KObj(binding.pv), "PVC", klog.KObj(binding.pvc))
// Save updated object from apiserver for later checking.
binding.pv = newPV
lastProcessedBinding++
@ -590,10 +593,10 @@ func (b *volumeBinder) bindAPIUpdate(ctx context.Context, pod *v1.Pod, bindings
// Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest
// PV controller is expected to signal back by removing related annotations if actual provisioning fails
for i, claim = range claimsToProvision {
klog.V(5).InfoS("Updating claims objects to trigger volume provisioning", "pod", klog.KObj(pod), "PVC", klog.KObj(claim))
logger.V(5).Info("Updating claims objects to trigger volume provisioning", "pod", klog.KObj(pod), "PVC", klog.KObj(claim))
newClaim, err := b.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
if err != nil {
klog.V(4).InfoS("Updating PersistentVolumeClaim: binding to volume failed", "PVC", klog.KObj(claim), "err", err)
logger.V(4).Info("Updating PersistentVolumeClaim: binding to volume failed", "PVC", klog.KObj(claim), "err", err)
return err
}
@ -619,7 +622,7 @@ var (
// PV/PVC cache can be assumed again in main scheduler loop, we must check
// latest state in API server which are shared with PV controller and
// provisioners
func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) (bool, error) {
func (b *volumeBinder) checkBindings(logger klog.Logger, pod *v1.Pod, bindings []*BindingInfo, claimsToProvision []*v1.PersistentVolumeClaim) (bool, error) {
podName := getPodName(pod)
if bindings == nil {
return false, fmt.Errorf("failed to get cached bindings for pod %q", podName)
@ -636,7 +639,7 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claim
csiNode, err := b.csiNodeLister.Get(node.Name)
if err != nil {
// TODO: return the error once CSINode is created by default
klog.V(4).InfoS("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
logger.V(4).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
}
// Check for any conditions that might require scheduling retry
@ -648,7 +651,7 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claim
if apierrors.IsNotFound(err) {
return false, fmt.Errorf("pod does not exist any more: %w", err)
}
klog.ErrorS(err, "Failed to get pod from the lister", "pod", klog.KObj(pod))
logger.Error(err, "Failed to get pod from the lister", "pod", klog.KObj(pod))
}
for _, binding := range bindings {
@ -744,11 +747,11 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*BindingInfo, claim
}
// All pvs and pvcs that we operated on are bound
klog.V(2).InfoS("All PVCs for pod are bound", "pod", klog.KObj(pod))
logger.V(2).Info("All PVCs for pod are bound", "pod", klog.KObj(pod))
return true, nil
}
func (b *volumeBinder) isVolumeBound(pod *v1.Pod, vol *v1.Volume) (bound bool, pvc *v1.PersistentVolumeClaim, err error) {
func (b *volumeBinder) isVolumeBound(logger klog.Logger, pod *v1.Pod, vol *v1.Volume) (bound bool, pvc *v1.PersistentVolumeClaim, err error) {
pvcName := ""
isEphemeral := false
switch {
@ -763,7 +766,7 @@ func (b *volumeBinder) isVolumeBound(pod *v1.Pod, vol *v1.Volume) (bound bool, p
return true, nil, nil
}
bound, pvc, err = b.isPVCBound(pod.Namespace, pvcName)
bound, pvc, err = b.isPVCBound(logger, pod.Namespace, pvcName)
// ... the PVC must be owned by the pod.
if isEphemeral && err == nil && pvc != nil {
if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil {
@ -773,7 +776,7 @@ func (b *volumeBinder) isVolumeBound(pod *v1.Pod, vol *v1.Volume) (bound bool, p
return
}
func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.PersistentVolumeClaim, error) {
func (b *volumeBinder) isPVCBound(logger klog.Logger, namespace, pvcName string) (bool, *v1.PersistentVolumeClaim, error) {
claim := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
@ -788,12 +791,12 @@ func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.Persiste
fullyBound := b.isPVCFullyBound(pvc)
if fullyBound {
klog.V(5).InfoS("PVC is fully bound to PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvc.Spec.VolumeName))
logger.V(5).Info("PVC is fully bound to PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvc.Spec.VolumeName))
} else {
if pvc.Spec.VolumeName != "" {
klog.V(5).InfoS("PVC is not fully bound to PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvc.Spec.VolumeName))
logger.V(5).Info("PVC is not fully bound to PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvc.Spec.VolumeName))
} else {
klog.V(5).InfoS("PVC is not bound", "PVC", klog.KObj(pvc))
logger.V(5).Info("PVC is not bound", "PVC", klog.KObj(pvc))
}
}
return fullyBound, pvc, nil
@ -804,9 +807,9 @@ func (b *volumeBinder) isPVCFullyBound(pvc *v1.PersistentVolumeClaim) bool {
}
// arePodVolumesBound returns true if all volumes are fully bound
func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool {
func (b *volumeBinder) arePodVolumesBound(logger klog.Logger, pod *v1.Pod) bool {
for _, vol := range pod.Spec.Volumes {
if isBound, _, _ := b.isVolumeBound(pod, &vol); !isBound {
if isBound, _, _ := b.isVolumeBound(logger, pod, &vol); !isBound {
// Pod has at least one PVC that needs binding
return false
}
@ -816,7 +819,7 @@ func (b *volumeBinder) arePodVolumesBound(pod *v1.Pod) bool {
// GetPodVolumeClaims returns a pod's PVCs separated into bound, unbound with delayed binding (including provisioning),
// unbound with immediate binding (including prebound) and PVs that belong to storage classes of unbound PVCs with delayed binding.
func (b *volumeBinder) GetPodVolumeClaims(pod *v1.Pod) (podVolumeClaims *PodVolumeClaims, err error) {
func (b *volumeBinder) GetPodVolumeClaims(logger klog.Logger, pod *v1.Pod) (podVolumeClaims *PodVolumeClaims, err error) {
podVolumeClaims = &PodVolumeClaims{
boundClaims: []*v1.PersistentVolumeClaim{},
unboundClaimsImmediate: []*v1.PersistentVolumeClaim{},
@ -824,7 +827,7 @@ func (b *volumeBinder) GetPodVolumeClaims(pod *v1.Pod) (podVolumeClaims *PodVolu
}
for _, vol := range pod.Spec.Volumes {
volumeBound, pvc, err := b.isVolumeBound(pod, &vol)
volumeBound, pvc, err := b.isVolumeBound(logger, pod, &vol)
if err != nil {
return podVolumeClaims, err
}
@ -859,11 +862,11 @@ func (b *volumeBinder) GetPodVolumeClaims(pod *v1.Pod) (podVolumeClaims *PodVolu
return podVolumeClaims, nil
}
func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node *v1.Node, pod *v1.Pod) (bool, bool, error) {
func (b *volumeBinder) checkBoundClaims(logger klog.Logger, claims []*v1.PersistentVolumeClaim, node *v1.Node, pod *v1.Pod) (bool, bool, error) {
csiNode, err := b.csiNodeLister.Get(node.Name)
if err != nil {
// TODO: return the error once CSINode is created by default
klog.V(4).InfoS("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
logger.V(4).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
}
for _, pvc := range claims {
@ -883,19 +886,19 @@ func (b *volumeBinder) checkBoundClaims(claims []*v1.PersistentVolumeClaim, node
err = volume.CheckNodeAffinity(pv, node.Labels)
if err != nil {
klog.V(4).InfoS("PersistentVolume and node mismatch for pod", "PV", klog.KRef("", pvName), "node", klog.KObj(node), "pod", klog.KObj(pod), "err", err)
logger.V(4).Info("PersistentVolume and node mismatch for pod", "PV", klog.KRef("", pvName), "node", klog.KObj(node), "pod", klog.KObj(pod), "err", err)
return false, true, nil
}
klog.V(5).InfoS("PersistentVolume and node matches for pod", "PV", klog.KRef("", pvName), "node", klog.KObj(node), "pod", klog.KObj(pod))
logger.V(5).Info("PersistentVolume and node matches for pod", "PV", klog.KRef("", pvName), "node", klog.KObj(node), "pod", klog.KObj(pod))
}
klog.V(4).InfoS("All bound volumes for pod match with node", "pod", klog.KObj(pod), "node", klog.KObj(node))
logger.V(4).Info("All bound volumes for pod match with node", "pod", klog.KObj(pod), "node", klog.KObj(node))
return true, true, nil
}
// findMatchingVolumes tries to find matching volumes for given claims,
// and return unbound claims for further provision.
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.PersistentVolumeClaim, unboundVolumesDelayBinding map[string][]*v1.PersistentVolume, node *v1.Node) (foundMatches bool, bindings []*BindingInfo, unboundClaims []*v1.PersistentVolumeClaim, err error) {
func (b *volumeBinder) findMatchingVolumes(logger klog.Logger, pod *v1.Pod, claimsToBind []*v1.PersistentVolumeClaim, unboundVolumesDelayBinding map[string][]*v1.PersistentVolume, node *v1.Node) (foundMatches bool, bindings []*BindingInfo, unboundClaims []*v1.PersistentVolumeClaim, err error) {
// Sort all the claims by increasing size request to get the smallest fits
sort.Sort(byPVCSize(claimsToBind))
@ -914,7 +917,7 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi
return false, nil, nil, err
}
if pv == nil {
klog.V(4).InfoS("No matching volumes for pod", "pod", klog.KObj(pod), "PVC", klog.KObj(pvc), "node", klog.KObj(node))
logger.V(4).Info("No matching volumes for pod", "pod", klog.KObj(pod), "PVC", klog.KObj(pvc), "node", klog.KObj(node))
unboundClaims = append(unboundClaims, pvc)
foundMatches = false
continue
@ -923,11 +926,11 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi
// matching PV needs to be excluded so we don't select it again
chosenPVs[pv.Name] = pv
bindings = append(bindings, &BindingInfo{pv: pv, pvc: pvc})
klog.V(5).InfoS("Found matching PV for PVC for pod", "PV", klog.KObj(pv), "PVC", klog.KObj(pvc), "node", klog.KObj(node), "pod", klog.KObj(pod))
logger.V(5).Info("Found matching PV for PVC for pod", "PV", klog.KObj(pv), "PVC", klog.KObj(pvc), "node", klog.KObj(node), "pod", klog.KObj(pod))
}
if foundMatches {
klog.V(4).InfoS("Found matching volumes for pod", "pod", klog.KObj(pod), "node", klog.KObj(node))
logger.V(4).Info("Found matching volumes for pod", "pod", klog.KObj(pod), "node", klog.KObj(node))
}
return
@ -936,7 +939,7 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi
// checkVolumeProvisions checks given unbound claims (the claims have gone through func
// findMatchingVolumes, and do not have matching volumes for binding), and return true
// if all of the claims are eligible for dynamic provision.
func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied, sufficientStorage bool, dynamicProvisions []*v1.PersistentVolumeClaim, err error) {
func (b *volumeBinder) checkVolumeProvisions(logger klog.Logger, pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied, sufficientStorage bool, dynamicProvisions []*v1.PersistentVolumeClaim, err error) {
dynamicProvisions = []*v1.PersistentVolumeClaim{}
// We return early with provisionedClaims == nil if a check
@ -954,18 +957,18 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v
}
provisioner := class.Provisioner
if provisioner == "" || provisioner == volume.NotSupportedProvisioner {
klog.V(4).InfoS("Storage class of claim does not support dynamic provisioning", "storageClassName", className, "PVC", klog.KObj(claim))
logger.V(4).Info("Storage class of claim does not support dynamic provisioning", "storageClassName", className, "PVC", klog.KObj(claim))
return false, true, nil, nil
}
// Check if the node can satisfy the topology requirement in the class
if !v1helper.MatchTopologySelectorTerms(class.AllowedTopologies, labels.Set(node.Labels)) {
klog.V(4).InfoS("Node cannot satisfy provisioning topology requirements of claim", "node", klog.KObj(node), "PVC", klog.KObj(claim))
logger.V(4).Info("Node cannot satisfy provisioning topology requirements of claim", "node", klog.KObj(node), "PVC", klog.KObj(claim))
return false, true, nil, nil
}
// Check storage capacity.
sufficient, err := b.hasEnoughCapacity(provisioner, claim, class, node)
sufficient, err := b.hasEnoughCapacity(logger, provisioner, claim, class, node)
if err != nil {
return false, false, nil, err
}
@ -977,7 +980,7 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v
dynamicProvisions = append(dynamicProvisions, claim)
}
klog.V(4).InfoS("Provisioning for claims of pod that has no matching volumes...", "claimCount", len(claimsToProvision), "pod", klog.KObj(pod), "node", klog.KObj(node))
logger.V(4).Info("Provisioning for claims of pod that has no matching volumes...", "claimCount", len(claimsToProvision), "pod", klog.KObj(pod), "node", klog.KObj(node))
return true, true, dynamicProvisions, nil
}
@ -996,7 +999,7 @@ func (b *volumeBinder) revertAssumedPVCs(claims []*v1.PersistentVolumeClaim) {
// hasEnoughCapacity checks whether the provisioner has enough capacity left for a new volume of the given size
// that is available from the node.
func (b *volumeBinder) hasEnoughCapacity(provisioner string, claim *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass, node *v1.Node) (bool, error) {
func (b *volumeBinder) hasEnoughCapacity(logger klog.Logger, provisioner string, claim *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass, node *v1.Node) (bool, error) {
quantity, ok := claim.Spec.Resources.Requests[v1.ResourceStorage]
if !ok {
// No capacity to check for.
@ -1029,7 +1032,7 @@ func (b *volumeBinder) hasEnoughCapacity(provisioner string, claim *v1.Persisten
for _, capacity := range capacities {
if capacity.StorageClassName == storageClass.Name &&
capacitySufficient(capacity, sizeInBytes) &&
b.nodeHasAccess(node, capacity) {
b.nodeHasAccess(logger, node, capacity) {
// Enough capacity found.
return true, nil
}
@ -1037,7 +1040,7 @@ func (b *volumeBinder) hasEnoughCapacity(provisioner string, claim *v1.Persisten
// TODO (?): this doesn't give any information about which pools where considered and why
// they had to be rejected. Log that above? But that might be a lot of log output...
klog.V(4).InfoS("Node has no accessible CSIStorageCapacity with enough capacity for PVC",
logger.V(4).Info("Node has no accessible CSIStorageCapacity with enough capacity for PVC",
"node", klog.KObj(node), "PVC", klog.KObj(claim), "size", sizeInBytes, "storageClass", klog.KObj(storageClass))
return false, nil
}
@ -1051,7 +1054,7 @@ func capacitySufficient(capacity *storagev1.CSIStorageCapacity, sizeInBytes int6
return limit != nil && limit.Value() >= sizeInBytes
}
func (b *volumeBinder) nodeHasAccess(node *v1.Node, capacity *storagev1.CSIStorageCapacity) bool {
func (b *volumeBinder) nodeHasAccess(logger klog.Logger, node *v1.Node, capacity *storagev1.CSIStorageCapacity) bool {
if capacity.NodeTopology == nil {
// Unavailable
return false
@ -1059,7 +1062,7 @@ func (b *volumeBinder) nodeHasAccess(node *v1.Node, capacity *storagev1.CSIStora
// Only matching by label is supported.
selector, err := metav1.LabelSelectorAsSelector(capacity.NodeTopology)
if err != nil {
klog.ErrorS(err, "Unexpected error converting to a label selector", "nodeTopology", capacity.NodeTopology)
logger.Error(err, "Unexpected error converting to a label selector", "nodeTopology", capacity.NodeTopology)
return false
}
return selector.Matches(labels.Set(node.Labels))

View File

@ -44,6 +44,7 @@ import (
"k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init"
"k8s.io/kubernetes/pkg/controller"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
)
@ -129,10 +130,6 @@ var (
zone1Labels = map[string]string{v1.LabelFailureDomainBetaZone: "us-east-1", v1.LabelFailureDomainBetaRegion: "us-east-1a"}
)
func init() {
klog.InitFlags(nil)
}
type testEnv struct {
client clientset.Interface
reactor *pvtesting.VolumeReactor
@ -149,9 +146,9 @@ type testEnv struct {
internalCSIStorageCapacityInformer storageinformers.CSIStorageCapacityInformer
}
func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv {
func newTestBinder(t *testing.T, ctx context.Context) *testEnv {
client := &fake.Clientset{}
_, ctx := ktesting.NewTestContext(t)
logger := klog.FromContext(ctx)
reactor := pvtesting.NewVolumeReactor(ctx, client, nil, nil, nil)
// TODO refactor all tests to use real watch mechanism, see #72327
client.AddWatchReactor("*", func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) {
@ -177,6 +174,7 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv {
CSIStorageCapacityInformer: csiStorageCapacityInformer,
}
binder := NewVolumeBinder(
logger,
client,
podInformer,
nodeInformer,
@ -188,10 +186,10 @@ func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv {
10*time.Second)
// Wait for informers cache sync
informerFactory.Start(stopCh)
for v, synced := range informerFactory.WaitForCacheSync(stopCh) {
informerFactory.Start(ctx.Done())
for v, synced := range informerFactory.WaitForCacheSync(ctx.Done()) {
if !synced {
klog.ErrorS(nil, "Error syncing informer", "informer", v)
logger.Error(nil, "Error syncing informer", "informer", v)
os.Exit(1)
}
}
@ -846,15 +844,15 @@ func checkReasons(t *testing.T, actual, expected ConflictReasons) {
}
// findPodVolumes gets and finds volumes for given pod and node
func findPodVolumes(binder SchedulerVolumeBinder, pod *v1.Pod, node *v1.Node) (*PodVolumes, ConflictReasons, error) {
podVolumeClaims, err := binder.GetPodVolumeClaims(pod)
func findPodVolumes(logger klog.Logger, binder SchedulerVolumeBinder, pod *v1.Pod, node *v1.Node) (*PodVolumes, ConflictReasons, error) {
podVolumeClaims, err := binder.GetPodVolumeClaims(logger, pod)
if err != nil {
return nil, nil, err
}
if len(podVolumeClaims.unboundClaimsImmediate) > 0 {
return nil, nil, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims")
}
return binder.FindPodVolumes(pod, podVolumeClaims, node)
return binder.FindPodVolumes(logger, pod, podVolumeClaims, node)
}
func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
@ -1006,11 +1004,12 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
}
run := func(t *testing.T, scenario scenarioType, csiDriver *storagev1.CSIDriver) {
ctx, cancel := context.WithCancel(context.Background())
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Setup
testEnv := newTestBinder(t, ctx.Done())
testEnv := newTestBinder(t, ctx)
testEnv.initVolumes(scenario.pvs, scenario.pvs)
if csiDriver != nil {
testEnv.addCSIDriver(csiDriver)
@ -1031,7 +1030,7 @@ func TestFindPodVolumesWithoutProvisioning(t *testing.T) {
}
// Execute
podVolumes, reasons, err := findPodVolumes(testEnv.binder, scenario.pod, testNode)
podVolumes, reasons, err := findPodVolumes(logger, testEnv.binder, scenario.pod, testNode)
// Validate
if !scenario.shouldFail && err != nil {
@ -1133,11 +1132,12 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) {
}
run := func(t *testing.T, scenario scenarioType, csiDriver *storagev1.CSIDriver) {
ctx, cancel := context.WithCancel(context.Background())
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Setup
testEnv := newTestBinder(t, ctx.Done())
testEnv := newTestBinder(t, ctx)
testEnv.initVolumes(scenario.pvs, scenario.pvs)
if csiDriver != nil {
testEnv.addCSIDriver(csiDriver)
@ -1158,7 +1158,7 @@ func TestFindPodVolumesWithProvisioning(t *testing.T) {
}
// Execute
podVolumes, reasons, err := findPodVolumes(testEnv.binder, scenario.pod, testNode)
podVolumes, reasons, err := findPodVolumes(logger, testEnv.binder, scenario.pod, testNode)
// Validate
if !scenario.shouldFail && err != nil {
@ -1240,11 +1240,12 @@ func TestFindPodVolumesWithCSIMigration(t *testing.T) {
}
run := func(t *testing.T, scenario scenarioType) {
ctx, cancel := context.WithCancel(context.Background())
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Setup
testEnv := newTestBinder(t, ctx.Done())
testEnv := newTestBinder(t, ctx)
testEnv.initVolumes(scenario.pvs, scenario.pvs)
var node *v1.Node
@ -1274,7 +1275,7 @@ func TestFindPodVolumesWithCSIMigration(t *testing.T) {
}
// Execute
_, reasons, err := findPodVolumes(testEnv.binder, scenario.pod, node)
_, reasons, err := findPodVolumes(logger, testEnv.binder, scenario.pod, node)
// Validate
if !scenario.shouldFail && err != nil {
@ -1357,11 +1358,12 @@ func TestAssumePodVolumes(t *testing.T) {
}
run := func(t *testing.T, scenario scenarioType) {
ctx, cancel := context.WithCancel(context.Background())
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Setup
testEnv := newTestBinder(t, ctx.Done())
testEnv := newTestBinder(t, ctx)
testEnv.initClaims(scenario.podPVCs, scenario.podPVCs)
pod := makePod("test-pod").
withNamespace("testns").
@ -1374,7 +1376,7 @@ func TestAssumePodVolumes(t *testing.T) {
testEnv.initVolumes(scenario.pvs, scenario.pvs)
// Execute
allBound, err := testEnv.binder.AssumePodVolumes(pod, "node1", podVolumes)
allBound, err := testEnv.binder.AssumePodVolumes(logger, pod, "node1", podVolumes)
// Validate
if !scenario.shouldFail && err != nil {
@ -1406,7 +1408,8 @@ func TestAssumePodVolumes(t *testing.T) {
}
func TestRevertAssumedPodVolumes(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
podPVCs := []*v1.PersistentVolumeClaim{unboundPVC, provisionedPVC}
@ -1417,7 +1420,7 @@ func TestRevertAssumedPodVolumes(t *testing.T) {
expectedProvisionings := []*v1.PersistentVolumeClaim{selectedNodePVC}
// Setup
testEnv := newTestBinder(t, ctx.Done())
testEnv := newTestBinder(t, ctx)
testEnv.initClaims(podPVCs, podPVCs)
pod := makePod("test-pod").
withNamespace("testns").
@ -1429,7 +1432,7 @@ func TestRevertAssumedPodVolumes(t *testing.T) {
}
testEnv.initVolumes(pvs, pvs)
allbound, err := testEnv.binder.AssumePodVolumes(pod, "node1", podVolumes)
allbound, err := testEnv.binder.AssumePodVolumes(logger, pod, "node1", podVolumes)
if allbound || err != nil {
t.Errorf("No volumes are assumed")
}
@ -1534,11 +1537,12 @@ func TestBindAPIUpdate(t *testing.T) {
}
run := func(t *testing.T, scenario scenarioType) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Setup
testEnv := newTestBinder(t, ctx.Done())
testEnv := newTestBinder(t, ctx)
pod := makePod("test-pod").
withNamespace("testns").
withNodeName("node1").Pod
@ -1732,13 +1736,14 @@ func TestCheckBindings(t *testing.T) {
}
run := func(t *testing.T, scenario scenarioType) {
ctx, cancel := context.WithCancel(context.Background())
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Setup
pod := makePod("test-pod").
withNamespace("testns").
withNodeName("node1").Pod
testEnv := newTestBinder(t, ctx.Done())
testEnv := newTestBinder(t, ctx)
testEnv.internalPodInformer.Informer().GetIndexer().Add(pod)
testEnv.initNodes([]*v1.Node{node1})
testEnv.initVolumes(scenario.initPVs, nil)
@ -1762,7 +1767,7 @@ func TestCheckBindings(t *testing.T) {
}
// Execute
allBound, err := testEnv.internalBinder.checkBindings(pod, scenario.bindings, scenario.provisionedPVCs)
allBound, err := testEnv.internalBinder.checkBindings(logger, pod, scenario.bindings, scenario.provisionedPVCs)
// Validate
if !scenario.shouldFail && err != nil {
@ -1857,14 +1862,15 @@ func TestCheckBindingsWithCSIMigration(t *testing.T) {
}
run := func(t *testing.T, scenario scenarioType) {
ctx, cancel := context.WithCancel(context.Background())
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Setup
pod := makePod("test-pod").
withNamespace("testns").
withNodeName("node1").Pod
testEnv := newTestBinder(t, ctx.Done())
testEnv := newTestBinder(t, ctx)
testEnv.internalPodInformer.Informer().GetIndexer().Add(pod)
testEnv.initNodes(scenario.initNodes)
testEnv.initCSINodes(scenario.initCSINodes)
@ -1881,7 +1887,7 @@ func TestCheckBindingsWithCSIMigration(t *testing.T) {
}
// Execute
allBound, err := testEnv.internalBinder.checkBindings(pod, scenario.bindings, scenario.provisionedPVCs)
allBound, err := testEnv.internalBinder.checkBindings(logger, pod, scenario.bindings, scenario.provisionedPVCs)
// Validate
if !scenario.shouldFail && err != nil {
@ -2047,13 +2053,14 @@ func TestBindPodVolumes(t *testing.T) {
}
run := func(t *testing.T, scenario scenarioType) {
ctx, cancel := context.WithCancel(context.Background())
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Setup
pod := makePod("test-pod").
withNamespace("testns").
withNodeName("node1").Pod
testEnv := newTestBinder(t, ctx.Done())
testEnv := newTestBinder(t, ctx)
testEnv.internalPodInformer.Informer().GetIndexer().Add(pod)
if scenario.nodes == nil {
scenario.nodes = []*v1.Node{node1}
@ -2091,7 +2098,7 @@ func TestBindPodVolumes(t *testing.T) {
go func(scenario scenarioType) {
time.Sleep(5 * time.Second)
// Sleep a while to run after bindAPIUpdate in BindPodVolumes
klog.V(5).InfoS("Running delay function")
logger.V(5).Info("Running delay function")
scenario.delayFunc(t, ctx, testEnv, pod, scenario.initPVs, scenario.initPVCs)
}(scenario)
}
@ -2127,9 +2134,10 @@ func TestFindAssumeVolumes(t *testing.T) {
pvs := []*v1.PersistentVolume{pvNode2, pvNode1a, pvNode1c}
// Setup
ctx, cancel := context.WithCancel(context.Background())
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
testEnv := newTestBinder(t, ctx.Done())
testEnv := newTestBinder(t, ctx)
testEnv.initVolumes(pvs, pvs)
testEnv.initClaims(podPVCs, podPVCs)
pod := makePod("test-pod").
@ -2148,7 +2156,7 @@ func TestFindAssumeVolumes(t *testing.T) {
// Execute
// 1. Find matching PVs
podVolumes, reasons, err := findPodVolumes(testEnv.binder, pod, testNode)
podVolumes, reasons, err := findPodVolumes(logger, testEnv.binder, pod, testNode)
if err != nil {
t.Errorf("Test failed: FindPodVolumes returned error: %v", err)
}
@ -2158,7 +2166,7 @@ func TestFindAssumeVolumes(t *testing.T) {
expectedBindings := podVolumes.StaticBindings
// 2. Assume matches
allBound, err := testEnv.binder.AssumePodVolumes(pod, testNode.Name, podVolumes)
allBound, err := testEnv.binder.AssumePodVolumes(logger, pod, testNode.Name, podVolumes)
if err != nil {
t.Errorf("Test failed: AssumePodVolumes returned error: %v", err)
}
@ -2174,7 +2182,7 @@ func TestFindAssumeVolumes(t *testing.T) {
// This should always return the original chosen pv
// Run this many times in case sorting returns different orders for the two PVs.
for i := 0; i < 50; i++ {
podVolumes, reasons, err := findPodVolumes(testEnv.binder, pod, testNode)
podVolumes, reasons, err := findPodVolumes(logger, testEnv.binder, pod, testNode)
if err != nil {
t.Errorf("Test failed: FindPodVolumes returned error: %v", err)
}
@ -2283,11 +2291,12 @@ func TestCapacity(t *testing.T) {
}
run := func(t *testing.T, scenario scenarioType, optIn bool) {
ctx, cancel := context.WithCancel(context.Background())
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Setup: the driver has the feature enabled, but the scheduler might not.
testEnv := newTestBinder(t, ctx.Done())
testEnv := newTestBinder(t, ctx)
testEnv.addCSIDriver(makeCSIDriver(provisioner, optIn))
testEnv.addCSIStorageCapacities(scenario.capacities)
@ -2301,7 +2310,7 @@ func TestCapacity(t *testing.T) {
withPVCSVolume(scenario.pvcs).Pod
// Execute
podVolumes, reasons, err := findPodVolumes(testEnv.binder, pod, testNode)
podVolumes, reasons, err := findPodVolumes(logger, testEnv.binder, pod, testNode)
// Validate
shouldFail := scenario.shouldFail
@ -2431,18 +2440,19 @@ func TestGetEligibleNodes(t *testing.T) {
}
run := func(t *testing.T, scenario scenarioType) {
ctx, cancel := context.WithCancel(context.Background())
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Setup
testEnv := newTestBinder(t, ctx.Done())
testEnv := newTestBinder(t, ctx)
testEnv.initVolumes(scenario.pvs, scenario.pvs)
testEnv.initNodes(scenario.nodes)
testEnv.initClaims(scenario.pvcs, scenario.pvcs)
// Execute
eligibleNodes := testEnv.binder.GetEligibleNodes(scenario.pvcs)
eligibleNodes := testEnv.binder.GetEligibleNodes(logger, scenario.pvcs)
// Validate
if reflect.DeepEqual(scenario.eligibleNodes, eligibleNodes) {

View File

@ -21,6 +21,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
)
// FakeVolumeBinderConfig holds configurations for fake volume binder.
@ -50,22 +51,22 @@ type FakeVolumeBinder struct {
var _ SchedulerVolumeBinder = &FakeVolumeBinder{}
// GetPodVolumeClaims implements SchedulerVolumeBinder.GetPodVolumes.
func (b *FakeVolumeBinder) GetPodVolumeClaims(pod *v1.Pod) (podVolumeClaims *PodVolumeClaims, err error) {
func (b *FakeVolumeBinder) GetPodVolumeClaims(_ klog.Logger, pod *v1.Pod) (podVolumeClaims *PodVolumeClaims, err error) {
return &PodVolumeClaims{}, nil
}
// GetEligibleNodes implements SchedulerVolumeBinder.GetEligibleNodes.
func (b *FakeVolumeBinder) GetEligibleNodes(boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.Set[string]) {
func (b *FakeVolumeBinder) GetEligibleNodes(_ klog.Logger, boundClaims []*v1.PersistentVolumeClaim) (eligibleNodes sets.Set[string]) {
return nil
}
// FindPodVolumes implements SchedulerVolumeBinder.FindPodVolumes.
func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, _ *PodVolumeClaims, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) {
func (b *FakeVolumeBinder) FindPodVolumes(_ klog.Logger, pod *v1.Pod, _ *PodVolumeClaims, node *v1.Node) (podVolumes *PodVolumes, reasons ConflictReasons, err error) {
return nil, b.config.FindReasons, b.config.FindErr
}
// AssumePodVolumes implements SchedulerVolumeBinder.AssumePodVolumes.
func (b *FakeVolumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (bool, error) {
func (b *FakeVolumeBinder) AssumePodVolumes(_ klog.Logger, assumedPod *v1.Pod, nodeName string, podVolumes *PodVolumes) (bool, error) {
b.AssumeCalled = true
return b.config.AllBound, b.config.AssumeErr
}

View File

@ -161,6 +161,7 @@ func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) (bool, error) {
// immediate PVCs bound. If not all immediate PVCs are bound, an
// UnschedulableAndUnresolvable is returned.
func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
logger := klog.FromContext(ctx)
// If pod does not reference any PVC, we don't need to do anything.
if hasPVC, err := pl.podHasPVCs(pod); err != nil {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
@ -168,7 +169,7 @@ func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleSt
state.Write(stateKey, &stateData{})
return nil, framework.NewStatus(framework.Skip)
}
podVolumeClaims, err := pl.Binder.GetPodVolumeClaims(pod)
podVolumeClaims, err := pl.Binder.GetPodVolumeClaims(logger, pod)
if err != nil {
return nil, framework.AsStatus(err)
}
@ -182,7 +183,7 @@ func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleSt
}
// Attempt to reduce down the number of nodes to consider in subsequent scheduling stages if pod has bound claims.
var result *framework.PreFilterResult
if eligibleNodes := pl.Binder.GetEligibleNodes(podVolumeClaims.boundClaims); eligibleNodes != nil {
if eligibleNodes := pl.Binder.GetEligibleNodes(logger, podVolumeClaims.boundClaims); eligibleNodes != nil {
result = &framework.PreFilterResult{
NodeNames: eligibleNodes,
}
@ -232,6 +233,7 @@ func getStateData(cs *framework.CycleState) (*stateData, error) {
// The predicate returns true if all bound PVCs have compatible PVs with the node, and if all unbound
// PVCs can be matched with an available and node-compatible PV.
func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
logger := klog.FromContext(ctx)
node := nodeInfo.Node()
state, err := getStateData(cs)
@ -239,7 +241,7 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p
return framework.AsStatus(err)
}
podVolumes, reasons, err := pl.Binder.FindPodVolumes(pod, state.podVolumeClaims, node)
podVolumes, reasons, err := pl.Binder.FindPodVolumes(logger, pod, state.podVolumeClaims, node)
if err != nil {
return framework.AsStatus(err)
@ -304,7 +306,7 @@ func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState,
// we don't need to hold the lock as only one node will be reserved for the given pod
podVolumes, ok := state.podVolumesByNode[nodeName]
if ok {
allBound, err := pl.Binder.AssumePodVolumes(pod, nodeName, podVolumes)
allBound, err := pl.Binder.AssumePodVolumes(klog.FromContext(ctx), pod, nodeName, podVolumes)
if err != nil {
return framework.AsStatus(err)
}
@ -335,13 +337,14 @@ func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState,
if !ok {
return framework.AsStatus(fmt.Errorf("no pod volumes found for node %q", nodeName))
}
klog.V(5).InfoS("Trying to bind volumes for pod", "pod", klog.KObj(pod))
logger := klog.FromContext(ctx)
logger.V(5).Info("Trying to bind volumes for pod", "pod", klog.KObj(pod))
err = pl.Binder.BindPodVolumes(ctx, pod, podVolumes)
if err != nil {
klog.V(1).InfoS("Failed to bind volumes for pod", "pod", klog.KObj(pod), "err", err)
logger.V(5).Info("Failed to bind volumes for pod", "pod", klog.KObj(pod), "err", err)
return framework.AsStatus(err)
}
klog.V(5).InfoS("Success binding volumes for pod", "pod", klog.KObj(pod))
logger.V(5).Info("Success binding volumes for pod", "pod", klog.KObj(pod))
return nil
}
@ -361,7 +364,7 @@ func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState
}
// New initializes a new plugin and returns it.
func New(_ context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
args, ok := plArgs.(*config.VolumeBindingArgs)
if !ok {
return nil, fmt.Errorf("want args to be of type VolumeBindingArgs, got %T", plArgs)
@ -381,7 +384,7 @@ func New(_ context.Context, plArgs runtime.Object, fh framework.Handle, fts feat
CSIDriverInformer: fh.SharedInformerFactory().Storage().V1().CSIDrivers(),
CSIStorageCapacityInformer: fh.SharedInformerFactory().Storage().V1().CSIStorageCapacities(),
}
binder := NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second)
binder := NewVolumeBinder(klog.FromContext(ctx), fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second)
// build score function
var scorer volumeCapacityScorer