From dbc7d8ded0325988f29a6dbceaba6bd52bf10344 Mon Sep 17 00:00:00 2001 From: Chris Henzie Date: Tue, 22 Nov 2022 12:08:27 -0800 Subject: [PATCH] feat: support preemption for pods using ReadWriteOncePod PVCs PVCs using the ReadWriteOncePod access mode can only be referenced by a single pod. When a pod is scheduled that uses a ReadWriteOncePod PVC, return "Unschedulable" if the PVC is already in-use in the cluster. To support preemption, the "VolumeRestrictions" scheduler plugin computes cycle state during the PreFilter phase. This cycle state contains the number of references to the ReadWriteOncePod PVCs used by the pod-to-be-scheduled. During scheduler simulation (AddPod and RemovePod), we add and remove reference counts from the cycle state if they use any of these ReadWriteOncePod PVCs. In the Filter phase, the scheduler checks if there are any PVC reference conflicts, and returns "Unschedulable" if there is a conflict. This is a required feature for the ReadWriteOncePod beta. See for more context: https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/2485-read-write-once-pod-pv-access-mode#beta --- .../volumerestrictions/volume_restrictions.go | 198 ++++++++-- .../volume_restrictions_test.go | 100 +++-- .../scheduler/preemption/preemption_test.go | 354 ++++++++++++++++++ test/integration/util/util.go | 10 + 4 files changed, 604 insertions(+), 58 deletions(-) diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go index 6e7e8ed9794..6790e615943 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go @@ -18,6 +18,7 @@ package volumerestrictions import ( "context" + "fmt" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -40,17 +41,57 @@ type VolumeRestrictions struct { var _ framework.PreFilterPlugin = &VolumeRestrictions{} var _ framework.FilterPlugin = &VolumeRestrictions{} var _ framework.EnqueueExtensions = &VolumeRestrictions{} - -// Name is the name of the plugin used in the plugin registry and configurations. -const Name = names.VolumeRestrictions +var _ framework.StateData = &preFilterState{} const ( + // Name is the name of the plugin used in the plugin registry and configurations. + Name = names.VolumeRestrictions + // preFilterStateKey is the key in CycleState to VolumeRestrictions pre-computed data for Filtering. + // Using the name of the plugin will likely help us avoid collisions with other plugins. + preFilterStateKey = "PreFilter" + Name + // ErrReasonDiskConflict is used for NoDiskConflict predicate error. ErrReasonDiskConflict = "node(s) had no available disk" // ErrReasonReadWriteOncePodConflict is used when a pod is found using the same PVC with the ReadWriteOncePod access mode. ErrReasonReadWriteOncePodConflict = "node has pod using PersistentVolumeClaim with the same name and ReadWriteOncePod access mode" ) +// preFilterState computed at PreFilter and used at Filter. +type preFilterState struct { + // Names of the pod's volumes using the ReadWriteOncePod access mode. + readWriteOncePodPVCs sets.Set[string] + // The number of references to these ReadWriteOncePod volumes by scheduled pods. + conflictingPVCRefCount int +} + +func (s *preFilterState) updateWithPod(podInfo *framework.PodInfo, multiplier int) { + s.conflictingPVCRefCount += multiplier * s.conflictingPVCRefCountForPod(podInfo) +} + +func (s *preFilterState) conflictingPVCRefCountForPod(podInfo *framework.PodInfo) int { + conflicts := 0 + for _, volume := range podInfo.Pod.Spec.Volumes { + if volume.PersistentVolumeClaim == nil { + continue + } + if s.readWriteOncePodPVCs.Has(volume.PersistentVolumeClaim.ClaimName) { + conflicts += 1 + } + } + return conflicts +} + +// Clone the prefilter state. +func (s *preFilterState) Clone() framework.StateData { + if s == nil { + return nil + } + return &preFilterState{ + readWriteOncePodPVCs: s.readWriteOncePodPVCs, + conflictingPVCRefCount: s.conflictingPVCRefCount, + } +} + // Name returns name of the plugin. It is used in logs, etc. func (pl *VolumeRestrictions) Name() string { return Name @@ -117,18 +158,87 @@ func haveOverlap(a1, a2 []string) bool { return false } +// PreFilter computes and stores cycleState containing details for enforcing ReadWriteOncePod. func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { - if pl.enableReadWriteOncePod { - return nil, pl.isReadWriteOncePodAccessModeConflict(ctx, pod) + if !pl.enableReadWriteOncePod { + return nil, nil } - return nil, framework.NewStatus(framework.Success) + + pvcs, err := pl.readWriteOncePodPVCsForPod(ctx, pod) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) + } + return nil, framework.AsStatus(err) + } + + s, err := pl.calPreFilterState(ctx, pod, pvcs) + if err != nil { + return nil, framework.AsStatus(err) + } + cycleState.Write(preFilterStateKey, s) + return nil, nil } -// isReadWriteOncePodAccessModeConflict checks if a pod uses a PVC with the ReadWriteOncePod access mode. -// This access mode restricts volume access to a single pod on a single node. Since only a single pod can -// use a ReadWriteOncePod PVC, mark any other pods attempting to use this PVC as UnschedulableAndUnresolvable. -// TODO(#103132): Mark pod as Unschedulable and add preemption logic. -func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(ctx context.Context, pod *v1.Pod) *framework.Status { +// AddPod from pre-computed data in cycleState. +func (pl *VolumeRestrictions) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { + if !pl.enableReadWriteOncePod { + return nil + } + state, err := getPreFilterState(cycleState) + if err != nil { + return framework.AsStatus(err) + } + state.updateWithPod(podInfoToAdd, 1) + return nil +} + +// RemovePod from pre-computed data in cycleState. +func (pl *VolumeRestrictions) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { + if !pl.enableReadWriteOncePod { + return nil + } + state, err := getPreFilterState(cycleState) + if err != nil { + return framework.AsStatus(err) + } + state.updateWithPod(podInfoToRemove, -1) + return nil +} + +func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) { + c, err := cycleState.Read(preFilterStateKey) + if err != nil { + // preFilterState doesn't exist, likely PreFilter wasn't invoked. + return nil, fmt.Errorf("cannot read %q from cycleState", preFilterStateKey) + } + + s, ok := c.(*preFilterState) + if !ok { + return nil, fmt.Errorf("%+v convert to volumerestrictions.state error", c) + } + return s, nil +} + +// calPreFilterState computes preFilterState describing which PVCs use ReadWriteOncePod +// and which pods in the cluster are in conflict. +func (pl *VolumeRestrictions) calPreFilterState(ctx context.Context, pod *v1.Pod, pvcs sets.Set[string]) (*preFilterState, error) { + conflictingPVCRefCount := 0 + for pvc := range pvcs { + key := framework.GetNamespacedName(pod.Namespace, pvc) + if pl.sharedLister.StorageInfos().IsPVCUsedByPods(key) { + // There can only be at most one pod using the ReadWriteOncePod PVC. + conflictingPVCRefCount += 1 + } + } + return &preFilterState{ + readWriteOncePodPVCs: pvcs, + conflictingPVCRefCount: conflictingPVCRefCount, + }, nil +} + +func (pl *VolumeRestrictions) readWriteOncePodPVCsForPod(ctx context.Context, pod *v1.Pod) (sets.Set[string], error) { + pvcs := sets.New[string]() for _, volume := range pod.Spec.Volumes { if volume.PersistentVolumeClaim == nil { continue @@ -136,27 +246,50 @@ func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(ctx context.C pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName) if err != nil { - if apierrors.IsNotFound(err) { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) - } - return framework.AsStatus(err) + return nil, err } if !v1helper.ContainsAccessMode(pvc.Spec.AccessModes, v1.ReadWriteOncePod) { continue } + pvcs.Insert(pvc.Name) + } + return pvcs, nil +} - key := framework.GetNamespacedName(pod.Namespace, volume.PersistentVolumeClaim.ClaimName) - if pl.sharedLister.StorageInfos().IsPVCUsedByPods(key) { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict) +// Checks if scheduling the pod onto this node would cause any conflicts with +// existing volumes. +func satisfyVolumeConflicts(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool { + for i := range pod.Spec.Volumes { + v := &pod.Spec.Volumes[i] + // fast path if there is no conflict checking targets. + if v.GCEPersistentDisk == nil && v.AWSElasticBlockStore == nil && v.RBD == nil && v.ISCSI == nil { + continue + } + + for _, ev := range nodeInfo.Pods { + if isVolumeConflict(v, ev.Pod) { + return false + } } } + return true +} +// Checks if scheduling the pod would cause any ReadWriteOncePod PVC access mode conflicts. +func satisfyReadWriteOncePod(ctx context.Context, state *preFilterState) *framework.Status { + if state == nil { + return nil + } + if state.conflictingPVCRefCount > 0 { + return framework.NewStatus(framework.Unschedulable, ErrReasonReadWriteOncePodConflict) + } return nil } +// PreFilterExtensions returns prefilter extensions, pod add and remove. func (pl *VolumeRestrictions) PreFilterExtensions() framework.PreFilterExtensions { - return nil + return pl } // Filter invoked at the filter extension point. @@ -168,21 +301,20 @@ func (pl *VolumeRestrictions) PreFilterExtensions() framework.PreFilterExtension // - AWS EBS forbids any two pods mounting the same volume ID // - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image, and the image is read-only // - ISCSI forbids if any two pods share at least same IQN and ISCSI volume is read-only -func (pl *VolumeRestrictions) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { - for i := range pod.Spec.Volumes { - v := &pod.Spec.Volumes[i] - // fast path if there is no conflict checking targets. - if v.GCEPersistentDisk == nil && v.AWSElasticBlockStore == nil && v.RBD == nil && v.ISCSI == nil { - continue - } - - for _, ev := range nodeInfo.Pods { - if isVolumeConflict(v, ev.Pod) { - return framework.NewStatus(framework.Unschedulable, ErrReasonDiskConflict) - } - } +// If the pod uses PVCs with the ReadWriteOncePod access mode, it evaluates if +// these PVCs are already in-use and if preemption will help. +func (pl *VolumeRestrictions) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + if !satisfyVolumeConflicts(pod, nodeInfo) { + return framework.NewStatus(framework.Unschedulable, ErrReasonDiskConflict) } - return nil + if !pl.enableReadWriteOncePod { + return nil + } + state, err := getPreFilterState(cycleState) + if err != nil { + return framework.AsStatus(err) + } + return satisfyReadWriteOncePod(ctx, state) } // EventsToRegister returns the possible events that may make a Pod diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go index 088eeb9d05d..7007db47616 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go @@ -21,6 +21,7 @@ import ( "reflect" "testing" + "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -69,7 +70,9 @@ func TestGCEDiskConflicts(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() p := newPlugin(ctx, t) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) + cycleState := framework.NewCycleState() + p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -111,7 +114,9 @@ func TestAWSDiskConflicts(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() p := newPlugin(ctx, t) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) + cycleState := framework.NewCycleState() + p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -159,7 +164,9 @@ func TestRBDDiskConflicts(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() p := newPlugin(ctx, t) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) + cycleState := framework.NewCycleState() + p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -207,7 +214,9 @@ func TestISCSIDiskConflicts(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() p := newPlugin(ctx, t) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) + cycleState := framework.NewCycleState() + p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod) + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -219,7 +228,10 @@ func TestAccessModeConflicts(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ReadWriteOncePod, true)() // Required for querying lister for PVCs in the same namespace. - podWithReadWriteOncePodPVC := st.MakePod().Name("pod-with-rwop").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwop").Node("node-1").Obj() + podWithOnePVC := st.MakePod().Name("pod-with-one-pvc").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwop-1").Node("node-1").Obj() + podWithTwoPVCs := st.MakePod().Name("pod-with-two-pvcs").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwop-1").PVC("claim-with-rwop-2").Node("node-1").Obj() + podWithOneConflict := st.MakePod().Name("pod-with-one-conflict").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwop-1").Node("node-1").Obj() + podWithTwoConflicts := st.MakePod().Name("pod-with-two-conflicts").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwop-1").PVC("claim-with-rwop-2").Node("node-1").Obj() // Required for querying lister for PVCs in the same namespace. podWithReadWriteManyPVC := st.MakePod().Name("pod-with-rwx").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwx").Node("node-1").Obj() @@ -230,10 +242,19 @@ func TestAccessModeConflicts(t *testing.T) { }, } - readWriteOncePodPVC := &v1.PersistentVolumeClaim{ + readWriteOncePodPVC1 := &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", - Name: "claim-with-rwop", + Name: "claim-with-rwop-1", + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}, + }, + } + readWriteOncePodPVC2 := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "claim-with-rwop-2", }, Spec: v1.PersistentVolumeClaimSpec{ AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}, @@ -252,47 +273,68 @@ func TestAccessModeConflicts(t *testing.T) { tests := []struct { name string pod *v1.Pod + nodeInfo *framework.NodeInfo existingPods []*v1.Pod existingNodes []*v1.Node existingPVCs []*v1.PersistentVolumeClaim enableReadWriteOncePod bool + preFilterWantStatus *framework.Status wantStatus *framework.Status }{ { name: "nothing", pod: &v1.Pod{}, + nodeInfo: framework.NewNodeInfo(), existingPods: []*v1.Pod{}, existingNodes: []*v1.Node{}, existingPVCs: []*v1.PersistentVolumeClaim{}, enableReadWriteOncePod: true, + preFilterWantStatus: nil, wantStatus: nil, }, { name: "failed to get PVC", - pod: podWithReadWriteOncePodPVC, + pod: podWithOnePVC, + nodeInfo: framework.NewNodeInfo(), existingPods: []*v1.Pod{}, existingNodes: []*v1.Node{}, existingPVCs: []*v1.PersistentVolumeClaim{}, enableReadWriteOncePod: true, - wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "persistentvolumeclaim \"claim-with-rwop\" not found"), - }, - { - name: "no access mode conflict", - pod: podWithReadWriteOncePodPVC, - existingPods: []*v1.Pod{podWithReadWriteManyPVC}, - existingNodes: []*v1.Node{node}, - existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC, readWriteManyPVC}, - enableReadWriteOncePod: true, + preFilterWantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "persistentvolumeclaim \"claim-with-rwop-1\" not found"), wantStatus: nil, }, { - name: "access mode conflict", - pod: podWithReadWriteOncePodPVC, - existingPods: []*v1.Pod{podWithReadWriteOncePodPVC, podWithReadWriteManyPVC}, + name: "no access mode conflict", + pod: podWithOnePVC, + nodeInfo: framework.NewNodeInfo(podWithReadWriteManyPVC), + existingPods: []*v1.Pod{podWithReadWriteManyPVC}, existingNodes: []*v1.Node{node}, - existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC, readWriteManyPVC}, + existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC1, readWriteManyPVC}, enableReadWriteOncePod: true, - wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict), + preFilterWantStatus: nil, + wantStatus: nil, + }, + { + name: "access mode conflict, unschedulable", + pod: podWithOneConflict, + nodeInfo: framework.NewNodeInfo(podWithOnePVC, podWithReadWriteManyPVC), + existingPods: []*v1.Pod{podWithOnePVC, podWithReadWriteManyPVC}, + existingNodes: []*v1.Node{node}, + existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC1, readWriteManyPVC}, + enableReadWriteOncePod: true, + preFilterWantStatus: nil, + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonReadWriteOncePodConflict), + }, + { + name: "two conflicts, unschedulable", + pod: podWithTwoConflicts, + nodeInfo: framework.NewNodeInfo(podWithTwoPVCs, podWithReadWriteManyPVC), + existingPods: []*v1.Pod{podWithTwoPVCs, podWithReadWriteManyPVC}, + existingNodes: []*v1.Node{node}, + existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC1, readWriteOncePodPVC2, readWriteManyPVC}, + enableReadWriteOncePod: true, + preFilterWantStatus: nil, + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonReadWriteOncePodConflict), }, } @@ -301,9 +343,17 @@ func TestAccessModeConflicts(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() p := newPluginWithListers(ctx, t, test.existingPods, test.existingNodes, test.existingPVCs, test.enableReadWriteOncePod) - _, gotStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), nil, test.pod) - if !reflect.DeepEqual(gotStatus, test.wantStatus) { - t.Errorf("status does not match: %+v, want: %+v", gotStatus, test.wantStatus) + cycleState := framework.NewCycleState() + _, preFilterGotStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod) + if diff := cmp.Diff(test.preFilterWantStatus, preFilterGotStatus); diff != "" { + t.Errorf("Unexpected PreFilter status (-want, +got): %s", diff) + } + // If PreFilter fails, then Filter will not run. + if test.preFilterWantStatus.IsSuccess() { + gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo) + if diff := cmp.Diff(test.wantStatus, gotStatus); diff != "" { + t.Errorf("Unexpected Filter status (-want, +got): %s", diff) + } } }) } diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index f33180cae2f..ceac2107db1 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -38,6 +38,7 @@ import ( clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/component-helpers/storage/volume" "k8s.io/klog/v2" configv1 "k8s.io/kube-scheduler/config/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" @@ -46,6 +47,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler" configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/kubernetes/plugin/pkg/admission/priority" @@ -1607,3 +1609,355 @@ func TestPreferNominatedNode(t *testing.T) { }) } } + +// TestReadWriteOncePodPreemption tests preemption scenarios for pods with +// ReadWriteOncePod PVCs. +func TestReadWriteOncePodPreemption(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ReadWriteOncePod, true)() + + cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ + Profiles: []configv1.KubeSchedulerProfile{{ + SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), + Plugins: &configv1.Plugins{ + Filter: configv1.PluginSet{ + Enabled: []configv1.Plugin{ + {Name: volumerestrictions.Name}, + }, + }, + PreFilter: configv1.PluginSet{ + Enabled: []configv1.Plugin{ + {Name: volumerestrictions.Name}, + }, + }, + }, + }}, + }) + + testCtx := testutils.InitTestSchedulerWithOptions(t, + testutils.InitTestAPIServer(t, "preemption", nil), + 0, + scheduler.WithProfiles(cfg.Profiles...)) + testutils.SyncInformerFactory(testCtx) + go testCtx.Scheduler.Run(testCtx.Ctx) + + defer testutils.CleanupTest(t, testCtx) + cs := testCtx.ClientSet + + storage := v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}} + volType := v1.HostPathDirectoryOrCreate + pv1 := st.MakePersistentVolume(). + Name("pv-with-read-write-once-pod-1"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Capacity(storage.Requests). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/mnt1", Type: &volType}). + Obj() + pvc1 := st.MakePersistentVolumeClaim(). + Name("pvc-with-read-write-once-pod-1"). + Namespace(testCtx.NS.Name). + // Annotation and volume name required for PVC to be considered bound. + Annotation(volume.AnnBindCompleted, "true"). + VolumeName(pv1.Name). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(storage). + Obj() + pv2 := st.MakePersistentVolume(). + Name("pv-with-read-write-once-pod-2"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Capacity(storage.Requests). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/mnt2", Type: &volType}). + Obj() + pvc2 := st.MakePersistentVolumeClaim(). + Name("pvc-with-read-write-once-pod-2"). + Namespace(testCtx.NS.Name). + // Annotation and volume name required for PVC to be considered bound. + Annotation(volume.AnnBindCompleted, "true"). + VolumeName(pv2.Name). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(storage). + Obj() + + tests := []struct { + name string + init func() error + existingPods []*v1.Pod + pod *v1.Pod + unresolvable bool + preemptedPodIndexes map[int]struct{} + cleanup func() error + }{ + { + name: "preempt single pod", + init: func() error { + _, err := testutils.CreatePV(cs, pv1) + if err != nil { + return fmt.Errorf("cannot create pv: %v", err) + } + _, err = testutils.CreatePVC(cs, pvc1) + if err != nil { + return fmt.Errorf("cannot create pvc: %v", err) + } + return nil + }, + existingPods: []*v1.Pod{ + initPausePod(&testutils.PausePodConfig{ + Name: "victim-pod", + Namespace: testCtx.NS.Name, + Priority: &lowPriority, + Volumes: []v1.Volume{{ + Name: "volume", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + }}, + }), + }, + pod: initPausePod(&testutils.PausePodConfig{ + Name: "preemptor-pod", + Namespace: testCtx.NS.Name, + Priority: &highPriority, + Volumes: []v1.Volume{{ + Name: "volume", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + }}, + }), + preemptedPodIndexes: map[int]struct{}{0: {}}, + cleanup: func() error { + if err := testutils.DeletePVC(cs, pvc1.Name, pvc1.Namespace); err != nil { + return fmt.Errorf("cannot delete pvc: %v", err) + } + if err := testutils.DeletePV(cs, pv1.Name); err != nil { + return fmt.Errorf("cannot delete pv: %v", err) + } + return nil + }, + }, + { + name: "preempt two pods", + init: func() error { + for _, pv := range []*v1.PersistentVolume{pv1, pv2} { + _, err := testutils.CreatePV(cs, pv) + if err != nil { + return fmt.Errorf("cannot create pv: %v", err) + } + } + for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} { + _, err := testutils.CreatePVC(cs, pvc) + if err != nil { + return fmt.Errorf("cannot create pvc: %v", err) + } + } + return nil + }, + existingPods: []*v1.Pod{ + initPausePod(&testutils.PausePodConfig{ + Name: "victim-pod-1", + Namespace: testCtx.NS.Name, + Priority: &lowPriority, + Volumes: []v1.Volume{{ + Name: "volume", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + }}, + }), + initPausePod(&testutils.PausePodConfig{ + Name: "victim-pod-2", + Namespace: testCtx.NS.Name, + Priority: &lowPriority, + Volumes: []v1.Volume{{ + Name: "volume", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc2.Name, + }, + }, + }}, + }), + }, + pod: initPausePod(&testutils.PausePodConfig{ + Name: "preemptor-pod", + Namespace: testCtx.NS.Name, + Priority: &highPriority, + Volumes: []v1.Volume{ + { + Name: "volume-1", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + }, + { + Name: "volume-2", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc2.Name, + }, + }, + }, + }, + }), + preemptedPodIndexes: map[int]struct{}{0: {}, 1: {}}, + cleanup: func() error { + for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} { + if err := testutils.DeletePVC(cs, pvc.Name, pvc.Namespace); err != nil { + return fmt.Errorf("cannot delete pvc: %v", err) + } + } + for _, pv := range []*v1.PersistentVolume{pv1, pv2} { + if err := testutils.DeletePV(cs, pv.Name); err != nil { + return fmt.Errorf("cannot delete pv: %v", err) + } + } + return nil + }, + }, + { + name: "preempt single pod with two volumes", + init: func() error { + for _, pv := range []*v1.PersistentVolume{pv1, pv2} { + _, err := testutils.CreatePV(cs, pv) + if err != nil { + return fmt.Errorf("cannot create pv: %v", err) + } + } + for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} { + _, err := testutils.CreatePVC(cs, pvc) + if err != nil { + return fmt.Errorf("cannot create pvc: %v", err) + } + } + return nil + }, + existingPods: []*v1.Pod{ + initPausePod(&testutils.PausePodConfig{ + Name: "victim-pod", + Namespace: testCtx.NS.Name, + Priority: &lowPriority, + Volumes: []v1.Volume{ + { + Name: "volume-1", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + }, + { + Name: "volume-2", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc2.Name, + }, + }, + }, + }, + }), + }, + pod: initPausePod(&testutils.PausePodConfig{ + Name: "preemptor-pod", + Namespace: testCtx.NS.Name, + Priority: &highPriority, + Volumes: []v1.Volume{ + { + Name: "volume-1", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc1.Name, + }, + }, + }, + { + Name: "volume-2", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc2.Name, + }, + }, + }, + }, + }), + preemptedPodIndexes: map[int]struct{}{0: {}}, + cleanup: func() error { + for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} { + if err := testutils.DeletePVC(cs, pvc.Name, pvc.Namespace); err != nil { + return fmt.Errorf("cannot delete pvc: %v", err) + } + } + for _, pv := range []*v1.PersistentVolume{pv1, pv2} { + if err := testutils.DeletePV(cs, pv.Name); err != nil { + return fmt.Errorf("cannot delete pv: %v", err) + } + } + return nil + }, + }, + } + + // Create a node with some resources and a label. + nodeRes := map[v1.ResourceName]string{ + v1.ResourcePods: "32", + v1.ResourceCPU: "500m", + v1.ResourceMemory: "500", + } + nodeObject := st.MakeNode().Name("node1").Capacity(nodeRes).Label("node", "node1").Obj() + if _, err := createNode(cs, nodeObject); err != nil { + t.Fatalf("Error creating node: %v", err) + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if err := test.init(); err != nil { + t.Fatalf("Error while initializing test: %v", err) + } + + pods := make([]*v1.Pod, len(test.existingPods)) + t.Cleanup(func() { + testutils.CleanupPods(cs, t, pods) + if err := test.cleanup(); err != nil { + t.Errorf("Error cleaning up test: %v", err) + } + }) + // Create and run existingPods. + for i, p := range test.existingPods { + var err error + pods[i], err = runPausePod(cs, p) + if err != nil { + t.Fatalf("Error running pause pod: %v", err) + } + } + // Create the "pod". + preemptor, err := createPausePod(cs, test.pod) + if err != nil { + t.Errorf("Error while creating high priority pod: %v", err) + } + pods = append(pods, preemptor) + // Wait for preemption of pods and make sure the other ones are not preempted. + for i, p := range pods { + if _, found := test.preemptedPodIndexes[i]; found { + if err = wait.Poll(time.Second, wait.ForeverTestTimeout, podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil { + t.Errorf("Pod %v/%v is not getting evicted.", p.Namespace, p.Name) + } + } else { + if p.DeletionTimestamp != nil { + t.Errorf("Didn't expect pod %v to get preempted.", p.Name) + } + } + } + // Also check that the preemptor pod gets the NominatedNodeName field set. + if len(test.preemptedPodIndexes) > 0 { + if err := waitForNominatedNodeName(cs, preemptor); err != nil { + t.Errorf("NominatedNodeName field was not set for pod %v: %v", preemptor.Name, err) + } + } + }) + } +} diff --git a/test/integration/util/util.go b/test/integration/util/util.go index ae2fa61b8e7..b058b49ed3f 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -688,6 +688,16 @@ func CreatePV(cs clientset.Interface, pv *v1.PersistentVolume) (*v1.PersistentVo return cs.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}) } +// DeletePVC deletes the given PVC in the given namespace. +func DeletePVC(cs clientset.Interface, pvcName string, nsName string) error { + return cs.CoreV1().PersistentVolumeClaims(nsName).Delete(context.TODO(), pvcName, *metav1.NewDeleteOptions(0)) +} + +// DeletePV deletes the given PV in the given namespace. +func DeletePV(cs clientset.Interface, pvName string) error { + return cs.CoreV1().PersistentVolumes().Delete(context.TODO(), pvName, *metav1.NewDeleteOptions(0)) +} + // RunPausePod creates a pod with "Pause" image and the given config and waits // until it is scheduled. It returns its pointer and error status. func RunPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {