Merge pull request #82912 from ahg-g/ahg-prefilter-update

An interface that allows pre-filter plugins to update their pre-calculated status
This commit is contained in:
Kubernetes Prow Robot 2019-09-25 11:06:40 -07:00 committed by GitHub
commit 0b4cccc9d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 338 additions and 8 deletions

View File

@ -1024,7 +1024,9 @@ func (g *genericScheduler) selectNodesForPreemption(
if meta != nil {
metaCopy = meta.ShallowCopy()
}
pods, numPDBViolations, fits := g.selectVictimsOnNode(pluginContext, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
pluginContextClone := pluginContext.Clone()
pods, numPDBViolations, fits := g.selectVictimsOnNode(
pluginContextClone, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
if fits {
resultLock.Lock()
victims := schedulerapi.Victims{
@ -1117,6 +1119,10 @@ func (g *genericScheduler) selectVictimsOnNode(
return err
}
}
status := g.framework.RunPreFilterUpdaterRemovePod(pluginContext, pod, rp, nodeInfoCopy)
if !status.IsSuccess() {
return status.AsError()
}
return nil
}
addPod := func(ap *v1.Pod) error {
@ -1126,6 +1132,10 @@ func (g *genericScheduler) selectVictimsOnNode(
return err
}
}
status := g.framework.RunPreFilterUpdaterAddPod(pluginContext, pod, ap, nodeInfoCopy)
if !status.IsSuccess() {
return status.AsError()
}
return nil
}
// As the first step, remove all the lower priority pods from the node and

View File

@ -52,6 +52,7 @@ go_test(
deps = [
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/config/scheme:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",

View File

@ -53,8 +53,12 @@ func NewPluginContext() *PluginContext {
}
}
// Clone creates a copy of PluginContext and returns its pointer.
// Clone creates a copy of PluginContext and returns its pointer. Clone returns
// nil if the context being cloned is nil.
func (c *PluginContext) Clone() *PluginContext {
if c == nil {
return nil
}
copy := NewPluginContext()
for k, v := range c.storage {
copy.Write(k, v.Clone())

View File

@ -64,3 +64,11 @@ func TestPluginContextClone(t *testing.T) {
t.Errorf("cloned copy should not change, got %q, expected %q", v.data, data1)
}
}
func TestPluginContextCloneNil(t *testing.T) {
var pc *PluginContext
pcCopy := pc.Clone()
if pcCopy != nil {
t.Errorf("clone expected to be nil")
}
}

View File

@ -306,6 +306,46 @@ func (f *framework) RunPreFilterPlugins(
return nil
}
// RunPreFilterUpdaterAddPod calls the AddPod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
func (f *framework) RunPreFilterUpdaterAddPod(pc *PluginContext, podToSchedule *v1.Pod,
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
for _, pl := range f.preFilterPlugins {
if updater := pl.Updater(); updater != nil {
status := updater.AddPod(pc, podToSchedule, podToAdd, nodeInfo)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
}
return nil
}
// RunPreFilterUpdaterRemovePod calls the RemovePod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
func (f *framework) RunPreFilterUpdaterRemovePod(pc *PluginContext, podToSchedule *v1.Pod,
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
for _, pl := range f.preFilterPlugins {
if updater := pl.Updater(); updater != nil {
status := updater.RemovePod(pc, podToSchedule, podToRemove, nodeInfo)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
}
return nil
}
// RunFilterPlugins runs the set of configured Filter plugins for pod on
// the given node. If any of these plugins doesn't return "Success", the
// given node is not suitable for running pod.

View File

@ -25,13 +25,16 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
const (
scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1"
scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2"
scorePlugin1 = "score-plugin-1"
pluginNotImplementingScore = "plugin-not-implementing-score"
scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1"
scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2"
scorePlugin1 = "score-plugin-1"
pluginNotImplementingScore = "plugin-not-implementing-score"
preFilterPluginName = "prefilter-plugin"
preFilterWithUpdaterPluginName = "prefilter-with-updater-plugin"
)
// TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface.
@ -105,6 +108,56 @@ func (pl *PluginNotImplementingScore) Name() string {
return pluginNotImplementingScore
}
// TestPreFilterPlugin only implements PreFilterPlugin interface.
type TestPreFilterPlugin struct {
PreFilterCalled int
}
func (pl *TestPreFilterPlugin) Name() string {
return preFilterPluginName
}
func (pl *TestPreFilterPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status {
pl.PreFilterCalled++
return nil
}
func (pl *TestPreFilterPlugin) Updater() Updater {
return nil
}
// TestPreFilterWithUpdatePlugin implements Add/Remove interfaces.
type TestPreFilterWithUpdaterPlugin struct {
PreFilterCalled int
AddCalled int
RemoveCalled int
}
func (pl *TestPreFilterWithUpdaterPlugin) Name() string {
return preFilterWithUpdaterPluginName
}
func (pl *TestPreFilterWithUpdaterPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status {
pl.PreFilterCalled++
return nil
}
func (pl *TestPreFilterWithUpdaterPlugin) AddPod(pc *PluginContext, podToSchedule *v1.Pod,
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
pl.AddCalled++
return nil
}
func (pl *TestPreFilterWithUpdaterPlugin) RemovePod(pc *PluginContext, podToSchedule *v1.Pod,
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
pl.RemoveCalled++
return nil
}
func (pl *TestPreFilterWithUpdaterPlugin) Updater() Updater {
return pl
}
var registry Registry = func() Registry {
r := make(Registry)
r.Register(scoreWithNormalizePlugin1, newScoreWithNormalizePlugin1)
@ -365,6 +418,44 @@ func TestRunScorePlugins(t *testing.T) {
}
}
func TestPreFilterPlugins(t *testing.T) {
preFilter1 := &TestPreFilterPlugin{}
preFilter2 := &TestPreFilterWithUpdaterPlugin{}
r := make(Registry)
r.Register(preFilterPluginName,
func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) {
return preFilter1, nil
})
r.Register(preFilterWithUpdaterPluginName,
func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) {
return preFilter2, nil
})
plugins := &config.Plugins{PreFilter: &config.PluginSet{Enabled: []config.Plugin{{Name: preFilterWithUpdaterPluginName}, {Name: preFilterPluginName}}}}
t.Run("TestPreFilterPlugin", func(t *testing.T) {
f, err := NewFramework(r, plugins, emptyArgs)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
f.RunPreFilterPlugins(nil, nil)
f.RunPreFilterUpdaterAddPod(nil, nil, nil, nil)
f.RunPreFilterUpdaterRemovePod(nil, nil, nil, nil)
if preFilter1.PreFilterCalled != 1 {
t.Errorf("preFilter1 called %v, expected: 1", preFilter1.PreFilterCalled)
}
if preFilter2.PreFilterCalled != 1 {
t.Errorf("preFilter2 called %v, expected: 1", preFilter2.PreFilterCalled)
}
if preFilter2.AddCalled != 1 {
t.Errorf("AddPod called %v, expected: 1", preFilter2.AddCalled)
}
if preFilter2.RemoveCalled != 1 {
t.Errorf("AddPod called %v, expected: 1", preFilter2.RemoveCalled)
}
})
}
func buildConfigDefaultWeights(ps ...string) *config.Plugins {
return buildConfigWithWeights(defaultWeights, ps...)
}

View File

@ -165,6 +165,18 @@ type QueueSortPlugin interface {
Less(*PodInfo, *PodInfo) bool
}
// Updater is an interface that is included in plugins that allow specifying
// callbacks to make incremental updates to its supposedly pre-calculated
// state.
type Updater interface {
// AddPod is called by the framework while trying to evaluate the impact
// of adding podToAdd to the node while scheduling podToSchedule.
AddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
// RemovePod is called by the framework while trying to evaluate the impact
// of removing podToRemove from the node while scheduling podToSchedule.
RemovePod(pc *PluginContext, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
}
// PreFilterPlugin is an interface that must be implemented by "prefilter" plugins.
// These plugins are called at the beginning of the scheduling cycle.
type PreFilterPlugin interface {
@ -172,6 +184,13 @@ type PreFilterPlugin interface {
// PreFilter is called at the beginning of the scheduling cycle. All PreFilter
// plugins must return success or the pod will be rejected.
PreFilter(pc *PluginContext, p *v1.Pod) *Status
// Updater returns an updater if the plugin implements one, or nil if it
// does not. A Pre-filter plugin can provide an updater to incrementally
// modify its pre-processed info. The framework guarantees that the updater
// AddPod/RemovePod functions will only be called after PreFilter,
// possibly on a cloned PluginContext, and may call those functions more than
// once before calling Filter again on a specific node.
Updater() Updater
}
// FilterPlugin is an interface for Filter plugins. These plugins are called at the
@ -326,6 +345,16 @@ type Framework interface {
// schedule the target pod.
RunFilterPlugins(pc *PluginContext, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
// RunPreFilterUpdaterAddPod calls the AddPod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
RunPreFilterUpdaterAddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
// RunPreFilterUpdaterRemovePod calls the RemovePod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
RunPreFilterUpdaterRemovePod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
// RunPostFilterPlugins runs the set of configured post-filter plugins. If any
// of these plugins returns any status other than "Success", the given node is
// rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses.

View File

@ -175,6 +175,14 @@ func (*fakeFramework) RunFilterPlugins(pc *framework.PluginContext, pod *v1.Pod,
return nil
}
func (*fakeFramework) RunPreFilterUpdaterAddPod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
return nil
}
func (*fakeFramework) RunPreFilterUpdaterRemovePod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
return nil
}
func (*fakeFramework) RunScorePlugins(pc *framework.PluginContext, pod *v1.Pod, nodes []*v1.Node) (framework.PluginToNodeScores, *framework.Status) {
return nil, nil
}

View File

@ -326,6 +326,11 @@ func (pp *PreFilterPlugin) Name() string {
return prefilterPluginName
}
// Updater returns the updater interface.
func (pp *PreFilterPlugin) Updater() framework.Updater {
return nil
}
// PreFilter is a test function that returns (true, nil) or errors for testing.
func (pp *PreFilterPlugin) PreFilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status {
pp.numPreFilterCalled++

View File

@ -27,6 +27,7 @@ import (
policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
@ -38,6 +39,9 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/scheduling"
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/plugin/pkg/admission/priority"
testutils "k8s.io/kubernetes/test/utils"
@ -66,10 +70,80 @@ func waitForNominatedNodeName(cs clientset.Interface, pod *v1.Pod) error {
return waitForNominatedNodeNameWithTimeout(cs, pod, wait.ForeverTestTimeout)
}
const tokenFilterName = "token-filter"
type tokenFilter struct {
Tokens int
Unresolvable bool
}
// Name returns name of the plugin.
func (fp *tokenFilter) Name() string {
return tokenFilterName
}
func (fp *tokenFilter) Filter(pc *framework.PluginContext, pod *v1.Pod,
nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
if fp.Tokens > 0 {
fp.Tokens--
return nil
}
status := framework.Unschedulable
if fp.Unresolvable {
status = framework.UnschedulableAndUnresolvable
}
return framework.NewStatus(status, fmt.Sprintf("can't fit %v", pod.Name))
}
func (fp *tokenFilter) PreFilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status {
return nil
}
func (fp *tokenFilter) AddPod(pc *framework.PluginContext, podToSchedule *v1.Pod,
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
fp.Tokens--
return nil
}
func (fp *tokenFilter) RemovePod(pc *framework.PluginContext, podToSchedule *v1.Pod,
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
fp.Tokens++
return nil
}
func (fp *tokenFilter) Updater() framework.Updater {
return fp
}
var _ = framework.FilterPlugin(&tokenFilter{})
// TestPreemption tests a few preemption scenarios.
func TestPreemption(t *testing.T) {
// Initialize scheduler.
context := initTest(t, "preemption")
// Initialize scheduler with a filter plugin.
var filter tokenFilter
registry := framework.Registry{filterPluginName: func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
return &filter, nil
}}
plugin := &schedulerconfig.Plugins{
Filter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: filterPluginName,
},
},
},
PreFilter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: filterPluginName,
},
},
},
}
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "preemptiom", nil),
false, nil, registry, plugin, []schedulerconfig.PluginConfig{}, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
@ -78,14 +152,18 @@ func TestPreemption(t *testing.T) {
v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)},
}
maxTokens := 1000
tests := []struct {
description string
existingPods []*v1.Pod
pod *v1.Pod
initTokens int
unresolvable bool
preemptedPodIndexes map[int]struct{}
}{
{
description: "basic pod preemption",
initTokens: maxTokens,
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "victim-pod",
@ -108,8 +186,61 @@ func TestPreemption(t *testing.T) {
}),
preemptedPodIndexes: map[int]struct{}{0: {}},
},
{
description: "basic pod preemption with filter",
initTokens: 1,
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "victim-pod",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
},
}),
},
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
},
}),
preemptedPodIndexes: map[int]struct{}{0: {}},
},
{
// same as the previous test, but the filter is unresolvable.
description: "basic pod preemption with unresolvable filter",
initTokens: 1,
unresolvable: true,
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "victim-pod",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
},
}),
},
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
},
}),
preemptedPodIndexes: map[int]struct{}{},
},
{
description: "preemption is performed to satisfy anti-affinity",
initTokens: maxTokens,
existingPods: []*v1.Pod{
initPausePod(cs, &pausePodConfig{
Name: "pod-0", Namespace: context.ns.Name,
@ -173,6 +304,7 @@ func TestPreemption(t *testing.T) {
{
// This is similar to the previous case only pod-1 is high priority.
description: "preemption is not performed when anti-affinity is not satisfied",
initTokens: maxTokens,
existingPods: []*v1.Pod{
initPausePod(cs, &pausePodConfig{
Name: "pod-0", Namespace: context.ns.Name,
@ -254,6 +386,8 @@ func TestPreemption(t *testing.T) {
}
for _, test := range tests {
filter.Tokens = test.initTokens
filter.Unresolvable = test.unresolvable
pods := make([]*v1.Pod, len(test.existingPods))
// Create and run existingPods.
for i, p := range test.existingPods {