From 185ba08fcd5b046c167ee1a808a2d149c922f9ee Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Fri, 11 Sep 2020 13:52:30 -0700 Subject: [PATCH] Move podPassesBasicChecks() to VolumeBinding plugin --- pkg/scheduler/core/BUILD | 6 +-- pkg/scheduler/core/generic_scheduler.go | 50 ------------------- pkg/scheduler/core/generic_scheduler_test.go | 33 +++++++++--- .../framework/plugins/volumebinding/BUILD | 2 + .../plugins/volumebinding/volume_binding.go | 44 +++++++++++++--- .../volumebinding/volume_binding_test.go | 2 +- pkg/scheduler/scheduler_test.go | 5 +- 7 files changed, 73 insertions(+), 69 deletions(-) diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index 6042c42a966..a4e0777e0b0 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -10,7 +10,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/api/v1/pod:go_default_library", - "//pkg/features:go_default_library", "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework/runtime:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", @@ -20,11 +19,8 @@ go_library( "//pkg/scheduler/profile:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", - "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", @@ -40,12 +36,14 @@ go_test( ], embed = [":go_default_library"], deps = [ + "//pkg/controller/volume/persistentvolume/util:go_default_library", "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework/plugins/defaultbinder:go_default_library", "//pkg/scheduler/framework/plugins/noderesources:go_default_library", "//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library", "//pkg/scheduler/framework/plugins/queuesort:go_default_library", "//pkg/scheduler/framework/plugins/selectorspread:go_default_library", + "//pkg/scheduler/framework/plugins/volumebinding:go_default_library", "//pkg/scheduler/framework/runtime:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 711661d8a0a..5fe479c3e5d 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -29,12 +29,8 @@ import ( "k8s.io/klog/v2" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - utilfeature "k8s.io/apiserver/pkg/util/feature" - corelisters "k8s.io/client-go/listers/core/v1" extenderv1 "k8s.io/kube-scheduler/extender/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -137,12 +133,6 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) - pvcLister := prof.SharedInformerFactory().Core().V1().PersistentVolumeClaims().Lister() - if err := podPassesBasicChecks(pod, pvcLister); err != nil { - return result, err - } - trace.Step("Basic checks done") - if err := g.snapshot(); err != nil { return result, err } @@ -273,7 +263,6 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profil filteredNodesStatuses[n.Node().Name] = s } return nil, filteredNodesStatuses, nil - } feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses) @@ -584,45 +573,6 @@ func (g *genericScheduler) prioritizeNodes( return result, nil } -// podPassesBasicChecks makes sanity checks on the pod if it can be scheduled. -func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeClaimLister) error { - // Check PVCs used by the pod - namespace := pod.Namespace - manifest := &(pod.Spec) - for i := range manifest.Volumes { - volume := &manifest.Volumes[i] - var pvcName string - ephemeral := false - switch { - case volume.PersistentVolumeClaim != nil: - pvcName = volume.PersistentVolumeClaim.ClaimName - case volume.Ephemeral != nil && - utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume): - pvcName = pod.Name + "-" + volume.Name - ephemeral = true - default: - // Volume is not using a PVC, ignore - continue - } - pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(pvcName) - if err != nil { - // The error has already enough context ("persistentvolumeclaim "myclaim" not found") - return err - } - - if pvc.DeletionTimestamp != nil { - return fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name) - } - - if ephemeral && - !metav1.IsControlledBy(pvc, pod) { - return fmt.Errorf("persistentvolumeclaim %q was not created for the pod", pvc.Name) - } - } - - return nil -} - // NewGenericScheduler creates a genericScheduler object. func NewGenericScheduler( cache internalcache.Cache, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index a4a168f4cd0..9ab27f0120e 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -21,6 +21,7 @@ import ( "fmt" "math" "strconv" + "strings" "testing" "time" @@ -33,12 +34,14 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" + pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" @@ -416,11 +419,17 @@ func TestGenericScheduler(t *testing.T) { // Pod with existing PVC registerPlugins: []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterPreFilterPlugin(volumebinding.Name, volumebinding.New), st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, nodes: []string{"machine1", "machine2"}, - pvcs: []v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", UID: types.UID("existingPVC"), Namespace: v1.NamespaceDefault}}}, + pvcs: []v1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", UID: types.UID("existingPVC"), Namespace: v1.NamespaceDefault}, + Spec: v1.PersistentVolumeClaimSpec{VolumeName: "existingPV"}, + }, + }, pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore"), Namespace: v1.NamespaceDefault}, Spec: v1.PodSpec{ @@ -443,6 +452,7 @@ func TestGenericScheduler(t *testing.T) { // Pod with non existing PVC registerPlugins: []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterPreFilterPlugin(volumebinding.Name, volumebinding.New), st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, @@ -468,6 +478,7 @@ func TestGenericScheduler(t *testing.T) { // Pod with deleting PVC registerPlugins: []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterPreFilterPlugin(volumebinding.Name, volumebinding.New), st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), }, @@ -726,10 +737,16 @@ func TestGenericScheduler(t *testing.T) { cache.AddNode(node) } + ctx := context.Background() cs := clientsetfake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(cs, 0) - for i := range test.pvcs { - informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetStore().Add(&test.pvcs[i]) + for _, pvc := range test.pvcs { + metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "true") + cs.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, &pvc, metav1.CreateOptions{}) + if pvName := pvc.Spec.VolumeName; pvName != "" { + pv := v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: pvName}} + cs.CoreV1().PersistentVolumes().Create(ctx, &pv, metav1.CreateOptions{}) + } } snapshot := internalcache.NewSnapshot(test.pods, nodes) fwk, err := st.NewFramework( @@ -749,9 +766,13 @@ func TestGenericScheduler(t *testing.T) { cache, snapshot, []framework.Extender{}, - schedulerapi.DefaultPercentageOfNodesToScore) - result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), test.pod) - if err != test.wErr && err.Error() != test.wErr.Error() { + schedulerapi.DefaultPercentageOfNodesToScore, + ) + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + + result, err := scheduler.Schedule(ctx, prof, framework.NewCycleState(), test.pod) + if err != test.wErr && !strings.Contains(err.Error(), test.wErr.Error()) { t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr) } if test.expectedHosts != nil && !test.expectedHosts.Has(result.SuggestedHost) { diff --git a/pkg/scheduler/framework/plugins/volumebinding/BUILD b/pkg/scheduler/framework/plugins/volumebinding/BUILD index dd70db1ea9d..745e6e6a219 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/BUILD +++ b/pkg/scheduler/framework/plugins/volumebinding/BUILD @@ -11,8 +11,10 @@ go_library( "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", ], ) diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index 52c316de348..a8fa0256e80 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -23,8 +23,10 @@ import ( "time" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilfeature "k8s.io/apiserver/pkg/util/feature" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller/volume/scheduling" "k8s.io/kubernetes/pkg/features" @@ -62,6 +64,7 @@ func (d *stateData) Clone() framework.StateData { // Reserve and PreBind phases. type VolumeBinding struct { Binder scheduling.SchedulerVolumeBinder + PVCLister corelisters.PersistentVolumeClaimLister GenericEphemeralVolumeFeatureEnabled bool } @@ -78,14 +81,40 @@ func (pl *VolumeBinding) Name() string { return Name } -func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) bool { +// podHasPVCs returns 2 values: +// - the first one to denote if the given "pod" has any PVC defined. +// - the second one to return any error if the requested PVC is illegal. +func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) (bool, error) { + hasPVC := false for _, vol := range pod.Spec.Volumes { - if vol.PersistentVolumeClaim != nil || - pl.GenericEphemeralVolumeFeatureEnabled && vol.Ephemeral != nil { - return true + var pvcName string + ephemeral := false + switch { + case vol.PersistentVolumeClaim != nil: + pvcName = vol.PersistentVolumeClaim.ClaimName + case vol.Ephemeral != nil && pl.GenericEphemeralVolumeFeatureEnabled: + pvcName = pod.Name + "-" + vol.Name + ephemeral = true + default: + // Volume is not using a PVC, ignore + continue + } + hasPVC = true + pvc, err := pl.PVCLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName) + if err != nil { + // The error has already enough context ("persistentvolumeclaim "myclaim" not found") + return hasPVC, err + } + + if pvc.DeletionTimestamp != nil { + return hasPVC, fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name) + } + + if ephemeral && !metav1.IsControlledBy(pvc, pod) { + return hasPVC, fmt.Errorf("persistentvolumeclaim %q was not created for the pod", pvc.Name) } } - return false + return hasPVC, nil } // PreFilter invoked at the prefilter extension point to check if pod has all @@ -93,7 +122,9 @@ func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) bool { // UnschedulableAndUnresolvable is returned. func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status { // If pod does not reference any PVC, we don't need to do anything. - if !pl.podHasPVCs(pod) { + if hasPVC, err := pl.podHasPVCs(pod); err != nil { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) + } else if !hasPVC { state.Write(stateKey, &stateData{skip: true}) return nil } @@ -271,6 +302,7 @@ func New(plArgs runtime.Object, fh framework.FrameworkHandle) (framework.Plugin, binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second) return &VolumeBinding{ Binder: binder, + PVCLister: pvcInformer.Lister(), GenericEphemeralVolumeFeatureEnabled: utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume), }, nil } diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go index 14695cdb922..e602843ac4c 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go @@ -209,7 +209,7 @@ func TestVolumeBinding(t *testing.T) { name: "pvc not found", pod: makePod("pod-a", []string{"pvc-a"}), node: &v1.Node{}, - wantPreFilterStatus: framework.NewStatus(framework.Error, `error getting PVC "default/pvc-a": could not find v1.PersistentVolumeClaim "default/pvc-a"`), + wantPreFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "pvc-a" not found`), wantFilterStatus: nil, }, { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 3cdce85cfee..72b2312e8ac 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -862,13 +862,14 @@ func setupTestSchedulerWithVolumeBinding(volumeBinder scheduling.SchedulerVolume testPVC := v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "testPVC", Namespace: pod.Namespace, UID: types.UID("testPVC")}} client := clientsetfake.NewSimpleClientset(&testNode, &testPVC) informerFactory := informers.NewSharedInformerFactory(client, 0) - informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetStore().Add(&testPVC) + pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() + pvcInformer.Informer().GetStore().Add(&testPVC) fns := []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterPluginAsExtensions(volumebinding.Name, func(plArgs runtime.Object, handle framework.FrameworkHandle) (framework.Plugin, error) { - return &volumebinding.VolumeBinding{Binder: volumeBinder}, nil + return &volumebinding.VolumeBinding{Binder: volumeBinder, PVCLister: pvcInformer.Lister()}, nil }, "PreFilter", "Filter", "Reserve", "PreBind"), } s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, fns...)