diff --git a/pkg/scheduler/testing/wrappers.go b/pkg/scheduler/testing/wrappers.go index 6ec85003535..15d331aaeb9 100644 --- a/pkg/scheduler/testing/wrappers.go +++ b/pkg/scheduler/testing/wrappers.go @@ -22,6 +22,7 @@ import ( v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1alpha3" + storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -913,6 +914,23 @@ func (p *PersistentVolumeWrapper) NodeAffinityIn(key string, vals []string) *Per return p } +// Labels sets all {k,v} pair provided by `labels` to the pv. +func (p *PersistentVolumeWrapper) Labels(labels map[string]string) *PersistentVolumeWrapper { + for k, v := range labels { + p.Label(k, v) + } + return p +} + +// Label sets a {k,v} pair to the pv. +func (p *PersistentVolumeWrapper) Label(k, v string) *PersistentVolumeWrapper { + if p.PersistentVolume.ObjectMeta.Labels == nil { + p.PersistentVolume.ObjectMeta.Labels = make(map[string]string) + } + p.PersistentVolume.ObjectMeta.Labels[k] = v + return p +} + // ResourceClaimWrapper wraps a ResourceClaim inside. type ResourceClaimWrapper struct{ resourceapi.ResourceClaim } @@ -1133,3 +1151,34 @@ func (wrapper *ResourceSliceWrapper) Device(name string, attrs map[resourceapi.Q wrapper.Spec.Devices = append(wrapper.Spec.Devices, resourceapi.Device{Name: name, Basic: &resourceapi.BasicDevice{Attributes: attrs}}) return wrapper } + +// StorageClassWrapper wraps a StorageClass inside. +type StorageClassWrapper struct{ storagev1.StorageClass } + +// MakeStorageClass creates a StorageClass wrapper. +func MakeStorageClass() *StorageClassWrapper { + return &StorageClassWrapper{} +} + +// Obj returns the inner StorageClass. +func (s *StorageClassWrapper) Obj() *storagev1.StorageClass { + return &s.StorageClass +} + +// Name sets `n` as the name of the inner StorageClass. +func (s *StorageClassWrapper) Name(n string) *StorageClassWrapper { + s.SetName(n) + return s +} + +// VolumeBindingMode sets mode as the mode of the inner StorageClass. +func (s *StorageClassWrapper) VolumeBindingMode(mode storagev1.VolumeBindingMode) *StorageClassWrapper { + s.StorageClass.VolumeBindingMode = &mode + return s +} + +// Provisoner sets p as the provisioner of the inner StorageClass. +func (s *StorageClassWrapper) Provisioner(p string) *StorageClassWrapper { + s.StorageClass.Provisioner = p + return s +} diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index 4e5da499c1f..6c4fe776a6b 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -23,9 +23,11 @@ import ( "time" v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -38,6 +40,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/component-helpers/storage/volume" "k8s.io/klog/v2" configv1 "k8s.io/kube-scheduler/config/v1" apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" @@ -194,6 +197,12 @@ func TestCoreResourceEnqueue(t *testing.T) { // Note that the scheduler won't schedule those Pods, // meaning, those Pods should be already scheduled basically; they should have .spec.nodename. initialPods []*v1.Pod + // initialPVCs are the list of PersistentVolumeClaims to be created at first. + // Note that PVs are automatically created following PVCs. + // Also, the namespace of pvcs is automatically filled in. + initialPVCs []*v1.PersistentVolumeClaim + // initialPVs are the list of PersistentVolume to be created at first. + initialPVs []*v1.PersistentVolume // pods are the list of Pods to be created. // All of them are expected to be unschedulable at first. pods []*v1.Pod @@ -905,7 +914,321 @@ func TestCoreResourceEnqueue(t *testing.T) { } return nil }, - wantRequeuedPods: sets.New("pod4"), + wantRequeuedPods: sets.New("pod4"), + }, + { + name: "Pod rejected with node by the VolumeZone plugin is requeued when the PV is added", + initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Label("node", "fake-node").Label(v1.LabelTopologyZone, "us-west1-a").Obj()}, + initialPVs: []*v1.PersistentVolume{ + st.MakePersistentVolume(). + Name("pv1"). + Labels(map[string]string{v1.LabelTopologyZone: "us-west1-a"}). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}). + Capacity(v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}). + Obj(), + }, + initialPVCs: []*v1.PersistentVolumeClaim{ + st.MakePersistentVolumeClaim(). + Name("pvc1"). + Annotation(volume.AnnBindCompleted, "true"). + VolumeName("pv1"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}). + Obj(), + st.MakePersistentVolumeClaim(). + Name("pvc2"). + Annotation(volume.AnnBindCompleted, "true"). + VolumeName("pv2"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}). + Obj(), + }, + initialPods: []*v1.Pod{ + st.MakePod().Name("pod1").Container("image").PVC("pvc1").Node("fake-node").Obj(), + }, + pods: []*v1.Pod{ + st.MakePod().Name("pod2").Container("image").PVC("pvc2").Obj(), + }, + triggerFn: func(testCtx *testutils.TestContext) error { + pv2 := st.MakePersistentVolume().Name("pv2").Label(v1.LabelTopologyZone, "us-west1-a"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}). + Capacity(v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}). + Obj() + if _, err := testCtx.ClientSet.CoreV1().PersistentVolumes().Create(testCtx.Ctx, pv2, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed to create pv2: %w", err) + } + return nil + }, + wantRequeuedPods: sets.New("pod2"), + enableSchedulingQueueHint: []bool{true}, + }, + { + name: "Pod rejected with node by the VolumeZone plugin is requeued when the PV is updated", + initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Label("node", "fake-node").Label(v1.LabelTopologyZone, "us-west1-a").Obj()}, + initialPVs: []*v1.PersistentVolume{ + st.MakePersistentVolume(). + Name("pv1"). + Labels(map[string]string{v1.LabelTopologyZone: "us-west1-a"}). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}). + Capacity(v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}). + Obj(), + st.MakePersistentVolume(). + Name("pv2"). + Labels(map[string]string{v1.LabelTopologyZone: "us-east1"}). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}). + Capacity(v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}). + Obj(), + }, + initialPVCs: []*v1.PersistentVolumeClaim{ + st.MakePersistentVolumeClaim(). + Name("pvc1"). + Annotation(volume.AnnBindCompleted, "true"). + VolumeName("pv1"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}). + Obj(), + st.MakePersistentVolumeClaim(). + Name("pvc2"). + Annotation(volume.AnnBindCompleted, "true"). + VolumeName("pv2"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}). + Obj(), + }, + initialPods: []*v1.Pod{ + st.MakePod().Name("pod1").Container("image").PVC("pvc1").Node("fake-node").Obj(), + }, + pods: []*v1.Pod{ + st.MakePod().Name("pod2").Container("image").PVC("pvc2").Obj(), + }, + triggerFn: func(testCtx *testutils.TestContext) error { + pv2 := st.MakePersistentVolume().Name("pv2").Label(v1.LabelTopologyZone, "us-west1-a"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}). + Capacity(v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}). + Obj() + if _, err := testCtx.ClientSet.CoreV1().PersistentVolumes().Update(testCtx.Ctx, pv2, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update pv2: %w", err) + } + return nil + }, + wantRequeuedPods: sets.New("pod2"), + enableSchedulingQueueHint: []bool{true}, + }, + { + name: "Pod rejected with node by the VolumeZone plugin is requeued when the PVC bound to the pod is added", + initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Label("node", "fake-node").Label(v1.LabelTopologyZone, "us-west1-a").Obj()}, + initialPVs: []*v1.PersistentVolume{ + st.MakePersistentVolume(). + Name("pv1"). + Labels(map[string]string{v1.LabelTopologyZone: "us-west1-a"}). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}). + Capacity(v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}). + Obj(), + st.MakePersistentVolume(). + Name("pv2"). + Labels(map[string]string{v1.LabelTopologyZone: "us-east1"}). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}). + Capacity(v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}). + Obj(), + }, + initialPVCs: []*v1.PersistentVolumeClaim{ + st.MakePersistentVolumeClaim(). + Name("pvc1"). + Annotation(volume.AnnBindCompleted, "true"). + VolumeName("pv1"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}). + Obj(), + }, + initialPods: []*v1.Pod{ + st.MakePod().Name("pod1").Container("image").PVC("pvc1").Node("fake-node").Obj(), + }, + pods: []*v1.Pod{ + st.MakePod().Name("pod2").Container("image").PVC("pvc2").Obj(), + st.MakePod().Name("pod3").Container("image").PVC("pvc3").Obj(), + }, + triggerFn: func(testCtx *testutils.TestContext) error { + pvc2 := st.MakePersistentVolumeClaim(). + Name("pvc2"). + Annotation(volume.AnnBindCompleted, "true"). + VolumeName("pv2"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}). + Obj() + if _, err := testCtx.ClientSet.CoreV1().PersistentVolumeClaims(testCtx.NS.Name).Create(testCtx.Ctx, pvc2, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed to add pvc2: %w", err) + } + return nil + }, + wantRequeuedPods: sets.New("pod2"), + enableSchedulingQueueHint: []bool{true}, + }, + { + name: "Pod rejected with node by the VolumeZone plugin is requeued when the PVC bound to the pod is updated", + initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Label("node", "fake-node").Label(v1.LabelTopologyZone, "us-west1-a").Obj()}, + initialPVs: []*v1.PersistentVolume{ + st.MakePersistentVolume(). + Name("pv1"). + Labels(map[string]string{v1.LabelTopologyZone: "us-west1-a"}). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}). + Capacity(v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}). + Obj(), + st.MakePersistentVolume(). + Name("pv2"). + Labels(map[string]string{v1.LabelTopologyZone: "us-east1"}). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}). + Capacity(v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}). + Obj(), + }, + initialPVCs: []*v1.PersistentVolumeClaim{ + st.MakePersistentVolumeClaim(). + Name("pvc1"). + Annotation(volume.AnnBindCompleted, "true"). + VolumeName("pv1"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}). + Obj(), + st.MakePersistentVolumeClaim(). + Name("pvc2"). + Annotation(volume.AnnBindCompleted, "true"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}). + Obj(), + }, + initialPods: []*v1.Pod{ + st.MakePod().Name("pod1").Container("image").PVC("pvc1").Node("fake-node").Obj(), + }, + pods: []*v1.Pod{ + st.MakePod().Name("pod2").Container("image").PVC("pvc2").Obj(), + st.MakePod().Name("pod3").Container("image").PVC("pvc3").Obj(), + }, + triggerFn: func(testCtx *testutils.TestContext) error { + pvc2 := st.MakePersistentVolumeClaim(). + Name("pvc2"). + Annotation(volume.AnnBindCompleted, "true"). + VolumeName("pv2"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}). + Obj() + if _, err := testCtx.ClientSet.CoreV1().PersistentVolumeClaims(testCtx.NS.Name).Update(testCtx.Ctx, pvc2, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update pvc2: %w", err) + } + return nil + }, + wantRequeuedPods: sets.New("pod2"), + enableSchedulingQueueHint: []bool{true}, + }, + { + name: "Pod rejected with node by the VolumeZone plugin is requeued when the Storage class is added", + initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Label("node", "fake-node").Label(v1.LabelTopologyZone, "us-west1-a").Obj()}, + initialPVs: []*v1.PersistentVolume{ + st.MakePersistentVolume(). + Name("pv1"). + Labels(map[string]string{v1.LabelTopologyZone: "us-west1-a"}). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}). + Capacity(v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}). + Obj(), + }, + initialPVCs: []*v1.PersistentVolumeClaim{ + st.MakePersistentVolumeClaim(). + Name("pvc1"). + Annotation(volume.AnnBindCompleted, "true"). + VolumeName("pv1"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}). + Obj(), + st.MakePersistentVolumeClaim(). + Name("pvc2"). + Annotation(volume.AnnBindCompleted, "true"). + VolumeName("pv2"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}). + Obj(), + }, + initialPods: []*v1.Pod{ + st.MakePod().Name("pod1").Container("image").PVC("pvc1").Node("fake-node").Obj(), + }, + pods: []*v1.Pod{ + st.MakePod().Name("pod2").Container("image").PVC("pvc2").Obj(), + }, + triggerFn: func(testCtx *testutils.TestContext) error { + sc1 := st.MakeStorageClass(). + Name("sc1"). + VolumeBindingMode(storagev1.VolumeBindingWaitForFirstConsumer). + Provisioner("p"). + Obj() + if _, err := testCtx.ClientSet.StorageV1().StorageClasses().Create(testCtx.Ctx, sc1, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed to create sc1: %w", err) + } + return nil + }, + wantRequeuedPods: sets.New("pod2"), + enableSchedulingQueueHint: []bool{true}, + }, + { + name: "Pod rejected with node by the VolumeZone plugin is not requeued when the PV is updated but the topology is same", + initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Label("node", "fake-node").Label(v1.LabelTopologyZone, "us-west1-a").Obj()}, + initialPVs: []*v1.PersistentVolume{ + st.MakePersistentVolume(). + Name("pv1"). + Labels(map[string]string{v1.LabelTopologyZone: "us-west1-a"}). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}). + Capacity(v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}). + Obj(), + st.MakePersistentVolume(). + Name("pv2"). + Labels(map[string]string{v1.LabelTopologyZone: "us-east1"}). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}). + Capacity(v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}). + Obj(), + }, + initialPVCs: []*v1.PersistentVolumeClaim{ + st.MakePersistentVolumeClaim(). + Name("pvc1"). + Annotation(volume.AnnBindCompleted, "true"). + VolumeName("pv1"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}). + Obj(), + st.MakePersistentVolumeClaim(). + Name("pvc2"). + Annotation(volume.AnnBindCompleted, "true"). + VolumeName("pv2"). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}). + Resources(v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}). + Obj(), + }, + initialPods: []*v1.Pod{ + st.MakePod().Name("pod1").Container("image").PVC("pvc1").Node("fake-node").Obj(), + }, + pods: []*v1.Pod{ + st.MakePod().Name("pod2").Container("image").PVC("pvc2").Obj(), + }, + triggerFn: func(testCtx *testutils.TestContext) error { + pv2 := st.MakePersistentVolume().Name("pv2"). + Labels(map[string]string{v1.LabelTopologyZone: "us-east1", "unrelated": "unrelated"}). + AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}). + Capacity(v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}). + HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: ptr.To(v1.HostPathDirectoryOrCreate)}). + Obj() + if _, err := testCtx.ClientSet.CoreV1().PersistentVolumes().Update(testCtx.Ctx, pv2, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update pv2: %w", err) + } + return nil + }, + wantRequeuedPods: sets.Set[string]{}, enableSchedulingQueueHint: []bool{true}, }, } @@ -942,6 +1265,19 @@ func TestCoreResourceEnqueue(t *testing.T) { } } + for _, pv := range tt.initialPVs { + if _, err := testutils.CreatePV(cs, pv); err != nil { + t.Fatalf("Failed to create a PV %q: %v", pv.Name, err) + } + } + + for _, pvc := range tt.initialPVCs { + pvc.Namespace = ns + if _, err := testutils.CreatePVC(cs, pvc); err != nil { + t.Fatalf("Failed to create a PVC %q: %v", pvc.Name, err) + } + } + for _, pod := range tt.initialPods { if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil { t.Fatalf("Failed to create an initial Pod %q: %v", pod.Name, err)