diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go index 6e7e8ed9794..6790e615943 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go @@ -18,6 +18,7 @@ package volumerestrictions import ( "context" + "fmt" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -40,17 +41,57 @@ type VolumeRestrictions struct { var _ framework.PreFilterPlugin = &VolumeRestrictions{} var _ framework.FilterPlugin = &VolumeRestrictions{} var _ framework.EnqueueExtensions = &VolumeRestrictions{} - -// Name is the name of the plugin used in the plugin registry and configurations. -const Name = names.VolumeRestrictions +var _ framework.StateData = &preFilterState{} const ( + // Name is the name of the plugin used in the plugin registry and configurations. + Name = names.VolumeRestrictions + // preFilterStateKey is the key in CycleState to VolumeRestrictions pre-computed data for Filtering. + // Using the name of the plugin will likely help us avoid collisions with other plugins. + preFilterStateKey = "PreFilter" + Name + // ErrReasonDiskConflict is used for NoDiskConflict predicate error. ErrReasonDiskConflict = "node(s) had no available disk" // ErrReasonReadWriteOncePodConflict is used when a pod is found using the same PVC with the ReadWriteOncePod access mode. ErrReasonReadWriteOncePodConflict = "node has pod using PersistentVolumeClaim with the same name and ReadWriteOncePod access mode" ) +// preFilterState computed at PreFilter and used at Filter. +type preFilterState struct { + // Names of the pod's volumes using the ReadWriteOncePod access mode. + readWriteOncePodPVCs sets.Set[string] + // The number of references to these ReadWriteOncePod volumes by scheduled pods. + conflictingPVCRefCount int +} + +func (s *preFilterState) updateWithPod(podInfo *framework.PodInfo, multiplier int) { + s.conflictingPVCRefCount += multiplier * s.conflictingPVCRefCountForPod(podInfo) +} + +func (s *preFilterState) conflictingPVCRefCountForPod(podInfo *framework.PodInfo) int { + conflicts := 0 + for _, volume := range podInfo.Pod.Spec.Volumes { + if volume.PersistentVolumeClaim == nil { + continue + } + if s.readWriteOncePodPVCs.Has(volume.PersistentVolumeClaim.ClaimName) { + conflicts += 1 + } + } + return conflicts +} + +// Clone the prefilter state. +func (s *preFilterState) Clone() framework.StateData { + if s == nil { + return nil + } + return &preFilterState{ + readWriteOncePodPVCs: s.readWriteOncePodPVCs, + conflictingPVCRefCount: s.conflictingPVCRefCount, + } +} + // Name returns name of the plugin. It is used in logs, etc. func (pl *VolumeRestrictions) Name() string { return Name @@ -117,18 +158,87 @@ func haveOverlap(a1, a2 []string) bool { return false } +// PreFilter computes and stores cycleState containing details for enforcing ReadWriteOncePod. func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { - if pl.enableReadWriteOncePod { - return nil, pl.isReadWriteOncePodAccessModeConflict(ctx, pod) + if !pl.enableReadWriteOncePod { + return nil, nil } - return nil, framework.NewStatus(framework.Success) + + pvcs, err := pl.readWriteOncePodPVCsForPod(ctx, pod) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) + } + return nil, framework.AsStatus(err) + } + + s, err := pl.calPreFilterState(ctx, pod, pvcs) + if err != nil { + return nil, framework.AsStatus(err) + } + cycleState.Write(preFilterStateKey, s) + return nil, nil } -// isReadWriteOncePodAccessModeConflict checks if a pod uses a PVC with the ReadWriteOncePod access mode. -// This access mode restricts volume access to a single pod on a single node. Since only a single pod can -// use a ReadWriteOncePod PVC, mark any other pods attempting to use this PVC as UnschedulableAndUnresolvable. -// TODO(#103132): Mark pod as Unschedulable and add preemption logic. -func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(ctx context.Context, pod *v1.Pod) *framework.Status { +// AddPod from pre-computed data in cycleState. +func (pl *VolumeRestrictions) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { + if !pl.enableReadWriteOncePod { + return nil + } + state, err := getPreFilterState(cycleState) + if err != nil { + return framework.AsStatus(err) + } + state.updateWithPod(podInfoToAdd, 1) + return nil +} + +// RemovePod from pre-computed data in cycleState. +func (pl *VolumeRestrictions) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { + if !pl.enableReadWriteOncePod { + return nil + } + state, err := getPreFilterState(cycleState) + if err != nil { + return framework.AsStatus(err) + } + state.updateWithPod(podInfoToRemove, -1) + return nil +} + +func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) { + c, err := cycleState.Read(preFilterStateKey) + if err != nil { + // preFilterState doesn't exist, likely PreFilter wasn't invoked. + return nil, fmt.Errorf("cannot read %q from cycleState", preFilterStateKey) + } + + s, ok := c.(*preFilterState) + if !ok { + return nil, fmt.Errorf("%+v convert to volumerestrictions.state error", c) + } + return s, nil +} + +// calPreFilterState computes preFilterState describing which PVCs use ReadWriteOncePod +// and which pods in the cluster are in conflict. +func (pl *VolumeRestrictions) calPreFilterState(ctx context.Context, pod *v1.Pod, pvcs sets.Set[string]) (*preFilterState, error) { + conflictingPVCRefCount := 0 + for pvc := range pvcs { + key := framework.GetNamespacedName(pod.Namespace, pvc) + if pl.sharedLister.StorageInfos().IsPVCUsedByPods(key) { + // There can only be at most one pod using the ReadWriteOncePod PVC. + conflictingPVCRefCount += 1 + } + } + return &preFilterState{ + readWriteOncePodPVCs: pvcs, + conflictingPVCRefCount: conflictingPVCRefCount, + }, nil +} + +func (pl *VolumeRestrictions) readWriteOncePodPVCsForPod(ctx context.Context, pod *v1.Pod) (sets.Set[string], error) { + pvcs := sets.New[string]() for _, volume := range pod.Spec.Volumes { if volume.PersistentVolumeClaim == nil { continue @@ -136,27 +246,50 @@ func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(ctx context.C pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName) if err != nil { - if apierrors.IsNotFound(err) { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) - } - return framework.AsStatus(err) + return nil, err } if !v1helper.ContainsAccessMode(pvc.Spec.AccessModes, v1.ReadWriteOncePod) { continue } + pvcs.Insert(pvc.Name) + } + return pvcs, nil +} - key := framework.GetNamespacedName(pod.Namespace, volume.PersistentVolumeClaim.ClaimName) - if pl.sharedLister.StorageInfos().IsPVCUsedByPods(key) { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict) +// Checks if scheduling the pod onto this node would cause any conflicts with +// existing volumes. +func satisfyVolumeConflicts(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool { + for i := range pod.Spec.Volumes { + v := &pod.Spec.Volumes[i] + // fast path if there is no conflict checking targets. + if v.GCEPersistentDisk == nil && v.AWSElasticBlockStore == nil && v.RBD == nil && v.ISCSI == nil { + continue + } + + for _, ev := range nodeInfo.Pods { + if isVolumeConflict(v, ev.Pod) { + return false + } } } + return true +} +// Checks if scheduling the pod would cause any ReadWriteOncePod PVC access mode conflicts. +func satisfyReadWriteOncePod(ctx context.Context, state *preFilterState) *framework.Status { + if state == nil { + return nil + } + if state.conflictingPVCRefCount > 0 { + return framework.NewStatus(framework.Unschedulable, ErrReasonReadWriteOncePodConflict) + } return nil } +// PreFilterExtensions returns prefilter extensions, pod add and remove. func (pl *VolumeRestrictions) PreFilterExtensions() framework.PreFilterExtensions { - return nil + return pl } // Filter invoked at the filter extension point. @@ -168,21 +301,20 @@ func (pl *VolumeRestrictions) PreFilterExtensions() framework.PreFilterExtension // - AWS EBS forbids any two pods mounting the same volume ID // - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image, and the image is read-only // - ISCSI forbids if any two pods share at least same IQN and ISCSI volume is read-only -func (pl *VolumeRestrictions) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { - for i := range pod.Spec.Volumes { - v := &pod.Spec.Volumes[i] - // fast path if there is no conflict checking targets. - if v.GCEPersistentDisk == nil && v.AWSElasticBlockStore == nil && v.RBD == nil && v.ISCSI == nil { - continue - } - - for _, ev := range nodeInfo.Pods { - if isVolumeConflict(v, ev.Pod) { - return framework.NewStatus(framework.Unschedulable, ErrReasonDiskConflict) - } - } +// If the pod uses PVCs with the ReadWriteOncePod access mode, it evaluates if +// these PVCs are already in-use and if preemption will help. +func (pl *VolumeRestrictions) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + if !satisfyVolumeConflicts(pod, nodeInfo) { + return framework.NewStatus(framework.Unschedulable, ErrReasonDiskConflict) } - return nil + if !pl.enableReadWriteOncePod { + return nil + } + state, err := getPreFilterState(cycleState) + if err != nil { + return framework.AsStatus(err) + } + return satisfyReadWriteOncePod(ctx, state) } // EventsToRegister returns the possible events that may make a Pod diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go index 088eeb9d05d..7007db47616 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go @@ -21,6 +21,7 @@ import ( "reflect" "testing" + "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -69,7 +70,9 @@ func TestGCEDiskConflicts(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() p := newPlugin(ctx, t) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) + cycleState := framework.NewCycleState() + p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -111,7 +114,9 @@ func TestAWSDiskConflicts(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() p := newPlugin(ctx, t) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) + cycleState := framework.NewCycleState() + p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -159,7 +164,9 @@ func TestRBDDiskConflicts(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() p := newPlugin(ctx, t) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) + cycleState := framework.NewCycleState() + p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -207,7 +214,9 @@ func TestISCSIDiskConflicts(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() p := newPlugin(ctx, t) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) + cycleState := framework.NewCycleState() + p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -219,7 +228,10 @@ func TestAccessModeConflicts(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ReadWriteOncePod, true)() // Required for querying lister for PVCs in the same namespace. - podWithReadWriteOncePodPVC := st.MakePod().Name("pod-with-rwop").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwop").Node("node-1").Obj() + podWithOnePVC := st.MakePod().Name("pod-with-one-pvc").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwop-1").Node("node-1").Obj() + podWithTwoPVCs := st.MakePod().Name("pod-with-two-pvcs").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwop-1").PVC("claim-with-rwop-2").Node("node-1").Obj() + podWithOneConflict := st.MakePod().Name("pod-with-one-conflict").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwop-1").Node("node-1").Obj() + podWithTwoConflicts := st.MakePod().Name("pod-with-two-conflicts").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwop-1").PVC("claim-with-rwop-2").Node("node-1").Obj() // Required for querying lister for PVCs in the same namespace. podWithReadWriteManyPVC := st.MakePod().Name("pod-with-rwx").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwx").Node("node-1").Obj() @@ -230,10 +242,19 @@ func TestAccessModeConflicts(t *testing.T) { }, } - readWriteOncePodPVC := &v1.PersistentVolumeClaim{ + readWriteOncePodPVC1 := &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", - Name: "claim-with-rwop", + Name: "claim-with-rwop-1", + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}, + }, + } + readWriteOncePodPVC2 := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "claim-with-rwop-2", }, Spec: v1.PersistentVolumeClaimSpec{ AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}, @@ -252,47 +273,68 @@ func TestAccessModeConflicts(t *testing.T) { tests := []struct { name string pod *v1.Pod + nodeInfo *framework.NodeInfo existingPods []*v1.Pod existingNodes []*v1.Node existingPVCs []*v1.PersistentVolumeClaim enableReadWriteOncePod bool + preFilterWantStatus *framework.Status wantStatus *framework.Status }{ { name: "nothing", pod: &v1.Pod{}, + nodeInfo: framework.NewNodeInfo(), existingPods: []*v1.Pod{}, existingNodes: []*v1.Node{}, existingPVCs: []*v1.PersistentVolumeClaim{}, enableReadWriteOncePod: true, + preFilterWantStatus: nil, wantStatus: nil, }, { name: "failed to get PVC", - pod: podWithReadWriteOncePodPVC, + pod: podWithOnePVC, + nodeInfo: framework.NewNodeInfo(), existingPods: []*v1.Pod{}, existingNodes: []*v1.Node{}, existingPVCs: []*v1.PersistentVolumeClaim{}, enableReadWriteOncePod: true, - wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "persistentvolumeclaim \"claim-with-rwop\" not found"), - }, - { - name: "no access mode conflict", - pod: podWithReadWriteOncePodPVC, - existingPods: []*v1.Pod{podWithReadWriteManyPVC}, - existingNodes: []*v1.Node{node}, - existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC, readWriteManyPVC}, - enableReadWriteOncePod: true, + preFilterWantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "persistentvolumeclaim \"claim-with-rwop-1\" not found"), wantStatus: nil, }, { - name: "access mode conflict", - pod: podWithReadWriteOncePodPVC, - existingPods: []*v1.Pod{podWithReadWriteOncePodPVC, podWithReadWriteManyPVC}, + name: "no access mode conflict", + pod: podWithOnePVC, + nodeInfo: framework.NewNodeInfo(podWithReadWriteManyPVC), + existingPods: []*v1.Pod{podWithReadWriteManyPVC}, existingNodes: []*v1.Node{node}, - existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC, readWriteManyPVC}, + existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC1, readWriteManyPVC}, enableReadWriteOncePod: true, - wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict), + preFilterWantStatus: nil, + wantStatus: nil, + }, + { + name: "access mode conflict, unschedulable", + pod: podWithOneConflict, + nodeInfo: framework.NewNodeInfo(podWithOnePVC, podWithReadWriteManyPVC), + existingPods: []*v1.Pod{podWithOnePVC, podWithReadWriteManyPVC}, + existingNodes: []*v1.Node{node}, + existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC1, readWriteManyPVC}, + enableReadWriteOncePod: true, + preFilterWantStatus: nil, + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonReadWriteOncePodConflict), + }, + { + name: "two conflicts, unschedulable", + pod: podWithTwoConflicts, + nodeInfo: framework.NewNodeInfo(podWithTwoPVCs, podWithReadWriteManyPVC), + existingPods: []*v1.Pod{podWithTwoPVCs, podWithReadWriteManyPVC}, + existingNodes: []*v1.Node{node}, + existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC1, readWriteOncePodPVC2, readWriteManyPVC}, + enableReadWriteOncePod: true, + preFilterWantStatus: nil, + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonReadWriteOncePodConflict), }, } @@ -301,9 +343,17 @@ func TestAccessModeConflicts(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() p := newPluginWithListers(ctx, t, test.existingPods, test.existingNodes, test.existingPVCs, test.enableReadWriteOncePod) - _, gotStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), nil, test.pod) - if !reflect.DeepEqual(gotStatus, test.wantStatus) { - t.Errorf("status does not match: %+v, want: %+v", gotStatus, test.wantStatus) + cycleState := framework.NewCycleState() + _, preFilterGotStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod) + if diff := cmp.Diff(test.preFilterWantStatus, preFilterGotStatus); diff != "" { + t.Errorf("Unexpected PreFilter status (-want, +got): %s", diff) + } + // If PreFilter fails, then Filter will not run. + if test.preFilterWantStatus.IsSuccess() { + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo) + if diff := cmp.Diff(test.wantStatus, gotStatus); diff != "" { + t.Errorf("Unexpected Filter status (-want, +got): %s", diff) + } } }) } diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index f33180cae2f..ceac2107db1 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -38,6 +38,7 @@ import ( clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/component-helpers/storage/volume" "k8s.io/klog/v2" configv1 "k8s.io/kube-scheduler/config/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" @@ -46,6 +47,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler" configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/kubernetes/plugin/pkg/admission/priority" @@ -1607,3 +1609,355 @@ func TestPreferNominatedNode(t *testing.T) { }) } } + +// TestReadWriteOncePodPreemption tests preemption scenarios for pods with +// ReadWriteOncePod PVCs. +func TestReadWriteOncePodPreemption(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ReadWriteOncePod, true)() + + cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ + Profiles: []configv1.KubeSchedulerProfile{{ + SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), + Plugins: &configv1.Plugins{ + Filter: configv1.PluginSet{ + Enabled: []configv1.Plugin{ + {Name: volumerestrictions.Name}, + }, + }, + PreFilter: configv1.PluginSet{ + Enabled: []configv1.Plugin{ + {Name: volumerestrictions.Name}, + }, + }, + }, + }}, + }) + + testCtx := testutils.InitTestSchedulerWithOptions(t, + testutils.InitTestAPIServer(t, "preemption", nil), + 0, + scheduler.WithProfiles(cfg.Profiles...)) + testutils.SyncInformerFactory(testCtx) + go testCtx.Scheduler.Run(testCtx.Ctx) + + defer testutils.CleanupTest(t, testCtx) + cs := testCtx.ClientSet + + storage := v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}} + volType := v1.HostPathDirectoryOrCreate + pv1 := st.MakePersistentVolume(). + Name("pv-with-read-write-once-pod-1"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Capacity(storage.Requests). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/mnt1", Type: &volType}). + Obj() + pvc1 := st.MakePersistentVolumeClaim(). + Name("pvc-with-read-write-once-pod-1"). + Namespace(testCtx.NS.Name). + // Annotation and volume name required for PVC to be considered bound. + Annotation(volume.AnnBindCompleted, "true"). + VolumeName(pv1.Name). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(storage). + Obj() + pv2 := st.MakePersistentVolume(). + Name("pv-with-read-write-once-pod-2"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Capacity(storage.Requests). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/mnt2", Type: &volType}). + Obj() + pvc2 := st.MakePersistentVolumeClaim(). + Name("pvc-with-read-write-once-pod-2"). + Namespace(testCtx.NS.Name). + // Annotation and volume name required for PVC to be considered bound. + Annotation(volume.AnnBindCompleted, "true"). + VolumeName(pv2.Name). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(storage). + Obj() + + tests := []struct { + name string + init func() error + existingPods []*v1.Pod + pod *v1.Pod + unresolvable bool + preemptedPodIndexes map[int]struct{} + cleanup func() error + }{ + { + name: "preempt single pod", + init: func() error { + _, err := testutils.CreatePV(cs, pv1) + if err != nil { + return fmt.Errorf("cannot create pv: %v", err) + } + _, err = testutils.CreatePVC(cs, pvc1) + if err != nil { + return fmt.Errorf("cannot create pvc: %v", err) + } + return nil + }, + existingPods: []*v1.Pod{ + initPausePod(&testutils.PausePodConfig{ + Name: "victim-pod", + Namespace: testCtx.NS.Name, + Priority: &lowPriority, + Volumes: []v1.Volume{{ + Name: "volume", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + }}, + }), + }, + pod: initPausePod(&testutils.PausePodConfig{ + Name: "preemptor-pod", + Namespace: testCtx.NS.Name, + Priority: &highPriority, + Volumes: []v1.Volume{{ + Name: "volume", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + }}, + }), + preemptedPodIndexes: map[int]struct{}{0: {}}, + cleanup: func() error { + if err := testutils.DeletePVC(cs, pvc1.Name, pvc1.Namespace); err != nil { + return fmt.Errorf("cannot delete pvc: %v", err) + } + if err := testutils.DeletePV(cs, pv1.Name); err != nil { + return fmt.Errorf("cannot delete pv: %v", err) + } + return nil + }, + }, + { + name: "preempt two pods", + init: func() error { + for _, pv := range []*v1.PersistentVolume{pv1, pv2} { + _, err := testutils.CreatePV(cs, pv) + if err != nil { + return fmt.Errorf("cannot create pv: %v", err) + } + } + for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} { + _, err := testutils.CreatePVC(cs, pvc) + if err != nil { + return fmt.Errorf("cannot create pvc: %v", err) + } + } + return nil + }, + existingPods: []*v1.Pod{ + initPausePod(&testutils.PausePodConfig{ + Name: "victim-pod-1", + Namespace: testCtx.NS.Name, + Priority: &lowPriority, + Volumes: []v1.Volume{{ + Name: "volume", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + }}, + }), + initPausePod(&testutils.PausePodConfig{ + Name: "victim-pod-2", + Namespace: testCtx.NS.Name, + Priority: &lowPriority, + Volumes: []v1.Volume{{ + Name: "volume", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc2.Name, + }, + }, + }}, + }), + }, + pod: initPausePod(&testutils.PausePodConfig{ + Name: "preemptor-pod", + Namespace: testCtx.NS.Name, + Priority: &highPriority, + Volumes: []v1.Volume{ + { + Name: "volume-1", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + }, + { + Name: "volume-2", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc2.Name, + }, + }, + }, + }, + }), + preemptedPodIndexes: map[int]struct{}{0: {}, 1: {}}, + cleanup: func() error { + for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} { + if err := testutils.DeletePVC(cs, pvc.Name, pvc.Namespace); err != nil { + return fmt.Errorf("cannot delete pvc: %v", err) + } + } + for _, pv := range []*v1.PersistentVolume{pv1, pv2} { + if err := testutils.DeletePV(cs, pv.Name); err != nil { + return fmt.Errorf("cannot delete pv: %v", err) + } + } + return nil + }, + }, + { + name: "preempt single pod with two volumes", + init: func() error { + for _, pv := range []*v1.PersistentVolume{pv1, pv2} { + _, err := testutils.CreatePV(cs, pv) + if err != nil { + return fmt.Errorf("cannot create pv: %v", err) + } + } + for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} { + _, err := testutils.CreatePVC(cs, pvc) + if err != nil { + return fmt.Errorf("cannot create pvc: %v", err) + } + } + return nil + }, + existingPods: []*v1.Pod{ + initPausePod(&testutils.PausePodConfig{ + Name: "victim-pod", + Namespace: testCtx.NS.Name, + Priority: &lowPriority, + Volumes: []v1.Volume{ + { + Name: "volume-1", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + }, + { + Name: "volume-2", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc2.Name, + }, + }, + }, + }, + }), + }, + pod: initPausePod(&testutils.PausePodConfig{ + Name: "preemptor-pod", + Namespace: testCtx.NS.Name, + Priority: &highPriority, + Volumes: []v1.Volume{ + { + Name: "volume-1", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + }, + { + Name: "volume-2", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc2.Name, + }, + }, + }, + }, + }), + preemptedPodIndexes: map[int]struct{}{0: {}}, + cleanup: func() error { + for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} { + if err := testutils.DeletePVC(cs, pvc.Name, pvc.Namespace); err != nil { + return fmt.Errorf("cannot delete pvc: %v", err) + } + } + for _, pv := range []*v1.PersistentVolume{pv1, pv2} { + if err := testutils.DeletePV(cs, pv.Name); err != nil { + return fmt.Errorf("cannot delete pv: %v", err) + } + } + return nil + }, + }, + } + + // Create a node with some resources and a label. + nodeRes := map[v1.ResourceName]string{ + v1.ResourcePods: "32", + v1.ResourceCPU: "500m", + v1.ResourceMemory: "500", + } + nodeObject := st.MakeNode().Name("node1").Capacity(nodeRes).Label("node", "node1").Obj() + if _, err := createNode(cs, nodeObject); err != nil { + t.Fatalf("Error creating node: %v", err) + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if err := test.init(); err != nil { + t.Fatalf("Error while initializing test: %v", err) + } + + pods := make([]*v1.Pod, len(test.existingPods)) + t.Cleanup(func() { + testutils.CleanupPods(cs, t, pods) + if err := test.cleanup(); err != nil { + t.Errorf("Error cleaning up test: %v", err) + } + }) + // Create and run existingPods. + for i, p := range test.existingPods { + var err error + pods[i], err = runPausePod(cs, p) + if err != nil { + t.Fatalf("Error running pause pod: %v", err) + } + } + // Create the "pod". + preemptor, err := createPausePod(cs, test.pod) + if err != nil { + t.Errorf("Error while creating high priority pod: %v", err) + } + pods = append(pods, preemptor) + // Wait for preemption of pods and make sure the other ones are not preempted. + for i, p := range pods { + if _, found := test.preemptedPodIndexes[i]; found { + if err = wait.Poll(time.Second, wait.ForeverTestTimeout, podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil { + t.Errorf("Pod %v/%v is not getting evicted.", p.Namespace, p.Name) + } + } else { + if p.DeletionTimestamp != nil { + t.Errorf("Didn't expect pod %v to get preempted.", p.Name) + } + } + } + // Also check that the preemptor pod gets the NominatedNodeName field set. + if len(test.preemptedPodIndexes) > 0 { + if err := waitForNominatedNodeName(cs, preemptor); err != nil { + t.Errorf("NominatedNodeName field was not set for pod %v: %v", preemptor.Name, err) + } + } + }) + } +} diff --git a/test/integration/util/util.go b/test/integration/util/util.go index ae2fa61b8e7..b058b49ed3f 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -688,6 +688,16 @@ func CreatePV(cs clientset.Interface, pv *v1.PersistentVolume) (*v1.PersistentVo return cs.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}) } +// DeletePVC deletes the given PVC in the given namespace. +func DeletePVC(cs clientset.Interface, pvcName string, nsName string) error { + return cs.CoreV1().PersistentVolumeClaims(nsName).Delete(context.TODO(), pvcName, *metav1.NewDeleteOptions(0)) +} + +// DeletePV deletes the given PV in the given namespace. +func DeletePV(cs clientset.Interface, pvName string) error { + return cs.CoreV1().PersistentVolumes().Delete(context.TODO(), pvName, *metav1.NewDeleteOptions(0)) +} + // RunPausePod creates a pod with "Pause" image and the given config and waits // until it is scheduled. It returns its pointer and error status. func RunPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {