A consistent interface for plugin extended functionality

This commit is contained in:
notpad 2019-10-01 23:59:48 +08:00
parent 2ebcd2509c
commit fc55e74741
8 changed files with 85 additions and 72 deletions

View File

@ -1119,7 +1119,7 @@ func (g *genericScheduler) selectVictimsOnNode(
return err
}
}
status := g.framework.RunPreFilterUpdaterRemovePod(pluginContext, pod, rp, nodeInfoCopy)
status := g.framework.RunPreFilterExtensionRemovePod(pluginContext, pod, rp, nodeInfoCopy)
if !status.IsSuccess() {
return status.AsError()
}
@ -1132,7 +1132,7 @@ func (g *genericScheduler) selectVictimsOnNode(
return err
}
}
status := g.framework.RunPreFilterUpdaterAddPod(pluginContext, pod, ap, nodeInfoCopy)
status := g.framework.RunPreFilterExtensionAddPod(pluginContext, pod, ap, nodeInfoCopy)
if !status.IsSuccess() {
return status.AsError()
}

View File

@ -634,7 +634,7 @@ func (t *TestPlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName stri
return 1, nil
}
func (t *TestPlugin) NormalizeScore(pc *framework.PluginContext, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
func (t *TestPlugin) Extensions() framework.ScoreExtensions {
return nil
}

View File

@ -297,40 +297,40 @@ func (f *framework) RunPreFilterPlugins(
return nil
}
// RunPreFilterUpdaterAddPod calls the AddPod interface for the set of configured
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
func (f *framework) RunPreFilterUpdaterAddPod(pc *PluginContext, podToSchedule *v1.Pod,
func (f *framework) RunPreFilterExtensionAddPod(pc *PluginContext, podToSchedule *v1.Pod,
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
for _, pl := range f.preFilterPlugins {
if updater := pl.Updater(); updater != nil {
status := updater.AddPod(pc, podToSchedule, podToAdd, nodeInfo)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
if pl.Extensions() == nil {
continue
}
if status := pl.Extensions().AddPod(pc, podToSchedule, podToAdd, nodeInfo); !status.IsSuccess() {
msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
return nil
}
// RunPreFilterUpdaterRemovePod calls the RemovePod interface for the set of configured
// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
func (f *framework) RunPreFilterUpdaterRemovePod(pc *PluginContext, podToSchedule *v1.Pod,
func (f *framework) RunPreFilterExtensionRemovePod(pc *PluginContext, podToSchedule *v1.Pod,
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
for _, pl := range f.preFilterPlugins {
if updater := pl.Updater(); updater != nil {
status := updater.RemovePod(pc, podToSchedule, podToRemove, nodeInfo)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
if pl.Extensions() == nil {
continue
}
if status := pl.Extensions().RemovePod(pc, podToSchedule, podToRemove, nodeInfo); !status.IsSuccess() {
msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
@ -417,8 +417,10 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
workqueue.ParallelizeUntil(ctx, 16, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
nodeScoreList := pluginToNodeScores[pl.Name()]
status := pl.NormalizeScore(pc, pod, nodeScoreList)
if !status.IsSuccess() {
if pl.Extensions() == nil {
return
}
if status := pl.Extensions().NormalizeScore(pc, pod, nodeScoreList); !status.IsSuccess() {
err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message())
errCh.SendErrorWithCancel(err, cancel)
return

View File

@ -29,12 +29,12 @@ import (
)
const (
scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1"
scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2"
scorePlugin1 = "score-plugin-1"
pluginNotImplementingScore = "plugin-not-implementing-score"
preFilterPluginName = "prefilter-plugin"
preFilterWithUpdaterPluginName = "prefilter-with-updater-plugin"
scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1"
scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2"
scorePlugin1 = "score-plugin-1"
pluginNotImplementingScore = "plugin-not-implementing-score"
preFilterPluginName = "prefilter-plugin"
preFilterWithExtensionsPluginName = "prefilter-with-extensions-plugin"
)
// TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface.
@ -87,6 +87,10 @@ func (pl *TestScoreWithNormalizePlugin) Score(pc *PluginContext, p *v1.Pod, node
return setScoreRes(pl.inj)
}
func (pl *TestScoreWithNormalizePlugin) Extensions() ScoreExtensions {
return pl
}
// TestScorePlugin only implements ScorePlugin interface.
type TestScorePlugin struct {
name string
@ -101,7 +105,7 @@ func (pl *TestScorePlugin) Score(pc *PluginContext, p *v1.Pod, nodeName string)
return setScoreRes(pl.inj)
}
func (pl *TestScorePlugin) NormalizeScore(pc *PluginContext, pod *v1.Pod, scores NodeScoreList) *Status {
func (pl *TestScorePlugin) Extensions() ScoreExtensions {
return nil
}
@ -126,39 +130,39 @@ func (pl *TestPreFilterPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status {
return nil
}
func (pl *TestPreFilterPlugin) Updater() Updater {
func (pl *TestPreFilterPlugin) Extensions() PreFilterExtensions {
return nil
}
// TestPreFilterWithUpdatePlugin implements Add/Remove interfaces.
type TestPreFilterWithUpdaterPlugin struct {
// TestPreFilterWithExtensionsPlugin implements Add/Remove interfaces.
type TestPreFilterWithExtensionsPlugin struct {
PreFilterCalled int
AddCalled int
RemoveCalled int
}
func (pl *TestPreFilterWithUpdaterPlugin) Name() string {
return preFilterWithUpdaterPluginName
func (pl *TestPreFilterWithExtensionsPlugin) Name() string {
return preFilterWithExtensionsPluginName
}
func (pl *TestPreFilterWithUpdaterPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status {
func (pl *TestPreFilterWithExtensionsPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status {
pl.PreFilterCalled++
return nil
}
func (pl *TestPreFilterWithUpdaterPlugin) AddPod(pc *PluginContext, podToSchedule *v1.Pod,
func (pl *TestPreFilterWithExtensionsPlugin) AddPod(pc *PluginContext, podToSchedule *v1.Pod,
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
pl.AddCalled++
return nil
}
func (pl *TestPreFilterWithUpdaterPlugin) RemovePod(pc *PluginContext, podToSchedule *v1.Pod,
func (pl *TestPreFilterWithExtensionsPlugin) RemovePod(pc *PluginContext, podToSchedule *v1.Pod,
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
pl.RemoveCalled++
return nil
}
func (pl *TestPreFilterWithUpdaterPlugin) Updater() Updater {
func (pl *TestPreFilterWithExtensionsPlugin) Extensions() PreFilterExtensions {
return pl
}
@ -424,25 +428,25 @@ func TestRunScorePlugins(t *testing.T) {
func TestPreFilterPlugins(t *testing.T) {
preFilter1 := &TestPreFilterPlugin{}
preFilter2 := &TestPreFilterWithUpdaterPlugin{}
preFilter2 := &TestPreFilterWithExtensionsPlugin{}
r := make(Registry)
r.Register(preFilterPluginName,
func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) {
return preFilter1, nil
})
r.Register(preFilterWithUpdaterPluginName,
r.Register(preFilterWithExtensionsPluginName,
func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) {
return preFilter2, nil
})
plugins := &config.Plugins{PreFilter: &config.PluginSet{Enabled: []config.Plugin{{Name: preFilterWithUpdaterPluginName}, {Name: preFilterPluginName}}}}
plugins := &config.Plugins{PreFilter: &config.PluginSet{Enabled: []config.Plugin{{Name: preFilterWithExtensionsPluginName}, {Name: preFilterPluginName}}}}
t.Run("TestPreFilterPlugin", func(t *testing.T) {
f, err := NewFramework(r, plugins, emptyArgs)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
f.RunPreFilterPlugins(nil, nil)
f.RunPreFilterUpdaterAddPod(nil, nil, nil, nil)
f.RunPreFilterUpdaterRemovePod(nil, nil, nil, nil)
f.RunPreFilterExtensionAddPod(nil, nil, nil, nil)
f.RunPreFilterExtensionRemovePod(nil, nil, nil, nil)
if preFilter1.PreFilterCalled != 1 {
t.Errorf("preFilter1 called %v, expected: 1", preFilter1.PreFilterCalled)

View File

@ -165,10 +165,10 @@ type QueueSortPlugin interface {
Less(*PodInfo, *PodInfo) bool
}
// Updater is an interface that is included in plugins that allow specifying
// PreFilterExtensions is an interface that is included in plugins that allow specifying
// callbacks to make incremental updates to its supposedly pre-calculated
// state.
type Updater interface {
type PreFilterExtensions interface {
// AddPod is called by the framework while trying to evaluate the impact
// of adding podToAdd to the node while scheduling podToSchedule.
AddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
@ -184,13 +184,13 @@ type PreFilterPlugin interface {
// PreFilter is called at the beginning of the scheduling cycle. All PreFilter
// plugins must return success or the pod will be rejected.
PreFilter(pc *PluginContext, p *v1.Pod) *Status
// Updater returns an updater if the plugin implements one, or nil if it
// does not. A Pre-filter plugin can provide an updater to incrementally
// modify its pre-processed info. The framework guarantees that the updater
// AddPod/RemovePod functions will only be called after PreFilter,
// possibly on a cloned PluginContext, and may call those functions more than
// once before calling Filter again on a specific node.
Updater() Updater
// Extensions returns a PreFilterExtensions interface if the plugin implements one,
// or nil if it does not. A Pre-filter plugin can provide extensions to incrementally
// modify its pre-processed info. The framework guarantees that the extensions
// AddPod/RemovePod will only be called after PreFilter, possibly on a cloned
// PluginContext, and may call those functions more than once before calling
// Filter again on a specific node.
Extensions() PreFilterExtensions
}
// FilterPlugin is an interface for Filter plugins. These plugins are called at the
@ -230,6 +230,14 @@ type PostFilterPlugin interface {
PostFilter(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status
}
// ScoreExtensions is an interface for Score extended functionality.
type ScoreExtensions interface {
// NormalizeScore is called for all node scores produced by the same plugin's "Score"
// method. A successful run of NormalizeScore will update the scores list and return
// a success status.
NormalizeScore(pc *PluginContext, p *v1.Pod, scores NodeScoreList) *Status
}
// ScorePlugin is an interface that must be implemented by "score" plugins to rank
// nodes that passed the filtering phase.
type ScorePlugin interface {
@ -239,13 +247,8 @@ type ScorePlugin interface {
// the pod will be rejected.
Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status)
// NormalizeScore is called for all node scores produced by the same plugin's "Score"
// method. A successful run of NormalizeScore will update the scores list and return
// a success status.
//
// NOTE: This function is optional, and you could implement it as a no-op by simply
// returning nil.
NormalizeScore(pc *PluginContext, p *v1.Pod, scores NodeScoreList) *Status
// Extensions returns a ScoreExtensions interface if it implements one, or nil if does not.
Extensions() ScoreExtensions
}
// ReservePlugin is an interface for Reserve plugins. These plugins are called
@ -342,15 +345,15 @@ type Framework interface {
// schedule the target pod.
RunFilterPlugins(pc *PluginContext, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
// RunPreFilterUpdaterAddPod calls the AddPod interface for the set of configured
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
RunPreFilterUpdaterAddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
RunPreFilterExtensionAddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
// RunPreFilterUpdaterRemovePod calls the RemovePod interface for the set of configured
// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
RunPreFilterUpdaterRemovePod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
RunPreFilterExtensionRemovePod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
// RunPostFilterPlugins runs the set of configured post-filter plugins. If any
// of these plugins returns any status other than "Success", the given node is

View File

@ -175,11 +175,11 @@ func (*fakeFramework) RunFilterPlugins(pc *framework.PluginContext, pod *v1.Pod,
return nil
}
func (*fakeFramework) RunPreFilterUpdaterAddPod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
func (*fakeFramework) RunPreFilterExtensionAddPod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
return nil
}
func (*fakeFramework) RunPreFilterUpdaterRemovePod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
func (*fakeFramework) RunPreFilterExtensionRemovePod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
return nil
}

View File

@ -162,7 +162,7 @@ func (sp *ScorePlugin) Score(pc *framework.PluginContext, p *v1.Pod, nodeName st
return score, nil
}
func (sp *ScorePlugin) NormalizeScore(pc *framework.PluginContext, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
func (sp *ScorePlugin) Extensions() framework.ScoreExtensions {
return nil
}
@ -189,6 +189,10 @@ func (sp *ScoreWithNormalizePlugin) NormalizeScore(pc *framework.PluginContext,
return nil
}
func (sp *ScoreWithNormalizePlugin) Extensions() framework.ScoreExtensions {
return sp
}
// Name returns name of the plugin.
func (fp *FilterPlugin) Name() string {
return filterPluginName
@ -330,8 +334,8 @@ func (pp *PreFilterPlugin) Name() string {
return prefilterPluginName
}
// Updater returns the updater interface.
func (pp *PreFilterPlugin) Updater() framework.Updater {
// Extensions returns the PreFilterExtensions interface.
func (pp *PreFilterPlugin) Extensions() framework.PreFilterExtensions {
return nil
}

View File

@ -111,7 +111,7 @@ func (fp *tokenFilter) RemovePod(pc *framework.PluginContext, podToSchedule *v1.
return nil
}
func (fp *tokenFilter) Updater() framework.Updater {
func (fp *tokenFilter) Extensions() framework.PreFilterExtensions {
return fp
}