diff --git a/pkg/scheduler/framework/plugins/volumebinding/binder.go b/pkg/scheduler/framework/plugins/volumebinding/binder.go index ed98ba7da14..80f09b30ca8 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/binder.go +++ b/pkg/scheduler/framework/plugins/volumebinding/binder.go @@ -27,6 +27,7 @@ import ( v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" @@ -1048,12 +1049,16 @@ func (b *volumeBinder) hasEnoughCapacity(logger klog.Logger, provisioner string, } func capacitySufficient(capacity *storagev1.CSIStorageCapacity, sizeInBytes int64) bool { - limit := capacity.Capacity + limit := volumeLimit(capacity) + return limit != nil && limit.Value() >= sizeInBytes +} + +func volumeLimit(capacity *storagev1.CSIStorageCapacity) *resource.Quantity { if capacity.MaximumVolumeSize != nil { // Prefer MaximumVolumeSize if available, it is more precise. - limit = capacity.MaximumVolumeSize + return capacity.MaximumVolumeSize } - return limit != nil && limit.Value() >= sizeInBytes + return capacity.Capacity } func (b *volumeBinder) nodeHasAccess(logger klog.Logger, node *v1.Node, capacity *storagev1.CSIStorageCapacity) bool { diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index c143c01b1d9..2dcb72fdccb 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -126,7 +126,7 @@ func (pl *VolumeBinding) EventsToRegister(_ context.Context) ([]framework.Cluste // When CSIStorageCapacity is enabled, pods may become schedulable // on CSI driver & storage capacity changes. {Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.Add | framework.Update}}, - {Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update}}, + {Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterCSIStorageCapacityChange}, } return events, nil } @@ -228,6 +228,47 @@ func (pl *VolumeBinding) isSchedulableAfterStorageClassChange(logger klog.Logger return framework.QueueSkip, nil } +// isSchedulableAfterCSIStorageCapacityChange checks whether a CSIStorageCapacity event +// might make a Pod schedulable or not. +// Any CSIStorageCapacity addition and a CSIStorageCapacity update to volume limit +// (calculated based on capacity and maximumVolumeSize) might make a Pod schedulable. +// Note that an update to nodeTopology and storageClassName is not allowed and +// we don't have to consider while examining the update event. +func (pl *VolumeBinding) isSchedulableAfterCSIStorageCapacityChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + oldCap, newCap, err := util.As[*storagev1.CSIStorageCapacity](oldObj, newObj) + if err != nil { + return framework.Queue, err + } + + if oldCap == nil { + logger.V(5).Info( + "A new CSIStorageCapacity was created, which could make a Pod schedulable", + "Pod", klog.KObj(pod), + "CSIStorageCapacity", klog.KObj(newCap), + ) + return framework.Queue, nil + } + + oldLimit := volumeLimit(oldCap) + newLimit := volumeLimit(newCap) + + logger = klog.LoggerWithValues( + logger, + "Pod", klog.KObj(pod), + "CSIStorageCapacity", klog.KObj(newCap), + "volumeLimit(new)", newLimit, + "volumeLimit(old)", oldLimit, + ) + + if newLimit != nil && (oldLimit == nil || newLimit.Value() > oldLimit.Value()) { + logger.V(5).Info("VolumeLimit was increased, which could make a Pod schedulable") + return framework.Queue, nil + } + + logger.V(5).Info("CSIStorageCapacity was updated, but it doesn't make this pod schedulable") + return framework.QueueSkip, nil +} + // podHasPVCs returns 2 values: // - the first one to denote if the given "pod" has any PVC defined. // - the second one to return any error if the requested PVC is illegal. diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go index 2904d0cdab3..78ad84a8e57 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go @@ -1181,3 +1181,232 @@ func TestIsSchedulableAfterStorageClassChange(t *testing.T) { }) } } + +func TestIsSchedulableAfterCSIStorageCapacityChange(t *testing.T) { + table := []struct { + name string + pod *v1.Pod + oldCap interface{} + newCap interface{} + wantErr bool + expect framework.QueueingHint + }{ + { + name: "When a new CSIStorageCapacity is created, it returns Queue", + pod: makePod("pod-a").Pod, + oldCap: nil, + newCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + }, + wantErr: false, + expect: framework.Queue, + }, + { + name: "When the volume limit is increase(Capacity set), it returns Queue", + pod: makePod("pod-a").Pod, + oldCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + }, + newCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + Capacity: resource.NewQuantity(100, resource.DecimalSI), + }, + wantErr: false, + expect: framework.Queue, + }, + { + name: "When the volume limit is increase(MaximumVolumeSize set), it returns Queue", + pod: makePod("pod-a").Pod, + oldCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + }, + newCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + MaximumVolumeSize: resource.NewQuantity(100, resource.DecimalSI), + }, + wantErr: false, + expect: framework.Queue, + }, + { + name: "When the volume limit is increase(Capacity increase), it returns Queue", + pod: makePod("pod-a").Pod, + oldCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + Capacity: resource.NewQuantity(50, resource.DecimalSI), + }, + newCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + Capacity: resource.NewQuantity(100, resource.DecimalSI), + }, + wantErr: false, + expect: framework.Queue, + }, + { + name: "When the volume limit is increase(MaximumVolumeSize unset), it returns Queue", + pod: makePod("pod-a").Pod, + oldCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + Capacity: resource.NewQuantity(100, resource.DecimalSI), + MaximumVolumeSize: resource.NewQuantity(50, resource.DecimalSI), + }, + newCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + Capacity: resource.NewQuantity(100, resource.DecimalSI), + }, + wantErr: false, + expect: framework.Queue, + }, + { + name: "When the volume limit is increase(MaximumVolumeSize increase), it returns Queue", + pod: makePod("pod-a").Pod, + oldCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + Capacity: resource.NewQuantity(100, resource.DecimalSI), + MaximumVolumeSize: resource.NewQuantity(50, resource.DecimalSI), + }, + newCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + Capacity: resource.NewQuantity(100, resource.DecimalSI), + MaximumVolumeSize: resource.NewQuantity(60, resource.DecimalSI), + }, + wantErr: false, + expect: framework.Queue, + }, + { + name: "When there are no changes to the CSIStorageCapacity, it returns QueueSkip", + pod: makePod("pod-a").Pod, + oldCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + Capacity: resource.NewQuantity(100, resource.DecimalSI), + MaximumVolumeSize: resource.NewQuantity(50, resource.DecimalSI), + }, + newCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + Capacity: resource.NewQuantity(100, resource.DecimalSI), + MaximumVolumeSize: resource.NewQuantity(50, resource.DecimalSI), + }, + wantErr: false, + expect: framework.QueueSkip, + }, + { + name: "When the volume limit is equal(Capacity), it returns QueueSkip", + pod: makePod("pod-a").Pod, + oldCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + Capacity: resource.NewQuantity(100, resource.DecimalSI), + }, + newCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + Capacity: resource.NewQuantity(100, resource.DecimalSI), + }, + wantErr: false, + expect: framework.QueueSkip, + }, + { + name: "When the volume limit is equal(MaximumVolumeSize unset), it returns QueueSkip", + pod: makePod("pod-a").Pod, + oldCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + Capacity: resource.NewQuantity(100, resource.DecimalSI), + MaximumVolumeSize: resource.NewQuantity(50, resource.DecimalSI), + }, + newCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + Capacity: resource.NewQuantity(50, resource.DecimalSI), + }, + wantErr: false, + expect: framework.QueueSkip, + }, + { + name: "When the volume limit is decrease(Capacity), it returns QueueSkip", + pod: makePod("pod-a").Pod, + oldCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + Capacity: resource.NewQuantity(100, resource.DecimalSI), + }, + newCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + Capacity: resource.NewQuantity(90, resource.DecimalSI), + }, + wantErr: false, + expect: framework.QueueSkip, + }, + { + name: "When the volume limit is decrease(MaximumVolumeSize), it returns QueueSkip", + pod: makePod("pod-a").Pod, + oldCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + MaximumVolumeSize: resource.NewQuantity(100, resource.DecimalSI), + }, + newCap: &storagev1.CSIStorageCapacity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cap-a", + }, + MaximumVolumeSize: resource.NewQuantity(90, resource.DecimalSI), + }, + wantErr: false, + expect: framework.QueueSkip, + }, + { + name: "type conversion error", + oldCap: new(struct{}), + newCap: new(struct{}), + wantErr: true, + expect: framework.Queue, + }, + } + + for _, item := range table { + t.Run(item.name, func(t *testing.T) { + pl := &VolumeBinding{} + logger, _ := ktesting.NewTestContext(t) + qhint, err := pl.isSchedulableAfterCSIStorageCapacityChange(logger, item.pod, item.oldCap, item.newCap) + if (err != nil) != item.wantErr { + t.Errorf("error is unexpectedly returned or not returned from isSchedulableAfterCSIStorageCapacityChange. wantErr: %v actual error: %q", item.wantErr, err) + } + if qhint != item.expect { + t.Errorf("QHint does not match: %v, want: %v", qhint, item.expect) + } + }) + } +}