prefilter extension point implementation.

This commit is contained in:
Abdullah Gharaibeh 2019-06-10 17:01:50 -04:00
parent d873167e8f
commit a61a437ef2
9 changed files with 183 additions and 10 deletions

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@ -552,7 +553,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
schedulerapi.DefaultPercentageOfNodesToScore,
false)
podIgnored := &v1.Pod{}
result, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
result, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext())
if test.expectsErr {
if err == nil {
t.Errorf("Unexpected non-error, result %+v", result)

View File

@ -126,7 +126,7 @@ func (f *FitError) Error() string {
// onto machines.
// TODO: Rename this type.
type ScheduleAlgorithm interface {
Schedule(*v1.Pod, algorithm.NodeLister) (scheduleResult ScheduleResult, err error)
Schedule(*v1.Pod, algorithm.NodeLister, *framework.PluginContext) (scheduleResult ScheduleResult, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, a
@ -181,7 +181,7 @@ func (g *genericScheduler) snapshot() error {
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister, pluginContext *framework.PluginContext) (result ScheduleResult, err error) {
trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
defer trace.LogIfLong(100 * time.Millisecond)
@ -189,6 +189,12 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
return result, err
}
// Run "prefilter" plugins.
prefilterStatus := g.framework.RunPrefilterPlugins(pluginContext, pod)
if !prefilterStatus.IsSuccess() {
return result, prefilterStatus.AsError()
}
nodes, err := nodeLister.List()
if err != nil {
return result, err

View File

@ -466,8 +466,7 @@ func TestGenericScheduler(t *testing.T) {
false,
schedulerapi.DefaultPercentageOfNodesToScore,
false)
result, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
result, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext())
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("Unexpected error: %v, expected: %v", err, test.wErr)
}

View File

@ -36,6 +36,7 @@ type framework struct {
waitingPods *waitingPodsMap
plugins map[string]Plugin // a map of initialized plugins. Plugin name:plugin instance.
queueSortPlugins []QueueSortPlugin
prefilterPlugins []PrefilterPlugin
reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin
postbindPlugins []PostbindPlugin
@ -85,6 +86,20 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
f.plugins[name] = p
}
if plugins.PreFilter != nil {
for _, pf := range plugins.PreFilter.Enabled {
if pg, ok := f.plugins[pf.Name]; ok {
p, ok := pg.(PrefilterPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend prefilter plugin", pf.Name)
}
f.prefilterPlugins = append(f.prefilterPlugins, p)
} else {
return nil, fmt.Errorf("prefilter plugin %v does not exist", pf.Name)
}
}
}
if plugins.Reserve != nil {
for _, r := range plugins.Reserve.Enabled {
if pg, ok := f.plugins[r.Name]; ok {
@ -185,6 +200,28 @@ func (f *framework) QueueSortFunc() LessFunc {
return f.queueSortPlugins[0].Less
}
// RunPrefilterPlugins runs the set of configured prefilter plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
func (f *framework) RunPrefilterPlugins(
pc *PluginContext, pod *v1.Pod) *Status {
for _, pl := range f.prefilterPlugins {
status := pl.Prefilter(pc, pod)
if !status.IsSuccess() {
if status.Code() == Unschedulable {
msg := fmt.Sprintf("rejected by %v at prefilter: %v", pl.Name(), status.Message())
klog.V(4).Infof(msg)
return NewStatus(status.Code(), msg)
}
msg := fmt.Sprintf("error while running %v prefilter plugin for pod %v: %v", pl.Name(), pod.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
return nil
}
// RunPrebindPlugins runs the set of configured prebind plugins. It returns a
// failure (bool) if any of the plugins returns an error. It also returns an
// error containing the rejection message or the error occurred in the plugin.

View File

@ -126,6 +126,15 @@ type QueueSortPlugin interface {
Less(*PodInfo, *PodInfo) bool
}
// PrefilterPlugin is an interface that must be implemented by "prefilter" plugins.
// These plugins are called at the beginning of the scheduling cycle.
type PrefilterPlugin interface {
Plugin
// Prefilter is called at the beginning of the scheduling cycle. All prefilter
// plugins must return success or the pod will be rejected.
Prefilter(pc *PluginContext, p *v1.Pod) *Status
}
// ReservePlugin is an interface for Reserve plugins. These plugins are called
// at the reservation point. These are meant to update the state of the plugin.
// This concept used to be called 'assume' in the original scheduler.
@ -190,6 +199,12 @@ type Framework interface {
// QueueSortFunc returns the function to sort pods in scheduling queue
QueueSortFunc() LessFunc
// RunPrefilterPlugins runs the set of configured prefilter plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
RunPrefilterPlugins(pc *PluginContext, pod *v1.Pod) *Status
// RunPrebindPlugins runs the set of configured prebind plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If the Status code is "Unschedulable", it is

View File

@ -167,6 +167,10 @@ func (*fakeFramework) NodeInfoSnapshot() *internalcache.NodeInfoSnapshot {
return nil
}
func (*fakeFramework) RunPrefilterPlugins(pc *framework.PluginContext, pod *v1.Pod) *framework.Status {
return nil
}
func (*fakeFramework) RunPrebindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}

View File

@ -280,8 +280,8 @@ func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason s
// schedule implements the scheduling algorithm and returns the suggested result(host,
// evaluated nodes number,feasible nodes number).
func (sched *Scheduler) schedule(pod *v1.Pod) (core.ScheduleResult, error) {
result, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (core.ScheduleResult, error) {
result, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister, pluginContext)
if err != nil {
pod = pod.DeepCopy()
sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error())
@ -458,7 +458,7 @@ func (sched *Scheduler) scheduleOne() {
// Synchronously attempt to find a fit for the pod.
start := time.Now()
pluginContext := framework.NewPluginContext()
scheduleResult, err := sched.schedule(pod)
scheduleResult, err := sched.schedule(pod, pluginContext)
if err != nil {
// schedule() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it

View File

@ -58,7 +58,7 @@ import (
)
// EmptyFramework is an empty framework used in tests.
// Note: If the test runs in goroutine, please don't using this variable to avoid a race condition.
// Note: If the test runs in goroutine, please don't use this variable to avoid a race condition.
var EmptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, EmptyPluginConfig)
// EmptyPluginConfig is an empty plugin config used in tests.
@ -159,7 +159,7 @@ type mockScheduler struct {
err error
}
func (es mockScheduler) Schedule(pod *v1.Pod, ml algorithm.NodeLister) (core.ScheduleResult, error) {
func (es mockScheduler) Schedule(pod *v1.Pod, ml algorithm.NodeLister, pc *framework.PluginContext) (core.ScheduleResult, error) {
return es.result, es.err
}

View File

@ -31,10 +31,13 @@ import (
// TesterPlugin is common ancestor for a test plugin that allows injection of
// failures and some other test functionalities.
type TesterPlugin struct {
numPrefilterCalled int
numReserveCalled int
numPrebindCalled int
numPostbindCalled int
numUnreserveCalled int
failPrefilter bool
rejectPrefilter bool
failReserve bool
failPrebind bool
rejectPrebind bool
@ -46,6 +49,10 @@ type TesterPlugin struct {
waitAndAllowPermit bool
}
type PrefilterPlugin struct {
TesterPlugin
}
type ReservePlugin struct {
TesterPlugin
}
@ -68,6 +75,7 @@ type PermitPlugin struct {
}
const (
prefilterPluginName = "prefilter-plugin"
reservePluginName = "reserve-plugin"
prebindPluginName = "prebind-plugin"
unreservePluginName = "unreserve-plugin"
@ -75,6 +83,7 @@ const (
permitPluginName = "permit-plugin"
)
var _ = framework.PrefilterPlugin(&PrefilterPlugin{})
var _ = framework.ReservePlugin(&ReservePlugin{})
var _ = framework.PrebindPlugin(&PrebindPlugin{})
var _ = framework.PostbindPlugin(&PostbindPlugin{})
@ -154,6 +163,30 @@ func NewPostbindPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framewo
return ptbdPlugin, nil
}
var pfPlugin = &PrefilterPlugin{}
// Name returns name of the plugin.
func (pp *PrefilterPlugin) Name() string {
return prefilterPluginName
}
// Prefilter is a test function that returns (true, nil) or errors for testing.
func (pp *PrefilterPlugin) Prefilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status {
pp.numPrefilterCalled++
if pp.failPrefilter {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
}
if pp.rejectPrefilter {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name))
}
return nil
}
// NewPrebindPlugin is the factory for prebind plugin.
func NewPrefilterPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return pfPlugin, nil
}
var unresPlugin = &UnreservePlugin{}
// Name returns name of the plugin.
@ -226,6 +259,84 @@ func NewPermitPlugin(_ *runtime.Unknown, fh framework.FrameworkHandle) (framewor
return perPlugin, nil
}
// TestPrefilterPlugin tests invocation of prefilter plugins.
func TestPrefilterPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a reserve plugin.
registry := framework.Registry{prefilterPluginName: NewPrefilterPlugin}
// Setup initial prefilter plugin for testing.
prefilterPlugin := &schedulerconfig.Plugins{
PreFilter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: prefilterPluginName,
},
},
},
}
// Set empty plugin config for testing
emptyPluginConfig := []schedulerconfig.PluginConfig{}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "prefilter-plugin", nil),
false, nil, registry, prefilterPlugin, emptyPluginConfig, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
tests := []struct {
fail bool
reject bool
}{
{
fail: false,
reject: false,
},
{
fail: true,
reject: false,
},
{
fail: false,
reject: true,
},
}
for i, test := range tests {
pfPlugin.failPrefilter = test.fail
pfPlugin.rejectPrefilter = test.reject
// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
if test.reject || test.fail {
if err = waitForPodUnschedulable(cs, pod); err != nil {
t.Errorf("test #%v: Didn't expect the pod to be scheduled. error: %v", i, err)
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("test #%v: Expected the pod to be scheduled. error: %v", i, err)
}
}
if pfPlugin.numPrefilterCalled == 0 {
t.Errorf("Expected the prefilter plugin to be called.")
}
cleanupPods(cs, t, []*v1.Pod{pod})
}
}
// TestReservePlugin tests invocation of reserve plugins.
func TestReservePlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a reserve plugin.