From f74b30868ce3a294391d066c5047a13d5eb491a6 Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Thu, 8 Nov 2018 18:08:38 -0800 Subject: [PATCH] Add plugin invocation for 'reserve' and 'prebind' plugins to the scheduler. --- pkg/scheduler/core/extender_test.go | 1 + pkg/scheduler/core/generic_scheduler.go | 4 + pkg/scheduler/core/generic_scheduler_test.go | 28 +++++ pkg/scheduler/factory/factory.go | 11 ++ pkg/scheduler/scheduler.go | 112 +++++++++++-------- pkg/scheduler/scheduler_test.go | 5 + pkg/scheduler/testutil.go | 24 ++++ 7 files changed, 140 insertions(+), 45 deletions(-) diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index e71f8805d67..f1f17beb437 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -516,6 +516,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyPriorityMetadataProducer, + emptyPluginSet, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 8d5a73d1466..874ca53360d 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -44,6 +44,7 @@ import ( schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" + pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) @@ -103,6 +104,7 @@ type genericScheduler struct { priorityMetaProducer algorithm.PriorityMetadataProducer predicateMetaProducer algorithm.PredicateMetadataProducer prioritizers []algorithm.PriorityConfig + pluginSet pluginsv1alpha1.PluginSet extenders []algorithm.SchedulerExtender lastNodeIndex uint64 alwaysCheckAllPredicates bool @@ -1152,6 +1154,7 @@ func NewGenericScheduler( predicateMetaProducer algorithm.PredicateMetadataProducer, prioritizers []algorithm.PriorityConfig, priorityMetaProducer algorithm.PriorityMetadataProducer, + pluginSet pluginsv1alpha1.PluginSet, extenders []algorithm.SchedulerExtender, volumeBinder *volumebinder.VolumeBinder, pvcLister corelisters.PersistentVolumeClaimLister, @@ -1168,6 +1171,7 @@ func NewGenericScheduler( predicateMetaProducer: predicateMetaProducer, prioritizers: prioritizers, priorityMetaProducer: priorityMetaProducer, + pluginSet: pluginSet, extenders: extenders, cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo), volumeBinder: volumeBinder, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index d0152a9fbb6..2b668796923 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/core/equivalence" schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" + plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -135,6 +136,28 @@ func getNodeReducePriority(pod *v1.Pod, meta interface{}, nodeNameToInfo map[str return nil } +// EmptyPluginSet is a test plugin set used by the default scheduler. +type EmptyPluginSet struct{} + +var _ plugins.PluginSet = EmptyPluginSet{} + +// ReservePlugins returns a slice of default reserve plugins. +func (r EmptyPluginSet) ReservePlugins() []plugins.ReservePlugin { + return []plugins.ReservePlugin{} +} + +// PrebindPlugins returns a slice of default prebind plugins. +func (r EmptyPluginSet) PrebindPlugins() []plugins.PrebindPlugin { + return []plugins.PrebindPlugin{} +} + +// Data returns a pointer to PluginData. +func (r EmptyPluginSet) Data() *plugins.PluginData { + return &plugins.PluginData{} +} + +var emptyPluginSet = &EmptyPluginSet{} + func makeNodeList(nodeNames []string) []*v1.Node { result := make([]*v1.Node, 0, len(nodeNames)) for _, nodeName := range nodeNames { @@ -454,6 +477,7 @@ func TestGenericScheduler(t *testing.T) { algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyPriorityMetadataProducer, + emptyPluginSet, []algorithm.SchedulerExtender{}, nil, pvcLister, @@ -490,6 +514,7 @@ func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Nod algorithm.EmptyPredicateMetadataProducer, prioritizers, algorithm.EmptyPriorityMetadataProducer, + emptyPluginSet, nil, nil, nil, nil, false, false, schedulerapi.DefaultPercentageOfNodesToScore) cache.UpdateNodeNameToInfoMap(s.(*genericScheduler).cachedNodeInfoMap) @@ -1416,6 +1441,7 @@ func TestPreempt(t *testing.T) { algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyPriorityMetadataProducer, + emptyPluginSet, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, @@ -1543,6 +1569,7 @@ func TestCacheInvalidationRace(t *testing.T) { algorithm.EmptyPredicateMetadataProducer, prioritizers, algorithm.EmptyPriorityMetadataProducer, + emptyPluginSet, nil, nil, pvcLister, pdbLister, true, false, schedulerapi.DefaultPercentageOfNodesToScore) @@ -1626,6 +1653,7 @@ func TestCacheInvalidationRace2(t *testing.T) { algorithm.EmptyPredicateMetadataProducer, prioritizers, algorithm.EmptyPriorityMetadataProducer, + emptyPluginSet, nil, nil, pvcLister, pdbLister, true, false, schedulerapi.DefaultPercentageOfNodesToScore) diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index aa420bcb173..120806aef78 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -63,6 +63,8 @@ import ( schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" + "k8s.io/kubernetes/pkg/scheduler/plugins" + pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) @@ -109,6 +111,8 @@ type Config struct { PodConditionUpdater PodConditionUpdater // PodPreemptor is used to evict pods and update pod annotations. PodPreemptor PodPreemptor + // PlugingSet has a set of plugins and data used to run them. + PluginSet pluginsv1alpha1.PluginSet // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling @@ -202,6 +206,8 @@ type configFactory struct { pdbLister policylisters.PodDisruptionBudgetLister // a means to list all StorageClasses storageClassLister storagelisters.StorageClassLister + // pluginRunner has a set of plugins and the context used for running them. + pluginSet pluginsv1alpha1.PluginSet // Close this to stop all reflectors StopEverything <-chan struct{} @@ -1225,6 +1231,9 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, return nil, err } + // TODO(bsalamat): the default registrar should be able to process config files. + c.pluginSet = plugins.NewDefaultPluginSet(pluginsv1alpha1.NewPluginContext(), &c.schedulerCache) + // Init equivalence class cache if c.enableEquivalenceClassCache { c.equivalencePodCache = equivalence.NewCache(predicates.Ordering()) @@ -1239,6 +1248,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, predicateMetaProducer, priorityConfigs, priorityMetaProducer, + c.pluginSet, extenders, c.volumeBinder, c.pVCLister, @@ -1258,6 +1268,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, GetBinder: c.getBinderFunc(extenders), PodConditionUpdater: &podConditionUpdater{c.client}, PodPreemptor: &podPreemptor{c.client}, + PluginSet: c.pluginSet, WaitForCacheSync: func() bool { return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced) }, diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 0237036b1cd..8d63a997950 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -17,11 +17,14 @@ limitations under the License. package scheduler import ( + "errors" "fmt" "io/ioutil" "os" "time" + "k8s.io/klog" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -44,13 +47,13 @@ import ( schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" - - "k8s.io/klog" ) const ( // BindTimeoutSeconds defines the default bind timeout BindTimeoutSeconds = 100 + // SchedulerError is the reason recorded for events when an error occurs during scheduling a pod. + SchedulerError = "SchedulerError" ) // Scheduler watches for new unscheduled pods. It attempts to find @@ -286,19 +289,26 @@ func (sched *Scheduler) Config() *factory.Config { return sched.config } +// recordFailedSchedulingEvent records an event for the pod that indicates the +// pod has failed to schedule. +// NOTE: This function modifies "pod". "pod" should be copied before being passed. +func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) { + sched.config.Error(pod, err) + sched.config.Recorder.Event(pod, v1.EventTypeWarning, "FailedScheduling", message) + sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: reason, + Message: err.Error(), + }) +} + // schedule implements the scheduling algorithm and returns the suggested host. func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) { host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister) if err != nil { pod = pod.DeepCopy() - sched.config.Error(pod, err) - sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "%v", err) - sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: v1.PodReasonUnschedulable, - Message: err.Error(), - }) + sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error()) return "", err } return host, err @@ -362,14 +372,8 @@ func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bo if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { allBound, err = sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host) if err != nil { - sched.config.Error(assumed, err) - sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePodVolumes failed: %v", err) - sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: "SchedulerError", - Message: err.Error(), - }) + sched.recordSchedulingFailure(assumed, err, SchedulerError, + fmt.Sprintf("AssumePodVolumes failed: %v", err)) } // Invalidate ecache because assumed volumes could have affected the cached // pvs for other pods @@ -387,9 +391,6 @@ func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bo // If binding errors, times out or gets undone, then an error will be returned to // retry scheduling. func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error { - var reason string - var eventType string - klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed) if err != nil { @@ -404,15 +405,7 @@ func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error { // stale pod binding cache. sched.config.VolumeBinder.DeletePodBindings(assumed) - reason = "VolumeBindingFailed" - eventType = v1.EventTypeWarning - sched.config.Error(assumed, err) - sched.config.Recorder.Eventf(assumed, eventType, "FailedScheduling", "%v", err) - sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: reason, - }) + sched.recordSchedulingFailure(assumed, err, "VolumeBindingFailed", err.Error()) return err } @@ -441,14 +434,8 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). - sched.config.Error(assumed, err) - sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePod failed: %v", err) - sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: "SchedulerError", - Message: err.Error(), - }) + sched.recordSchedulingFailure(assumed, err, SchedulerError, + fmt.Sprintf("AssumePod failed: %v", err)) return err } // if "assumed" is a nominated pod, we should remove it from internal cache @@ -480,13 +467,8 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error { if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", err) } - sched.config.Error(assumed, err) - sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "Binding rejected: %v", err) - sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: "BindingRejected", - }) + sched.recordSchedulingFailure(assumed, err, SchedulerError, + fmt.Sprintf("Binding rejected: %v", err)) return err } @@ -498,6 +480,12 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error { // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. func (sched *Scheduler) scheduleOne() { + plugins := sched.config.PluginSet + // Remove all plugin context data at the beginning of a scheduling cycle. + if plugins.Data().Ctx != nil { + plugins.Data().Ctx.Reset() + } + pod := sched.config.NextPod() // pod could be nil when schedulerQueue is closed if pod == nil { @@ -554,6 +542,16 @@ func (sched *Scheduler) scheduleOne() { return } + // Run "reserve" plugins. + for _, pl := range plugins.ReservePlugins() { + if err := pl.Reserve(plugins, assumedPod, suggestedHost); err != nil { + klog.Errorf("error while running %v reserve plugin for pod %v: %v", pl.Name(), assumedPod.Name, err) + sched.recordSchedulingFailure(assumedPod, err, SchedulerError, + fmt.Sprintf("reserve plugin %v failed", pl.Name())) + metrics.PodScheduleErrors.Inc() + return + } + } // assume modifies `assumedPod` by setting NodeName=suggestedHost err = sched.assume(assumedPod, suggestedHost) if err != nil { @@ -573,6 +571,30 @@ func (sched *Scheduler) scheduleOne() { } } + // Run "prebind" plugins. + for _, pl := range plugins.PrebindPlugins() { + approved, err := pl.Prebind(plugins, assumedPod, suggestedHost) + if err != nil { + approved = false + klog.Errorf("error while running %v prebind plugin for pod %v: %v", pl.Name(), assumedPod.Name, err) + metrics.PodScheduleErrors.Inc() + } + if !approved { + sched.Cache().ForgetPod(assumedPod) + var reason string + if err == nil { + msg := fmt.Sprintf("prebind plugin %v rejected pod %v.", pl.Name(), assumedPod.Name) + klog.V(4).Infof(msg) + err = errors.New(msg) + reason = v1.PodReasonUnschedulable + } else { + reason = SchedulerError + } + sched.recordSchedulingFailure(assumedPod, err, reason, err.Error()) + return + } + } + err := sched.bind(assumedPod, &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}, Target: v1.ObjectReference{ diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 4dac1a7bd2d..0f2ea0b11f4 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -295,6 +295,7 @@ func TestScheduler(t *testing.T) { NextPod: func() *v1.Pod { return item.sendPod }, + PluginSet: &EmptyPluginSet{}, Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}), VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}), }, @@ -643,6 +644,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{}, algorithm.EmptyPriorityMetadataProducer, + &EmptyPluginSet{}, []algorithm.SchedulerExtender{}, nil, informerFactory.Core().V1().PersistentVolumeClaims().Lister(), @@ -672,6 +674,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern Recorder: &record.FakeRecorder{}, PodConditionUpdater: fakePodConditionUpdater{}, PodPreemptor: fakePodPreemptor{}, + PluginSet: &EmptyPluginSet{}, VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}), }, } @@ -694,6 +697,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{}, algorithm.EmptyPriorityMetadataProducer, + &EmptyPluginSet{}, []algorithm.SchedulerExtender{}, nil, informerFactory.Core().V1().PersistentVolumeClaims().Lister(), @@ -727,6 +731,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc PodConditionUpdater: fakePodConditionUpdater{}, PodPreemptor: fakePodPreemptor{}, StopEverything: stop, + PluginSet: &EmptyPluginSet{}, VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}), }, } diff --git a/pkg/scheduler/testutil.go b/pkg/scheduler/testutil.go index 0c101a54573..b495fdefa16 100644 --- a/pkg/scheduler/testutil.go +++ b/pkg/scheduler/testutil.go @@ -27,6 +27,7 @@ import ( schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/factory" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" + plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -89,3 +90,26 @@ func (fc *FakeConfigurator) CreateFromConfig(policy schedulerapi.Policy) (*facto func (fc *FakeConfigurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*factory.Config, error) { return fc.Config, nil } + +// EmptyPluginSet is the default plugin restirar used by the default scheduler. +type EmptyPluginSet struct{} + +var _ = plugins.PluginSet(EmptyPluginSet{}) + +// ReservePlugins returns a slice of default reserve plugins. +func (r EmptyPluginSet) ReservePlugins() []plugins.ReservePlugin { + return []plugins.ReservePlugin{} +} + +// PrebindPlugins returns a slice of default prebind plugins. +func (r EmptyPluginSet) PrebindPlugins() []plugins.PrebindPlugin { + return []plugins.PrebindPlugin{} +} + +// Data returns a pointer to PluginData. +func (r EmptyPluginSet) Data() *plugins.PluginData { + return &plugins.PluginData{ + Ctx: nil, + SchedulerCache: nil, + } +}