From a61a437ef2d9be80884a3e63ee4fa3b103b2d3e4 Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Mon, 10 Jun 2019 17:01:50 -0400 Subject: [PATCH] prefilter extension point implementation. --- pkg/scheduler/core/extender_test.go | 3 +- pkg/scheduler/core/generic_scheduler.go | 10 +- pkg/scheduler/core/generic_scheduler_test.go | 3 +- pkg/scheduler/framework/v1alpha1/framework.go | 37 ++++++ pkg/scheduler/framework/v1alpha1/interface.go | 15 +++ .../internal/queue/scheduling_queue_test.go | 4 + pkg/scheduler/scheduler.go | 6 +- pkg/scheduler/scheduler_test.go | 4 +- test/integration/scheduler/framework_test.go | 111 ++++++++++++++++++ 9 files changed, 183 insertions(+), 10 deletions(-) diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 205e5b8ef33..15e799832e5 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" @@ -552,7 +553,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { schedulerapi.DefaultPercentageOfNodesToScore, false) podIgnored := &v1.Pod{} - result, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) + result, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext()) if test.expectsErr { if err == nil { t.Errorf("Unexpected non-error, result %+v", result) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index c661a10abed..bcfe07ae1d1 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -126,7 +126,7 @@ func (f *FitError) Error() string { // onto machines. // TODO: Rename this type. type ScheduleAlgorithm interface { - Schedule(*v1.Pod, algorithm.NodeLister) (scheduleResult ScheduleResult, err error) + Schedule(*v1.Pod, algorithm.NodeLister, *framework.PluginContext) (scheduleResult ScheduleResult, err error) // Preempt receives scheduling errors for a pod and tries to create room for // the pod by preempting lower priority pods if possible. // It returns the node where preemption happened, a list of preempted pods, a @@ -181,7 +181,7 @@ func (g *genericScheduler) snapshot() error { // Schedule tries to schedule the given pod to one of the nodes in the node list. // If it succeeds, it will return the name of the node. // If it fails, it will return a FitError error with reasons. -func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) { +func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister, pluginContext *framework.PluginContext) (result ScheduleResult, err error) { trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name)) defer trace.LogIfLong(100 * time.Millisecond) @@ -189,6 +189,12 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister return result, err } + // Run "prefilter" plugins. + prefilterStatus := g.framework.RunPrefilterPlugins(pluginContext, pod) + if !prefilterStatus.IsSuccess() { + return result, prefilterStatus.AsError() + } + nodes, err := nodeLister.List() if err != nil { return result, err diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index fd9bc274461..23dc3bccd07 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -466,8 +466,7 @@ func TestGenericScheduler(t *testing.T) { false, schedulerapi.DefaultPercentageOfNodesToScore, false) - result, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) - + result, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext()) if !reflect.DeepEqual(err, test.wErr) { t.Errorf("Unexpected error: %v, expected: %v", err, test.wErr) } diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index b59dd0c61a8..bf73965d6e6 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -36,6 +36,7 @@ type framework struct { waitingPods *waitingPodsMap plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance. queueSortPlugins []QueueSortPlugin + prefilterPlugins []PrefilterPlugin reservePlugins []ReservePlugin prebindPlugins []PrebindPlugin postbindPlugins []PostbindPlugin @@ -85,6 +86,20 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi f.plugins[name] = p } + if plugins.PreFilter != nil { + for _, pf := range plugins.PreFilter.Enabled { + if pg, ok := f.plugins[pf.Name]; ok { + p, ok := pg.(PrefilterPlugin) + if !ok { + return nil, fmt.Errorf("plugin %v does not extend prefilter plugin", pf.Name) + } + f.prefilterPlugins = append(f.prefilterPlugins, p) + } else { + return nil, fmt.Errorf("prefilter plugin %v does not exist", pf.Name) + } + } + } + if plugins.Reserve != nil { for _, r := range plugins.Reserve.Enabled { if pg, ok := f.plugins[r.Name]; ok { @@ -185,6 +200,28 @@ func (f *framework) QueueSortFunc() LessFunc { return f.queueSortPlugins[0].Less } +// RunPrefilterPlugins runs the set of configured prefilter plugins. It returns +// *Status and its code is set to non-success if any of the plugins returns +// anything but Success. If a non-success status is returned, then the scheduling +// cycle is aborted. +func (f *framework) RunPrefilterPlugins( + pc *PluginContext, pod *v1.Pod) *Status { + for _, pl := range f.prefilterPlugins { + status := pl.Prefilter(pc, pod) + if !status.IsSuccess() { + if status.Code() == Unschedulable { + msg := fmt.Sprintf("rejected by %v at prefilter: %v", pl.Name(), status.Message()) + klog.V(4).Infof(msg) + return NewStatus(status.Code(), msg) + } + msg := fmt.Sprintf("error while running %v prefilter plugin for pod %v: %v", pl.Name(), pod.Name, status.Message()) + klog.Error(msg) + return NewStatus(Error, msg) + } + } + return nil +} + // RunPrebindPlugins runs the set of configured prebind plugins. It returns a // failure (bool) if any of the plugins returns an error. It also returns an // error containing the rejection message or the error occurred in the plugin. diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index 119e7b87ace..fa29340ea73 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -126,6 +126,15 @@ type QueueSortPlugin interface { Less(*PodInfo, *PodInfo) bool } +// PrefilterPlugin is an interface that must be implemented by "prefilter" plugins. +// These plugins are called at the beginning of the scheduling cycle. +type PrefilterPlugin interface { + Plugin + // Prefilter is called at the beginning of the scheduling cycle. All prefilter + // plugins must return success or the pod will be rejected. + Prefilter(pc *PluginContext, p *v1.Pod) *Status +} + // ReservePlugin is an interface for Reserve plugins. These plugins are called // at the reservation point. These are meant to update the state of the plugin. // This concept used to be called 'assume' in the original scheduler. @@ -190,6 +199,12 @@ type Framework interface { // QueueSortFunc returns the function to sort pods in scheduling queue QueueSortFunc() LessFunc + // RunPrefilterPlugins runs the set of configured prefilter plugins. It returns + // *Status and its code is set to non-success if any of the plugins returns + // anything but Success. If a non-success status is returned, then the scheduling + // cycle is aborted. + RunPrefilterPlugins(pc *PluginContext, pod *v1.Pod) *Status + // RunPrebindPlugins runs the set of configured prebind plugins. It returns // *Status and its code is set to non-success if any of the plugins returns // anything but Success. If the Status code is "Unschedulable", it is diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index d61d372aaf0..903c55fa7a4 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -167,6 +167,10 @@ func (*fakeFramework) NodeInfoSnapshot() *internalcache.NodeInfoSnapshot { return nil } +func (*fakeFramework) RunPrefilterPlugins(pc *framework.PluginContext, pod *v1.Pod) *framework.Status { + return nil +} + func (*fakeFramework) RunPrebindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { return nil } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 9c30fe15309..8faa07c490e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -280,8 +280,8 @@ func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason s // schedule implements the scheduling algorithm and returns the suggested result(host, // evaluated nodes number,feasible nodes number). -func (sched *Scheduler) schedule(pod *v1.Pod) (core.ScheduleResult, error) { - result, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister) +func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (core.ScheduleResult, error) { + result, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister, pluginContext) if err != nil { pod = pod.DeepCopy() sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error()) @@ -458,7 +458,7 @@ func (sched *Scheduler) scheduleOne() { // Synchronously attempt to find a fit for the pod. start := time.Now() pluginContext := framework.NewPluginContext() - scheduleResult, err := sched.schedule(pod) + scheduleResult, err := sched.schedule(pod, pluginContext) if err != nil { // schedule() may have failed because the pod would not fit on any host, so we try to // preempt, with the expectation that the next time the pod is tried for scheduling it diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index d96f8740d81..e027ddce673 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -58,7 +58,7 @@ import ( ) // EmptyFramework is an empty framework used in tests. -// Note: If the test runs in goroutine, please don't using this variable to avoid a race condition. +// Note: If the test runs in goroutine, please don't use this variable to avoid a race condition. var EmptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, EmptyPluginConfig) // EmptyPluginConfig is an empty plugin config used in tests. @@ -159,7 +159,7 @@ type mockScheduler struct { err error } -func (es mockScheduler) Schedule(pod *v1.Pod, ml algorithm.NodeLister) (core.ScheduleResult, error) { +func (es mockScheduler) Schedule(pod *v1.Pod, ml algorithm.NodeLister, pc *framework.PluginContext) (core.ScheduleResult, error) { return es.result, es.err } diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index 98cd8ce4d24..78d9d00eb57 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -31,10 +31,13 @@ import ( // TesterPlugin is common ancestor for a test plugin that allows injection of // failures and some other test functionalities. type TesterPlugin struct { + numPrefilterCalled int numReserveCalled int numPrebindCalled int numPostbindCalled int numUnreserveCalled int + failPrefilter bool + rejectPrefilter bool failReserve bool failPrebind bool rejectPrebind bool @@ -46,6 +49,10 @@ type TesterPlugin struct { waitAndAllowPermit bool } +type PrefilterPlugin struct { + TesterPlugin +} + type ReservePlugin struct { TesterPlugin } @@ -68,6 +75,7 @@ type PermitPlugin struct { } const ( + prefilterPluginName = "prefilter-plugin" reservePluginName = "reserve-plugin" prebindPluginName = "prebind-plugin" unreservePluginName = "unreserve-plugin" @@ -75,6 +83,7 @@ const ( permitPluginName = "permit-plugin" ) +var _ = framework.PrefilterPlugin(&PrefilterPlugin{}) var _ = framework.ReservePlugin(&ReservePlugin{}) var _ = framework.PrebindPlugin(&PrebindPlugin{}) var _ = framework.PostbindPlugin(&PostbindPlugin{}) @@ -154,6 +163,30 @@ func NewPostbindPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framewo return ptbdPlugin, nil } +var pfPlugin = &PrefilterPlugin{} + +// Name returns name of the plugin. +func (pp *PrefilterPlugin) Name() string { + return prefilterPluginName +} + +// Prefilter is a test function that returns (true, nil) or errors for testing. +func (pp *PrefilterPlugin) Prefilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status { + pp.numPrefilterCalled++ + if pp.failPrefilter { + return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) + } + if pp.rejectPrefilter { + return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)) + } + return nil +} + +// NewPrebindPlugin is the factory for prebind plugin. +func NewPrefilterPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + return pfPlugin, nil +} + var unresPlugin = &UnreservePlugin{} // Name returns name of the plugin. @@ -226,6 +259,84 @@ func NewPermitPlugin(_ *runtime.Unknown, fh framework.FrameworkHandle) (framewor return perPlugin, nil } +// TestPrefilterPlugin tests invocation of prefilter plugins. +func TestPrefilterPlugin(t *testing.T) { + // Create a plugin registry for testing. Register only a reserve plugin. + registry := framework.Registry{prefilterPluginName: NewPrefilterPlugin} + + // Setup initial prefilter plugin for testing. + prefilterPlugin := &schedulerconfig.Plugins{ + PreFilter: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: prefilterPluginName, + }, + }, + }, + } + // Set empty plugin config for testing + emptyPluginConfig := []schedulerconfig.PluginConfig{} + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "prefilter-plugin", nil), + false, nil, registry, prefilterPlugin, emptyPluginConfig, false, time.Second) + + defer cleanupTest(t, context) + + cs := context.clientSet + // Add a few nodes. + _, err := createNodes(cs, "test-node", nil, 2) + if err != nil { + t.Fatalf("Cannot create nodes: %v", err) + } + + tests := []struct { + fail bool + reject bool + }{ + { + fail: false, + reject: false, + }, + { + fail: true, + reject: false, + }, + { + fail: false, + reject: true, + }, + } + + for i, test := range tests { + pfPlugin.failPrefilter = test.fail + pfPlugin.rejectPrefilter = test.reject + // Create a best effort pod. + pod, err := createPausePod(cs, + initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name})) + if err != nil { + t.Errorf("Error while creating a test pod: %v", err) + } + + if test.reject || test.fail { + if err = waitForPodUnschedulable(cs, pod); err != nil { + t.Errorf("test #%v: Didn't expect the pod to be scheduled. error: %v", i, err) + } + } else { + if err = waitForPodToSchedule(cs, pod); err != nil { + t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err) + } + } + + if pfPlugin.numPrefilterCalled == 0 { + t.Errorf("Expected the prefilter plugin to be called.") + } + + cleanupPods(cs, t, []*v1.Pod{pod}) + } +} + // TestReservePlugin tests invocation of reserve plugins. func TestReservePlugin(t *testing.T) { // Create a plugin registry for testing. Register only a reserve plugin.