feat: support preemption for pods using ReadWriteOncePod PVCs

PVCs using the ReadWriteOncePod access mode can only be referenced by a
single pod. When a pod is scheduled that uses a ReadWriteOncePod PVC,
return "Unschedulable" if the PVC is already in-use in the cluster.

To support preemption, the "VolumeRestrictions" scheduler plugin
computes cycle state during the PreFilter phase. This cycle state
contains the number of references to the ReadWriteOncePod PVCs used by
the pod-to-be-scheduled.

During scheduler simulation (AddPod and RemovePod), we add and remove
reference counts from the cycle state if they use any of these
ReadWriteOncePod PVCs.

In the Filter phase, the scheduler checks if there are any PVC reference
conflicts, and returns "Unschedulable" if there is a conflict.

This is a required feature for the ReadWriteOncePod beta. See for more context:
https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/2485-read-write-once-pod-pv-access-mode#beta
This commit is contained in:
Chris Henzie
2022-11-22 12:08:27 -08:00
parent ad2a9f2f33
commit dbc7d8ded0
4 changed files with 604 additions and 58 deletions

View File

@@ -38,6 +38,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2"
configv1 "k8s.io/kube-scheduler/config/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@@ -46,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/plugin/pkg/admission/priority"
@@ -1607,3 +1609,355 @@ func TestPreferNominatedNode(t *testing.T) {
})
}
}
// TestReadWriteOncePodPreemption tests preemption scenarios for pods with
// ReadWriteOncePod PVCs.
func TestReadWriteOncePodPreemption(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ReadWriteOncePod, true)()
cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
Profiles: []configv1.KubeSchedulerProfile{{
SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
Plugins: &configv1.Plugins{
Filter: configv1.PluginSet{
Enabled: []configv1.Plugin{
{Name: volumerestrictions.Name},
},
},
PreFilter: configv1.PluginSet{
Enabled: []configv1.Plugin{
{Name: volumerestrictions.Name},
},
},
},
}},
})
testCtx := testutils.InitTestSchedulerWithOptions(t,
testutils.InitTestAPIServer(t, "preemption", nil),
0,
scheduler.WithProfiles(cfg.Profiles...))
testutils.SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
defer testutils.CleanupTest(t, testCtx)
cs := testCtx.ClientSet
storage := v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}
volType := v1.HostPathDirectoryOrCreate
pv1 := st.MakePersistentVolume().
Name("pv-with-read-write-once-pod-1").
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}).
Capacity(storage.Requests).
HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/mnt1", Type: &volType}).
Obj()
pvc1 := st.MakePersistentVolumeClaim().
Name("pvc-with-read-write-once-pod-1").
Namespace(testCtx.NS.Name).
// Annotation and volume name required for PVC to be considered bound.
Annotation(volume.AnnBindCompleted, "true").
VolumeName(pv1.Name).
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}).
Resources(storage).
Obj()
pv2 := st.MakePersistentVolume().
Name("pv-with-read-write-once-pod-2").
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}).
Capacity(storage.Requests).
HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/mnt2", Type: &volType}).
Obj()
pvc2 := st.MakePersistentVolumeClaim().
Name("pvc-with-read-write-once-pod-2").
Namespace(testCtx.NS.Name).
// Annotation and volume name required for PVC to be considered bound.
Annotation(volume.AnnBindCompleted, "true").
VolumeName(pv2.Name).
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}).
Resources(storage).
Obj()
tests := []struct {
name string
init func() error
existingPods []*v1.Pod
pod *v1.Pod
unresolvable bool
preemptedPodIndexes map[int]struct{}
cleanup func() error
}{
{
name: "preempt single pod",
init: func() error {
_, err := testutils.CreatePV(cs, pv1)
if err != nil {
return fmt.Errorf("cannot create pv: %v", err)
}
_, err = testutils.CreatePVC(cs, pvc1)
if err != nil {
return fmt.Errorf("cannot create pvc: %v", err)
}
return nil
},
existingPods: []*v1.Pod{
initPausePod(&testutils.PausePodConfig{
Name: "victim-pod",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
Volumes: []v1.Volume{{
Name: "volume",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc1.Name,
},
},
}},
}),
},
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
Volumes: []v1.Volume{{
Name: "volume",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc1.Name,
},
},
}},
}),
preemptedPodIndexes: map[int]struct{}{0: {}},
cleanup: func() error {
if err := testutils.DeletePVC(cs, pvc1.Name, pvc1.Namespace); err != nil {
return fmt.Errorf("cannot delete pvc: %v", err)
}
if err := testutils.DeletePV(cs, pv1.Name); err != nil {
return fmt.Errorf("cannot delete pv: %v", err)
}
return nil
},
},
{
name: "preempt two pods",
init: func() error {
for _, pv := range []*v1.PersistentVolume{pv1, pv2} {
_, err := testutils.CreatePV(cs, pv)
if err != nil {
return fmt.Errorf("cannot create pv: %v", err)
}
}
for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} {
_, err := testutils.CreatePVC(cs, pvc)
if err != nil {
return fmt.Errorf("cannot create pvc: %v", err)
}
}
return nil
},
existingPods: []*v1.Pod{
initPausePod(&testutils.PausePodConfig{
Name: "victim-pod-1",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
Volumes: []v1.Volume{{
Name: "volume",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc1.Name,
},
},
}},
}),
initPausePod(&testutils.PausePodConfig{
Name: "victim-pod-2",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
Volumes: []v1.Volume{{
Name: "volume",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc2.Name,
},
},
}},
}),
},
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
Volumes: []v1.Volume{
{
Name: "volume-1",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc1.Name,
},
},
},
{
Name: "volume-2",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc2.Name,
},
},
},
},
}),
preemptedPodIndexes: map[int]struct{}{0: {}, 1: {}},
cleanup: func() error {
for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} {
if err := testutils.DeletePVC(cs, pvc.Name, pvc.Namespace); err != nil {
return fmt.Errorf("cannot delete pvc: %v", err)
}
}
for _, pv := range []*v1.PersistentVolume{pv1, pv2} {
if err := testutils.DeletePV(cs, pv.Name); err != nil {
return fmt.Errorf("cannot delete pv: %v", err)
}
}
return nil
},
},
{
name: "preempt single pod with two volumes",
init: func() error {
for _, pv := range []*v1.PersistentVolume{pv1, pv2} {
_, err := testutils.CreatePV(cs, pv)
if err != nil {
return fmt.Errorf("cannot create pv: %v", err)
}
}
for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} {
_, err := testutils.CreatePVC(cs, pvc)
if err != nil {
return fmt.Errorf("cannot create pvc: %v", err)
}
}
return nil
},
existingPods: []*v1.Pod{
initPausePod(&testutils.PausePodConfig{
Name: "victim-pod",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
Volumes: []v1.Volume{
{
Name: "volume-1",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc1.Name,
},
},
},
{
Name: "volume-2",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc2.Name,
},
},
},
},
}),
},
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
Volumes: []v1.Volume{
{
Name: "volume-1",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc1.Name,
},
},
},
{
Name: "volume-2",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc2.Name,
},
},
},
},
}),
preemptedPodIndexes: map[int]struct{}{0: {}},
cleanup: func() error {
for _, pvc := range []*v1.PersistentVolumeClaim{pvc1, pvc2} {
if err := testutils.DeletePVC(cs, pvc.Name, pvc.Namespace); err != nil {
return fmt.Errorf("cannot delete pvc: %v", err)
}
}
for _, pv := range []*v1.PersistentVolume{pv1, pv2} {
if err := testutils.DeletePV(cs, pv.Name); err != nil {
return fmt.Errorf("cannot delete pv: %v", err)
}
}
return nil
},
},
}
// Create a node with some resources and a label.
nodeRes := map[v1.ResourceName]string{
v1.ResourcePods: "32",
v1.ResourceCPU: "500m",
v1.ResourceMemory: "500",
}
nodeObject := st.MakeNode().Name("node1").Capacity(nodeRes).Label("node", "node1").Obj()
if _, err := createNode(cs, nodeObject); err != nil {
t.Fatalf("Error creating node: %v", err)
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if err := test.init(); err != nil {
t.Fatalf("Error while initializing test: %v", err)
}
pods := make([]*v1.Pod, len(test.existingPods))
t.Cleanup(func() {
testutils.CleanupPods(cs, t, pods)
if err := test.cleanup(); err != nil {
t.Errorf("Error cleaning up test: %v", err)
}
})
// Create and run existingPods.
for i, p := range test.existingPods {
var err error
pods[i], err = runPausePod(cs, p)
if err != nil {
t.Fatalf("Error running pause pod: %v", err)
}
}
// Create the "pod".
preemptor, err := createPausePod(cs, test.pod)
if err != nil {
t.Errorf("Error while creating high priority pod: %v", err)
}
pods = append(pods, preemptor)
// Wait for preemption of pods and make sure the other ones are not preempted.
for i, p := range pods {
if _, found := test.preemptedPodIndexes[i]; found {
if err = wait.Poll(time.Second, wait.ForeverTestTimeout, podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil {
t.Errorf("Pod %v/%v is not getting evicted.", p.Namespace, p.Name)
}
} else {
if p.DeletionTimestamp != nil {
t.Errorf("Didn't expect pod %v to get preempted.", p.Name)
}
}
}
// Also check that the preemptor pod gets the NominatedNodeName field set.
if len(test.preemptedPodIndexes) > 0 {
if err := waitForNominatedNodeName(cs, preemptor); err != nil {
t.Errorf("NominatedNodeName field was not set for pod %v: %v", preemptor.Name, err)
}
}
})
}
}