diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index 48fb165ef0f..cc9b7a49d26 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" + "k8s.io/kubernetes/pkg/scheduler/util" volumeutil "k8s.io/kubernetes/pkg/volume/util" ) @@ -73,18 +74,42 @@ func (pl *CSILimits) Name() string { return CSIName } -// EventsToRegister returns the possible events that may make a Pod +// EventsToRegister returns the possible events that may make a Pod. // failed by this plugin schedulable. func (pl *CSILimits) EventsToRegister() []framework.ClusterEventWithHint { return []framework.ClusterEventWithHint{ // We don't register any `QueueingHintFn` intentionally // because any new CSINode could make pods that were rejected by CSI volumes schedulable. {Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add}}, - {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}}, + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted}, {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}}, } } +func (pl *CSILimits) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + deletedPod, _, err := util.As[*v1.Pod](oldObj, newObj) + if err != nil { + return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPodDeleted: %w", err) + } + + if len(deletedPod.Spec.Volumes) == 0 { + return framework.QueueSkip, nil + } + + if deletedPod.Spec.NodeName == "" { + return framework.QueueSkip, nil + } + + for _, vol := range deletedPod.Spec.Volumes { + if vol.PersistentVolumeClaim != nil || vol.Ephemeral != nil || pl.translator.IsInlineMigratable(&vol) { + return framework.Queue, nil + } + } + + logger.V(5).Info("The deleted pod does not impact the scheduling of the unscheduled pod", "deletedPod", klog.KObj(pod), "pod", klog.KObj(deletedPod)) + return framework.QueueSkip, nil +} + // PreFilter invoked at the prefilter extension point // // If the pod haven't those types of volumes, we'll skip the Filter phase diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go index c18f8a8bb1b..d74d52b2597 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go @@ -641,6 +641,71 @@ func TestCSILimits(t *testing.T) { } } +func TestCSILimitsQHint(t *testing.T) { + podEbs := st.MakePod().PVC("csi-ebs.csi.aws.com-2") + + tests := []struct { + newPod *v1.Pod + deletedPod *v1.Pod + deletedPodNotScheduled bool + test string + wantQHint framework.QueueingHint + }{ + { + newPod: podEbs.Obj(), + deletedPod: st.MakePod().PVC("placeholder").Obj(), + test: "return a Queue when a deleted pod has a PVC", + wantQHint: framework.Queue, + }, + { + newPod: podEbs.Obj(), + deletedPod: st.MakePod().Volume(v1.Volume{VolumeSource: v1.VolumeSource{AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{}}}).Obj(), + test: "return a Queue when a deleted pod has a inline migratable volume", + wantQHint: framework.Queue, + }, + { + newPod: podEbs.Obj(), + deletedPod: st.MakePod().Obj(), + test: "return a QueueSkip when a deleted pod doesn't have any volume", + wantQHint: framework.QueueSkip, + }, + { + newPod: podEbs.Obj(), + deletedPod: st.MakePod().PVC("csi-ebs.csi.aws.com-0").Obj(), + deletedPodNotScheduled: true, + test: "return a QueueSkip when a deleted pod is not scheduled.", + wantQHint: framework.QueueSkip, + }, + } + + for _, test := range tests { + t.Run(test.test, func(t *testing.T) { + node, csiNode := getNodeWithPodAndVolumeLimits("csiNode", []*v1.Pod{}, 1, "") + if csiNode != nil { + enableMigrationOnNode(csiNode, csilibplugins.AWSEBSDriverName) + } + if !test.deletedPodNotScheduled { + test.deletedPod.Spec.NodeName = node.Node().Name + } else { + test.deletedPod.Spec.NodeName = "" + } + + p := &CSILimits{ + randomVolumeIDPrefix: rand.String(32), + translator: csitrans.New(), + } + logger, _ := ktesting.NewTestContext(t) + qhint, err := p.isSchedulableAfterPodDeleted(logger, test.newPod, test.deletedPod, nil) + if err != nil { + t.Errorf("isSchedulableAfterPodDeleted failed: %v", err) + } + if qhint != test.wantQHint { + t.Errorf("QHint does not match: %v, want: %v", qhint, test.wantQHint) + } + }) + } +} + func getFakeCSIPVLister(volumeName string, driverNames ...string) tf.PersistentVolumeLister { pvLister := tf.PersistentVolumeLister{} for _, driver := range driverNames {