Merge pull request #114051 from chrishenzie/rwop-preemption

[scheduler] Support preemption of pods using ReadWriteOncePod PVCs
This commit is contained in:
Kubernetes Prow Robot 2023-02-13 11:45:30 -08:00 committed by GitHub
commit b8b18ecd85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 604 additions and 58 deletions

View File

@ -18,6 +18,7 @@ package volumerestrictions
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -40,17 +41,57 @@ type VolumeRestrictions struct {
var _ framework.PreFilterPlugin = &VolumeRestrictions{}
var _ framework.FilterPlugin = &VolumeRestrictions{}
var _ framework.EnqueueExtensions = &VolumeRestrictions{}
// Name is the name of the plugin used in the plugin registry and configurations.
const Name = names.VolumeRestrictions
var _ framework.StateData = &preFilterState{}
const (
// Name is the name of the plugin used in the plugin registry and configurations.
Name = names.VolumeRestrictions
// preFilterStateKey is the key in CycleState to VolumeRestrictions pre-computed data for Filtering.
// Using the name of the plugin will likely help us avoid collisions with other plugins.
preFilterStateKey = "PreFilter" + Name
// ErrReasonDiskConflict is used for NoDiskConflict predicate error.
ErrReasonDiskConflict = "node(s) had no available disk"
// ErrReasonReadWriteOncePodConflict is used when a pod is found using the same PVC with the ReadWriteOncePod access mode.
ErrReasonReadWriteOncePodConflict = "node has pod using PersistentVolumeClaim with the same name and ReadWriteOncePod access mode"
)
// preFilterState computed at PreFilter and used at Filter.
type preFilterState struct {
// Names of the pod's volumes using the ReadWriteOncePod access mode.
readWriteOncePodPVCs sets.Set[string]
// The number of references to these ReadWriteOncePod volumes by scheduled pods.
conflictingPVCRefCount int
}
func (s *preFilterState) updateWithPod(podInfo *framework.PodInfo, multiplier int) {
s.conflictingPVCRefCount += multiplier * s.conflictingPVCRefCountForPod(podInfo)
}
func (s *preFilterState) conflictingPVCRefCountForPod(podInfo *framework.PodInfo) int {
conflicts := 0
for _, volume := range podInfo.Pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}
if s.readWriteOncePodPVCs.Has(volume.PersistentVolumeClaim.ClaimName) {
conflicts += 1
}
}
return conflicts
}
// Clone the prefilter state.
func (s *preFilterState) Clone() framework.StateData {
if s == nil {
return nil
}
return &preFilterState{
readWriteOncePodPVCs: s.readWriteOncePodPVCs,
conflictingPVCRefCount: s.conflictingPVCRefCount,
}
}
// Name returns name of the plugin. It is used in logs, etc.
func (pl *VolumeRestrictions) Name() string {
return Name
@ -117,18 +158,87 @@ func haveOverlap(a1, a2 []string) bool {
return false
}
// PreFilter computes and stores cycleState containing details for enforcing ReadWriteOncePod.
func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
if pl.enableReadWriteOncePod {
return nil, pl.isReadWriteOncePodAccessModeConflict(ctx, pod)
if !pl.enableReadWriteOncePod {
return nil, nil
}
return nil, framework.NewStatus(framework.Success)
pvcs, err := pl.readWriteOncePodPVCsForPod(ctx, pod)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return nil, framework.AsStatus(err)
}
s, err := pl.calPreFilterState(ctx, pod, pvcs)
if err != nil {
return nil, framework.AsStatus(err)
}
cycleState.Write(preFilterStateKey, s)
return nil, nil
}
// isReadWriteOncePodAccessModeConflict checks if a pod uses a PVC with the ReadWriteOncePod access mode.
// This access mode restricts volume access to a single pod on a single node. Since only a single pod can
// use a ReadWriteOncePod PVC, mark any other pods attempting to use this PVC as UnschedulableAndUnresolvable.
// TODO(#103132): Mark pod as Unschedulable and add preemption logic.
func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(ctx context.Context, pod *v1.Pod) *framework.Status {
// AddPod from pre-computed data in cycleState.
func (pl *VolumeRestrictions) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if !pl.enableReadWriteOncePod {
return nil
}
state, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}
state.updateWithPod(podInfoToAdd, 1)
return nil
}
// RemovePod from pre-computed data in cycleState.
func (pl *VolumeRestrictions) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if !pl.enableReadWriteOncePod {
return nil
}
state, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}
state.updateWithPod(podInfoToRemove, -1)
return nil
}
func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
c, err := cycleState.Read(preFilterStateKey)
if err != nil {
// preFilterState doesn't exist, likely PreFilter wasn't invoked.
return nil, fmt.Errorf("cannot read %q from cycleState", preFilterStateKey)
}
s, ok := c.(*preFilterState)
if !ok {
return nil, fmt.Errorf("%+v convert to volumerestrictions.state error", c)
}
return s, nil
}
// calPreFilterState computes preFilterState describing which PVCs use ReadWriteOncePod
// and which pods in the cluster are in conflict.
func (pl *VolumeRestrictions) calPreFilterState(ctx context.Context, pod *v1.Pod, pvcs sets.Set[string]) (*preFilterState, error) {
conflictingPVCRefCount := 0
for pvc := range pvcs {
key := framework.GetNamespacedName(pod.Namespace, pvc)
if pl.sharedLister.StorageInfos().IsPVCUsedByPods(key) {
// There can only be at most one pod using the ReadWriteOncePod PVC.
conflictingPVCRefCount += 1
}
}
return &preFilterState{
readWriteOncePodPVCs: pvcs,
conflictingPVCRefCount: conflictingPVCRefCount,
}, nil
}
func (pl *VolumeRestrictions) readWriteOncePodPVCsForPod(ctx context.Context, pod *v1.Pod) (sets.Set[string], error) {
pvcs := sets.New[string]()
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
@ -136,27 +246,50 @@ func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(ctx context.C
pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
if err != nil {
if apierrors.IsNotFound(err) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return framework.AsStatus(err)
return nil, err
}
if !v1helper.ContainsAccessMode(pvc.Spec.AccessModes, v1.ReadWriteOncePod) {
continue
}
pvcs.Insert(pvc.Name)
}
return pvcs, nil
}
key := framework.GetNamespacedName(pod.Namespace, volume.PersistentVolumeClaim.ClaimName)
if pl.sharedLister.StorageInfos().IsPVCUsedByPods(key) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict)
// Checks if scheduling the pod onto this node would cause any conflicts with
// existing volumes.
func satisfyVolumeConflicts(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {
for i := range pod.Spec.Volumes {
v := &pod.Spec.Volumes[i]
// fast path if there is no conflict checking targets.
if v.GCEPersistentDisk == nil && v.AWSElasticBlockStore == nil && v.RBD == nil && v.ISCSI == nil {
continue
}
for _, ev := range nodeInfo.Pods {
if isVolumeConflict(v, ev.Pod) {
return false
}
}
}
return true
}
// Checks if scheduling the pod would cause any ReadWriteOncePod PVC access mode conflicts.
func satisfyReadWriteOncePod(ctx context.Context, state *preFilterState) *framework.Status {
if state == nil {
return nil
}
if state.conflictingPVCRefCount > 0 {
return framework.NewStatus(framework.Unschedulable, ErrReasonReadWriteOncePodConflict)
}
return nil
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.
func (pl *VolumeRestrictions) PreFilterExtensions() framework.PreFilterExtensions {
return nil
return pl
}
// Filter invoked at the filter extension point.
@ -168,21 +301,20 @@ func (pl *VolumeRestrictions) PreFilterExtensions() framework.PreFilterExtension
// - AWS EBS forbids any two pods mounting the same volume ID
// - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image, and the image is read-only
// - ISCSI forbids if any two pods share at least same IQN and ISCSI volume is read-only
func (pl *VolumeRestrictions) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
for i := range pod.Spec.Volumes {
v := &pod.Spec.Volumes[i]
// fast path if there is no conflict checking targets.
if v.GCEPersistentDisk == nil && v.AWSElasticBlockStore == nil && v.RBD == nil && v.ISCSI == nil {
continue
}
for _, ev := range nodeInfo.Pods {
if isVolumeConflict(v, ev.Pod) {
return framework.NewStatus(framework.Unschedulable, ErrReasonDiskConflict)
}
}
// If the pod uses PVCs with the ReadWriteOncePod access mode, it evaluates if
// these PVCs are already in-use and if preemption will help.
func (pl *VolumeRestrictions) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
if !satisfyVolumeConflicts(pod, nodeInfo) {
return framework.NewStatus(framework.Unschedulable, ErrReasonDiskConflict)
}
return nil
if !pl.enableReadWriteOncePod {
return nil
}
state, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}
return satisfyReadWriteOncePod(ctx, state)
}
// EventsToRegister returns the possible events that may make a Pod

View File

@ -21,6 +21,7 @@ import (
"reflect"
"testing"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -69,7 +70,9 @@ func TestGCEDiskConflicts(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := newPlugin(ctx, t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
cycleState := framework.NewCycleState()
p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod)
gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
}
@ -111,7 +114,9 @@ func TestAWSDiskConflicts(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := newPlugin(ctx, t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
cycleState := framework.NewCycleState()
p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod)
gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
}
@ -159,7 +164,9 @@ func TestRBDDiskConflicts(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := newPlugin(ctx, t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
cycleState := framework.NewCycleState()
p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod)
gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
}
@ -207,7 +214,9 @@ func TestISCSIDiskConflicts(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := newPlugin(ctx, t)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
cycleState := framework.NewCycleState()
p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod)
gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
}
@ -219,7 +228,10 @@ func TestAccessModeConflicts(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ReadWriteOncePod, true)()
// Required for querying lister for PVCs in the same namespace.
podWithReadWriteOncePodPVC := st.MakePod().Name("pod-with-rwop").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwop").Node("node-1").Obj()
podWithOnePVC := st.MakePod().Name("pod-with-one-pvc").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwop-1").Node("node-1").Obj()
podWithTwoPVCs := st.MakePod().Name("pod-with-two-pvcs").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwop-1").PVC("claim-with-rwop-2").Node("node-1").Obj()
podWithOneConflict := st.MakePod().Name("pod-with-one-conflict").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwop-1").Node("node-1").Obj()
podWithTwoConflicts := st.MakePod().Name("pod-with-two-conflicts").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwop-1").PVC("claim-with-rwop-2").Node("node-1").Obj()
// Required for querying lister for PVCs in the same namespace.
podWithReadWriteManyPVC := st.MakePod().Name("pod-with-rwx").Namespace(metav1.NamespaceDefault).PVC("claim-with-rwx").Node("node-1").Obj()
@ -230,10 +242,19 @@ func TestAccessModeConflicts(t *testing.T) {
},
}
readWriteOncePodPVC := &v1.PersistentVolumeClaim{
readWriteOncePodPVC1 := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "claim-with-rwop",
Name: "claim-with-rwop-1",
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod},
},
}
readWriteOncePodPVC2 := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "claim-with-rwop-2",
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod},
@ -252,47 +273,68 @@ func TestAccessModeConflicts(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
nodeInfo *framework.NodeInfo
existingPods []*v1.Pod
existingNodes []*v1.Node
existingPVCs []*v1.PersistentVolumeClaim
enableReadWriteOncePod bool
preFilterWantStatus *framework.Status
wantStatus *framework.Status
}{
{
name: "nothing",
pod: &v1.Pod{},
nodeInfo: framework.NewNodeInfo(),
existingPods: []*v1.Pod{},
existingNodes: []*v1.Node{},
existingPVCs: []*v1.PersistentVolumeClaim{},
enableReadWriteOncePod: true,
preFilterWantStatus: nil,
wantStatus: nil,
},
{
name: "failed to get PVC",
pod: podWithReadWriteOncePodPVC,
pod: podWithOnePVC,
nodeInfo: framework.NewNodeInfo(),
existingPods: []*v1.Pod{},
existingNodes: []*v1.Node{},
existingPVCs: []*v1.PersistentVolumeClaim{},
enableReadWriteOncePod: true,
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "persistentvolumeclaim \"claim-with-rwop\" not found"),
},
{
name: "no access mode conflict",
pod: podWithReadWriteOncePodPVC,
existingPods: []*v1.Pod{podWithReadWriteManyPVC},
existingNodes: []*v1.Node{node},
existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC, readWriteManyPVC},
enableReadWriteOncePod: true,
preFilterWantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "persistentvolumeclaim \"claim-with-rwop-1\" not found"),
wantStatus: nil,
},
{
name: "access mode conflict",
pod: podWithReadWriteOncePodPVC,
existingPods: []*v1.Pod{podWithReadWriteOncePodPVC, podWithReadWriteManyPVC},
name: "no access mode conflict",
pod: podWithOnePVC,
nodeInfo: framework.NewNodeInfo(podWithReadWriteManyPVC),
existingPods: []*v1.Pod{podWithReadWriteManyPVC},
existingNodes: []*v1.Node{node},
existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC, readWriteManyPVC},
existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC1, readWriteManyPVC},
enableReadWriteOncePod: true,
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict),
preFilterWantStatus: nil,
wantStatus: nil,
},
{
name: "access mode conflict, unschedulable",
pod: podWithOneConflict,
nodeInfo: framework.NewNodeInfo(podWithOnePVC, podWithReadWriteManyPVC),
existingPods: []*v1.Pod{podWithOnePVC, podWithReadWriteManyPVC},
existingNodes: []*v1.Node{node},
existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC1, readWriteManyPVC},
enableReadWriteOncePod: true,
preFilterWantStatus: nil,
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonReadWriteOncePodConflict),
},
{
name: "two conflicts, unschedulable",
pod: podWithTwoConflicts,
nodeInfo: framework.NewNodeInfo(podWithTwoPVCs, podWithReadWriteManyPVC),
existingPods: []*v1.Pod{podWithTwoPVCs, podWithReadWriteManyPVC},
existingNodes: []*v1.Node{node},
existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC1, readWriteOncePodPVC2, readWriteManyPVC},
enableReadWriteOncePod: true,
preFilterWantStatus: nil,
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonReadWriteOncePodConflict),
},
}
@ -301,9 +343,17 @@ func TestAccessModeConflicts(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := newPluginWithListers(ctx, t, test.existingPods, test.existingNodes, test.existingPVCs, test.enableReadWriteOncePod)
_, gotStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), nil, test.pod)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %+v, want: %+v", gotStatus, test.wantStatus)
cycleState := framework.NewCycleState()
_, preFilterGotStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod)
if diff := cmp.Diff(test.preFilterWantStatus, preFilterGotStatus); diff != "" {
t.Errorf("Unexpected PreFilter status (-want, +got): %s", diff)
}
// If PreFilter fails, then Filter will not run.
if test.preFilterWantStatus.IsSuccess() {
gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo)
if diff := cmp.Diff(test.wantStatus, gotStatus); diff != "" {
t.Errorf("Unexpected Filter status (-want, +got): %s", diff)
}
}
})
}

View File

@ -38,6 +38,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2"
configv1 "k8s.io/kube-scheduler/config/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@ -46,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/plugin/pkg/admission/priority"
@ -1607,3 +1609,355 @@ func TestPreferNominatedNode(t *testing.T) {
})
}
}
// TestReadWriteOncePodPreemption tests preemption scenarios for pods with
// ReadWriteOncePod PVCs.
func TestReadWriteOncePodPreemption(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ReadWriteOncePod, true)()
cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
Profiles: []configv1.KubeSchedulerProfile{{
SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
Plugins: &configv1.Plugins{
Filter: configv1.PluginSet{
Enabled: []configv1.Plugin{
{Name: volumerestrictions.Name},
},
},
PreFilter: configv1.PluginSet{
Enabled: []configv1.Plugin{
{Name: volumerestrictions.Name},
},
},
},
}},
})
testCtx := testutils.InitTestSchedulerWithOptions(t,
testutils.InitTestAPIServer(t, "preemption", nil),
0,
scheduler.WithProfiles(cfg.Profiles...))
testutils.SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
defer testutils.CleanupTest(t, testCtx)
cs := testCtx.ClientSet
storage := v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}
volType := v1.HostPathDirectoryOrCreate
pv1 := st.MakePersistentVolume().
Name("pv-with-read-write-once-pod-1").
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}).
Capacity(storage.Requests).
HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/mnt1", Type: &volType}).
Obj()
pvc1 := st.MakePersistentVolumeClaim().
Name("pvc-with-read-write-once-pod-1").
Namespace(testCtx.NS.Name).
// Annotation and volume name required for PVC to be considered bound.
Annotation(volume.AnnBindCompleted, "true").
VolumeName(pv1.Name).
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}).
Resources(storage).
Obj()
pv2 := st.MakePersistentVolume().
Name("pv-with-read-write-once-pod-2").
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}).
Capacity(storage.Requests).
HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/mnt2", Type: &volType}).
Obj()
pvc2 := st.MakePersistentVolumeClaim().
Name("pvc-with-read-write-once-pod-2").
Namespace(testCtx.NS.Name).
// Annotation and volume name required for PVC to be considered bound.
Annotation(volume.AnnBindCompleted, "true").
VolumeName(pv2.Name).
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}).
Resources(storage).
Obj()
tests := []struct {
name string
init func() error
existingPods []*v1.Pod
pod *v1.Pod
unresolvable bool
preemptedPodIndexes map[int]struct{}
cleanup func() error
}{
{
name: "preempt single pod",
init: func() error {
_, err := testutils.CreatePV(cs, pv1)
if err != nil {
return fmt.Errorf("cannot create pv: %v", err)
}
_, err = testutils.CreatePVC(cs, pvc1)
if err != nil {
return fmt.Errorf("cannot create pvc: %v", err)
}
return nil
},
existingPods: []*v1.Pod{
initPausePod(&testutils.PausePodConfig{
Name: "victim-pod",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
Volumes: []v1.Volume{{
Name: "volume",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc1.Name,
},
},
}},
}),
},
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
Volumes: []v1.Volume{{
Name: "volume",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc1.Name,
},
},
}},
}),
preemptedPodIndexes: map[int]struct{}{0: {}},
cleanup: func() error {
if err := testutils.DeletePVC(cs, pvc1.Name, pvc1.Namespace); err != nil {
return fmt.Errorf("cannot delete pvc: %v", err)
}
if err := testutils.DeletePV(cs, pv1.Name); err != nil {
return fmt.Errorf("cannot delete pv: %v", err)
}
return nil
},
},
{
name: "preempt two pods",
init: func() error {
for _, pv := range []*v1.PersistentVolume{pv1, pv2} {
_, err := testutils.CreatePV(cs, pv)
if err != nil {
return fmt.Errorf("cannot create pv: %v", err)
}
}
for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} {
_, err := testutils.CreatePVC(cs, pvc)
if err != nil {
return fmt.Errorf("cannot create pvc: %v", err)
}
}
return nil
},
existingPods: []*v1.Pod{
initPausePod(&testutils.PausePodConfig{
Name: "victim-pod-1",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
Volumes: []v1.Volume{{
Name: "volume",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc1.Name,
},
},
}},
}),
initPausePod(&testutils.PausePodConfig{
Name: "victim-pod-2",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
Volumes: []v1.Volume{{
Name: "volume",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc2.Name,
},
},
}},
}),
},
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
Volumes: []v1.Volume{
{
Name: "volume-1",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc1.Name,
},
},
},
{
Name: "volume-2",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc2.Name,
},
},
},
},
}),
preemptedPodIndexes: map[int]struct{}{0: {}, 1: {}},
cleanup: func() error {
for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} {
if err := testutils.DeletePVC(cs, pvc.Name, pvc.Namespace); err != nil {
return fmt.Errorf("cannot delete pvc: %v", err)
}
}
for _, pv := range []*v1.PersistentVolume{pv1, pv2} {
if err := testutils.DeletePV(cs, pv.Name); err != nil {
return fmt.Errorf("cannot delete pv: %v", err)
}
}
return nil
},
},
{
name: "preempt single pod with two volumes",
init: func() error {
for _, pv := range []*v1.PersistentVolume{pv1, pv2} {
_, err := testutils.CreatePV(cs, pv)
if err != nil {
return fmt.Errorf("cannot create pv: %v", err)
}
}
for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} {
_, err := testutils.CreatePVC(cs, pvc)
if err != nil {
return fmt.Errorf("cannot create pvc: %v", err)
}
}
return nil
},
existingPods: []*v1.Pod{
initPausePod(&testutils.PausePodConfig{
Name: "victim-pod",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
Volumes: []v1.Volume{
{
Name: "volume-1",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc1.Name,
},
},
},
{
Name: "volume-2",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc2.Name,
},
},
},
},
}),
},
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
Volumes: []v1.Volume{
{
Name: "volume-1",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc1.Name,
},
},
},
{
Name: "volume-2",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc2.Name,
},
},
},
},
}),
preemptedPodIndexes: map[int]struct{}{0: {}},
cleanup: func() error {
for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} {
if err := testutils.DeletePVC(cs, pvc.Name, pvc.Namespace); err != nil {
return fmt.Errorf("cannot delete pvc: %v", err)
}
}
for _, pv := range []*v1.PersistentVolume{pv1, pv2} {
if err := testutils.DeletePV(cs, pv.Name); err != nil {
return fmt.Errorf("cannot delete pv: %v", err)
}
}
return nil
},
},
}
// Create a node with some resources and a label.
nodeRes := map[v1.ResourceName]string{
v1.ResourcePods: "32",
v1.ResourceCPU: "500m",
v1.ResourceMemory: "500",
}
nodeObject := st.MakeNode().Name("node1").Capacity(nodeRes).Label("node", "node1").Obj()
if _, err := createNode(cs, nodeObject); err != nil {
t.Fatalf("Error creating node: %v", err)
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if err := test.init(); err != nil {
t.Fatalf("Error while initializing test: %v", err)
}
pods := make([]*v1.Pod, len(test.existingPods))
t.Cleanup(func() {
testutils.CleanupPods(cs, t, pods)
if err := test.cleanup(); err != nil {
t.Errorf("Error cleaning up test: %v", err)
}
})
// Create and run existingPods.
for i, p := range test.existingPods {
var err error
pods[i], err = runPausePod(cs, p)
if err != nil {
t.Fatalf("Error running pause pod: %v", err)
}
}
// Create the "pod".
preemptor, err := createPausePod(cs, test.pod)
if err != nil {
t.Errorf("Error while creating high priority pod: %v", err)
}
pods = append(pods, preemptor)
// Wait for preemption of pods and make sure the other ones are not preempted.
for i, p := range pods {
if _, found := test.preemptedPodIndexes[i]; found {
if err = wait.Poll(time.Second, wait.ForeverTestTimeout, podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil {
t.Errorf("Pod %v/%v is not getting evicted.", p.Namespace, p.Name)
}
} else {
if p.DeletionTimestamp != nil {
t.Errorf("Didn't expect pod %v to get preempted.", p.Name)
}
}
}
// Also check that the preemptor pod gets the NominatedNodeName field set.
if len(test.preemptedPodIndexes) > 0 {
if err := waitForNominatedNodeName(cs, preemptor); err != nil {
t.Errorf("NominatedNodeName field was not set for pod %v: %v", preemptor.Name, err)
}
}
})
}
}

View File

@ -682,6 +682,16 @@ func CreatePV(cs clientset.Interface, pv *v1.PersistentVolume) (*v1.PersistentVo
return cs.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{})
}
// DeletePVC deletes the given PVC in the given namespace.
func DeletePVC(cs clientset.Interface, pvcName string, nsName string) error {
return cs.CoreV1().PersistentVolumeClaims(nsName).Delete(context.TODO(), pvcName, *metav1.NewDeleteOptions(0))
}
// DeletePV deletes the given PV in the given namespace.
func DeletePV(cs clientset.Interface, pvName string) error {
return cs.CoreV1().PersistentVolumes().Delete(context.TODO(), pvName, *metav1.NewDeleteOptions(0))
}
// RunPausePod creates a pod with "Pause" image and the given config and waits
// until it is scheduled. It returns its pointer and error status.
func RunPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {