From 42cfda2f946aad3c366eb825153456ac285bb4f6 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Mon, 6 Jul 2020 15:40:10 -0700 Subject: [PATCH 1/2] Remove pvcLister from genericScheduler PVCLister can be fetched from sharedInformerFactory. --- pkg/scheduler/core/BUILD | 1 - pkg/scheduler/core/extender_test.go | 2 +- pkg/scheduler/core/generic_scheduler.go | 6 ++--- pkg/scheduler/core/generic_scheduler_test.go | 28 +++++++++----------- pkg/scheduler/factory.go | 1 - pkg/scheduler/scheduler_test.go | 10 ++++--- 6 files changed, 23 insertions(+), 25 deletions(-) diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index 738289a2d8a..6042c42a966 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -48,7 +48,6 @@ go_test( "//pkg/scheduler/framework/plugins/selectorspread:go_default_library", "//pkg/scheduler/framework/runtime:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", - "//pkg/scheduler/framework/v1alpha1/fake:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/profile:go_default_library", diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 36ec0a4c19e..bbb9b770a3e 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -272,6 +272,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { fwk, err := st.NewFramework( test.registerPlugins, runtime.WithClientSet(client), + runtime.WithInformerFactory(informerFactory), runtime.WithPodNominator(internalqueue.NewPodNominator()), ) if err != nil { @@ -285,7 +286,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { cache, emptySnapshot, extenders, - informerFactory.Core().V1().PersistentVolumeClaims().Lister(), schedulerapi.DefaultPercentageOfNodesToScore) podIgnored := &v1.Pod{} result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), podIgnored) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 4e3f5910507..711661d8a0a 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -119,7 +119,6 @@ type genericScheduler struct { cache internalcache.Cache extenders []framework.Extender nodeInfoSnapshot *internalcache.Snapshot - pvcLister corelisters.PersistentVolumeClaimLister percentageOfNodesToScore int32 nextStartNodeIndex int } @@ -138,7 +137,8 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) - if err := podPassesBasicChecks(pod, g.pvcLister); err != nil { + pvcLister := prof.SharedInformerFactory().Core().V1().PersistentVolumeClaims().Lister() + if err := podPassesBasicChecks(pod, pvcLister); err != nil { return result, err } trace.Step("Basic checks done") @@ -628,13 +628,11 @@ func NewGenericScheduler( cache internalcache.Cache, nodeInfoSnapshot *internalcache.Snapshot, extenders []framework.Extender, - pvcLister corelisters.PersistentVolumeClaimLister, percentageOfNodesToScore int32) ScheduleAlgorithm { return &genericScheduler{ cache: cache, extenders: extenders, nodeInfoSnapshot: nodeInfoSnapshot, - pvcLister: pvcLister, percentageOfNodesToScore: percentageOfNodesToScore, } } diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 4c5847ec2f0..a4a168f4cd0 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "math" - "reflect" "strconv" "testing" "time" @@ -42,7 +41,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - fakeframework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1/fake" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/profile" @@ -422,9 +420,9 @@ func TestGenericScheduler(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, nodes: []string{"machine1", "machine2"}, - pvcs: []v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "existingPVC"}}}, + pvcs: []v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", UID: types.UID("existingPVC"), Namespace: v1.NamespaceDefault}}}, pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}, + ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore"), Namespace: v1.NamespaceDefault}, Spec: v1.PodSpec{ Volumes: []v1.Volume{ { @@ -474,9 +472,9 @@ func TestGenericScheduler(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, nodes: []string{"machine1", "machine2"}, - pvcs: []v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", DeletionTimestamp: &metav1.Time{}}}}, + pvcs: []v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", UID: types.UID("existingPVC"), Namespace: v1.NamespaceDefault, DeletionTimestamp: &metav1.Time{}}}}, pod: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}, + ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore"), Namespace: v1.NamespaceDefault}, Spec: v1.PodSpec{ Volumes: []v1.Volume{ { @@ -728,10 +726,16 @@ func TestGenericScheduler(t *testing.T) { cache.AddNode(node) } + cs := clientsetfake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + for i := range test.pvcs { + informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetStore().Add(&test.pvcs[i]) + } snapshot := internalcache.NewSnapshot(test.pods, nodes) fwk, err := st.NewFramework( test.registerPlugins, frameworkruntime.WithSnapshotSharedLister(snapshot), + frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), ) if err != nil { @@ -741,19 +745,14 @@ func TestGenericScheduler(t *testing.T) { Framework: fwk, } - var pvcs []v1.PersistentVolumeClaim - pvcs = append(pvcs, test.pvcs...) - pvcLister := fakeframework.PersistentVolumeClaimLister(pvcs) - scheduler := NewGenericScheduler( cache, snapshot, []framework.Extender{}, - pvcLister, schedulerapi.DefaultPercentageOfNodesToScore) result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), test.pod) - if !reflect.DeepEqual(err, test.wErr) { - t.Errorf("want: %v, got: %v", test.wErr, err) + if err != test.wErr && err.Error() != test.wErr.Error() { + t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr) } if test.expectedHosts != nil && !test.expectedHosts.Has(result.SuggestedHost) { t.Errorf("Expected: %s, got: %s", test.expectedHosts, result.SuggestedHost) @@ -775,7 +774,7 @@ func makeScheduler(nodes []*v1.Node) *genericScheduler { s := NewGenericScheduler( cache, emptySnapshot, - nil, nil, + nil, schedulerapi.DefaultPercentageOfNodesToScore) cache.UpdateSnapshot(s.(*genericScheduler).nodeInfoSnapshot) return s.(*genericScheduler) @@ -1069,7 +1068,6 @@ func TestZeroRequest(t *testing.T) { nil, emptySnapshot, []framework.Extender{}, - nil, schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler) scheduler.nodeInfoSnapshot = snapshot diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 2fe58aa68dd..249a0ee980e 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -180,7 +180,6 @@ func (c *Configurator) create() (*Scheduler, error) { c.schedulerCache, c.nodeInfoSnapshot, extenders, - c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(), c.percentageOfNodesToScore, ) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 8c9a7e17424..3cdce85cfee 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -807,7 +807,12 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C return true, b, nil }) - fwk, _ := st.NewFramework(fns, frameworkruntime.WithClientSet(client), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator())) + fwk, _ := st.NewFramework( + fns, + frameworkruntime.WithClientSet(client), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), + ) prof := &profile.Profile{ Framework: fwk, Recorder: &events.FakeRecorder{}, @@ -824,7 +829,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C scache, internalcache.NewEmptySnapshot(), []framework.Extender{}, - informerFactory.Core().V1().PersistentVolumeClaims().Lister(), schedulerapi.DefaultPercentageOfNodesToScore, ) @@ -858,6 +862,7 @@ func setupTestSchedulerWithVolumeBinding(volumeBinder scheduling.SchedulerVolume testPVC := v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "testPVC", Namespace: pod.Namespace, UID: types.UID("testPVC")}} client := clientsetfake.NewSimpleClientset(&testNode, &testPVC) informerFactory := informers.NewSharedInformerFactory(client, 0) + informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetStore().Add(&testPVC) fns := []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), @@ -1172,7 +1177,6 @@ func TestSchedulerBinding(t *testing.T) { scache, nil, test.extenders, - nil, 0, ) sched := Scheduler{ From 185ba08fcd5b046c167ee1a808a2d149c922f9ee Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Fri, 11 Sep 2020 13:52:30 -0700 Subject: [PATCH 2/2] Move podPassesBasicChecks() to VolumeBinding plugin --- pkg/scheduler/core/BUILD | 6 +-- pkg/scheduler/core/generic_scheduler.go | 50 ------------------- pkg/scheduler/core/generic_scheduler_test.go | 33 +++++++++--- .../framework/plugins/volumebinding/BUILD | 2 + .../plugins/volumebinding/volume_binding.go | 44 +++++++++++++--- .../volumebinding/volume_binding_test.go | 2 +- pkg/scheduler/scheduler_test.go | 5 +- 7 files changed, 73 insertions(+), 69 deletions(-) diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index 6042c42a966..a4e0777e0b0 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -10,7 +10,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/api/v1/pod:go_default_library", - "//pkg/features:go_default_library", "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework/runtime:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", @@ -20,11 +19,8 @@ go_library( "//pkg/scheduler/profile:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", - "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", @@ -40,12 +36,14 @@ go_test( ], embed = [":go_default_library"], deps = [ + "//pkg/controller/volume/persistentvolume/util:go_default_library", "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework/plugins/defaultbinder:go_default_library", "//pkg/scheduler/framework/plugins/noderesources:go_default_library", "//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library", "//pkg/scheduler/framework/plugins/queuesort:go_default_library", "//pkg/scheduler/framework/plugins/selectorspread:go_default_library", + "//pkg/scheduler/framework/plugins/volumebinding:go_default_library", "//pkg/scheduler/framework/runtime:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 711661d8a0a..5fe479c3e5d 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -29,12 +29,8 @@ import ( "k8s.io/klog/v2" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - utilfeature "k8s.io/apiserver/pkg/util/feature" - corelisters "k8s.io/client-go/listers/core/v1" extenderv1 "k8s.io/kube-scheduler/extender/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -137,12 +133,6 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) - pvcLister := prof.SharedInformerFactory().Core().V1().PersistentVolumeClaims().Lister() - if err := podPassesBasicChecks(pod, pvcLister); err != nil { - return result, err - } - trace.Step("Basic checks done") - if err := g.snapshot(); err != nil { return result, err } @@ -273,7 +263,6 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profil filteredNodesStatuses[n.Node().Name] = s } return nil, filteredNodesStatuses, nil - } feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses) @@ -584,45 +573,6 @@ func (g *genericScheduler) prioritizeNodes( return result, nil } -// podPassesBasicChecks makes sanity checks on the pod if it can be scheduled. -func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeClaimLister) error { - // Check PVCs used by the pod - namespace := pod.Namespace - manifest := &(pod.Spec) - for i := range manifest.Volumes { - volume := &manifest.Volumes[i] - var pvcName string - ephemeral := false - switch { - case volume.PersistentVolumeClaim != nil: - pvcName = volume.PersistentVolumeClaim.ClaimName - case volume.Ephemeral != nil && - utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume): - pvcName = pod.Name + "-" + volume.Name - ephemeral = true - default: - // Volume is not using a PVC, ignore - continue - } - pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(pvcName) - if err != nil { - // The error has already enough context ("persistentvolumeclaim "myclaim" not found") - return err - } - - if pvc.DeletionTimestamp != nil { - return fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name) - } - - if ephemeral && - !metav1.IsControlledBy(pvc, pod) { - return fmt.Errorf("persistentvolumeclaim %q was not created for the pod", pvc.Name) - } - } - - return nil -} - // NewGenericScheduler creates a genericScheduler object. func NewGenericScheduler( cache internalcache.Cache, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index a4a168f4cd0..9ab27f0120e 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -21,6 +21,7 @@ import ( "fmt" "math" "strconv" + "strings" "testing" "time" @@ -33,12 +34,14 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" + pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -416,11 +419,17 @@ func TestGenericScheduler(t *testing.T) { // Pod with existing PVC registerPlugins: []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterPreFilterPlugin(volumebinding.Name, volumebinding.New), st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, nodes: []string{"machine1", "machine2"}, - pvcs: []v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", UID: types.UID("existingPVC"), Namespace: v1.NamespaceDefault}}}, + pvcs: []v1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", UID: types.UID("existingPVC"), Namespace: v1.NamespaceDefault}, + Spec: v1.PersistentVolumeClaimSpec{VolumeName: "existingPV"}, + }, + }, pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore"), Namespace: v1.NamespaceDefault}, Spec: v1.PodSpec{ @@ -443,6 +452,7 @@ func TestGenericScheduler(t *testing.T) { // Pod with non existing PVC registerPlugins: []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterPreFilterPlugin(volumebinding.Name, volumebinding.New), st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, @@ -468,6 +478,7 @@ func TestGenericScheduler(t *testing.T) { // Pod with deleting PVC registerPlugins: []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterPreFilterPlugin(volumebinding.Name, volumebinding.New), st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, @@ -726,10 +737,16 @@ func TestGenericScheduler(t *testing.T) { cache.AddNode(node) } + ctx := context.Background() cs := clientsetfake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(cs, 0) - for i := range test.pvcs { - informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetStore().Add(&test.pvcs[i]) + for _, pvc := range test.pvcs { + metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "true") + cs.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, &pvc, metav1.CreateOptions{}) + if pvName := pvc.Spec.VolumeName; pvName != "" { + pv := v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: pvName}} + cs.CoreV1().PersistentVolumes().Create(ctx, &pv, metav1.CreateOptions{}) + } } snapshot := internalcache.NewSnapshot(test.pods, nodes) fwk, err := st.NewFramework( @@ -749,9 +766,13 @@ func TestGenericScheduler(t *testing.T) { cache, snapshot, []framework.Extender{}, - schedulerapi.DefaultPercentageOfNodesToScore) - result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), test.pod) - if err != test.wErr && err.Error() != test.wErr.Error() { + schedulerapi.DefaultPercentageOfNodesToScore, + ) + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + + result, err := scheduler.Schedule(ctx, prof, framework.NewCycleState(), test.pod) + if err != test.wErr && !strings.Contains(err.Error(), test.wErr.Error()) { t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr) } if test.expectedHosts != nil && !test.expectedHosts.Has(result.SuggestedHost) { diff --git a/pkg/scheduler/framework/plugins/volumebinding/BUILD b/pkg/scheduler/framework/plugins/volumebinding/BUILD index dd70db1ea9d..745e6e6a219 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/BUILD +++ b/pkg/scheduler/framework/plugins/volumebinding/BUILD @@ -11,8 +11,10 @@ go_library( "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", ], ) diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index 52c316de348..a8fa0256e80 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -23,8 +23,10 @@ import ( "time" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilfeature "k8s.io/apiserver/pkg/util/feature" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller/volume/scheduling" "k8s.io/kubernetes/pkg/features" @@ -62,6 +64,7 @@ func (d *stateData) Clone() framework.StateData { // Reserve and PreBind phases. type VolumeBinding struct { Binder scheduling.SchedulerVolumeBinder + PVCLister corelisters.PersistentVolumeClaimLister GenericEphemeralVolumeFeatureEnabled bool } @@ -78,14 +81,40 @@ func (pl *VolumeBinding) Name() string { return Name } -func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) bool { +// podHasPVCs returns 2 values: +// - the first one to denote if the given "pod" has any PVC defined. +// - the second one to return any error if the requested PVC is illegal. +func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) (bool, error) { + hasPVC := false for _, vol := range pod.Spec.Volumes { - if vol.PersistentVolumeClaim != nil || - pl.GenericEphemeralVolumeFeatureEnabled && vol.Ephemeral != nil { - return true + var pvcName string + ephemeral := false + switch { + case vol.PersistentVolumeClaim != nil: + pvcName = vol.PersistentVolumeClaim.ClaimName + case vol.Ephemeral != nil && pl.GenericEphemeralVolumeFeatureEnabled: + pvcName = pod.Name + "-" + vol.Name + ephemeral = true + default: + // Volume is not using a PVC, ignore + continue + } + hasPVC = true + pvc, err := pl.PVCLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) + if err != nil { + // The error has already enough context ("persistentvolumeclaim "myclaim" not found") + return hasPVC, err + } + + if pvc.DeletionTimestamp != nil { + return hasPVC, fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name) + } + + if ephemeral && !metav1.IsControlledBy(pvc, pod) { + return hasPVC, fmt.Errorf("persistentvolumeclaim %q was not created for the pod", pvc.Name) } } - return false + return hasPVC, nil } // PreFilter invoked at the prefilter extension point to check if pod has all @@ -93,7 +122,9 @@ func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) bool { // UnschedulableAndUnresolvable is returned. func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status { // If pod does not reference any PVC, we don't need to do anything. - if !pl.podHasPVCs(pod) { + if hasPVC, err := pl.podHasPVCs(pod); err != nil { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) + } else if !hasPVC { state.Write(stateKey, &stateData{skip: true}) return nil } @@ -271,6 +302,7 @@ func New(plArgs runtime.Object, fh framework.FrameworkHandle) (framework.Plugin, binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second) return &VolumeBinding{ Binder: binder, + PVCLister: pvcInformer.Lister(), GenericEphemeralVolumeFeatureEnabled: utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume), }, nil } diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go index 14695cdb922..e602843ac4c 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go @@ -209,7 +209,7 @@ func TestVolumeBinding(t *testing.T) { name: "pvc not found", pod: makePod("pod-a", []string{"pvc-a"}), node: &v1.Node{}, - wantPreFilterStatus: framework.NewStatus(framework.Error, `error getting PVC "default/pvc-a": could not find v1.PersistentVolumeClaim "default/pvc-a"`), + wantPreFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "pvc-a" not found`), wantFilterStatus: nil, }, { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 3cdce85cfee..72b2312e8ac 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -862,13 +862,14 @@ func setupTestSchedulerWithVolumeBinding(volumeBinder scheduling.SchedulerVolume testPVC := v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "testPVC", Namespace: pod.Namespace, UID: types.UID("testPVC")}} client := clientsetfake.NewSimpleClientset(&testNode, &testPVC) informerFactory := informers.NewSharedInformerFactory(client, 0) - informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetStore().Add(&testPVC) + pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() + pvcInformer.Informer().GetStore().Add(&testPVC) fns := []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterPluginAsExtensions(volumebinding.Name, func(plArgs runtime.Object, handle framework.FrameworkHandle) (framework.Plugin, error) { - return &volumebinding.VolumeBinding{Binder: volumeBinder}, nil + return &volumebinding.VolumeBinding{Binder: volumeBinder, PVCLister: pvcInformer.Lister()}, nil }, "PreFilter", "Filter", "Reserve", "PreBind"), } s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, fns...)