Merge pull request #78477 from YoubingLi/filter

Fixes 78001 The implementation of Filter extension for the new framework
This commit is contained in:
Kubernetes Prow Robot 2019-07-22 16:48:53 -07:00 committed by GitHub
commit f31d786927
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 178 additions and 4 deletions

View File

@ -209,7 +209,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
trace.Step("Basic checks done")
startPredicateEvalTime := time.Now()
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pluginContext, pod, nodes)
if err != nil {
return result, err
}
@ -460,7 +460,7 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}
@ -486,6 +486,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
checkNode := func(i int) {
nodeName := g.cache.NodeTree().Next()
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
@ -501,6 +502,19 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
return
}
if fits {
// Iterate each plugin to verify current node
status := g.framework.RunFilterPlugins(pluginContext, pod, nodeName)
if !status.IsSuccess() {
predicateResultLock.Lock()
failedPredicateMap[nodeName] = append(failedPredicateMap[nodeName],
predicates.NewFailureReason(status.Message()))
if status.Code() != framework.Unschedulable {
errs[status.Message()]++
}
predicateResultLock.Unlock()
return
}
length := atomic.AddInt32(&filteredLen, 1)
if length > numNodesToFind {
cancel()

View File

@ -506,7 +506,7 @@ func TestFindFitAllError(t *testing.T) {
nodes := makeNodeList([]string{"3", "2", "1"})
scheduler := makeScheduler(predicates, nodes)
_, predicateMap, err := scheduler.findNodesThatFit(&v1.Pod{}, nodes)
_, predicateMap, err := scheduler.findNodesThatFit(nil, &v1.Pod{}, nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -536,7 +536,7 @@ func TestFindFitSomeError(t *testing.T) {
scheduler := makeScheduler(predicates, nodes)
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
_, predicateMap, err := scheduler.findNodesThatFit(pod, nodes)
_, predicateMap, err := scheduler.findNodesThatFit(nil, pod, nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)

View File

@ -40,6 +40,7 @@ type framework struct {
pluginNameToWeightMap map[string]int
queueSortPlugins []QueueSortPlugin
prefilterPlugins []PrefilterPlugin
filterPlugins []FilterPlugin
scorePlugins []ScorePlugin
reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin
@ -113,6 +114,20 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
}
}
if plugins.Filter != nil {
for _, r := range plugins.Filter.Enabled {
if pg, ok := pluginsMap[r.Name]; ok {
p, ok := pg.(FilterPlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend filter plugin", r.Name)
}
f.filterPlugins = append(f.filterPlugins, p)
} else {
return nil, fmt.Errorf("filter plugin %v does not exist", r.Name)
}
}
}
if plugins.Score != nil {
for _, sc := range plugins.Score.Enabled {
if pg, ok := pluginsMap[sc.Name]; ok {
@ -263,6 +278,29 @@ func (f *framework) RunPrefilterPlugins(
return nil
}
// RunFilterPlugins runs the set of configured Filter plugins for pod on
// the given node. If any of these plugins doesn't return "Success", the
// given node is not suitable for running pod.
// Meanwhile, the failure message and status are set for the given node.
func (f *framework) RunFilterPlugins(pc *PluginContext,
pod *v1.Pod, nodeName string) *Status {
for _, p := range f.filterPlugins {
status := p.Filter(pc, pod, nodeName)
if !status.IsSuccess() {
if status.Code() != Unschedulable {
errMsg := fmt.Sprintf("RunFilterPlugins: error while running %s filter plugin for pod %s: %s",
p.Name(), pod.Name, status.Message())
klog.Error(errMsg)
return NewStatus(Error, errMsg)
}
return status
}
}
return nil
}
// RunScorePlugins runs the set of configured scoring plugins. It returns a map that
// stores for each scoring plugin name the corresponding NodeScoreList(s).
// It also returns *Status, which is set to non-success if any of the plugins returns

View File

@ -143,6 +143,23 @@ type PrefilterPlugin interface {
Prefilter(pc *PluginContext, p *v1.Pod) *Status
}
// FilterPlugin is an interface for Filter plugins. These plugins are called at the
// filter extension point for filtering out hosts that cannot run a pod.
// This concept used to be called 'predicate' in the original scheduler.
// These plugins should return "Success", "Unschedulable" or "Error" in Status.code.
// However, the scheduler accepts other valid codes as well.
// Anything other than "Success" will lead to exclusion of the given host from
// running the pod.
type FilterPlugin interface {
Plugin
// Filter is called by the scheduling framework.
// All FilterPlugins should return "Success" to declare that
// the given node fits the pod. If Filter doesn't return "Success",
// please refer scheduler/algorithm/predicates/error.go
// to set error message.
Filter(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
}
// ScorePlugin is an interface that must be implemented by "score" plugins to rank
// nodes that passed the filtering phase.
type ScorePlugin interface {
@ -236,6 +253,11 @@ type Framework interface {
// cycle is aborted.
RunPrefilterPlugins(pc *PluginContext, pod *v1.Pod) *Status
// RunFilterPlugins runs the set of configured filter plugins for pod on the
// given host. If any of these plugins returns any status other than "Success",
// the given node is not suitable for running the pod.
RunFilterPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
// RunScorePlugins runs the set of configured scoring plugins. It returns a map that
// stores for each scoring plugin name the corresponding NodeScoreList(s).
// It also returns *Status, which is set to non-success if any of the plugins returns

View File

@ -171,6 +171,10 @@ func (*fakeFramework) RunPrefilterPlugins(pc *framework.PluginContext, pod *v1.P
return nil
}
func (*fakeFramework) RunFilterPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
return nil
}
func (*fakeFramework) RunScorePlugins(pc *framework.PluginContext, pod *v1.Pod, nodes []*v1.Node) (framework.PluginToNodeScoreMap, *framework.Status) {
return nil, nil
}

View File

@ -42,6 +42,11 @@ type ScorePlugin struct {
highScoreNode string
}
type FilterPlugin struct {
numFilterCalled int
failFilter bool
}
type ReservePlugin struct {
numReserveCalled int
failReserve bool
@ -86,6 +91,7 @@ type PermitPlugin struct {
const (
prefilterPluginName = "prefilter-plugin"
scorePluginName = "score-plugin"
filterPluginName = "filter-plugin"
reservePluginName = "reserve-plugin"
prebindPluginName = "prebind-plugin"
unreservePluginName = "unreserve-plugin"
@ -95,6 +101,7 @@ const (
var _ = framework.PrefilterPlugin(&PrefilterPlugin{})
var _ = framework.ScorePlugin(&ScorePlugin{})
var _ = framework.FilterPlugin(&FilterPlugin{})
var _ = framework.ReservePlugin(&ReservePlugin{})
var _ = framework.PrebindPlugin(&PrebindPlugin{})
var _ = framework.BindPlugin(&BindPlugin{})
@ -137,6 +144,36 @@ func NewScorePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.
return scPlugin, nil
}
var filterPlugin = &FilterPlugin{}
// Name returns name of the plugin.
func (fp *FilterPlugin) Name() string {
return filterPluginName
}
// reset is used to reset filter plugin.
func (fp *FilterPlugin) reset() {
fp.numFilterCalled = 0
fp.failFilter = false
}
// Filter is a test function that returns an error or nil, depending on the
// value of "failFilter".
func (fp *FilterPlugin) Filter(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
fp.numFilterCalled++
if fp.failFilter {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
}
return nil
}
// NewFilterPlugin is the factory for filtler plugin.
func NewFilterPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return filterPlugin, nil
}
// Name returns name of the plugin.
func (rp *ReservePlugin) Name() string {
return reservePluginName
@ -1286,3 +1323,62 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) {
cleanupPods(cs, t, []*v1.Pod{waitingPod, signallingPod})
}
}
// TestFilterPlugin tests invocation of filter plugins.
func TestFilterPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a filter plugin.
registry := framework.Registry{filterPluginName: NewFilterPlugin}
// Setup initial filter plugin for testing.
plugin := &schedulerconfig.Plugins{
Filter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: filterPluginName,
},
},
},
}
// Set empty plugin config for testing
emptyPluginConfig := []schedulerconfig.PluginConfig{}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "filter-plugin", nil),
false, nil, registry, plugin, emptyPluginConfig, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add a few nodes.
_, err := createNodes(cs, "test-node", nil, 2)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
for _, fail := range []bool{false, true} {
filterPlugin.failFilter = fail
// Create a best effort pod.
pod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
if fail {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(cs, pod.Namespace, pod.Name)); err == nil {
t.Errorf("Didn't expect the pod to be scheduled.")
}
} else {
if err = waitForPodToSchedule(cs, pod); err != nil {
t.Errorf("Expected the pod to be scheduled. error: %v", err)
}
}
if filterPlugin.numFilterCalled == 0 {
t.Errorf("Expected the filter plugin to be called.")
}
filterPlugin.reset()
cleanupPods(cs, t, []*v1.Pod{pod})
}
}