diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go index 80c35f44f28..b11f8c30cb8 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go @@ -25,10 +25,12 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/klog/v2" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" + "k8s.io/kubernetes/pkg/scheduler/util" ) // VolumeRestrictions is a plugin that checks volume restrictions. @@ -321,8 +323,9 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHin // Pods may fail to schedule because of volumes conflicting with other pods on same node. // Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable. // Due to immutable fields `spec.volumes`, pod update events are ignored. - {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}}, + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted}, // A new Node may make a pod schedulable. + // We intentionally don't set QueueingHint since all Node/Add events could make Pods schedulable. {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}}, // Pods may fail to schedule because the PVC it uses has not yet been created. // This PVC is required to exist to check its access modes. @@ -330,6 +333,27 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHin } } +// isSchedulableAfterPodDeleted is invoked whenever a pod deleted, +// It checks whether the deleted pod will conflict with volumes of other pods on the same node +// TODO If we observe good throughput, we will add a check for conflicts between the deleted Pod and the readWriteOncePodPVC of the current Pod. +func (pl *VolumeRestrictions) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + deletedPod, _, err := util.As[*v1.Pod](oldObj, newObj) + if err != nil { + return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPodDeleted: %w", err) + } + + if deletedPod.Namespace != pod.Namespace { + return framework.QueueSkip, nil + } + + nodeInfo := framework.NewNodeInfo(deletedPod) + if !satisfyVolumeConflicts(pod, nodeInfo) { + return framework.Queue, nil + } + + return framework.QueueSkip, nil +} + // New initializes a new plugin and returns it. func New(_ context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go index 61f699779e4..8fe54ed00db 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" @@ -478,6 +479,194 @@ func TestAccessModeConflicts(t *testing.T) { } } +func Test_isSchedulableAfterPodDeleted(t *testing.T) { + GCEDiskVolState := v1.Volume{ + VolumeSource: v1.VolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: "foo", + }, + }, + } + GCEDiskVolState2 := v1.Volume{ + VolumeSource: v1.VolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: "bar", + }, + }, + } + + AWSDiskVolState := v1.Volume{ + VolumeSource: v1.VolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ + VolumeID: "foo", + }, + }, + } + AWSDiskVolState2 := v1.Volume{ + VolumeSource: v1.VolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ + VolumeID: "bar", + }, + }, + } + + RBDDiskVolState := v1.Volume{ + VolumeSource: v1.VolumeSource{ + RBD: &v1.RBDVolumeSource{ + CephMonitors: []string{"a", "b"}, + RBDPool: "foo", + RBDImage: "bar", + FSType: "ext4", + }, + }, + } + RBDDiskVolState2 := v1.Volume{ + VolumeSource: v1.VolumeSource{ + RBD: &v1.RBDVolumeSource{ + CephMonitors: []string{"c", "d"}, + RBDPool: "foo", + RBDImage: "bar", + FSType: "ext4", + }, + }, + } + + ISCSIDiskVolState := v1.Volume{ + VolumeSource: v1.VolumeSource{ + ISCSI: &v1.ISCSIVolumeSource{ + TargetPortal: "127.0.0.1:3260", + IQN: "iqn.2016-12.server:storage.target01", + FSType: "ext4", + Lun: 0, + }, + }, + } + ISCSIDiskVolState2 := v1.Volume{ + VolumeSource: v1.VolumeSource{ + ISCSI: &v1.ISCSIVolumeSource{ + TargetPortal: "127.0.0.1:3260", + IQN: "iqn.2017-12.server:storage.target01", + FSType: "ext4", + Lun: 0, + }, + }, + } + + podGCEDisk := st.MakePod().Volume(GCEDiskVolState).Obj() + podGCEDiskConflicts := st.MakePod().Volume(GCEDiskVolState).Obj() + podGCEDiskNoConflicts := st.MakePod().Volume(GCEDiskVolState2).Obj() + + podAWSDisk := st.MakePod().Volume(AWSDiskVolState).Obj() + podAWSDiskConflicts := st.MakePod().Volume(AWSDiskVolState).Obj() + podAWSDiskNoConflicts := st.MakePod().Volume(AWSDiskVolState2).Obj() + + podRBDDiskDisk := st.MakePod().Volume(RBDDiskVolState).Obj() + podRBDDiskDiskConflicts := st.MakePod().Volume(RBDDiskVolState).Obj() + podRBDDiskNoConflicts := st.MakePod().Volume(RBDDiskVolState2).Obj() + + podISCSIDiskDisk := st.MakePod().Volume(ISCSIDiskVolState).Obj() + podISCSIDiskConflicts := st.MakePod().Volume(ISCSIDiskVolState).Obj() + podISCSIDiskNoConflicts := st.MakePod().Volume(ISCSIDiskVolState2).Obj() + + testcases := map[string]struct { + pod *v1.Pod + oldObj, newObj interface{} + existingPods []*v1.Pod + existingPVC *v1.PersistentVolumeClaim + expectedHint framework.QueueingHint + expectedErr bool + }{ + "queue-new-object-gcedisk-conflict": { + pod: podGCEDisk, + oldObj: podGCEDiskConflicts, + existingPods: []*v1.Pod{}, + existingPVC: &v1.PersistentVolumeClaim{}, + expectedHint: framework.Queue, + expectedErr: false, + }, + "skip-new-object-gcedisk-no-conflict": { + pod: podGCEDisk, + oldObj: podGCEDiskNoConflicts, + existingPods: []*v1.Pod{}, + existingPVC: &v1.PersistentVolumeClaim{}, + expectedHint: framework.QueueSkip, + expectedErr: false, + }, + "queue-new-object-awsdisk-conflict": { + pod: podAWSDisk, + oldObj: podAWSDiskConflicts, + existingPods: []*v1.Pod{}, + existingPVC: &v1.PersistentVolumeClaim{}, + expectedHint: framework.Queue, + expectedErr: false, + }, + "skip-new-object-awsdisk-no-conflict": { + pod: podAWSDisk, + oldObj: podAWSDiskNoConflicts, + existingPods: []*v1.Pod{}, + existingPVC: &v1.PersistentVolumeClaim{}, + expectedHint: framework.QueueSkip, + expectedErr: false, + }, + "queue-new-object-rbddisk-conflict": { + pod: podRBDDiskDisk, + oldObj: podRBDDiskDiskConflicts, + existingPods: []*v1.Pod{}, + existingPVC: &v1.PersistentVolumeClaim{}, + expectedHint: framework.Queue, + expectedErr: false, + }, + "skip-new-object-rbddisk-no-conflict": { + pod: podRBDDiskDisk, + oldObj: podRBDDiskNoConflicts, + existingPods: []*v1.Pod{}, + existingPVC: &v1.PersistentVolumeClaim{}, + expectedHint: framework.QueueSkip, + expectedErr: false, + }, + "queue-new-object-iscsidisk-conflict": { + pod: podISCSIDiskDisk, + oldObj: podISCSIDiskConflicts, + existingPods: []*v1.Pod{}, + existingPVC: &v1.PersistentVolumeClaim{}, + expectedHint: framework.Queue, + expectedErr: false, + }, + "skip-new-object-iscsidisk-no-conflict": { + pod: podISCSIDiskDisk, + oldObj: podISCSIDiskNoConflicts, + existingPods: []*v1.Pod{}, + existingPVC: &v1.PersistentVolumeClaim{}, + expectedHint: framework.QueueSkip, + expectedErr: false, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + p := newPluginWithListers(ctx, t, tc.existingPods, nil, []*v1.PersistentVolumeClaim{tc.existingPVC}) + + actualHint, err := p.(*VolumeRestrictions).isSchedulableAfterPodDeleted(logger, tc.pod, tc.oldObj, nil) + if tc.expectedErr { + if err == nil { + t.Error("Expect error, but got nil") + } + return + } + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + if diff := cmp.Diff(tc.expectedHint, actualHint); diff != "" { + t.Errorf("Unexpected QueueingHint (-want, +got): %s", diff) + } + }) + } +} + func newPlugin(ctx context.Context, t *testing.T) framework.Plugin { return newPluginWithListers(ctx, t, nil, nil, nil) }