Merge pull request #78005 from ahg-g/ahg-perfilter

prefilter extension point implementation for the scheduler
This commit is contained in:
Kubernetes Prow Robot 2019-06-14 00:29:13 -07:00 committed by GitHub
commit d8695d06b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 183 additions and 10 deletions

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@ -552,7 +553,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
schedulerapi.DefaultPercentageOfNodesToScore, schedulerapi.DefaultPercentageOfNodesToScore,
false) false)
podIgnored := &v1.Pod{} podIgnored := &v1.Pod{}
result, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) result, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext())
if test.expectsErr { if test.expectsErr {
if err == nil { if err == nil {
t.Errorf("Unexpected non-error, result %+v", result) t.Errorf("Unexpected non-error, result %+v", result)

View File

@ -126,7 +126,7 @@ func (f *FitError) Error() string {
// onto machines. // onto machines.
// TODO: Rename this type. // TODO: Rename this type.
type ScheduleAlgorithm interface { type ScheduleAlgorithm interface {
Schedule(*v1.Pod, algorithm.NodeLister) (scheduleResult ScheduleResult, err error) Schedule(*v1.Pod, algorithm.NodeLister, *framework.PluginContext) (scheduleResult ScheduleResult, err error)
// Preempt receives scheduling errors for a pod and tries to create room for // Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible. // the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, a // It returns the node where preemption happened, a list of preempted pods, a
@ -181,7 +181,7 @@ func (g *genericScheduler) snapshot() error {
// Schedule tries to schedule the given pod to one of the nodes in the node list. // Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node. // If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons. // If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) { func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister, pluginContext *framework.PluginContext) (result ScheduleResult, err error) {
trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name)) trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
defer trace.LogIfLong(100 * time.Millisecond) defer trace.LogIfLong(100 * time.Millisecond)
@ -189,6 +189,12 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
return result, err return result, err
} }
// Run "prefilter" plugins.
prefilterStatus := g.framework.RunPrefilterPlugins(pluginContext, pod)
if !prefilterStatus.IsSuccess() {
return result, prefilterStatus.AsError()
}
nodes, err := nodeLister.List() nodes, err := nodeLister.List()
if err != nil { if err != nil {
return result, err return result, err

View File

@ -466,8 +466,7 @@ func TestGenericScheduler(t *testing.T) {
false, false,
schedulerapi.DefaultPercentageOfNodesToScore, schedulerapi.DefaultPercentageOfNodesToScore,
false) false)
result, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) result, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext())
if !reflect.DeepEqual(err, test.wErr) { if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("Unexpected error: %v, expected: %v", err, test.wErr) t.Errorf("Unexpected error: %v, expected: %v", err, test.wErr)
} }

View File

@ -36,6 +36,7 @@ type framework struct {
waitingPods *waitingPodsMap waitingPods *waitingPodsMap
plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance. plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
queueSortPlugins []QueueSortPlugin queueSortPlugins []QueueSortPlugin
prefilterPlugins []PrefilterPlugin
reservePlugins []ReservePlugin reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin prebindPlugins []PrebindPlugin
postbindPlugins []PostbindPlugin postbindPlugins []PostbindPlugin
@ -85,6 +86,20 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
f.plugins[name] = p f.plugins[name] = p
} }
if plugins.PreFilter != nil {
for _, pf := range plugins.PreFilter.Enabled {
if pg, ok := f.plugins[pf.Name]; ok {
p, ok := pg.(PrefilterPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend prefilter plugin", pf.Name)
}
f.prefilterPlugins = append(f.prefilterPlugins, p)
} else {
return nil, fmt.Errorf("prefilter plugin %v does not exist", pf.Name)
}
}
}
if plugins.Reserve != nil { if plugins.Reserve != nil {
for _, r := range plugins.Reserve.Enabled { for _, r := range plugins.Reserve.Enabled {
if pg, ok := f.plugins[r.Name]; ok { if pg, ok := f.plugins[r.Name]; ok {
@ -185,6 +200,28 @@ func (f *framework) QueueSortFunc() LessFunc {
return f.queueSortPlugins[0].Less return f.queueSortPlugins[0].Less
} }
// RunPrefilterPlugins runs the set of configured prefilter plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
func (f *framework) RunPrefilterPlugins(
pc *PluginContext, pod *v1.Pod) *Status {
for _, pl := range f.prefilterPlugins {
status := pl.Prefilter(pc, pod)
if !status.IsSuccess() {
if status.Code() == Unschedulable {
msg := fmt.Sprintf("rejected by %v at prefilter: %v", pl.Name(), status.Message())
klog.V(4).Infof(msg)
return NewStatus(status.Code(), msg)
}
msg := fmt.Sprintf("error while running %v prefilter plugin for pod %v: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
return nil
}
// RunPrebindPlugins runs the set of configured prebind plugins. It returns a // RunPrebindPlugins runs the set of configured prebind plugins. It returns a
// failure (bool) if any of the plugins returns an error. It also returns an // failure (bool) if any of the plugins returns an error. It also returns an
// error containing the rejection message or the error occurred in the plugin. // error containing the rejection message or the error occurred in the plugin.

View File

@ -126,6 +126,15 @@ type QueueSortPlugin interface {
Less(*PodInfo, *PodInfo) bool Less(*PodInfo, *PodInfo) bool
} }
// PrefilterPlugin is an interface that must be implemented by "prefilter" plugins.
// These plugins are called at the beginning of the scheduling cycle.
type PrefilterPlugin interface {
Plugin
// Prefilter is called at the beginning of the scheduling cycle. All prefilter
// plugins must return success or the pod will be rejected.
Prefilter(pc *PluginContext, p *v1.Pod) *Status
}
// ReservePlugin is an interface for Reserve plugins. These plugins are called // ReservePlugin is an interface for Reserve plugins. These plugins are called
// at the reservation point. These are meant to update the state of the plugin. // at the reservation point. These are meant to update the state of the plugin.
// This concept used to be called 'assume' in the original scheduler. // This concept used to be called 'assume' in the original scheduler.
@ -190,6 +199,12 @@ type Framework interface {
// QueueSortFunc returns the function to sort pods in scheduling queue // QueueSortFunc returns the function to sort pods in scheduling queue
QueueSortFunc() LessFunc QueueSortFunc() LessFunc
// RunPrefilterPlugins runs the set of configured prefilter plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
RunPrefilterPlugins(pc *PluginContext, pod *v1.Pod) *Status
// RunPrebindPlugins runs the set of configured prebind plugins. It returns // RunPrebindPlugins runs the set of configured prebind plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns // *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If the Status code is "Unschedulable", it is // anything but Success. If the Status code is "Unschedulable", it is

View File

@ -167,6 +167,10 @@ func (*fakeFramework) NodeInfoSnapshot() *internalcache.NodeInfoSnapshot {
return nil return nil
} }
func (*fakeFramework) RunPrefilterPlugins(pc *framework.PluginContext, pod *v1.Pod) *framework.Status {
return nil
}
func (*fakeFramework) RunPrebindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { func (*fakeFramework) RunPrebindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
return nil return nil
} }

View File

@ -280,8 +280,8 @@ func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason s
// schedule implements the scheduling algorithm and returns the suggested result(host, // schedule implements the scheduling algorithm and returns the suggested result(host,
// evaluated nodes number,feasible nodes number). // evaluated nodes number,feasible nodes number).
func (sched *Scheduler) schedule(pod *v1.Pod) (core.ScheduleResult, error) { func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (core.ScheduleResult, error) {
result, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister) result, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister, pluginContext)
if err != nil { if err != nil {
pod = pod.DeepCopy() pod = pod.DeepCopy()
sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error()) sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error())
@ -458,7 +458,7 @@ func (sched *Scheduler) scheduleOne() {
// Synchronously attempt to find a fit for the pod. // Synchronously attempt to find a fit for the pod.
start := time.Now() start := time.Now()
pluginContext := framework.NewPluginContext() pluginContext := framework.NewPluginContext()
scheduleResult, err := sched.schedule(pod) scheduleResult, err := sched.schedule(pod, pluginContext)
if err != nil { if err != nil {
// schedule() may have failed because the pod would not fit on any host, so we try to // schedule() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it // preempt, with the expectation that the next time the pod is tried for scheduling it

View File

@ -58,7 +58,7 @@ import (
) )
// EmptyFramework is an empty framework used in tests. // EmptyFramework is an empty framework used in tests.
// Note: If the test runs in goroutine, please don't using this variable to avoid a race condition. // Note: If the test runs in goroutine, please don't use this variable to avoid a race condition.
var EmptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, EmptyPluginConfig) var EmptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, EmptyPluginConfig)
// EmptyPluginConfig is an empty plugin config used in tests. // EmptyPluginConfig is an empty plugin config used in tests.
@ -159,7 +159,7 @@ type mockScheduler struct {
err error err error
} }
func (es mockScheduler) Schedule(pod *v1.Pod, ml algorithm.NodeLister) (core.ScheduleResult, error) { func (es mockScheduler) Schedule(pod *v1.Pod, ml algorithm.NodeLister, pc *framework.PluginContext) (core.ScheduleResult, error) {
return es.result, es.err return es.result, es.err
} }

View File

@ -31,10 +31,13 @@ import (
// TesterPlugin is common ancestor for a test plugin that allows injection of // TesterPlugin is common ancestor for a test plugin that allows injection of
// failures and some other test functionalities. // failures and some other test functionalities.
type TesterPlugin struct { type TesterPlugin struct {
numPrefilterCalled int
numReserveCalled int numReserveCalled int
numPrebindCalled int numPrebindCalled int
numPostbindCalled int numPostbindCalled int
numUnreserveCalled int numUnreserveCalled int
failPrefilter bool
rejectPrefilter bool
failReserve bool failReserve bool
failPrebind bool failPrebind bool
rejectPrebind bool rejectPrebind bool
@ -46,6 +49,10 @@ type TesterPlugin struct {
waitAndAllowPermit bool waitAndAllowPermit bool
} }
type PrefilterPlugin struct {
TesterPlugin
}
type ReservePlugin struct { type ReservePlugin struct {
TesterPlugin TesterPlugin
} }
@ -68,6 +75,7 @@ type PermitPlugin struct {
} }
const ( const (
prefilterPluginName = "prefilter-plugin"
reservePluginName = "reserve-plugin" reservePluginName = "reserve-plugin"
prebindPluginName = "prebind-plugin" prebindPluginName = "prebind-plugin"
unreservePluginName = "unreserve-plugin" unreservePluginName = "unreserve-plugin"
@ -75,6 +83,7 @@ const (
permitPluginName = "permit-plugin" permitPluginName = "permit-plugin"
) )
var _ = framework.PrefilterPlugin(&PrefilterPlugin{})
var _ = framework.ReservePlugin(&ReservePlugin{}) var _ = framework.ReservePlugin(&ReservePlugin{})
var _ = framework.PrebindPlugin(&PrebindPlugin{}) var _ = framework.PrebindPlugin(&PrebindPlugin{})
var _ = framework.PostbindPlugin(&PostbindPlugin{}) var _ = framework.PostbindPlugin(&PostbindPlugin{})
@ -154,6 +163,30 @@ func NewPostbindPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framewo
return ptbdPlugin, nil return ptbdPlugin, nil
} }
var pfPlugin = &PrefilterPlugin{}
// Name returns name of the plugin.
func (pp *PrefilterPlugin) Name() string {
return prefilterPluginName
}
// Prefilter is a test function that returns (true, nil) or errors for testing.
func (pp *PrefilterPlugin) Prefilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status {
pp.numPrefilterCalled++
if pp.failPrefilter {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
}
if pp.rejectPrefilter {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name))
}
return nil
}
// NewPrebindPlugin is the factory for prebind plugin.
func NewPrefilterPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return pfPlugin, nil
}
var unresPlugin = &UnreservePlugin{} var unresPlugin = &UnreservePlugin{}
// Name returns name of the plugin. // Name returns name of the plugin.
@ -226,6 +259,84 @@ func NewPermitPlugin(_ *runtime.Unknown, fh framework.FrameworkHandle) (framewor
return perPlugin, nil return perPlugin, nil
} }
// TestPrefilterPlugin tests invocation of prefilter plugins.
func TestPrefilterPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a reserve plugin.
registry := framework.Registry{prefilterPluginName: NewPrefilterPlugin}
// Setup initial prefilter plugin for testing.
prefilterPlugin := &schedulerconfig.Plugins{
PreFilter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: prefilterPluginName,
},
},
},
}
// Set empty plugin config for testing
emptyPluginConfig := []schedulerconfig.PluginConfig{}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "prefilter-plugin", nil),
false, nil, registry, prefilterPlugin, emptyPluginConfig, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
tests := []struct {
fail bool
reject bool
}{
{
fail: false,
reject: false,
},
{
fail: true,
reject: false,
},
{
fail: false,
reject: true,
},
}
for i, test := range tests {
pfPlugin.failPrefilter = test.fail
pfPlugin.rejectPrefilter = test.reject
// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
if test.reject || test.fail {
if err = waitForPodUnschedulable(cs, pod); err != nil {
t.Errorf("test #%v: Didn't expect the pod to be scheduled. error: %v", i, err)
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err)
}
}
if pfPlugin.numPrefilterCalled == 0 {
t.Errorf("Expected the prefilter plugin to be called.")
}
cleanupPods(cs, t, []*v1.Pod{pod})
}
}
// TestReservePlugin tests invocation of reserve plugins. // TestReservePlugin tests invocation of reserve plugins.
func TestReservePlugin(t *testing.T) { func TestReservePlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a reserve plugin. // Create a plugin registry for testing. Register only a reserve plugin.